أنماط MCP المتقدمة
المصادقة والتفويض
5 دقيقة للقراءة
حماية خادم MCP الخاص بك أمر حاسم، خاصة للنشر البعيد. دعنا ننفذ المصادقة المناسبة.
استراتيجيات المصادقة
| الاستراتيجية | الأفضل لـ |
|---|---|
| مفاتيح API | بسيط، مستخدم واحد |
| JWT | متعدد المستخدمين، بدون حالة |
| OAuth 2.0 | تكامل طرف ثالث |
| mTLS | بيئات عالية الأمان |
مصادقة مفتاح API
بسيطة لكن فعالة للخوادم الشخصية:
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
class APIKeyMiddleware(BaseHTTPMiddleware):
def __init__(self, app, api_keys: set):
super().__init__(app)
self.api_keys = api_keys
async def dispatch(self, request, call_next):
api_key = request.headers.get("X-API-Key")
if api_key not in self.api_keys:
return JSONResponse(
{"error": "مفتاح API غير صالح"},
status_code=401
)
return await call_next(request)
# تطبيق الوسيط
app.add_middleware(APIKeyMiddleware, api_keys={"sk_live_abc123"})
مصادقة JWT
للأنظمة متعددة المستخدمين:
import jwt
from datetime import datetime, timedelta
SECRET_KEY = "your-secret-key"
def create_token(user_id: str) -> str:
payload = {
"sub": user_id,
"exp": datetime.utcnow() + timedelta(hours=24)
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def verify_token(token: str) -> dict:
try:
return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise ValueError("انتهت صلاحية الرمز")
except jwt.InvalidTokenError:
raise ValueError("رمز غير صالح")
class JWTMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return JSONResponse({"error": "الرمز مفقود"}, status_code=401)
token = auth_header[7:] # إزالة "Bearer "
try:
payload = verify_token(token)
request.state.user_id = payload["sub"]
except ValueError as e:
return JSONResponse({"error": str(e)}, status_code=401)
return await call_next(request)
التفويض على مستوى الأداة
تقييد الأدوات بناءً على صلاحيات المستخدم:
TOOL_PERMISSIONS = {
"search_documents": ["read"],
"add_document": ["read", "write"],
"delete_document": ["admin"]
}
@server.call_tool()
async def call_tool(name: str, arguments: dict):
user_permissions = get_user_permissions(current_user)
required_permissions = TOOL_PERMISSIONS.get(name, [])
if not any(p in user_permissions for p in required_permissions):
raise McpError(
code=-32603,
message=f"الإذن مرفوض للأداة: {name}"
)
# تنفيذ الأداة
return await execute_tool(name, arguments)
تحديد المعدل
الحماية من سوء الاستخدام:
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.route("/mcp")
@limiter.limit("100/minute")
async def handle_mcp(request):
...
في القسم التالي، سنستكشف التحديثات والإشعارات الفورية. :::