أنماط أمن الإنتاج

تصميم الوكلاء الآمن

3 دقيقة للقراءة

وكلاء LLM الذين يمكنهم اتخاذ إجراءات في العالم الحقيقي يتطلبون تصميم أمني دقيق. يغطي هذا الدرس مبدأ أقل الامتيازات وعزل الأدوات وأنماط الإنسان في الحلقة.

تحدي أمان الوكيل

┌─────────────────────────────────────────────────────────────┐
│                    مخاطر أمان الوكيل                         │
│                                                             │
│   وكيل LLM                                                  │
│       ↓                                                     │
│   ┌─────────────────────────────────────────┐               │
│   │  الأدوات المتاحة:                       │               │
│   │  • الوصول لنظام الملفات                │  ⚠️ خطر!      │
│   │  • استعلامات قاعدة البيانات            │  ⚠️ خطر!      │
│   │  • استدعاءات API                       │  ⚠️ خطر!      │
│   │  • إرسال البريد الإلكتروني             │  ⚠️ خطر!      │
│   │  • تنفيذ الكود                         │  ⚠️ خطر!      │
│   └─────────────────────────────────────────┘               │
│                                                             │
│   إذا تم اختراقه عبر حقن المحث:                            │
│   • تسريب البيانات                                          │
│   • إجراءات غير مصرح بها                                   │
│   • اختراق النظام                                          │
└─────────────────────────────────────────────────────────────┘

مبدأ أقل الامتيازات

from dataclasses import dataclass
from typing import Set, List, Callable, Dict, Any
from enum import Enum

class Permission(Enum):
    FILE_READ = "file:read"
    FILE_WRITE = "file:write"
    DB_READ = "db:read"
    DB_WRITE = "db:write"
    API_CALL = "api:call"
    EMAIL_SEND = "email:send"
    CODE_EXECUTE = "code:execute"

@dataclass
class ToolDefinition:
    name: str
    function: Callable
    required_permissions: Set[Permission]
    description: str
    is_destructive: bool = False

class SecureToolRegistry:
    """سجل مع تحكم وصول قائم على الأذونات."""

    def __init__(self):
        self.tools: Dict[str, ToolDefinition] = {}
        self.permission_cache: Dict[str, Set[Permission]] = {}

    def register(self, tool: ToolDefinition):
        """تسجيل أداة مع أذوناتها."""
        self.tools[tool.name] = tool

    def get_allowed_tools(
        self,
        granted_permissions: Set[Permission]
    ) -> List[ToolDefinition]:
        """الحصول على الأدوات المسموح للوكيل استخدامها."""
        allowed = []
        for tool in self.tools.values():
            if tool.required_permissions.issubset(granted_permissions):
                allowed.append(tool)
        return allowed

    def can_execute(
        self,
        tool_name: str,
        granted_permissions: Set[Permission]
    ) -> bool:
        """تحقق إذا كانت الأداة يمكن تنفيذها بالأذونات المعطاة."""
        if tool_name not in self.tools:
            return False

        tool = self.tools[tool_name]
        return tool.required_permissions.issubset(granted_permissions)

# أدوات مثال
def read_file(path: str) -> str:
    """قراءة محتويات الملف."""
    from pathlib import Path
    return Path(path).read_text()

def write_file(path: str, content: str) -> bool:
    """كتابة محتوى للملف."""
    from pathlib import Path
    Path(path).write_text(content)
    return True

def query_database(query: str) -> List[Dict]:
    """تنفيذ استعلام قاعدة بيانات للقراءة فقط."""
    # في الإنتاج: اتصال DB فعلي
    return [{"result": "data"}]

# التسجيل مع الأذونات
registry = SecureToolRegistry()

registry.register(ToolDefinition(
    name="read_file",
    function=read_file,
    required_permissions={Permission.FILE_READ},
    description="قراءة ملف",
    is_destructive=False
))

registry.register(ToolDefinition(
    name="write_file",
    function=write_file,
    required_permissions={Permission.FILE_WRITE},
    description="الكتابة في ملف",
    is_destructive=True
))

registry.register(ToolDefinition(
    name="query_db",
    function=query_database,
    required_permissions={Permission.DB_READ},
    description="استعلام قاعدة البيانات (قراءة فقط)",
    is_destructive=False
))

# وكيل بأذونات محدودة
agent_permissions = {Permission.FILE_READ, Permission.DB_READ}
allowed_tools = registry.get_allowed_tools(agent_permissions)
print(f"الوكيل يمكنه استخدام: {[t.name for t in allowed_tools]}")
# المخرج: الوكيل يمكنه استخدام: ['read_file', 'query_db']

عزل الأدوات

from pathlib import Path
from typing import Optional, Any
import os

class SandboxedFileSystem:
    """عمليات الملفات مقيدة لمجلد العزل."""

    def __init__(self, sandbox_root: Path):
        self.sandbox_root = sandbox_root.resolve()
        self.sandbox_root.mkdir(parents=True, exist_ok=True)

    def _validate_path(self, path: str) -> Path:
        """التأكد أن المسار داخل العزل."""
        # التحويل لمسار مطلق
        full_path = (self.sandbox_root / path).resolve()

        # تحقق إذا داخل العزل (منع اختراق المسار)
        try:
            full_path.relative_to(self.sandbox_root)
        except ValueError:
            raise PermissionError(
                f"الوصول مرفوض: المسار خارج العزل"
            )

        return full_path

    def read(self, path: str) -> str:
        """قراءة ملف من العزل."""
        safe_path = self._validate_path(path)
        return safe_path.read_text()

    def write(self, path: str, content: str) -> bool:
        """كتابة ملف للعزل."""
        safe_path = self._validate_path(path)
        safe_path.parent.mkdir(parents=True, exist_ok=True)
        safe_path.write_text(content)
        return True

    def list_files(self, path: str = ".") -> list:
        """سرد الملفات في مجلد العزل."""
        safe_path = self._validate_path(path)
        return [str(p.relative_to(self.sandbox_root)) for p in safe_path.iterdir()]

# الاستخدام
sandbox = SandboxedFileSystem(Path("./agent_sandbox"))

# آمن - داخل العزل
sandbox.write("data/output.txt", "مرحباً بالعالم")
content = sandbox.read("data/output.txt")

# محظور - محاولة اختراق المسار
try:
    sandbox.read("../../etc/passwd")
except PermissionError as e:
    print(f"محظور: {e}")

الإنسان في الحلقة للإجراءات عالية المخاطر

from dataclasses import dataclass
from typing import Callable, Optional, Any
from enum import Enum
import asyncio

class RiskLevel(Enum):
    LOW = "low"  # موافقة تلقائية
    MEDIUM = "medium"  # تسجيل لكن موافقة
    HIGH = "high"  # تتطلب موافقة بشرية
    CRITICAL = "critical"  # تتطلب موافقة بشرية + تأكيد

@dataclass
class ActionRequest:
    action_name: str
    parameters: dict
    risk_level: RiskLevel
    justification: str

@dataclass
class ApprovalResult:
    approved: bool
    approver: Optional[str] = None
    notes: Optional[str] = None

class HumanApprovalGate:
    """بوابة للموافقة البشرية على الإجراءات عالية المخاطر."""

    def __init__(self, approval_handler: Callable):
        self.approval_handler = approval_handler
        self.pending_approvals: Dict[str, ActionRequest] = {}
        self.approval_timeout = 300  # 5 دقائق

    async def request_approval(
        self,
        request: ActionRequest
    ) -> ApprovalResult:
        """طلب موافقة بشرية على الإجراء."""
        # موافقة تلقائية للمخاطر المنخفضة
        if request.risk_level == RiskLevel.LOW:
            return ApprovalResult(approved=True)

        # تسجيل المخاطر المتوسطة لكن موافقة
        if request.risk_level == RiskLevel.MEDIUM:
            self._log_action(request)
            return ApprovalResult(approved=True)

        # تتطلب موافقة للعالية/الحرجة
        return await self._get_human_approval(request)

    async def _get_human_approval(
        self,
        request: ActionRequest
    ) -> ApprovalResult:
        """الحصول على موافقة بشرية مع مهلة."""
        request_id = self._generate_id()
        self.pending_approvals[request_id] = request

        # إخطار الإنسان (webhook، بريد، Slack، إلخ)
        await self.approval_handler(request_id, request)

        # انتظار الموافقة مع مهلة
        try:
            result = await asyncio.wait_for(
                self._wait_for_approval(request_id),
                timeout=self.approval_timeout
            )
            return result
        except asyncio.TimeoutError:
            return ApprovalResult(
                approved=False,
                notes="انتهت مهلة الموافقة"
            )

    async def _wait_for_approval(
        self,
        request_id: str
    ) -> ApprovalResult:
        """انتظار الإنسان للموافقة/الرفض."""
        # في الإنتاج: رد webhook، استقصاء، إلخ
        # تنفيذ عنصر نائب
        await asyncio.sleep(1)
        return ApprovalResult(approved=True, approver="human@example.com")

    def _log_action(self, request: ActionRequest):
        """تسجيل الإجراء للتدقيق."""
        print(f"[تدقيق] {request.action_name}: {request.parameters}")

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())[:8]

# التكامل مع الوكيل
class SecureAgent:
    """وكيل مع إنسان في الحلقة للإجراءات الخطرة."""

    RISK_LEVELS = {
        "read_file": RiskLevel.LOW,
        "write_file": RiskLevel.MEDIUM,
        "delete_file": RiskLevel.HIGH,
        "send_email": RiskLevel.HIGH,
        "execute_code": RiskLevel.CRITICAL,
        "make_payment": RiskLevel.CRITICAL,
    }

    def __init__(self, approval_gate: HumanApprovalGate):
        self.approval_gate = approval_gate

    async def execute_tool(
        self,
        tool_name: str,
        parameters: dict,
        justification: str
    ) -> Any:
        """تنفيذ الأداة مع الموافقة المناسبة."""
        risk_level = self.RISK_LEVELS.get(tool_name, RiskLevel.HIGH)

        request = ActionRequest(
            action_name=tool_name,
            parameters=parameters,
            risk_level=risk_level,
            justification=justification
        )

        approval = await self.approval_gate.request_approval(request)

        if not approval.approved:
            raise PermissionError(
                f"الإجراء '{tool_name}' لم تتم الموافقة عليه: {approval.notes}"
            )

        # تنفيذ الأداة الفعلية
        return self._do_execute(tool_name, parameters)

    def _do_execute(self, tool_name: str, parameters: dict) -> Any:
        """تنفيذ الأداة فعلياً."""
        # منطق تنفيذ الأداة هنا
        pass

الوكيل الآمن الكامل

class SecureLLMAgent:
    """وكيل LLM جاهز للإنتاج وآمن."""

    def __init__(
        self,
        llm_client,
        permissions: Set[Permission],
        sandbox_path: Path
    ):
        self.llm = llm_client
        self.permissions = permissions
        self.sandbox = SandboxedFileSystem(sandbox_path)
        self.registry = self._setup_tools()
        self.approval_gate = HumanApprovalGate(self._notify_approver)

    def _setup_tools(self) -> SecureToolRegistry:
        """إعداد سجل الأدوات مع دوال معزولة."""
        registry = SecureToolRegistry()

        # تغليف الأدوات بالعزل
        registry.register(ToolDefinition(
            name="read_file",
            function=self.sandbox.read,
            required_permissions={Permission.FILE_READ},
            description="قراءة ملف من العزل",
            is_destructive=False
        ))

        registry.register(ToolDefinition(
            name="write_file",
            function=self.sandbox.write,
            required_permissions={Permission.FILE_WRITE},
            description="كتابة ملف للعزل",
            is_destructive=True
        ))

        return registry

    async def _notify_approver(self, request_id: str, request: ActionRequest):
        """إرسال طلب موافقة للإنسان."""
        print(f"[موافقة مطلوبة] {request_id}: {request.action_name}")
        print(f"  المعاملات: {request.parameters}")
        print(f"  التبرير: {request.justification}")

    async def run(self, task: str) -> str:
        """تشغيل الوكيل على المهمة مع ضوابط أمان."""
        # الحصول على الأدوات المسموحة لهذا الوكيل
        allowed_tools = self.registry.get_allowed_tools(self.permissions)
        tool_descriptions = [
            f"{t.name}: {t.description}"
            for t in allowed_tools
        ]

        # توليد الخطة مع LLM
        plan = await self.llm.plan(
            task=task,
            available_tools=tool_descriptions
        )

        # تنفيذ الخطة مع بوابات الموافقة
        results = []
        for step in plan.steps:
            if not self.registry.can_execute(step.tool, self.permissions):
                results.append(f"تخطي {step.tool}: أذونات غير كافية")
                continue

            try:
                result = await self._execute_step(step)
                results.append(result)
            except PermissionError as e:
                results.append(f"محظور: {e}")

        return "\n".join(str(r) for r in results)

    async def _execute_step(self, step) -> Any:
        """تنفيذ خطوة واحدة مع فحوصات أمان."""
        tool = self.registry.tools.get(step.tool)

        if tool.is_destructive:
            # تتطلب موافقة للإجراءات المدمرة
            approval = await self.approval_gate.request_approval(
                ActionRequest(
                    action_name=step.tool,
                    parameters=step.parameters,
                    risk_level=RiskLevel.HIGH,
                    justification=step.justification
                )
            )

            if not approval.approved:
                raise PermissionError(f"لم تتم الموافقة على الإجراء")

        return tool.function(**step.parameters)

# الاستخدام
agent = SecureLLMAgent(
    llm_client=llm,
    permissions={Permission.FILE_READ, Permission.FILE_WRITE},
    sandbox_path=Path("./agent_workspace")
)

result = await agent.run("لخص الملفات في مجلد البيانات")

النقطة الرئيسية: الوكلاء الآمنون يتطلبون طبقات متعددة: أذونات دنيا، بيئات معزولة، وموافقة بشرية للإجراءات عالية المخاطر. لا تعطِ وكيل LLM وصولاً أكثر مما يحتاجه لمهمته المحددة. :::

اختبار

الوحدة 5: أنماط أمن الإنتاج

خذ الاختبار