نشر الإنتاج والمراقبة
التسجيل والتدقيق
3 دقيقة للقراءة
التسجيل والتدقيق الشاملان ضروريان للامتثال وتصحيح الأخطاء والتحسين المستمر لأنظمة الحواجز. يغطي هذا الدرس استراتيجيات التسجيل الجاهزة للإنتاج.
هيكل سجل التدقيق
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any
from enum import Enum
import json
import uuid
class AuditEventType(Enum):
INPUT_CHECK = "input_check"
OUTPUT_CHECK = "output_check"
BLOCK = "block"
ESCALATION = "escalation"
OVERRIDE = "override"
CONFIG_CHANGE = "config_change"
@dataclass
class AuditEvent:
"""حدث تدقيق غير قابل للتغيير للامتثال."""
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
event_type: AuditEventType = AuditEventType.INPUT_CHECK
request_id: str = ""
user_id: Optional[str] = None
session_id: Optional[str] = None
# المحتوى (قد يكون محذوفاً للخصوصية)
input_hash: str = "" # تجزئة بدلاً من المحتوى الخام
input_length: int = 0
output_hash: Optional[str] = None
output_length: Optional[int] = None
# القرار
decision: str = ""
confidence: float = 0.0
categories: List[str] = field(default_factory=list)
rail_type: str = ""
# السياق
model_used: str = ""
model_version: str = ""
config_version: str = ""
latency_ms: float = 0.0
# البيانات الوصفية
metadata: Dict[str, Any] = field(default_factory=dict)
def to_json(self) -> str:
"""التسلسل إلى JSON للتخزين."""
data = {
"event_id": self.event_id,
"timestamp": self.timestamp.isoformat(),
"event_type": self.event_type.value,
"request_id": self.request_id,
"user_id": self.user_id,
"session_id": self.session_id,
"input_hash": self.input_hash,
"input_length": self.input_length,
"output_hash": self.output_hash,
"output_length": self.output_length,
"decision": self.decision,
"confidence": self.confidence,
"categories": self.categories,
"rail_type": self.rail_type,
"model_used": self.model_used,
"model_version": self.model_version,
"config_version": self.config_version,
"latency_ms": self.latency_ms,
"metadata": self.metadata
}
return json.dumps(data)
تنفيذ مسجل التدقيق
import hashlib
from abc import ABC, abstractmethod
from typing import Protocol
class AuditStorage(Protocol):
"""بروتوكول لخلفيات تخزين سجل التدقيق."""
async def write(self, event: AuditEvent) -> None: ...
async def query(self, filters: dict) -> List[AuditEvent]: ...
class AuditLogger:
"""مسجل تدقيق إنتاجي مع خلفيات متعددة."""
def __init__(
self,
storage: AuditStorage,
config_version: str = "1.0.0",
redact_content: bool = True
):
self.storage = storage
self.config_version = config_version
self.redact_content = redact_content
async def log_check(
self,
request_id: str,
user_id: str,
input_content: str,
output_content: str = None,
decision: str = "pass",
confidence: float = 1.0,
categories: List[str] = None,
rail_type: str = "",
model_used: str = "",
latency_ms: float = 0.0,
metadata: dict = None
):
"""تسجيل حدث فحص الحاجز."""
event = AuditEvent(
event_type=AuditEventType.INPUT_CHECK if not output_content else AuditEventType.OUTPUT_CHECK,
request_id=request_id,
user_id=user_id,
input_hash=self._hash_content(input_content),
input_length=len(input_content),
output_hash=self._hash_content(output_content) if output_content else None,
output_length=len(output_content) if output_content else None,
decision=decision,
confidence=confidence,
categories=categories or [],
rail_type=rail_type,
model_used=model_used,
config_version=self.config_version,
latency_ms=latency_ms,
metadata=metadata or {}
)
await self.storage.write(event)
return event
async def log_block(
self,
request_id: str,
user_id: str,
reason: str,
categories: List[str],
input_content: str,
rail_type: str,
metadata: dict = None
):
"""تسجيل طلب محظور."""
event = AuditEvent(
event_type=AuditEventType.BLOCK,
request_id=request_id,
user_id=user_id,
input_hash=self._hash_content(input_content),
input_length=len(input_content),
decision="blocked",
categories=categories,
rail_type=rail_type,
config_version=self.config_version,
metadata={**(metadata or {}), "reason": reason}
)
await self.storage.write(event)
return event
async def log_override(
self,
request_id: str,
admin_user_id: str,
original_decision: str,
new_decision: str,
justification: str
):
"""تسجيل تجاوز المسؤول لقرار الحاجز."""
event = AuditEvent(
event_type=AuditEventType.OVERRIDE,
request_id=request_id,
user_id=admin_user_id,
decision=new_decision,
config_version=self.config_version,
metadata={
"original_decision": original_decision,
"justification": justification
}
)
await self.storage.write(event)
return event
def _hash_content(self, content: str) -> str:
"""تجزئة المحتوى للتسجيل مع الحفاظ على الخصوصية."""
if not content:
return ""
return hashlib.sha256(content.encode()).hexdigest()[:16]
خلفيات التخزين
import asyncpg
from datetime import datetime
import json
class PostgresAuditStorage:
"""تخزين سجل التدقيق في PostgreSQL."""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def initialize(self):
self.pool = await asyncpg.create_pool(self.connection_string)
# إنشاء الجدول إذا لم يكن موجوداً
async with self.pool.acquire() as conn:
await conn.execute('''
CREATE TABLE IF NOT EXISTS guardrail_audit (
event_id UUID PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
event_type VARCHAR(50) NOT NULL,
request_id VARCHAR(100),
user_id VARCHAR(100),
session_id VARCHAR(100),
input_hash VARCHAR(32),
input_length INT,
output_hash VARCHAR(32),
output_length INT,
decision VARCHAR(50),
confidence FLOAT,
categories JSONB,
rail_type VARCHAR(100),
model_used VARCHAR(100),
model_version VARCHAR(50),
config_version VARCHAR(50),
latency_ms FLOAT,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_audit_timestamp
ON guardrail_audit(timestamp);
CREATE INDEX IF NOT EXISTS idx_audit_user_id
ON guardrail_audit(user_id);
CREATE INDEX IF NOT EXISTS idx_audit_event_type
ON guardrail_audit(event_type);
''')
async def write(self, event: AuditEvent):
async with self.pool.acquire() as conn:
await conn.execute('''
INSERT INTO guardrail_audit (
event_id, timestamp, event_type, request_id, user_id,
session_id, input_hash, input_length, output_hash,
output_length, decision, confidence, categories,
rail_type, model_used, model_version, config_version,
latency_ms, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
$12, $13, $14, $15, $16, $17, $18, $19)
''',
event.event_id, event.timestamp, event.event_type.value,
event.request_id, event.user_id, event.session_id,
event.input_hash, event.input_length, event.output_hash,
event.output_length, event.decision, event.confidence,
json.dumps(event.categories), event.rail_type,
event.model_used, event.model_version, event.config_version,
event.latency_ms, json.dumps(event.metadata)
)
async def query(
self,
start_time: datetime = None,
end_time: datetime = None,
user_id: str = None,
event_type: str = None,
limit: int = 1000
) -> List[AuditEvent]:
"""استعلام سجلات التدقيق مع الفلاتر."""
conditions = []
params = []
param_count = 0
if start_time:
param_count += 1
conditions.append(f"timestamp >= ${param_count}")
params.append(start_time)
if end_time:
param_count += 1
conditions.append(f"timestamp <= ${param_count}")
params.append(end_time)
if user_id:
param_count += 1
conditions.append(f"user_id = ${param_count}")
params.append(user_id)
if event_type:
param_count += 1
conditions.append(f"event_type = ${param_count}")
params.append(event_type)
where_clause = " AND ".join(conditions) if conditions else "1=1"
query = f'''
SELECT * FROM guardrail_audit
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT {limit}
'''
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, *params)
return [self._row_to_event(row) for row in rows]
تقارير الامتثال
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class ComplianceReport:
period_start: datetime
period_end: datetime
total_requests: int
blocked_requests: int
block_rate: float
category_breakdown: Dict[str, int]
avg_latency_ms: float
p99_latency_ms: float
escalation_count: int
override_count: int
class ComplianceReporter:
"""توليد تقارير الامتثال من سجلات التدقيق."""
def __init__(self, storage: AuditStorage):
self.storage = storage
async def generate_report(
self,
start_time: datetime,
end_time: datetime
) -> ComplianceReport:
"""توليد تقرير امتثال للفترة."""
events = await self.storage.query(
start_time=start_time,
end_time=end_time
)
# تجميع المقاييس
total = len(events)
blocked = sum(1 for e in events if e.decision == "blocked")
category_counts = {}
latencies = []
for event in events:
for cat in event.categories:
category_counts[cat] = category_counts.get(cat, 0) + 1
if event.latency_ms > 0:
latencies.append(event.latency_ms)
escalations = sum(
1 for e in events
if e.event_type == AuditEventType.ESCALATION
)
overrides = sum(
1 for e in events
if e.event_type == AuditEventType.OVERRIDE
)
latencies.sort()
return ComplianceReport(
period_start=start_time,
period_end=end_time,
total_requests=total,
blocked_requests=blocked,
block_rate=blocked / total * 100 if total > 0 else 0,
category_breakdown=category_counts,
avg_latency_ms=sum(latencies) / len(latencies) if latencies else 0,
p99_latency_ms=latencies[int(len(latencies) * 0.99)] if latencies else 0,
escalation_count=escalations,
override_count=overrides
)
async def generate_user_report(
self,
user_id: str,
start_time: datetime,
end_time: datetime
) -> dict:
"""توليد تقرير امتثال لكل مستخدم."""
events = await self.storage.query(
user_id=user_id,
start_time=start_time,
end_time=end_time
)
return {
"user_id": user_id,
"period": f"{start_time.date()} إلى {end_time.date()}",
"total_interactions": len(events),
"blocked_count": sum(1 for e in events if e.decision == "blocked"),
"categories_triggered": list(set(
cat for e in events for cat in e.categories
)),
"risk_score": self._calculate_risk_score(events)
}
def _calculate_risk_score(self, events: List[AuditEvent]) -> float:
"""حساب درجة مخاطر المستخدم بناءً على الانتهاكات."""
if not events:
return 0.0
blocked = sum(1 for e in events if e.decision == "blocked")
total = len(events)
# الوزن حسب خطورة الفئة
severity_weights = {
"hate_speech": 3.0,
"violence": 3.0,
"self_harm": 2.5,
"sexual_content": 2.0,
"harassment": 2.0,
"spam": 1.0,
"off_topic": 0.5
}
weighted_score = 0
for event in events:
for cat in event.categories:
weighted_score += severity_weights.get(cat, 1.0)
return min(100, (blocked / total * 50) + (weighted_score / total * 10))
نصيحة الامتثال: احفظ سجلات التدقيق لفترة الاحتفاظ المطلوبة من صناعتك (عادةً 1-7 سنوات). استخدم التخزين بالإضافة فقط والتوقيعات التشفيرية لإثبات عدم العبث.
التالي: الخطوات التالية والتحسين المستمر. :::