أنماط أمن الإنتاج

بنية الدفاع العميق

3 دقيقة للقراءة

الدفاع العميق هو استراتيجية أمنية تُطبق طبقات متعددة من الضوابط بحيث لا يمكن لأي نقطة فشل واحدة اختراق النظام بأكمله. يوضح هذا الدرس كيفية تطبيق هذا المبدأ على تطبيقات LLM.

نموذج الأمان متعدد الطبقات

┌─────────────────────────────────────────────────────────────┐
│                    طبقات الدفاع العميق                       │
│                                                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │  الطبقة 1: الشبكة والبنية التحتية                    │   │
│   │  WAF، حماية DDoS، TLS، بوابة API                    │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │  الطبقة 2: المصادقة والتفويض                        │   │
│   │  مفاتيح API، OAuth، RBAC، إدارة الجلسات            │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │  الطبقة 3: التحقق من المدخلات                       │   │
│   │  حدود الطول، كشف الأنماط، مرشحات المحتوى           │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │  الطبقة 4: حواجز LLM                                │   │
│   │  NeMo Guardrails، LLaMA Guard، قواعد الموضوع       │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │  الطبقة 5: تطهير المخرجات                           │   │
│   │  منع XSS، تنقيح PII، إشراف المحتوى                 │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │  الطبقة 6: المراقبة والاستجابة                       │   │
│   │  التسجيل، التنبيه، الاستجابة للحوادث                │   │
│   └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

تنفيذ خط أنابيب الأمان

from dataclasses import dataclass
from typing import Optional, List, Callable
from enum import Enum
import time

class SecurityDecision(Enum):
    ALLOW = "allow"
    BLOCK = "block"
    RATE_LIMIT = "rate_limit"
    REQUIRE_AUTH = "require_auth"

@dataclass
class SecurityContext:
    user_id: Optional[str]
    ip_address: str
    request_id: str
    timestamp: float
    is_authenticated: bool = False
    roles: List[str] = None

@dataclass
class SecurityCheckResult:
    passed: bool
    layer: str
    decision: SecurityDecision
    reason: Optional[str] = None

class SecurityPipeline:
    """خط أنابيب أمان الدفاع العميق."""

    def __init__(self):
        self.layers: List[Callable] = []
        self.metrics = {"checks": 0, "blocked": 0}

    def add_layer(self, check_func: Callable, name: str):
        """إضافة طبقة أمان لخط الأنابيب."""
        self.layers.append((name, check_func))

    def check(
        self,
        content: str,
        context: SecurityContext
    ) -> List[SecurityCheckResult]:
        """تمرير المحتوى عبر جميع طبقات الأمان."""
        results = []
        self.metrics["checks"] += 1

        for layer_name, check_func in self.layers:
            result = check_func(content, context)
            result.layer = layer_name
            results.append(result)

            # توقف عند أول حظر
            if not result.passed:
                self.metrics["blocked"] += 1
                break

        return results

    def is_allowed(self, results: List[SecurityCheckResult]) -> bool:
        """تحقق إذا مرت جميع الطبقات."""
        return all(r.passed for r in results)

تنفيذات الطبقات

الطبقة 1: تحديد المعدل

from collections import defaultdict
import time

class RateLimiter:
    """محدد معدل بخوارزمية دلو الرموز."""

    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.buckets = defaultdict(list)

    def check(
        self,
        content: str,
        context: SecurityContext
    ) -> SecurityCheckResult:
        """تحقق من حد المعدل للمستخدم."""
        key = context.user_id or context.ip_address
        now = time.time()
        minute_ago = now - 60

        # تنظيف الطلبات القديمة
        self.buckets[key] = [
            t for t in self.buckets[key] if t > minute_ago
        ]

        if len(self.buckets[key]) >= self.requests_per_minute:
            return SecurityCheckResult(
                passed=False,
                layer="rate_limit",
                decision=SecurityDecision.RATE_LIMIT,
                reason=f"تم تجاوز حد المعدل: {self.requests_per_minute}/دقيقة"
            )

        self.buckets[key].append(now)
        return SecurityCheckResult(
            passed=True,
            layer="rate_limit",
            decision=SecurityDecision.ALLOW
        )

الطبقة 2: فحص المصادقة

class AuthenticationLayer:
    """فحص المصادقة والتفويض."""

    def __init__(self, required_roles: List[str] = None):
        self.required_roles = required_roles or []

    def check(
        self,
        content: str,
        context: SecurityContext
    ) -> SecurityCheckResult:
        """التحقق من المصادقة."""
        if not context.is_authenticated:
            return SecurityCheckResult(
                passed=False,
                layer="authentication",
                decision=SecurityDecision.REQUIRE_AUTH,
                reason="المصادقة مطلوبة"
            )

        # تحقق من الأدوار المطلوبة
        if self.required_roles:
            user_roles = set(context.roles or [])
            required = set(self.required_roles)

            if not required.intersection(user_roles):
                return SecurityCheckResult(
                    passed=False,
                    layer="authorization",
                    decision=SecurityDecision.BLOCK,
                    reason=f"دور مطلوب مفقود: {self.required_roles}"
                )

        return SecurityCheckResult(
            passed=True,
            layer="authentication",
            decision=SecurityDecision.ALLOW
        )

الطبقة 3: التحقق من المدخلات

import re

class InputValidationLayer:
    """التحقق من محتوى المدخلات."""

    def __init__(self, max_length: int = 4000):
        self.max_length = max_length
        self.blocked_patterns = [
            r"ignore\s+previous\s+instructions",
            r"system\s*prompt",
            r"<script>",
        ]

    def check(
        self,
        content: str,
        context: SecurityContext
    ) -> SecurityCheckResult:
        """التحقق من المدخل."""
        # فحص الطول
        if len(content) > self.max_length:
            return SecurityCheckResult(
                passed=False,
                layer="input_validation",
                decision=SecurityDecision.BLOCK,
                reason=f"المدخل يتجاوز {self.max_length} حرف"
            )

        # فحص الأنماط
        content_lower = content.lower()
        for pattern in self.blocked_patterns:
            if re.search(pattern, content_lower):
                return SecurityCheckResult(
                    passed=False,
                    layer="input_validation",
                    decision=SecurityDecision.BLOCK,
                    reason="تم كشف نمط محظور"
                )

        return SecurityCheckResult(
            passed=True,
            layer="input_validation",
            decision=SecurityDecision.ALLOW
        )

مثال خط الأنابيب الكامل

class SecureLLMApplication:
    """تطبيق LLM مع دفاع عميق."""

    def __init__(self, llm_client):
        self.llm = llm_client
        self.pipeline = self._build_pipeline()

    def _build_pipeline(self) -> SecurityPipeline:
        """بناء خط أنابيب الأمان."""
        pipeline = SecurityPipeline()

        # إضافة الطبقات بالترتيب
        pipeline.add_layer(
            RateLimiter(requests_per_minute=30).check,
            "rate_limiting"
        )
        pipeline.add_layer(
            AuthenticationLayer().check,
            "authentication"
        )
        pipeline.add_layer(
            InputValidationLayer(max_length=4000).check,
            "input_validation"
        )

        return pipeline

    async def process_request(
        self,
        user_input: str,
        context: SecurityContext
    ) -> dict:
        """معالجة الطلب عبر خط أنابيب الأمان."""
        # تشغيل فحوصات الأمان
        results = self.pipeline.check(user_input, context)

        if not self.pipeline.is_allowed(results):
            failed = next(r for r in results if not r.passed)
            return {
                "success": False,
                "error": failed.reason,
                "decision": failed.decision.value
            }

        # توليد استجابة LLM
        response = await self.llm.generate(user_input)

        # معالجة لاحقة للمخرج (طبقة إضافية)
        sanitized = self._sanitize_output(response)

        return {
            "success": True,
            "response": sanitized,
            "security_checks": len(results)
        }

    def _sanitize_output(self, response: str) -> str:
        """تطهير مخرج LLM."""
        import html
        return html.escape(response)

# الاستخدام
app = SecureLLMApplication(llm_client)

context = SecurityContext(
    user_id="user123",
    ip_address="192.168.1.1",
    request_id="req-abc",
    timestamp=time.time(),
    is_authenticated=True,
    roles=["user"]
)

result = await app.process_request("مرحباً، كيف حالك؟", context)

أفضل الممارسات

الممارسة التنفيذ
الفشل المغلق احظر عند فشل أي طبقة
سجل في كل طبقة تتبع أين تُلتقط الهجمات
طبقات مستقلة كل طبقة تعمل منفردة
تدهور سلس رسائل خطأ واضحة للمستخدمين
اختبار منتظم تحقق أن كل طبقة تلتقط تهديداتها

النقطة الرئيسية: لا يوجد ضابط أمان واحد مثالي. طبّق طبقات دفاع متعددة بحيث يجب على المهاجمين تجاوزها جميعاً للنجاح. :::

اختبار

الوحدة 5: أنماط أمن الإنتاج

خذ الاختبار