Advanced MCP Patterns

Authentication and Authorization

5 min read

Protecting your MCP server is crucial, especially for remote deployments. Let's implement proper auth.

Authentication Strategies

Strategy Best For
API Keys Simple, single-user
JWT Multi-user, stateless
OAuth 2.0 Third-party integration
mTLS High-security environments

API Key Authentication

Simple but effective for personal servers:

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse

class APIKeyMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, api_keys: set):
        super().__init__(app)
        self.api_keys = api_keys

    async def dispatch(self, request, call_next):
        api_key = request.headers.get("X-API-Key")

        if api_key not in self.api_keys:
            return JSONResponse(
                {"error": "Invalid API key"},
                status_code=401
            )

        return await call_next(request)

# Apply middleware
app.add_middleware(APIKeyMiddleware, api_keys={"sk_live_abc123"})

JWT Authentication

For multi-user systems:

import jwt
from datetime import datetime, timedelta

SECRET_KEY = "your-secret-key"

def create_token(user_id: str) -> str:
    payload = {
        "sub": user_id,
        "exp": datetime.utcnow() + timedelta(hours=24)
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

def verify_token(token: str) -> dict:
    try:
        return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    except jwt.ExpiredSignatureError:
        raise ValueError("Token expired")
    except jwt.InvalidTokenError:
        raise ValueError("Invalid token")

class JWTMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        auth_header = request.headers.get("Authorization", "")

        if not auth_header.startswith("Bearer "):
            return JSONResponse({"error": "Missing token"}, status_code=401)

        token = auth_header[7:]  # Remove "Bearer "

        try:
            payload = verify_token(token)
            request.state.user_id = payload["sub"]
        except ValueError as e:
            return JSONResponse({"error": str(e)}, status_code=401)

        return await call_next(request)

Tool-Level Authorization

Restrict tools based on user permissions:

TOOL_PERMISSIONS = {
    "search_documents": ["read"],
    "add_document": ["read", "write"],
    "delete_document": ["admin"]
}

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    user_permissions = get_user_permissions(current_user)
    required_permissions = TOOL_PERMISSIONS.get(name, [])

    if not any(p in user_permissions for p in required_permissions):
        raise McpError(
            code=-32603,
            message=f"Permission denied for tool: {name}"
        )

    # Execute tool
    return await execute_tool(name, arguments)

Rate Limiting

Protect against abuse:

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.route("/mcp")
@limiter.limit("100/minute")
async def handle_mcp(request):
    ...

Next, we'll explore real-time updates and notifications. :::

Quiz

Module 4 Quiz: Advanced MCP Patterns

Take Quiz