أنماط أمن الإنتاج
بنية الدفاع العميق
3 دقيقة للقراءة
الدفاع العميق هو استراتيجية أمنية تُطبق طبقات متعددة من الضوابط بحيث لا يمكن لأي نقطة فشل واحدة اختراق النظام بأكمله. يوضح هذا الدرس كيفية تطبيق هذا المبدأ على تطبيقات LLM.
نموذج الأمان متعدد الطبقات
┌─────────────────────────────────────────────────────────────┐
│ طبقات الدفاع العميق │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ الطبقة 1: الشبكة والبنية التحتية │ │
│ │ WAF، حماية DDoS، TLS، بوابة API │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ الطبقة 2: المصادقة والتفويض │ │
│ │ مفاتيح API، OAuth، RBAC، إدارة الجلسات │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ الطبقة 3: التحقق من المدخلات │ │
│ │ حدود الطول، كشف الأنماط، مرشحات المحتوى │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ الطبقة 4: حواجز LLM │ │
│ │ NeMo Guardrails، LLaMA Guard، قواعد الموضوع │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ الطبقة 5: تطهير المخرجات │ │
│ │ منع XSS، تنقيح PII، إشراف المحتوى │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ الطبقة 6: المراقبة والاستجابة │ │
│ │ التسجيل، التنبيه، الاستجابة للحوادث │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
تنفيذ خط أنابيب الأمان
from dataclasses import dataclass
from typing import Optional, List, Callable
from enum import Enum
import time
class SecurityDecision(Enum):
ALLOW = "allow"
BLOCK = "block"
RATE_LIMIT = "rate_limit"
REQUIRE_AUTH = "require_auth"
@dataclass
class SecurityContext:
user_id: Optional[str]
ip_address: str
request_id: str
timestamp: float
is_authenticated: bool = False
roles: List[str] = None
@dataclass
class SecurityCheckResult:
passed: bool
layer: str
decision: SecurityDecision
reason: Optional[str] = None
class SecurityPipeline:
"""خط أنابيب أمان الدفاع العميق."""
def __init__(self):
self.layers: List[Callable] = []
self.metrics = {"checks": 0, "blocked": 0}
def add_layer(self, check_func: Callable, name: str):
"""إضافة طبقة أمان لخط الأنابيب."""
self.layers.append((name, check_func))
def check(
self,
content: str,
context: SecurityContext
) -> List[SecurityCheckResult]:
"""تمرير المحتوى عبر جميع طبقات الأمان."""
results = []
self.metrics["checks"] += 1
for layer_name, check_func in self.layers:
result = check_func(content, context)
result.layer = layer_name
results.append(result)
# توقف عند أول حظر
if not result.passed:
self.metrics["blocked"] += 1
break
return results
def is_allowed(self, results: List[SecurityCheckResult]) -> bool:
"""تحقق إذا مرت جميع الطبقات."""
return all(r.passed for r in results)
تنفيذات الطبقات
الطبقة 1: تحديد المعدل
from collections import defaultdict
import time
class RateLimiter:
"""محدد معدل بخوارزمية دلو الرموز."""
def __init__(self, requests_per_minute: int = 60):
self.requests_per_minute = requests_per_minute
self.buckets = defaultdict(list)
def check(
self,
content: str,
context: SecurityContext
) -> SecurityCheckResult:
"""تحقق من حد المعدل للمستخدم."""
key = context.user_id or context.ip_address
now = time.time()
minute_ago = now - 60
# تنظيف الطلبات القديمة
self.buckets[key] = [
t for t in self.buckets[key] if t > minute_ago
]
if len(self.buckets[key]) >= self.requests_per_minute:
return SecurityCheckResult(
passed=False,
layer="rate_limit",
decision=SecurityDecision.RATE_LIMIT,
reason=f"تم تجاوز حد المعدل: {self.requests_per_minute}/دقيقة"
)
self.buckets[key].append(now)
return SecurityCheckResult(
passed=True,
layer="rate_limit",
decision=SecurityDecision.ALLOW
)
الطبقة 2: فحص المصادقة
class AuthenticationLayer:
"""فحص المصادقة والتفويض."""
def __init__(self, required_roles: List[str] = None):
self.required_roles = required_roles or []
def check(
self,
content: str,
context: SecurityContext
) -> SecurityCheckResult:
"""التحقق من المصادقة."""
if not context.is_authenticated:
return SecurityCheckResult(
passed=False,
layer="authentication",
decision=SecurityDecision.REQUIRE_AUTH,
reason="المصادقة مطلوبة"
)
# تحقق من الأدوار المطلوبة
if self.required_roles:
user_roles = set(context.roles or [])
required = set(self.required_roles)
if not required.intersection(user_roles):
return SecurityCheckResult(
passed=False,
layer="authorization",
decision=SecurityDecision.BLOCK,
reason=f"دور مطلوب مفقود: {self.required_roles}"
)
return SecurityCheckResult(
passed=True,
layer="authentication",
decision=SecurityDecision.ALLOW
)
الطبقة 3: التحقق من المدخلات
import re
class InputValidationLayer:
"""التحقق من محتوى المدخلات."""
def __init__(self, max_length: int = 4000):
self.max_length = max_length
self.blocked_patterns = [
r"ignore\s+previous\s+instructions",
r"system\s*prompt",
r"<script>",
]
def check(
self,
content: str,
context: SecurityContext
) -> SecurityCheckResult:
"""التحقق من المدخل."""
# فحص الطول
if len(content) > self.max_length:
return SecurityCheckResult(
passed=False,
layer="input_validation",
decision=SecurityDecision.BLOCK,
reason=f"المدخل يتجاوز {self.max_length} حرف"
)
# فحص الأنماط
content_lower = content.lower()
for pattern in self.blocked_patterns:
if re.search(pattern, content_lower):
return SecurityCheckResult(
passed=False,
layer="input_validation",
decision=SecurityDecision.BLOCK,
reason="تم كشف نمط محظور"
)
return SecurityCheckResult(
passed=True,
layer="input_validation",
decision=SecurityDecision.ALLOW
)
مثال خط الأنابيب الكامل
class SecureLLMApplication:
"""تطبيق LLM مع دفاع عميق."""
def __init__(self, llm_client):
self.llm = llm_client
self.pipeline = self._build_pipeline()
def _build_pipeline(self) -> SecurityPipeline:
"""بناء خط أنابيب الأمان."""
pipeline = SecurityPipeline()
# إضافة الطبقات بالترتيب
pipeline.add_layer(
RateLimiter(requests_per_minute=30).check,
"rate_limiting"
)
pipeline.add_layer(
AuthenticationLayer().check,
"authentication"
)
pipeline.add_layer(
InputValidationLayer(max_length=4000).check,
"input_validation"
)
return pipeline
async def process_request(
self,
user_input: str,
context: SecurityContext
) -> dict:
"""معالجة الطلب عبر خط أنابيب الأمان."""
# تشغيل فحوصات الأمان
results = self.pipeline.check(user_input, context)
if not self.pipeline.is_allowed(results):
failed = next(r for r in results if not r.passed)
return {
"success": False,
"error": failed.reason,
"decision": failed.decision.value
}
# توليد استجابة LLM
response = await self.llm.generate(user_input)
# معالجة لاحقة للمخرج (طبقة إضافية)
sanitized = self._sanitize_output(response)
return {
"success": True,
"response": sanitized,
"security_checks": len(results)
}
def _sanitize_output(self, response: str) -> str:
"""تطهير مخرج LLM."""
import html
return html.escape(response)
# الاستخدام
app = SecureLLMApplication(llm_client)
context = SecurityContext(
user_id="user123",
ip_address="192.168.1.1",
request_id="req-abc",
timestamp=time.time(),
is_authenticated=True,
roles=["user"]
)
result = await app.process_request("مرحباً، كيف حالك؟", context)
أفضل الممارسات
| الممارسة | التنفيذ |
|---|---|
| الفشل المغلق | احظر عند فشل أي طبقة |
| سجل في كل طبقة | تتبع أين تُلتقط الهجمات |
| طبقات مستقلة | كل طبقة تعمل منفردة |
| تدهور سلس | رسائل خطأ واضحة للمستخدمين |
| اختبار منتظم | تحقق أن كل طبقة تلتقط تهديداتها |
النقطة الرئيسية: لا يوجد ضابط أمان واحد مثالي. طبّق طبقات دفاع متعددة بحيث يجب على المهاجمين تجاوزها جميعاً للنجاح. :::