أنماط أمن الإنتاج

المراقبة والتسجيل

3 دقيقة للقراءة

المراقبة والتسجيل الفعالان ضروريان لكشف الهجمات وتحليل الحوادث وتحسين وضعك الأمني. يغطي هذا الدرس ما يجب تسجيله وكيفية كشف الشذوذ وإعداد التنبيهات.

ما يجب تسجيله في تطبيقات LLM

┌─────────────────────────────────────────────────────────────┐
│                    تسجيل أمان LLM                           │
│                                                             │
│   سجل هذه الأحداث:                                          │
│   ✓ جميع مدخلات المستخدم (مطهرة)                           │
│   ✓ مخرجات LLM (أو تجزئات للخصوصية)                        │
│   ✓ الطلبات المحظورة مع السبب                              │
│   ✓ تجاوزات حد المعدل                                      │
│   ✓ فشل المصادقة                                           │
│   ✓ تشغيل الحواجز                                          │
│   ✓ تأخير API واستخدام الرموز                              │
│   ✓ معدلات الأخطاء وأنواعها                                │
└─────────────────────────────────────────────────────────────┘

تنفيذ التسجيل المنظم

import json
import logging
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any
from pathlib import Path
import hashlib

@dataclass
class LLMSecurityLog:
    """إدخال سجل منظم لأحداث أمان LLM."""
    timestamp: str
    event_type: str
    request_id: str
    user_id: Optional[str]
    ip_address: str
    input_hash: str  # تجزئة للخصوصية
    input_length: int
    output_hash: Optional[str] = None
    output_length: Optional[int] = None
    blocked: bool = False
    block_reason: Optional[str] = None
    guardrail_triggered: Optional[str] = None
    latency_ms: Optional[float] = None
    token_count: Optional[int] = None
    metadata: Optional[Dict[str, Any]] = None

class SecurityLogger:
    """مسجل موجه للأمان لتطبيقات LLM."""

    def __init__(self, log_path: Path):
        self.log_path = log_path
        self._setup_logger()

    def _setup_logger(self):
        """تكوين التسجيل المنظم."""
        self.logger = logging.getLogger("llm_security")
        self.logger.setLevel(logging.INFO)

        # معالج ملف JSON
        handler = logging.FileHandler(self.log_path)
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)

    def _hash_content(self, content: str) -> str:
        """إنشاء تجزئة تحافظ على الخصوصية للمحتوى."""
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def log_request(
        self,
        request_id: str,
        user_id: Optional[str],
        ip_address: str,
        user_input: str,
        llm_output: Optional[str] = None,
        blocked: bool = False,
        block_reason: Optional[str] = None,
        guardrail: Optional[str] = None,
        latency_ms: Optional[float] = None,
        token_count: Optional[int] = None,
        metadata: Optional[Dict] = None
    ):
        """تسجيل طلب مع سياق الأمان."""
        log_entry = LLMSecurityLog(
            timestamp=datetime.utcnow().isoformat(),
            event_type="llm_request",
            request_id=request_id,
            user_id=user_id,
            ip_address=ip_address,
            input_hash=self._hash_content(user_input),
            input_length=len(user_input),
            output_hash=self._hash_content(llm_output) if llm_output else None,
            output_length=len(llm_output) if llm_output else None,
            blocked=blocked,
            block_reason=block_reason,
            guardrail_triggered=guardrail,
            latency_ms=latency_ms,
            token_count=token_count,
            metadata=metadata
        )

        self.logger.info(json.dumps(asdict(log_entry)))

    def log_security_event(
        self,
        event_type: str,
        request_id: str,
        details: Dict[str, Any]
    ):
        """تسجيل حدث متعلق بالأمان."""
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "request_id": request_id,
            **details
        }
        self.logger.info(json.dumps(entry))

# الاستخدام
logger = SecurityLogger(Path("./security.log"))

logger.log_request(
    request_id="req-123",
    user_id="user-456",
    ip_address="192.168.1.1",
    user_input="مرحباً، كيف حالك؟",
    llm_output="أنا بخير، شكراً لك!",
    latency_ms=245.5,
    token_count=15
)

كشف الشذوذ

from collections import defaultdict
from datetime import datetime, timedelta
from typing import List, Tuple
import statistics

class AnomalyDetector:
    """كشف الأنماط المشبوهة في استخدام LLM."""

    def __init__(self):
        self.user_patterns = defaultdict(list)
        self.blocked_attempts = defaultdict(list)
        self.latency_history = []

    def record_request(
        self,
        user_id: str,
        timestamp: datetime,
        was_blocked: bool,
        latency_ms: float
    ):
        """تسجيل الطلب لتحليل الأنماط."""
        self.user_patterns[user_id].append(timestamp)
        self.latency_history.append(latency_ms)

        if was_blocked:
            self.blocked_attempts[user_id].append(timestamp)

    def check_anomalies(self, user_id: str) -> List[Tuple[str, str]]:
        """فحص الأنماط الشاذة."""
        anomalies = []

        # الفحص 1: طلبات سريعة (أتمتة محتملة)
        recent = self._get_recent_requests(user_id, minutes=1)
        if len(recent) > 30:  # أكثر من 30 طلب/دقيقة
            anomalies.append((
                "rapid_requests",
                f"المستخدم أرسل {len(recent)} طلب في دقيقة واحدة"
            ))

        # الفحص 2: معدل حظر عالي
        block_rate = self._calculate_block_rate(user_id)
        if block_rate > 0.3:  # أكثر من 30% محظور
            anomalies.append((
                "high_block_rate",
                f"معدل الحظر: {block_rate:.1%}"
            ))

        # الفحص 3: ساعات غير عادية (إذا كان للمستخدم تاريخ)
        if self._is_unusual_hour(user_id):
            anomalies.append((
                "unusual_activity_time",
                "نشاط خارج الساعات الطبيعية"
            ))

        return anomalies

    def _get_recent_requests(
        self,
        user_id: str,
        minutes: int
    ) -> List[datetime]:
        """الحصول على الطلبات في آخر N دقيقة."""
        cutoff = datetime.utcnow() - timedelta(minutes=minutes)
        return [
            t for t in self.user_patterns[user_id]
            if t > cutoff
        ]

    def _calculate_block_rate(self, user_id: str) -> float:
        """حساب نسبة الطلبات المحظورة."""
        total = len(self.user_patterns[user_id])
        blocked = len(self.blocked_attempts[user_id])

        if total == 0:
            return 0.0
        return blocked / total

    def _is_unusual_hour(self, user_id: str) -> bool:
        """تحقق إذا كانت الساعة الحالية غير عادية للمستخدم."""
        if len(self.user_patterns[user_id]) < 100:
            return False  # تاريخ غير كافٍ

        current_hour = datetime.utcnow().hour
        historical_hours = [
            t.hour for t in self.user_patterns[user_id]
        ]

        # تحقق إذا كانت الساعة الحالية نادرة (أقل من 5% من النشاط)
        hour_count = historical_hours.count(current_hour)
        return hour_count / len(historical_hours) < 0.05

# الاستخدام
detector = AnomalyDetector()

# تسجيل الطلبات
detector.record_request("user123", datetime.utcnow(), False, 200)
detector.record_request("user123", datetime.utcnow(), True, 50)

# فحص الشذوذ
anomalies = detector.check_anomalies("user123")
for anomaly_type, description in anomalies:
    print(f"[تنبيه] {anomaly_type}: {description}")

نظام التنبيهات

from dataclasses import dataclass
from enum import Enum
from typing import Callable, List
import json

class AlertSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class SecurityAlert:
    severity: AlertSeverity
    alert_type: str
    message: str
    user_id: Optional[str]
    request_id: Optional[str]
    metadata: Dict[str, Any]

class AlertingSystem:
    """نظام تنبيهات الأمان."""

    def __init__(self):
        self.handlers: List[Callable[[SecurityAlert], None]] = []
        self.alert_history = []

    def add_handler(self, handler: Callable[[SecurityAlert], None]):
        """إضافة معالج تنبيهات (بريد، Slack، PagerDuty، إلخ)."""
        self.handlers.append(handler)

    def alert(self, alert: SecurityAlert):
        """إرسال تنبيه لجميع المعالجين."""
        self.alert_history.append(alert)

        for handler in self.handlers:
            try:
                handler(alert)
            except Exception as e:
                print(f"فشل معالج التنبيه: {e}")

    def check_and_alert(
        self,
        anomalies: List[Tuple[str, str]],
        user_id: str,
        request_id: str
    ):
        """إنشاء تنبيهات للشذوذ المكتشف."""
        severity_map = {
            "rapid_requests": AlertSeverity.MEDIUM,
            "high_block_rate": AlertSeverity.HIGH,
            "unusual_activity_time": AlertSeverity.LOW,
            "injection_attempt": AlertSeverity.CRITICAL,
        }

        for anomaly_type, description in anomalies:
            severity = severity_map.get(anomaly_type, AlertSeverity.MEDIUM)

            self.alert(SecurityAlert(
                severity=severity,
                alert_type=anomaly_type,
                message=description,
                user_id=user_id,
                request_id=request_id,
                metadata={"detected_at": datetime.utcnow().isoformat()}
            ))

# معالجات مثال
def console_handler(alert: SecurityAlert):
    """طباعة التنبيهات للوحدة."""
    print(f"[{alert.severity.value.upper()}] {alert.alert_type}: {alert.message}")

def slack_handler(alert: SecurityAlert):
    """إرسال تنبيه إلى Slack (عنصر نائب)."""
    # في الإنتاج، استخدم Slack SDK
    webhook_payload = {
        "text": f"*{alert.severity.value.upper()}* - {alert.alert_type}",
        "blocks": [
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": alert.message}
            }
        ]
    }
    # requests.post(SLACK_WEBHOOK_URL, json=webhook_payload)

# الإعداد
alerting = AlertingSystem()
alerting.add_handler(console_handler)

مقاييس لوحة المعلومات

from collections import Counter
from datetime import datetime, timedelta

class SecurityMetrics:
    """جمع وعرض مقاييس الأمان."""

    def __init__(self):
        self.counters = Counter()
        self.request_times = []

    def record(self, metric: str, value: int = 1):
        """تسجيل مقياس."""
        self.counters[metric] += value

    def get_summary(self) -> Dict[str, Any]:
        """الحصول على ملخص المقاييس للوحة المعلومات."""
        return {
            "total_requests": self.counters["requests"],
            "blocked_requests": self.counters["blocked"],
            "block_rate": self._calculate_rate("blocked", "requests"),
            "injection_attempts": self.counters["injection_detected"],
            "rate_limit_hits": self.counters["rate_limited"],
            "auth_failures": self.counters["auth_failed"],
            "guardrail_triggers": self.counters["guardrail_triggered"],
        }

    def _calculate_rate(self, numerator: str, denominator: str) -> float:
        denom = self.counters[denominator]
        if denom == 0:
            return 0.0
        return self.counters[numerator] / denom

# عرض كمقاييس Prometheus (مثال)
def expose_prometheus_metrics(metrics: SecurityMetrics) -> str:
    """تنسيق المقاييس لجمع Prometheus."""
    summary = metrics.get_summary()
    lines = []
    for key, value in summary.items():
        metric_name = f"llm_security_{key}"
        lines.append(f"{metric_name} {value}")
    return "\n".join(lines)

النقطة الرئيسية: التسجيل والمراقبة الشاملة تمكنك من كشف الهجمات في الوقت الفعلي، والتحقيق في الحوادث بعد وقوعها، وتحسين وضعك الأمني باستمرار بناءً على الأنماط المُلاحظة. :::

اختبار

الوحدة 5: أنماط أمن الإنتاج

خذ الاختبار