نشر الإنتاج والمراقبة

التسجيل والتدقيق

3 دقيقة للقراءة

التسجيل والتدقيق الشاملان ضروريان للامتثال وتصحيح الأخطاء والتحسين المستمر لأنظمة الحواجز. يغطي هذا الدرس استراتيجيات التسجيل الجاهزة للإنتاج.

هيكل سجل التدقيق

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any
from enum import Enum
import json
import uuid

class AuditEventType(Enum):
    INPUT_CHECK = "input_check"
    OUTPUT_CHECK = "output_check"
    BLOCK = "block"
    ESCALATION = "escalation"
    OVERRIDE = "override"
    CONFIG_CHANGE = "config_change"

@dataclass
class AuditEvent:
    """حدث تدقيق غير قابل للتغيير للامتثال."""
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)
    event_type: AuditEventType = AuditEventType.INPUT_CHECK
    request_id: str = ""
    user_id: Optional[str] = None
    session_id: Optional[str] = None

    # المحتوى (قد يكون محذوفاً للخصوصية)
    input_hash: str = ""  # تجزئة بدلاً من المحتوى الخام
    input_length: int = 0
    output_hash: Optional[str] = None
    output_length: Optional[int] = None

    # القرار
    decision: str = ""
    confidence: float = 0.0
    categories: List[str] = field(default_factory=list)
    rail_type: str = ""

    # السياق
    model_used: str = ""
    model_version: str = ""
    config_version: str = ""
    latency_ms: float = 0.0

    # البيانات الوصفية
    metadata: Dict[str, Any] = field(default_factory=dict)

    def to_json(self) -> str:
        """التسلسل إلى JSON للتخزين."""
        data = {
            "event_id": self.event_id,
            "timestamp": self.timestamp.isoformat(),
            "event_type": self.event_type.value,
            "request_id": self.request_id,
            "user_id": self.user_id,
            "session_id": self.session_id,
            "input_hash": self.input_hash,
            "input_length": self.input_length,
            "output_hash": self.output_hash,
            "output_length": self.output_length,
            "decision": self.decision,
            "confidence": self.confidence,
            "categories": self.categories,
            "rail_type": self.rail_type,
            "model_used": self.model_used,
            "model_version": self.model_version,
            "config_version": self.config_version,
            "latency_ms": self.latency_ms,
            "metadata": self.metadata
        }
        return json.dumps(data)

تنفيذ مسجل التدقيق

import hashlib
from abc import ABC, abstractmethod
from typing import Protocol

class AuditStorage(Protocol):
    """بروتوكول لخلفيات تخزين سجل التدقيق."""
    async def write(self, event: AuditEvent) -> None: ...
    async def query(self, filters: dict) -> List[AuditEvent]: ...

class AuditLogger:
    """مسجل تدقيق إنتاجي مع خلفيات متعددة."""

    def __init__(
        self,
        storage: AuditStorage,
        config_version: str = "1.0.0",
        redact_content: bool = True
    ):
        self.storage = storage
        self.config_version = config_version
        self.redact_content = redact_content

    async def log_check(
        self,
        request_id: str,
        user_id: str,
        input_content: str,
        output_content: str = None,
        decision: str = "pass",
        confidence: float = 1.0,
        categories: List[str] = None,
        rail_type: str = "",
        model_used: str = "",
        latency_ms: float = 0.0,
        metadata: dict = None
    ):
        """تسجيل حدث فحص الحاجز."""
        event = AuditEvent(
            event_type=AuditEventType.INPUT_CHECK if not output_content else AuditEventType.OUTPUT_CHECK,
            request_id=request_id,
            user_id=user_id,
            input_hash=self._hash_content(input_content),
            input_length=len(input_content),
            output_hash=self._hash_content(output_content) if output_content else None,
            output_length=len(output_content) if output_content else None,
            decision=decision,
            confidence=confidence,
            categories=categories or [],
            rail_type=rail_type,
            model_used=model_used,
            config_version=self.config_version,
            latency_ms=latency_ms,
            metadata=metadata or {}
        )

        await self.storage.write(event)
        return event

    async def log_block(
        self,
        request_id: str,
        user_id: str,
        reason: str,
        categories: List[str],
        input_content: str,
        rail_type: str,
        metadata: dict = None
    ):
        """تسجيل طلب محظور."""
        event = AuditEvent(
            event_type=AuditEventType.BLOCK,
            request_id=request_id,
            user_id=user_id,
            input_hash=self._hash_content(input_content),
            input_length=len(input_content),
            decision="blocked",
            categories=categories,
            rail_type=rail_type,
            config_version=self.config_version,
            metadata={**(metadata or {}), "reason": reason}
        )

        await self.storage.write(event)
        return event

    async def log_override(
        self,
        request_id: str,
        admin_user_id: str,
        original_decision: str,
        new_decision: str,
        justification: str
    ):
        """تسجيل تجاوز المسؤول لقرار الحاجز."""
        event = AuditEvent(
            event_type=AuditEventType.OVERRIDE,
            request_id=request_id,
            user_id=admin_user_id,
            decision=new_decision,
            config_version=self.config_version,
            metadata={
                "original_decision": original_decision,
                "justification": justification
            }
        )

        await self.storage.write(event)
        return event

    def _hash_content(self, content: str) -> str:
        """تجزئة المحتوى للتسجيل مع الحفاظ على الخصوصية."""
        if not content:
            return ""
        return hashlib.sha256(content.encode()).hexdigest()[:16]

خلفيات التخزين

import asyncpg
from datetime import datetime
import json

class PostgresAuditStorage:
    """تخزين سجل التدقيق في PostgreSQL."""

    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.pool = None

    async def initialize(self):
        self.pool = await asyncpg.create_pool(self.connection_string)

        # إنشاء الجدول إذا لم يكن موجوداً
        async with self.pool.acquire() as conn:
            await conn.execute('''
                CREATE TABLE IF NOT EXISTS guardrail_audit (
                    event_id UUID PRIMARY KEY,
                    timestamp TIMESTAMPTZ NOT NULL,
                    event_type VARCHAR(50) NOT NULL,
                    request_id VARCHAR(100),
                    user_id VARCHAR(100),
                    session_id VARCHAR(100),
                    input_hash VARCHAR(32),
                    input_length INT,
                    output_hash VARCHAR(32),
                    output_length INT,
                    decision VARCHAR(50),
                    confidence FLOAT,
                    categories JSONB,
                    rail_type VARCHAR(100),
                    model_used VARCHAR(100),
                    model_version VARCHAR(50),
                    config_version VARCHAR(50),
                    latency_ms FLOAT,
                    metadata JSONB,
                    created_at TIMESTAMPTZ DEFAULT NOW()
                );

                CREATE INDEX IF NOT EXISTS idx_audit_timestamp
                ON guardrail_audit(timestamp);

                CREATE INDEX IF NOT EXISTS idx_audit_user_id
                ON guardrail_audit(user_id);

                CREATE INDEX IF NOT EXISTS idx_audit_event_type
                ON guardrail_audit(event_type);
            ''')

    async def write(self, event: AuditEvent):
        async with self.pool.acquire() as conn:
            await conn.execute('''
                INSERT INTO guardrail_audit (
                    event_id, timestamp, event_type, request_id, user_id,
                    session_id, input_hash, input_length, output_hash,
                    output_length, decision, confidence, categories,
                    rail_type, model_used, model_version, config_version,
                    latency_ms, metadata
                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
                          $12, $13, $14, $15, $16, $17, $18, $19)
            ''',
                event.event_id, event.timestamp, event.event_type.value,
                event.request_id, event.user_id, event.session_id,
                event.input_hash, event.input_length, event.output_hash,
                event.output_length, event.decision, event.confidence,
                json.dumps(event.categories), event.rail_type,
                event.model_used, event.model_version, event.config_version,
                event.latency_ms, json.dumps(event.metadata)
            )

    async def query(
        self,
        start_time: datetime = None,
        end_time: datetime = None,
        user_id: str = None,
        event_type: str = None,
        limit: int = 1000
    ) -> List[AuditEvent]:
        """استعلام سجلات التدقيق مع الفلاتر."""
        conditions = []
        params = []
        param_count = 0

        if start_time:
            param_count += 1
            conditions.append(f"timestamp >= ${param_count}")
            params.append(start_time)

        if end_time:
            param_count += 1
            conditions.append(f"timestamp <= ${param_count}")
            params.append(end_time)

        if user_id:
            param_count += 1
            conditions.append(f"user_id = ${param_count}")
            params.append(user_id)

        if event_type:
            param_count += 1
            conditions.append(f"event_type = ${param_count}")
            params.append(event_type)

        where_clause = " AND ".join(conditions) if conditions else "1=1"

        query = f'''
            SELECT * FROM guardrail_audit
            WHERE {where_clause}
            ORDER BY timestamp DESC
            LIMIT {limit}
        '''

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, *params)

        return [self._row_to_event(row) for row in rows]

تقارير الامتثال

from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class ComplianceReport:
    period_start: datetime
    period_end: datetime
    total_requests: int
    blocked_requests: int
    block_rate: float
    category_breakdown: Dict[str, int]
    avg_latency_ms: float
    p99_latency_ms: float
    escalation_count: int
    override_count: int

class ComplianceReporter:
    """توليد تقارير الامتثال من سجلات التدقيق."""

    def __init__(self, storage: AuditStorage):
        self.storage = storage

    async def generate_report(
        self,
        start_time: datetime,
        end_time: datetime
    ) -> ComplianceReport:
        """توليد تقرير امتثال للفترة."""
        events = await self.storage.query(
            start_time=start_time,
            end_time=end_time
        )

        # تجميع المقاييس
        total = len(events)
        blocked = sum(1 for e in events if e.decision == "blocked")

        category_counts = {}
        latencies = []

        for event in events:
            for cat in event.categories:
                category_counts[cat] = category_counts.get(cat, 0) + 1
            if event.latency_ms > 0:
                latencies.append(event.latency_ms)

        escalations = sum(
            1 for e in events
            if e.event_type == AuditEventType.ESCALATION
        )

        overrides = sum(
            1 for e in events
            if e.event_type == AuditEventType.OVERRIDE
        )

        latencies.sort()

        return ComplianceReport(
            period_start=start_time,
            period_end=end_time,
            total_requests=total,
            blocked_requests=blocked,
            block_rate=blocked / total * 100 if total > 0 else 0,
            category_breakdown=category_counts,
            avg_latency_ms=sum(latencies) / len(latencies) if latencies else 0,
            p99_latency_ms=latencies[int(len(latencies) * 0.99)] if latencies else 0,
            escalation_count=escalations,
            override_count=overrides
        )

    async def generate_user_report(
        self,
        user_id: str,
        start_time: datetime,
        end_time: datetime
    ) -> dict:
        """توليد تقرير امتثال لكل مستخدم."""
        events = await self.storage.query(
            user_id=user_id,
            start_time=start_time,
            end_time=end_time
        )

        return {
            "user_id": user_id,
            "period": f"{start_time.date()} إلى {end_time.date()}",
            "total_interactions": len(events),
            "blocked_count": sum(1 for e in events if e.decision == "blocked"),
            "categories_triggered": list(set(
                cat for e in events for cat in e.categories
            )),
            "risk_score": self._calculate_risk_score(events)
        }

    def _calculate_risk_score(self, events: List[AuditEvent]) -> float:
        """حساب درجة مخاطر المستخدم بناءً على الانتهاكات."""
        if not events:
            return 0.0

        blocked = sum(1 for e in events if e.decision == "blocked")
        total = len(events)

        # الوزن حسب خطورة الفئة
        severity_weights = {
            "hate_speech": 3.0,
            "violence": 3.0,
            "self_harm": 2.5,
            "sexual_content": 2.0,
            "harassment": 2.0,
            "spam": 1.0,
            "off_topic": 0.5
        }

        weighted_score = 0
        for event in events:
            for cat in event.categories:
                weighted_score += severity_weights.get(cat, 1.0)

        return min(100, (blocked / total * 50) + (weighted_score / total * 10))

نصيحة الامتثال: احفظ سجلات التدقيق لفترة الاحتفاظ المطلوبة من صناعتك (عادةً 1-7 سنوات). استخدم التخزين بالإضافة فقط والتوقيعات التشفيرية لإثبات عدم العبث.

التالي: الخطوات التالية والتحسين المستمر. :::

اختبار

الوحدة 6: نشر الإنتاج والمراقبة

خذ الاختبار
نشرة أسبوعية مجانية

ابقَ على مسار النيرد

بريد واحد أسبوعياً — دورات، مقالات معمّقة، أدوات، وتجارب ذكاء اصطناعي.

بدون إزعاج. إلغاء الاشتراك في أي وقت.