أنماط أمن الإنتاج
المراقبة والتسجيل
3 دقيقة للقراءة
المراقبة والتسجيل الفعالان ضروريان لكشف الهجمات وتحليل الحوادث وتحسين وضعك الأمني. يغطي هذا الدرس ما يجب تسجيله وكيفية كشف الشذوذ وإعداد التنبيهات.
ما يجب تسجيله في تطبيقات LLM
┌─────────────────────────────────────────────────────────────┐
│ تسجيل أمان LLM │
│ │
│ سجل هذه الأحداث: │
│ ✓ جميع مدخلات المستخدم (مطهرة) │
│ ✓ مخرجات LLM (أو تجزئات للخصوصية) │
│ ✓ الطلبات المحظورة مع السبب │
│ ✓ تجاوزات حد المعدل │
│ ✓ فشل المصادقة │
│ ✓ تشغيل الحواجز │
│ ✓ تأخير API واستخدام الرموز │
│ ✓ معدلات الأخطاء وأنواعها │
└─────────────────────────────────────────────────────────────┘
تنفيذ التسجيل المنظم
import json
import logging
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any
from pathlib import Path
import hashlib
@dataclass
class LLMSecurityLog:
"""إدخال سجل منظم لأحداث أمان LLM."""
timestamp: str
event_type: str
request_id: str
user_id: Optional[str]
ip_address: str
input_hash: str # تجزئة للخصوصية
input_length: int
output_hash: Optional[str] = None
output_length: Optional[int] = None
blocked: bool = False
block_reason: Optional[str] = None
guardrail_triggered: Optional[str] = None
latency_ms: Optional[float] = None
token_count: Optional[int] = None
metadata: Optional[Dict[str, Any]] = None
class SecurityLogger:
"""مسجل موجه للأمان لتطبيقات LLM."""
def __init__(self, log_path: Path):
self.log_path = log_path
self._setup_logger()
def _setup_logger(self):
"""تكوين التسجيل المنظم."""
self.logger = logging.getLogger("llm_security")
self.logger.setLevel(logging.INFO)
# معالج ملف JSON
handler = logging.FileHandler(self.log_path)
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def _hash_content(self, content: str) -> str:
"""إنشاء تجزئة تحافظ على الخصوصية للمحتوى."""
return hashlib.sha256(content.encode()).hexdigest()[:16]
def log_request(
self,
request_id: str,
user_id: Optional[str],
ip_address: str,
user_input: str,
llm_output: Optional[str] = None,
blocked: bool = False,
block_reason: Optional[str] = None,
guardrail: Optional[str] = None,
latency_ms: Optional[float] = None,
token_count: Optional[int] = None,
metadata: Optional[Dict] = None
):
"""تسجيل طلب مع سياق الأمان."""
log_entry = LLMSecurityLog(
timestamp=datetime.utcnow().isoformat(),
event_type="llm_request",
request_id=request_id,
user_id=user_id,
ip_address=ip_address,
input_hash=self._hash_content(user_input),
input_length=len(user_input),
output_hash=self._hash_content(llm_output) if llm_output else None,
output_length=len(llm_output) if llm_output else None,
blocked=blocked,
block_reason=block_reason,
guardrail_triggered=guardrail,
latency_ms=latency_ms,
token_count=token_count,
metadata=metadata
)
self.logger.info(json.dumps(asdict(log_entry)))
def log_security_event(
self,
event_type: str,
request_id: str,
details: Dict[str, Any]
):
"""تسجيل حدث متعلق بالأمان."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"request_id": request_id,
**details
}
self.logger.info(json.dumps(entry))
# الاستخدام
logger = SecurityLogger(Path("./security.log"))
logger.log_request(
request_id="req-123",
user_id="user-456",
ip_address="192.168.1.1",
user_input="مرحباً، كيف حالك؟",
llm_output="أنا بخير، شكراً لك!",
latency_ms=245.5,
token_count=15
)
كشف الشذوذ
from collections import defaultdict
from datetime import datetime, timedelta
from typing import List, Tuple
import statistics
class AnomalyDetector:
"""كشف الأنماط المشبوهة في استخدام LLM."""
def __init__(self):
self.user_patterns = defaultdict(list)
self.blocked_attempts = defaultdict(list)
self.latency_history = []
def record_request(
self,
user_id: str,
timestamp: datetime,
was_blocked: bool,
latency_ms: float
):
"""تسجيل الطلب لتحليل الأنماط."""
self.user_patterns[user_id].append(timestamp)
self.latency_history.append(latency_ms)
if was_blocked:
self.blocked_attempts[user_id].append(timestamp)
def check_anomalies(self, user_id: str) -> List[Tuple[str, str]]:
"""فحص الأنماط الشاذة."""
anomalies = []
# الفحص 1: طلبات سريعة (أتمتة محتملة)
recent = self._get_recent_requests(user_id, minutes=1)
if len(recent) > 30: # أكثر من 30 طلب/دقيقة
anomalies.append((
"rapid_requests",
f"المستخدم أرسل {len(recent)} طلب في دقيقة واحدة"
))
# الفحص 2: معدل حظر عالي
block_rate = self._calculate_block_rate(user_id)
if block_rate > 0.3: # أكثر من 30% محظور
anomalies.append((
"high_block_rate",
f"معدل الحظر: {block_rate:.1%}"
))
# الفحص 3: ساعات غير عادية (إذا كان للمستخدم تاريخ)
if self._is_unusual_hour(user_id):
anomalies.append((
"unusual_activity_time",
"نشاط خارج الساعات الطبيعية"
))
return anomalies
def _get_recent_requests(
self,
user_id: str,
minutes: int
) -> List[datetime]:
"""الحصول على الطلبات في آخر N دقيقة."""
cutoff = datetime.utcnow() - timedelta(minutes=minutes)
return [
t for t in self.user_patterns[user_id]
if t > cutoff
]
def _calculate_block_rate(self, user_id: str) -> float:
"""حساب نسبة الطلبات المحظورة."""
total = len(self.user_patterns[user_id])
blocked = len(self.blocked_attempts[user_id])
if total == 0:
return 0.0
return blocked / total
def _is_unusual_hour(self, user_id: str) -> bool:
"""تحقق إذا كانت الساعة الحالية غير عادية للمستخدم."""
if len(self.user_patterns[user_id]) < 100:
return False # تاريخ غير كافٍ
current_hour = datetime.utcnow().hour
historical_hours = [
t.hour for t in self.user_patterns[user_id]
]
# تحقق إذا كانت الساعة الحالية نادرة (أقل من 5% من النشاط)
hour_count = historical_hours.count(current_hour)
return hour_count / len(historical_hours) < 0.05
# الاستخدام
detector = AnomalyDetector()
# تسجيل الطلبات
detector.record_request("user123", datetime.utcnow(), False, 200)
detector.record_request("user123", datetime.utcnow(), True, 50)
# فحص الشذوذ
anomalies = detector.check_anomalies("user123")
for anomaly_type, description in anomalies:
print(f"[تنبيه] {anomaly_type}: {description}")
نظام التنبيهات
from dataclasses import dataclass
from enum import Enum
from typing import Callable, List
import json
class AlertSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class SecurityAlert:
severity: AlertSeverity
alert_type: str
message: str
user_id: Optional[str]
request_id: Optional[str]
metadata: Dict[str, Any]
class AlertingSystem:
"""نظام تنبيهات الأمان."""
def __init__(self):
self.handlers: List[Callable[[SecurityAlert], None]] = []
self.alert_history = []
def add_handler(self, handler: Callable[[SecurityAlert], None]):
"""إضافة معالج تنبيهات (بريد، Slack، PagerDuty، إلخ)."""
self.handlers.append(handler)
def alert(self, alert: SecurityAlert):
"""إرسال تنبيه لجميع المعالجين."""
self.alert_history.append(alert)
for handler in self.handlers:
try:
handler(alert)
except Exception as e:
print(f"فشل معالج التنبيه: {e}")
def check_and_alert(
self,
anomalies: List[Tuple[str, str]],
user_id: str,
request_id: str
):
"""إنشاء تنبيهات للشذوذ المكتشف."""
severity_map = {
"rapid_requests": AlertSeverity.MEDIUM,
"high_block_rate": AlertSeverity.HIGH,
"unusual_activity_time": AlertSeverity.LOW,
"injection_attempt": AlertSeverity.CRITICAL,
}
for anomaly_type, description in anomalies:
severity = severity_map.get(anomaly_type, AlertSeverity.MEDIUM)
self.alert(SecurityAlert(
severity=severity,
alert_type=anomaly_type,
message=description,
user_id=user_id,
request_id=request_id,
metadata={"detected_at": datetime.utcnow().isoformat()}
))
# معالجات مثال
def console_handler(alert: SecurityAlert):
"""طباعة التنبيهات للوحدة."""
print(f"[{alert.severity.value.upper()}] {alert.alert_type}: {alert.message}")
def slack_handler(alert: SecurityAlert):
"""إرسال تنبيه إلى Slack (عنصر نائب)."""
# في الإنتاج، استخدم Slack SDK
webhook_payload = {
"text": f"*{alert.severity.value.upper()}* - {alert.alert_type}",
"blocks": [
{
"type": "section",
"text": {"type": "mrkdwn", "text": alert.message}
}
]
}
# requests.post(SLACK_WEBHOOK_URL, json=webhook_payload)
# الإعداد
alerting = AlertingSystem()
alerting.add_handler(console_handler)
مقاييس لوحة المعلومات
from collections import Counter
from datetime import datetime, timedelta
class SecurityMetrics:
"""جمع وعرض مقاييس الأمان."""
def __init__(self):
self.counters = Counter()
self.request_times = []
def record(self, metric: str, value: int = 1):
"""تسجيل مقياس."""
self.counters[metric] += value
def get_summary(self) -> Dict[str, Any]:
"""الحصول على ملخص المقاييس للوحة المعلومات."""
return {
"total_requests": self.counters["requests"],
"blocked_requests": self.counters["blocked"],
"block_rate": self._calculate_rate("blocked", "requests"),
"injection_attempts": self.counters["injection_detected"],
"rate_limit_hits": self.counters["rate_limited"],
"auth_failures": self.counters["auth_failed"],
"guardrail_triggers": self.counters["guardrail_triggered"],
}
def _calculate_rate(self, numerator: str, denominator: str) -> float:
denom = self.counters[denominator]
if denom == 0:
return 0.0
return self.counters[numerator] / denom
# عرض كمقاييس Prometheus (مثال)
def expose_prometheus_metrics(metrics: SecurityMetrics) -> str:
"""تنسيق المقاييس لجمع Prometheus."""
summary = metrics.get_summary()
lines = []
for key, value in summary.items():
metric_name = f"llm_security_{key}"
lines.append(f"{metric_name} {value}")
return "\n".join(lines)
النقطة الرئيسية: التسجيل والمراقبة الشاملة تمكنك من كشف الهجمات في الوقت الفعلي، والتحقيق في الحوادث بعد وقوعها، وتحسين وضعك الأمني باستمرار بناءً على الأنماط المُلاحظة. :::