الدرس 15 من 20

الإنتاج والمؤسسات

الهوية والتحكم بالوصول

4 دقيقة للقراءة

عندما يتصرف الوكلاء نيابة عن المستخدمين، تحتاج أنظمة هوية قوية. لمن يعمل هذا الوكيل؟ ما المسموح لهم فعله؟

سلسلة الهوية

مستخدم → جلسة → وكيل → أداة → مورد
   ↓        ↓        ↓       ↓        ↓
 مصادقة   سياق    نطاق   إذن     بيانات

كل حلقة في هذه السلسلة تحتاج تحقق.

الوصول المبني على القدرات

بدلاً من فحص الأذونات عند كل استدعاء أداة، أصدر رموز قدرة:

from dataclasses import dataclass
from datetime import datetime, timedelta
import jwt

@dataclass
class Capability:
    resource: str      # أي مورد (مثل "files:/project/*")
    actions: list[str] # أي إجراءات (مثل ["read", "write"])
    expires_at: datetime
    constraints: dict  # قيود إضافية

class CapabilityManager:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key

    def issue_capability(
        self,
        user_id: str,
        resource: str,
        actions: list[str],
        ttl_minutes: int = 60,
        constraints: dict = None
    ) -> str:
        """إصدار رمز قدرة لوصول محدد."""
        payload = {
            "sub": user_id,
            "resource": resource,
            "actions": actions,
            "constraints": constraints or {},
            "exp": datetime.utcnow() + timedelta(minutes=ttl_minutes),
            "iat": datetime.utcnow()
        }
        return jwt.encode(payload, self.secret_key, algorithm="HS256")

    def verify_capability(self, token: str, resource: str, action: str) -> dict:
        """التحقق من رمز قدرة لإجراء محدد."""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])

            # تحقق من تطابق المورد (يدعم wildcards)
            if not self.resource_matches(payload["resource"], resource):
                return {"valid": False, "error": "عدم تطابق المورد"}

            # تحقق من أن الإجراء مسموح
            if action not in payload["actions"]:
                return {"valid": False, "error": f"الإجراء '{action}' غير مسموح"}

            return {"valid": True, "user_id": payload["sub"], "constraints": payload["constraints"]}

        except jwt.ExpiredSignatureError:
            return {"valid": False, "error": "الرمز منتهي الصلاحية"}
        except jwt.InvalidTokenError as e:
            return {"valid": False, "error": str(e)}

    def resource_matches(self, pattern: str, resource: str) -> bool:
        """تحقق إذا المورد يطابق النمط مع wildcards."""
        import fnmatch
        return fnmatch.fnmatch(resource, pattern)

جلسات الوكيل محدودة النطاق

أنشئ نسخ وكيل بنطاق محدود:

class ScopedAgentSession:
    def __init__(
        self,
        user_id: str,
        capabilities: list[str],
        tool_whitelist: list[str],
        resource_paths: list[str]
    ):
        self.user_id = user_id
        self.capabilities = capabilities
        self.tool_whitelist = tool_whitelist
        self.resource_paths = resource_paths
        self.session_id = str(uuid.uuid4())
        self.created_at = datetime.now()

    def can_use_tool(self, tool_name: str) -> bool:
        """تحقق إذا الأداة في القائمة البيضاء."""
        return tool_name in self.tool_whitelist

    def can_access_resource(self, path: str) -> bool:
        """تحقق إذا المسار ضمن الموارد المسموحة."""
        return any(
            path.startswith(allowed) for allowed in self.resource_paths
        )

    def get_filtered_tools(self, all_tools: list[dict]) -> list[dict]:
        """إرجاع الأدوات التي يمكن لهذه الجلسة استخدامها فقط."""
        return [t for t in all_tools if t["name"] in self.tool_whitelist]


# إنشاء جلسة لدور مستخدم محدد
def create_developer_session(user_id: str) -> ScopedAgentSession:
    return ScopedAgentSession(
        user_id=user_id,
        capabilities=["code:read", "code:write", "shell:execute"],
        tool_whitelist=["read_file", "write_file", "run_command", "git_commit"],
        resource_paths=["/home/projects/", "/tmp/"]
    )

def create_readonly_session(user_id: str) -> ScopedAgentSession:
    return ScopedAgentSession(
        user_id=user_id,
        capabilities=["code:read"],
        tool_whitelist=["read_file", "search_files", "list_directory"],
        resource_paths=["/home/projects/"]
    )

وصول الأدوات المبني على الأدوار

ربط أدوار المنظمة بأذونات الأدوات:

ROLE_PERMISSIONS = {
    "viewer": {
        "tools": ["read_file", "search", "list_directory"],
        "actions": ["read"],
        "resource_patterns": ["*"]
    },
    "developer": {
        "tools": ["read_file", "write_file", "run_tests", "git_*"],
        "actions": ["read", "write", "execute"],
        "resource_patterns": ["/projects/*", "/tmp/*"]
    },
    "admin": {
        "tools": ["*"],
        "actions": ["*"],
        "resource_patterns": ["*"]
    },
    "ci_bot": {
        "tools": ["run_tests", "build", "deploy_staging"],
        "actions": ["read", "execute"],
        "resource_patterns": ["/ci/*", "/builds/*"]
    }
}

class RoleBasedAccess:
    def __init__(self):
        self.user_roles = {}  # user_id -> role

    def assign_role(self, user_id: str, role: str):
        if role not in ROLE_PERMISSIONS:
            raise ValueError(f"دور غير معروف: {role}")
        self.user_roles[user_id] = role

    def get_user_tools(self, user_id: str, all_tools: list[dict]) -> list[dict]:
        """تصفية الأدوات بناءً على دور المستخدم."""
        role = self.user_roles.get(user_id, "viewer")
        permissions = ROLE_PERMISSIONS[role]

        allowed_tools = []
        for tool in all_tools:
            if self.tool_matches(tool["name"], permissions["tools"]):
                allowed_tools.append(tool)

        return allowed_tools

    def tool_matches(self, tool_name: str, patterns: list[str]) -> bool:
        """تحقق إذا الأداة تطابق أي نمط (يدعم wildcards)."""
        import fnmatch
        return any(fnmatch.fnmatch(tool_name, p) for p in patterns)

عزل المستأجرين المتعددين

عند خدمة منظمات متعددة:

class TenantIsolation:
    def __init__(self):
        self.tenant_configs = {}

    def register_tenant(self, tenant_id: str, config: dict):
        """تسجيل مستأجر بإعداداته المحددة."""
        self.tenant_configs[tenant_id] = {
            "allowed_models": config.get("allowed_models", ["claude-sonnet-4-20250514"]),
            "max_tokens_per_request": config.get("max_tokens", 4096),
            "allowed_tools": config.get("tools", []),
            "data_region": config.get("region", "us"),
            "custom_system_prompt": config.get("system_prompt", "")
        }

    def get_tenant_agent_config(self, tenant_id: str) -> dict:
        """الحصول على إعدادات الوكيل لمستأجر محدد."""
        if tenant_id not in self.tenant_configs:
            raise ValueError(f"مستأجر غير معروف: {tenant_id}")

        config = self.tenant_configs[tenant_id]

        return {
            "model": config["allowed_models"][0],
            "max_tokens": config["max_tokens_per_request"],
            "tools": self.filter_tools(config["allowed_tools"]),
            "system_prompt": config["custom_system_prompt"]
        }

    def validate_request(self, tenant_id: str, request: dict) -> tuple[bool, str]:
        """التحقق من طلب ضد سياسات المستأجر."""
        config = self.tenant_configs.get(tenant_id)
        if not config:
            return False, "مستأجر غير معروف"

        if request.get("model") not in config["allowed_models"]:
            return False, f"النموذج غير مسموح للمستأجر"

        if request.get("max_tokens", 0) > config["max_tokens_per_request"]:
            return False, f"تم تجاوز حد الرموز"

        return True, ""

سجل تدقيق للوصول

سجل جميع قرارات الوصول:

class AccessAuditLog:
    async def log_access(
        self,
        user_id: str,
        session_id: str,
        resource: str,
        action: str,
        granted: bool,
        reason: str = None
    ):
        """تسجيل كل قرار وصول للامتثال."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "user_id": user_id,
            "session_id": session_id,
            "resource": resource,
            "action": action,
            "granted": granted,
            "reason": reason
        }
        # الكتابة لسجل تدقيق غير قابل للتغيير
        await self.write_to_audit_store(entry)

ملاحظة نيردية: مبدأ أقل امتياز يعني أن الوكلاء يجب أن يكون لديهم الحد الأدنى من الوصول اللازم لمهمتهم—لا أكثر. عند الشك، ارفض.

التالي: التحكم بالتكاليف على نطاق واسع. :::

اختبار

الوحدة 4: الإنتاج والمؤسسات

خذ الاختبار