أنماط MCP المتقدمة

المصادقة والتفويض

5 دقيقة للقراءة

حماية خادم MCP الخاص بك أمر حاسم، خاصة للنشر البعيد. دعنا ننفذ المصادقة المناسبة.

استراتيجيات المصادقة

الاستراتيجية الأفضل لـ
مفاتيح API بسيط، مستخدم واحد
JWT متعدد المستخدمين، بدون حالة
OAuth 2.0 تكامل طرف ثالث
mTLS بيئات عالية الأمان

مصادقة مفتاح API

بسيطة لكن فعالة للخوادم الشخصية:

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse

class APIKeyMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, api_keys: set):
        super().__init__(app)
        self.api_keys = api_keys

    async def dispatch(self, request, call_next):
        api_key = request.headers.get("X-API-Key")

        if api_key not in self.api_keys:
            return JSONResponse(
                {"error": "مفتاح API غير صالح"},
                status_code=401
            )

        return await call_next(request)

# تطبيق الوسيط
app.add_middleware(APIKeyMiddleware, api_keys={"sk_live_abc123"})

مصادقة JWT

للأنظمة متعددة المستخدمين:

import jwt
from datetime import datetime, timedelta

SECRET_KEY = "your-secret-key"

def create_token(user_id: str) -> str:
    payload = {
        "sub": user_id,
        "exp": datetime.utcnow() + timedelta(hours=24)
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

def verify_token(token: str) -> dict:
    try:
        return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    except jwt.ExpiredSignatureError:
        raise ValueError("انتهت صلاحية الرمز")
    except jwt.InvalidTokenError:
        raise ValueError("رمز غير صالح")

class JWTMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        auth_header = request.headers.get("Authorization", "")

        if not auth_header.startswith("Bearer "):
            return JSONResponse({"error": "الرمز مفقود"}, status_code=401)

        token = auth_header[7:]  # إزالة "Bearer "

        try:
            payload = verify_token(token)
            request.state.user_id = payload["sub"]
        except ValueError as e:
            return JSONResponse({"error": str(e)}, status_code=401)

        return await call_next(request)

التفويض على مستوى الأداة

تقييد الأدوات بناءً على صلاحيات المستخدم:

TOOL_PERMISSIONS = {
    "search_documents": ["read"],
    "add_document": ["read", "write"],
    "delete_document": ["admin"]
}

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    user_permissions = get_user_permissions(current_user)
    required_permissions = TOOL_PERMISSIONS.get(name, [])

    if not any(p in user_permissions for p in required_permissions):
        raise McpError(
            code=-32603,
            message=f"الإذن مرفوض للأداة: {name}"
        )

    # تنفيذ الأداة
    return await execute_tool(name, arguments)

تحديد المعدل

الحماية من سوء الاستخدام:

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.route("/mcp")
@limiter.limit("100/minute")
async def handle_mcp(request):
    ...

في القسم التالي، سنستكشف التحديثات والإشعارات الفورية. :::

اختبار

اختبار الوحدة 4: أنماط MCP المتقدمة

خذ الاختبار