أنماط أمن الإنتاج
تصميم الوكلاء الآمن
3 دقيقة للقراءة
وكلاء LLM الذين يمكنهم اتخاذ إجراءات في العالم الحقيقي يتطلبون تصميم أمني دقيق. يغطي هذا الدرس مبدأ أقل الامتيازات وعزل الأدوات وأنماط الإنسان في الحلقة.
تحدي أمان الوكيل
┌─────────────────────────────────────────────────────────────┐
│ مخاطر أمان الوكيل │
│ │
│ وكيل LLM │
│ ↓ │
│ ┌─────────────────────────────────────────┐ │
│ │ الأدوات المتاحة: │ │
│ │ • الوصول لنظام الملفات │ ⚠️ خطر! │
│ │ • استعلامات قاعدة البيانات │ ⚠️ خطر! │
│ │ • استدعاءات API │ ⚠️ خطر! │
│ │ • إرسال البريد الإلكتروني │ ⚠️ خطر! │
│ │ • تنفيذ الكود │ ⚠️ خطر! │
│ └─────────────────────────────────────────┘ │
│ │
│ إذا تم اختراقه عبر حقن المحث: │
│ • تسريب البيانات │
│ • إجراءات غير مصرح بها │
│ • اختراق النظام │
└─────────────────────────────────────────────────────────────┘
مبدأ أقل الامتيازات
from dataclasses import dataclass
from typing import Set, List, Callable, Dict, Any
from enum import Enum
class Permission(Enum):
FILE_READ = "file:read"
FILE_WRITE = "file:write"
DB_READ = "db:read"
DB_WRITE = "db:write"
API_CALL = "api:call"
EMAIL_SEND = "email:send"
CODE_EXECUTE = "code:execute"
@dataclass
class ToolDefinition:
name: str
function: Callable
required_permissions: Set[Permission]
description: str
is_destructive: bool = False
class SecureToolRegistry:
"""سجل مع تحكم وصول قائم على الأذونات."""
def __init__(self):
self.tools: Dict[str, ToolDefinition] = {}
self.permission_cache: Dict[str, Set[Permission]] = {}
def register(self, tool: ToolDefinition):
"""تسجيل أداة مع أذوناتها."""
self.tools[tool.name] = tool
def get_allowed_tools(
self,
granted_permissions: Set[Permission]
) -> List[ToolDefinition]:
"""الحصول على الأدوات المسموح للوكيل استخدامها."""
allowed = []
for tool in self.tools.values():
if tool.required_permissions.issubset(granted_permissions):
allowed.append(tool)
return allowed
def can_execute(
self,
tool_name: str,
granted_permissions: Set[Permission]
) -> bool:
"""تحقق إذا كانت الأداة يمكن تنفيذها بالأذونات المعطاة."""
if tool_name not in self.tools:
return False
tool = self.tools[tool_name]
return tool.required_permissions.issubset(granted_permissions)
# أدوات مثال
def read_file(path: str) -> str:
"""قراءة محتويات الملف."""
from pathlib import Path
return Path(path).read_text()
def write_file(path: str, content: str) -> bool:
"""كتابة محتوى للملف."""
from pathlib import Path
Path(path).write_text(content)
return True
def query_database(query: str) -> List[Dict]:
"""تنفيذ استعلام قاعدة بيانات للقراءة فقط."""
# في الإنتاج: اتصال DB فعلي
return [{"result": "data"}]
# التسجيل مع الأذونات
registry = SecureToolRegistry()
registry.register(ToolDefinition(
name="read_file",
function=read_file,
required_permissions={Permission.FILE_READ},
description="قراءة ملف",
is_destructive=False
))
registry.register(ToolDefinition(
name="write_file",
function=write_file,
required_permissions={Permission.FILE_WRITE},
description="الكتابة في ملف",
is_destructive=True
))
registry.register(ToolDefinition(
name="query_db",
function=query_database,
required_permissions={Permission.DB_READ},
description="استعلام قاعدة البيانات (قراءة فقط)",
is_destructive=False
))
# وكيل بأذونات محدودة
agent_permissions = {Permission.FILE_READ, Permission.DB_READ}
allowed_tools = registry.get_allowed_tools(agent_permissions)
print(f"الوكيل يمكنه استخدام: {[t.name for t in allowed_tools]}")
# المخرج: الوكيل يمكنه استخدام: ['read_file', 'query_db']
عزل الأدوات
from pathlib import Path
from typing import Optional, Any
import os
class SandboxedFileSystem:
"""عمليات الملفات مقيدة لمجلد العزل."""
def __init__(self, sandbox_root: Path):
self.sandbox_root = sandbox_root.resolve()
self.sandbox_root.mkdir(parents=True, exist_ok=True)
def _validate_path(self, path: str) -> Path:
"""التأكد أن المسار داخل العزل."""
# التحويل لمسار مطلق
full_path = (self.sandbox_root / path).resolve()
# تحقق إذا داخل العزل (منع اختراق المسار)
try:
full_path.relative_to(self.sandbox_root)
except ValueError:
raise PermissionError(
f"الوصول مرفوض: المسار خارج العزل"
)
return full_path
def read(self, path: str) -> str:
"""قراءة ملف من العزل."""
safe_path = self._validate_path(path)
return safe_path.read_text()
def write(self, path: str, content: str) -> bool:
"""كتابة ملف للعزل."""
safe_path = self._validate_path(path)
safe_path.parent.mkdir(parents=True, exist_ok=True)
safe_path.write_text(content)
return True
def list_files(self, path: str = ".") -> list:
"""سرد الملفات في مجلد العزل."""
safe_path = self._validate_path(path)
return [str(p.relative_to(self.sandbox_root)) for p in safe_path.iterdir()]
# الاستخدام
sandbox = SandboxedFileSystem(Path("./agent_sandbox"))
# آمن - داخل العزل
sandbox.write("data/output.txt", "مرحباً بالعالم")
content = sandbox.read("data/output.txt")
# محظور - محاولة اختراق المسار
try:
sandbox.read("../../etc/passwd")
except PermissionError as e:
print(f"محظور: {e}")
الإنسان في الحلقة للإجراءات عالية المخاطر
from dataclasses import dataclass
from typing import Callable, Optional, Any
from enum import Enum
import asyncio
class RiskLevel(Enum):
LOW = "low" # موافقة تلقائية
MEDIUM = "medium" # تسجيل لكن موافقة
HIGH = "high" # تتطلب موافقة بشرية
CRITICAL = "critical" # تتطلب موافقة بشرية + تأكيد
@dataclass
class ActionRequest:
action_name: str
parameters: dict
risk_level: RiskLevel
justification: str
@dataclass
class ApprovalResult:
approved: bool
approver: Optional[str] = None
notes: Optional[str] = None
class HumanApprovalGate:
"""بوابة للموافقة البشرية على الإجراءات عالية المخاطر."""
def __init__(self, approval_handler: Callable):
self.approval_handler = approval_handler
self.pending_approvals: Dict[str, ActionRequest] = {}
self.approval_timeout = 300 # 5 دقائق
async def request_approval(
self,
request: ActionRequest
) -> ApprovalResult:
"""طلب موافقة بشرية على الإجراء."""
# موافقة تلقائية للمخاطر المنخفضة
if request.risk_level == RiskLevel.LOW:
return ApprovalResult(approved=True)
# تسجيل المخاطر المتوسطة لكن موافقة
if request.risk_level == RiskLevel.MEDIUM:
self._log_action(request)
return ApprovalResult(approved=True)
# تتطلب موافقة للعالية/الحرجة
return await self._get_human_approval(request)
async def _get_human_approval(
self,
request: ActionRequest
) -> ApprovalResult:
"""الحصول على موافقة بشرية مع مهلة."""
request_id = self._generate_id()
self.pending_approvals[request_id] = request
# إخطار الإنسان (webhook، بريد، Slack، إلخ)
await self.approval_handler(request_id, request)
# انتظار الموافقة مع مهلة
try:
result = await asyncio.wait_for(
self._wait_for_approval(request_id),
timeout=self.approval_timeout
)
return result
except asyncio.TimeoutError:
return ApprovalResult(
approved=False,
notes="انتهت مهلة الموافقة"
)
async def _wait_for_approval(
self,
request_id: str
) -> ApprovalResult:
"""انتظار الإنسان للموافقة/الرفض."""
# في الإنتاج: رد webhook، استقصاء، إلخ
# تنفيذ عنصر نائب
await asyncio.sleep(1)
return ApprovalResult(approved=True, approver="human@example.com")
def _log_action(self, request: ActionRequest):
"""تسجيل الإجراء للتدقيق."""
print(f"[تدقيق] {request.action_name}: {request.parameters}")
def _generate_id(self) -> str:
import uuid
return str(uuid.uuid4())[:8]
# التكامل مع الوكيل
class SecureAgent:
"""وكيل مع إنسان في الحلقة للإجراءات الخطرة."""
RISK_LEVELS = {
"read_file": RiskLevel.LOW,
"write_file": RiskLevel.MEDIUM,
"delete_file": RiskLevel.HIGH,
"send_email": RiskLevel.HIGH,
"execute_code": RiskLevel.CRITICAL,
"make_payment": RiskLevel.CRITICAL,
}
def __init__(self, approval_gate: HumanApprovalGate):
self.approval_gate = approval_gate
async def execute_tool(
self,
tool_name: str,
parameters: dict,
justification: str
) -> Any:
"""تنفيذ الأداة مع الموافقة المناسبة."""
risk_level = self.RISK_LEVELS.get(tool_name, RiskLevel.HIGH)
request = ActionRequest(
action_name=tool_name,
parameters=parameters,
risk_level=risk_level,
justification=justification
)
approval = await self.approval_gate.request_approval(request)
if not approval.approved:
raise PermissionError(
f"الإجراء '{tool_name}' لم تتم الموافقة عليه: {approval.notes}"
)
# تنفيذ الأداة الفعلية
return self._do_execute(tool_name, parameters)
def _do_execute(self, tool_name: str, parameters: dict) -> Any:
"""تنفيذ الأداة فعلياً."""
# منطق تنفيذ الأداة هنا
pass
الوكيل الآمن الكامل
class SecureLLMAgent:
"""وكيل LLM جاهز للإنتاج وآمن."""
def __init__(
self,
llm_client,
permissions: Set[Permission],
sandbox_path: Path
):
self.llm = llm_client
self.permissions = permissions
self.sandbox = SandboxedFileSystem(sandbox_path)
self.registry = self._setup_tools()
self.approval_gate = HumanApprovalGate(self._notify_approver)
def _setup_tools(self) -> SecureToolRegistry:
"""إعداد سجل الأدوات مع دوال معزولة."""
registry = SecureToolRegistry()
# تغليف الأدوات بالعزل
registry.register(ToolDefinition(
name="read_file",
function=self.sandbox.read,
required_permissions={Permission.FILE_READ},
description="قراءة ملف من العزل",
is_destructive=False
))
registry.register(ToolDefinition(
name="write_file",
function=self.sandbox.write,
required_permissions={Permission.FILE_WRITE},
description="كتابة ملف للعزل",
is_destructive=True
))
return registry
async def _notify_approver(self, request_id: str, request: ActionRequest):
"""إرسال طلب موافقة للإنسان."""
print(f"[موافقة مطلوبة] {request_id}: {request.action_name}")
print(f" المعاملات: {request.parameters}")
print(f" التبرير: {request.justification}")
async def run(self, task: str) -> str:
"""تشغيل الوكيل على المهمة مع ضوابط أمان."""
# الحصول على الأدوات المسموحة لهذا الوكيل
allowed_tools = self.registry.get_allowed_tools(self.permissions)
tool_descriptions = [
f"{t.name}: {t.description}"
for t in allowed_tools
]
# توليد الخطة مع LLM
plan = await self.llm.plan(
task=task,
available_tools=tool_descriptions
)
# تنفيذ الخطة مع بوابات الموافقة
results = []
for step in plan.steps:
if not self.registry.can_execute(step.tool, self.permissions):
results.append(f"تخطي {step.tool}: أذونات غير كافية")
continue
try:
result = await self._execute_step(step)
results.append(result)
except PermissionError as e:
results.append(f"محظور: {e}")
return "\n".join(str(r) for r in results)
async def _execute_step(self, step) -> Any:
"""تنفيذ خطوة واحدة مع فحوصات أمان."""
tool = self.registry.tools.get(step.tool)
if tool.is_destructive:
# تتطلب موافقة للإجراءات المدمرة
approval = await self.approval_gate.request_approval(
ActionRequest(
action_name=step.tool,
parameters=step.parameters,
risk_level=RiskLevel.HIGH,
justification=step.justification
)
)
if not approval.approved:
raise PermissionError(f"لم تتم الموافقة على الإجراء")
return tool.function(**step.parameters)
# الاستخدام
agent = SecureLLMAgent(
llm_client=llm,
permissions={Permission.FILE_READ, Permission.FILE_WRITE},
sandbox_path=Path("./agent_workspace")
)
result = await agent.run("لخص الملفات في مجلد البيانات")
النقطة الرئيسية: الوكلاء الآمنون يتطلبون طبقات متعددة: أذونات دنيا، بيئات معزولة، وموافقة بشرية للإجراءات عالية المخاطر. لا تعطِ وكيل LLM وصولاً أكثر مما يحتاجه لمهمته المحددة. :::