الدرس 14 من 20

الإنتاج والمؤسسات

أفضل ممارسات الأمان

4 دقيقة للقراءة

الوكيل مع الوصول لأنظمتك أداة قوية—وسطح هجوم قوي. الأمان ليس اختيارياً؛ إنه وجودي.

نموذج التهديد

التهديد مثال التخفيف
حقن البرومبت المستخدم يخدع الوكيل لتشغيل كود ضار تنقية المدخلات، صندوق رمل
تسريب البيانات الوكيل يرسل بيانات حساسة لخدمة خارجية سياسات الشبكة، تصفية المخرجات
تصعيد الامتيازات الوكيل يكتسب وصولاً خارج نطاقه أقل امتياز، رموز القدرة
إساءة استخدام الموارد الوكيل يشغل حلقات لانهائية، يستهلك كل الرموز حدود المعدل، ميزانيات، مهلات
إساءة استخدام الأداة الوكيل يحذف ملفات خاطئة، يسقط جداول تدفقات التأكيد، أوضاع التشغيل الجاف

تنقية المدخلات

لا تثق أبداً بمدخلات المستخدم—أو المحتوى المولّد من الوكيل:

class InputSanitizer:
    def __init__(self):
        self.dangerous_patterns = [
            r"ignore previous instructions",
            r"system prompt",
            r"<\|.*?\|>",  # رموز خاصة
            r"```.*?exec\(.*?```",  # تنفيذ كود
        ]

    def sanitize(self, text: str) -> tuple[str, list[str]]:
        """إزالة الأنماط الخطرة وإرجاع تحذيرات."""
        warnings = []

        for pattern in self.dangerous_patterns:
            if re.search(pattern, text, re.IGNORECASE | re.DOTALL):
                warnings.append(f"نمط محظور: {pattern}")
                text = re.sub(pattern, "[محظور]", text, flags=re.IGNORECASE | re.DOTALL)

        return text, warnings

    def validate_tool_args(self, tool_name: str, args: dict) -> tuple[bool, str]:
        """التحقق من معاملات الأداة قبل التنفيذ."""
        if tool_name == "run_command":
            cmd = args.get("command", "")
            # حظر الأوامر الخطرة
            blocked = ["rm -rf", "sudo", "chmod 777", "curl | bash", "eval"]
            for b in blocked:
                if b in cmd:
                    return False, f"نمط أمر محظور: {b}"

        if tool_name == "write_file":
            path = args.get("path", "")
            # منع اجتياز المسار
            if ".." in path or path.startswith("/etc") or path.startswith("/root"):
                return False, f"مسار محظور: {path}"

        return True, ""

التنفيذ في صندوق رمل

شغّل الكود في بيئات معزولة:

import docker

class SandboxExecutor:
    def __init__(self, image: str = "python:3.11-slim"):
        self.client = docker.from_env()
        self.image = image
        self.timeout = 30
        self.memory_limit = "256m"
        self.network_disabled = True

    async def execute(self, code: str) -> dict:
        """تشغيل الكود في حاوية معزولة."""
        container = self.client.containers.run(
            self.image,
            command=["python", "-c", code],
            detach=True,
            mem_limit=self.memory_limit,
            network_disabled=self.network_disabled,
            read_only=True,
            security_opt=["no-new-privileges:true"]
        )

        try:
            result = container.wait(timeout=self.timeout)
            logs = container.logs().decode()
            return {
                "success": result["StatusCode"] == 0,
                "output": logs,
                "exit_code": result["StatusCode"]
            }
        except Exception as e:
            return {"success": False, "error": str(e)}
        finally:
            container.remove(force=True)

تصفية المخرجات

منع تسريب البيانات الحساسة:

class OutputFilter:
    def __init__(self):
        self.patterns = {
            "api_key": r"(sk-[a-zA-Z0-9]{48}|api[_-]?key['\"]?\s*[:=]\s*['\"][^'\"]+)",
            "password": r"password['\"]?\s*[:=]\s*['\"][^'\"]+",
            "jwt": r"eyJ[a-zA-Z0-9_-]+\.eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+",
            "private_key": r"-----BEGIN (RSA |EC )?PRIVATE KEY-----",
            "aws_key": r"AKIA[0-9A-Z]{16}",
        }

    def filter_output(self, text: str) -> tuple[str, list[str]]:
        """تنقيح المعلومات الحساسة من مخرج الوكيل."""
        redacted = []

        for name, pattern in self.patterns.items():
            matches = re.findall(pattern, text, re.IGNORECASE)
            if matches:
                redacted.append(f"تم تنقيح {len(matches)} نمط {name}")
                text = re.sub(pattern, f"[منقح_{name.upper()}]", text, flags=re.IGNORECASE)

        return text, redacted

    def validate_urls(self, text: str, allowed_domains: list[str]) -> tuple[str, list[str]]:
        """حظر الطلبات للنطاقات غير المصرح بها."""
        blocked = []
        urls = re.findall(r'https?://([^/\s]+)', text)

        for url in urls:
            domain = url.split(':')[0]  # إزالة المنفذ
            if not any(domain.endswith(d) for d in allowed_domains):
                blocked.append(domain)
                text = text.replace(url, "[نطاق_محظور]")

        return text, blocked

التأكيد للإجراءات المدمرة

طلب موافقة صريحة للعمليات الخطرة:

class ConfirmationGate:
    def __init__(self, dangerous_tools: list[str]):
        self.dangerous_tools = dangerous_tools
        self.pending_confirmations = {}

    async def check_tool_call(self, tool_name: str, args: dict, context: dict) -> dict:
        """اعتراض استدعاءات الأدوات الخطرة للتأكيد."""
        if tool_name not in self.dangerous_tools:
            return {"approved": True}

        # توليد طلب تأكيد
        confirmation_id = str(uuid.uuid4())
        self.pending_confirmations[confirmation_id] = {
            "tool": tool_name,
            "args": args,
            "context": context,
            "created_at": datetime.now()
        }

        return {
            "approved": False,
            "requires_confirmation": True,
            "confirmation_id": confirmation_id,
            "message": f"الإجراء '{tool_name}' يتطلب موافقة. المعاملات: {args}"
        }

    async def confirm(self, confirmation_id: str, approved: bool) -> dict:
        """معالجة قرار التأكيد."""
        if confirmation_id not in self.pending_confirmations:
            return {"error": "معرف تأكيد غير معروف"}

        pending = self.pending_confirmations.pop(confirmation_id)

        if approved:
            return {"approved": True, "tool": pending["tool"], "args": pending["args"]}
        else:
            return {"approved": False, "reason": "المستخدم رفض"}

تسجيل التدقيق

سجل كل شيء للطب الشرعي:

class SecurityAuditLog:
    def __init__(self, log_file: str):
        self.log_file = log_file

    def log_event(self, event_type: str, details: dict):
        """إدخال سجل تدقيق غير قابل للتغيير."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "details": details,
            "checksum": None
        }
        # إضافة checksum للسلامة
        entry["checksum"] = hashlib.sha256(
            json.dumps(entry, sort_keys=True).encode()
        ).hexdigest()

        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

# تسجيل جميع تنفيذات الأدوات
audit.log_event("tool_execution", {
    "tool": "run_command",
    "args": {"command": "ls -la"},
    "user_id": "user_123",
    "agent_session": "session_456",
    "result": "success"
})

ملاحظة نيردية: الدفاع في العمق. افترض أن كل طبقة ستفشل وأضف طبقة أخرى. التحقق من المدخلات + صندوق الرمل + تصفية المخرجات + التأكيد + تسجيل التدقيق.

التالي: إدارة من يمكنه الوصول لماذا. :::

اختبار

الوحدة 4: الإنتاج والمؤسسات

خذ الاختبار