الإنتاج والمؤسسات
الهوية والتحكم بالوصول
4 دقيقة للقراءة
عندما يتصرف الوكلاء نيابة عن المستخدمين، تحتاج أنظمة هوية قوية. لمن يعمل هذا الوكيل؟ ما المسموح لهم فعله؟
سلسلة الهوية
مستخدم → جلسة → وكيل → أداة → مورد
↓ ↓ ↓ ↓ ↓
مصادقة سياق نطاق إذن بيانات
كل حلقة في هذه السلسلة تحتاج تحقق.
الوصول المبني على القدرات
بدلاً من فحص الأذونات عند كل استدعاء أداة، أصدر رموز قدرة:
from dataclasses import dataclass
from datetime import datetime, timedelta
import jwt
@dataclass
class Capability:
resource: str # أي مورد (مثل "files:/project/*")
actions: list[str] # أي إجراءات (مثل ["read", "write"])
expires_at: datetime
constraints: dict # قيود إضافية
class CapabilityManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
def issue_capability(
self,
user_id: str,
resource: str,
actions: list[str],
ttl_minutes: int = 60,
constraints: dict = None
) -> str:
"""إصدار رمز قدرة لوصول محدد."""
payload = {
"sub": user_id,
"resource": resource,
"actions": actions,
"constraints": constraints or {},
"exp": datetime.utcnow() + timedelta(minutes=ttl_minutes),
"iat": datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm="HS256")
def verify_capability(self, token: str, resource: str, action: str) -> dict:
"""التحقق من رمز قدرة لإجراء محدد."""
try:
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
# تحقق من تطابق المورد (يدعم wildcards)
if not self.resource_matches(payload["resource"], resource):
return {"valid": False, "error": "عدم تطابق المورد"}
# تحقق من أن الإجراء مسموح
if action not in payload["actions"]:
return {"valid": False, "error": f"الإجراء '{action}' غير مسموح"}
return {"valid": True, "user_id": payload["sub"], "constraints": payload["constraints"]}
except jwt.ExpiredSignatureError:
return {"valid": False, "error": "الرمز منتهي الصلاحية"}
except jwt.InvalidTokenError as e:
return {"valid": False, "error": str(e)}
def resource_matches(self, pattern: str, resource: str) -> bool:
"""تحقق إذا المورد يطابق النمط مع wildcards."""
import fnmatch
return fnmatch.fnmatch(resource, pattern)
جلسات الوكيل محدودة النطاق
أنشئ نسخ وكيل بنطاق محدود:
class ScopedAgentSession:
def __init__(
self,
user_id: str,
capabilities: list[str],
tool_whitelist: list[str],
resource_paths: list[str]
):
self.user_id = user_id
self.capabilities = capabilities
self.tool_whitelist = tool_whitelist
self.resource_paths = resource_paths
self.session_id = str(uuid.uuid4())
self.created_at = datetime.now()
def can_use_tool(self, tool_name: str) -> bool:
"""تحقق إذا الأداة في القائمة البيضاء."""
return tool_name in self.tool_whitelist
def can_access_resource(self, path: str) -> bool:
"""تحقق إذا المسار ضمن الموارد المسموحة."""
return any(
path.startswith(allowed) for allowed in self.resource_paths
)
def get_filtered_tools(self, all_tools: list[dict]) -> list[dict]:
"""إرجاع الأدوات التي يمكن لهذه الجلسة استخدامها فقط."""
return [t for t in all_tools if t["name"] in self.tool_whitelist]
# إنشاء جلسة لدور مستخدم محدد
def create_developer_session(user_id: str) -> ScopedAgentSession:
return ScopedAgentSession(
user_id=user_id,
capabilities=["code:read", "code:write", "shell:execute"],
tool_whitelist=["read_file", "write_file", "run_command", "git_commit"],
resource_paths=["/home/projects/", "/tmp/"]
)
def create_readonly_session(user_id: str) -> ScopedAgentSession:
return ScopedAgentSession(
user_id=user_id,
capabilities=["code:read"],
tool_whitelist=["read_file", "search_files", "list_directory"],
resource_paths=["/home/projects/"]
)
وصول الأدوات المبني على الأدوار
ربط أدوار المنظمة بأذونات الأدوات:
ROLE_PERMISSIONS = {
"viewer": {
"tools": ["read_file", "search", "list_directory"],
"actions": ["read"],
"resource_patterns": ["*"]
},
"developer": {
"tools": ["read_file", "write_file", "run_tests", "git_*"],
"actions": ["read", "write", "execute"],
"resource_patterns": ["/projects/*", "/tmp/*"]
},
"admin": {
"tools": ["*"],
"actions": ["*"],
"resource_patterns": ["*"]
},
"ci_bot": {
"tools": ["run_tests", "build", "deploy_staging"],
"actions": ["read", "execute"],
"resource_patterns": ["/ci/*", "/builds/*"]
}
}
class RoleBasedAccess:
def __init__(self):
self.user_roles = {} # user_id -> role
def assign_role(self, user_id: str, role: str):
if role not in ROLE_PERMISSIONS:
raise ValueError(f"دور غير معروف: {role}")
self.user_roles[user_id] = role
def get_user_tools(self, user_id: str, all_tools: list[dict]) -> list[dict]:
"""تصفية الأدوات بناءً على دور المستخدم."""
role = self.user_roles.get(user_id, "viewer")
permissions = ROLE_PERMISSIONS[role]
allowed_tools = []
for tool in all_tools:
if self.tool_matches(tool["name"], permissions["tools"]):
allowed_tools.append(tool)
return allowed_tools
def tool_matches(self, tool_name: str, patterns: list[str]) -> bool:
"""تحقق إذا الأداة تطابق أي نمط (يدعم wildcards)."""
import fnmatch
return any(fnmatch.fnmatch(tool_name, p) for p in patterns)
عزل المستأجرين المتعددين
عند خدمة منظمات متعددة:
class TenantIsolation:
def __init__(self):
self.tenant_configs = {}
def register_tenant(self, tenant_id: str, config: dict):
"""تسجيل مستأجر بإعداداته المحددة."""
self.tenant_configs[tenant_id] = {
"allowed_models": config.get("allowed_models", ["claude-sonnet-4-20250514"]),
"max_tokens_per_request": config.get("max_tokens", 4096),
"allowed_tools": config.get("tools", []),
"data_region": config.get("region", "us"),
"custom_system_prompt": config.get("system_prompt", "")
}
def get_tenant_agent_config(self, tenant_id: str) -> dict:
"""الحصول على إعدادات الوكيل لمستأجر محدد."""
if tenant_id not in self.tenant_configs:
raise ValueError(f"مستأجر غير معروف: {tenant_id}")
config = self.tenant_configs[tenant_id]
return {
"model": config["allowed_models"][0],
"max_tokens": config["max_tokens_per_request"],
"tools": self.filter_tools(config["allowed_tools"]),
"system_prompt": config["custom_system_prompt"]
}
def validate_request(self, tenant_id: str, request: dict) -> tuple[bool, str]:
"""التحقق من طلب ضد سياسات المستأجر."""
config = self.tenant_configs.get(tenant_id)
if not config:
return False, "مستأجر غير معروف"
if request.get("model") not in config["allowed_models"]:
return False, f"النموذج غير مسموح للمستأجر"
if request.get("max_tokens", 0) > config["max_tokens_per_request"]:
return False, f"تم تجاوز حد الرموز"
return True, ""
سجل تدقيق للوصول
سجل جميع قرارات الوصول:
class AccessAuditLog:
async def log_access(
self,
user_id: str,
session_id: str,
resource: str,
action: str,
granted: bool,
reason: str = None
):
"""تسجيل كل قرار وصول للامتثال."""
entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"session_id": session_id,
"resource": resource,
"action": action,
"granted": granted,
"reason": reason
}
# الكتابة لسجل تدقيق غير قابل للتغيير
await self.write_to_audit_store(entry)
ملاحظة نيردية: مبدأ أقل امتياز يعني أن الوكلاء يجب أن يكون لديهم الحد الأدنى من الوصول اللازم لمهمتهم—لا أكثر. عند الشك، ارفض.
التالي: التحكم بالتكاليف على نطاق واسع. :::