Production Deployment & Observability
Logging and Auditing
3 min read
Comprehensive logging and auditing are essential for compliance, debugging, and continuous improvement of guardrail systems. This lesson covers production-grade logging strategies.
Audit Log Structure
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any
from enum import Enum
import json
import uuid
class AuditEventType(Enum):
INPUT_CHECK = "input_check"
OUTPUT_CHECK = "output_check"
BLOCK = "block"
ESCALATION = "escalation"
OVERRIDE = "override"
CONFIG_CHANGE = "config_change"
@dataclass
class AuditEvent:
"""Immutable audit event for compliance."""
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
event_type: AuditEventType = AuditEventType.INPUT_CHECK
request_id: str = ""
user_id: Optional[str] = None
session_id: Optional[str] = None
# Content (may be redacted for privacy)
input_hash: str = "" # Hash instead of raw content
input_length: int = 0
output_hash: Optional[str] = None
output_length: Optional[int] = None
# Decision
decision: str = ""
confidence: float = 0.0
categories: List[str] = field(default_factory=list)
rail_type: str = ""
# Context
model_used: str = ""
model_version: str = ""
config_version: str = ""
latency_ms: float = 0.0
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
def to_json(self) -> str:
"""Serialize to JSON for storage."""
data = {
"event_id": self.event_id,
"timestamp": self.timestamp.isoformat(),
"event_type": self.event_type.value,
"request_id": self.request_id,
"user_id": self.user_id,
"session_id": self.session_id,
"input_hash": self.input_hash,
"input_length": self.input_length,
"output_hash": self.output_hash,
"output_length": self.output_length,
"decision": self.decision,
"confidence": self.confidence,
"categories": self.categories,
"rail_type": self.rail_type,
"model_used": self.model_used,
"model_version": self.model_version,
"config_version": self.config_version,
"latency_ms": self.latency_ms,
"metadata": self.metadata
}
return json.dumps(data)
Audit Logger Implementation
import hashlib
from abc import ABC, abstractmethod
from typing import Protocol
class AuditStorage(Protocol):
"""Protocol for audit log storage backends."""
async def write(self, event: AuditEvent) -> None: ...
async def query(self, filters: dict) -> List[AuditEvent]: ...
class AuditLogger:
"""Production audit logger with multiple backends."""
def __init__(
self,
storage: AuditStorage,
config_version: str = "1.0.0",
redact_content: bool = True
):
self.storage = storage
self.config_version = config_version
self.redact_content = redact_content
async def log_check(
self,
request_id: str,
user_id: str,
input_content: str,
output_content: str = None,
decision: str = "pass",
confidence: float = 1.0,
categories: List[str] = None,
rail_type: str = "",
model_used: str = "",
latency_ms: float = 0.0,
metadata: dict = None
):
"""Log a guardrail check event."""
event = AuditEvent(
event_type=AuditEventType.INPUT_CHECK if not output_content else AuditEventType.OUTPUT_CHECK,
request_id=request_id,
user_id=user_id,
input_hash=self._hash_content(input_content),
input_length=len(input_content),
output_hash=self._hash_content(output_content) if output_content else None,
output_length=len(output_content) if output_content else None,
decision=decision,
confidence=confidence,
categories=categories or [],
rail_type=rail_type,
model_used=model_used,
config_version=self.config_version,
latency_ms=latency_ms,
metadata=metadata or {}
)
await self.storage.write(event)
return event
async def log_block(
self,
request_id: str,
user_id: str,
reason: str,
categories: List[str],
input_content: str,
rail_type: str,
metadata: dict = None
):
"""Log a blocked request."""
event = AuditEvent(
event_type=AuditEventType.BLOCK,
request_id=request_id,
user_id=user_id,
input_hash=self._hash_content(input_content),
input_length=len(input_content),
decision="blocked",
categories=categories,
rail_type=rail_type,
config_version=self.config_version,
metadata={**(metadata or {}), "reason": reason}
)
await self.storage.write(event)
return event
async def log_override(
self,
request_id: str,
admin_user_id: str,
original_decision: str,
new_decision: str,
justification: str
):
"""Log an admin override of guardrail decision."""
event = AuditEvent(
event_type=AuditEventType.OVERRIDE,
request_id=request_id,
user_id=admin_user_id,
decision=new_decision,
config_version=self.config_version,
metadata={
"original_decision": original_decision,
"justification": justification
}
)
await self.storage.write(event)
return event
def _hash_content(self, content: str) -> str:
"""Hash content for privacy-preserving logging."""
if not content:
return ""
return hashlib.sha256(content.encode()).hexdigest()[:16]
Storage Backends
import asyncpg
from datetime import datetime
import json
class PostgresAuditStorage:
"""PostgreSQL audit log storage."""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def initialize(self):
self.pool = await asyncpg.create_pool(self.connection_string)
# Create table if not exists
async with self.pool.acquire() as conn:
await conn.execute('''
CREATE TABLE IF NOT EXISTS guardrail_audit (
event_id UUID PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
event_type VARCHAR(50) NOT NULL,
request_id VARCHAR(100),
user_id VARCHAR(100),
session_id VARCHAR(100),
input_hash VARCHAR(32),
input_length INT,
output_hash VARCHAR(32),
output_length INT,
decision VARCHAR(50),
confidence FLOAT,
categories JSONB,
rail_type VARCHAR(100),
model_used VARCHAR(100),
model_version VARCHAR(50),
config_version VARCHAR(50),
latency_ms FLOAT,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_audit_timestamp
ON guardrail_audit(timestamp);
CREATE INDEX IF NOT EXISTS idx_audit_user_id
ON guardrail_audit(user_id);
CREATE INDEX IF NOT EXISTS idx_audit_event_type
ON guardrail_audit(event_type);
''')
async def write(self, event: AuditEvent):
async with self.pool.acquire() as conn:
await conn.execute('''
INSERT INTO guardrail_audit (
event_id, timestamp, event_type, request_id, user_id,
session_id, input_hash, input_length, output_hash,
output_length, decision, confidence, categories,
rail_type, model_used, model_version, config_version,
latency_ms, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
$12, $13, $14, $15, $16, $17, $18, $19)
''',
event.event_id, event.timestamp, event.event_type.value,
event.request_id, event.user_id, event.session_id,
event.input_hash, event.input_length, event.output_hash,
event.output_length, event.decision, event.confidence,
json.dumps(event.categories), event.rail_type,
event.model_used, event.model_version, event.config_version,
event.latency_ms, json.dumps(event.metadata)
)
async def query(
self,
start_time: datetime = None,
end_time: datetime = None,
user_id: str = None,
event_type: str = None,
limit: int = 1000
) -> List[AuditEvent]:
"""Query audit logs with filters."""
conditions = []
params = []
param_count = 0
if start_time:
param_count += 1
conditions.append(f"timestamp >= ${param_count}")
params.append(start_time)
if end_time:
param_count += 1
conditions.append(f"timestamp <= ${param_count}")
params.append(end_time)
if user_id:
param_count += 1
conditions.append(f"user_id = ${param_count}")
params.append(user_id)
if event_type:
param_count += 1
conditions.append(f"event_type = ${param_count}")
params.append(event_type)
where_clause = " AND ".join(conditions) if conditions else "1=1"
query = f'''
SELECT * FROM guardrail_audit
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT {limit}
'''
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, *params)
return [self._row_to_event(row) for row in rows]
Compliance Reporting
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class ComplianceReport:
period_start: datetime
period_end: datetime
total_requests: int
blocked_requests: int
block_rate: float
category_breakdown: Dict[str, int]
avg_latency_ms: float
p99_latency_ms: float
escalation_count: int
override_count: int
class ComplianceReporter:
"""Generate compliance reports from audit logs."""
def __init__(self, storage: AuditStorage):
self.storage = storage
async def generate_report(
self,
start_time: datetime,
end_time: datetime
) -> ComplianceReport:
"""Generate compliance report for period."""
events = await self.storage.query(
start_time=start_time,
end_time=end_time
)
# Aggregate metrics
total = len(events)
blocked = sum(1 for e in events if e.decision == "blocked")
category_counts = {}
latencies = []
for event in events:
for cat in event.categories:
category_counts[cat] = category_counts.get(cat, 0) + 1
if event.latency_ms > 0:
latencies.append(event.latency_ms)
escalations = sum(
1 for e in events
if e.event_type == AuditEventType.ESCALATION
)
overrides = sum(
1 for e in events
if e.event_type == AuditEventType.OVERRIDE
)
latencies.sort()
return ComplianceReport(
period_start=start_time,
period_end=end_time,
total_requests=total,
blocked_requests=blocked,
block_rate=blocked / total * 100 if total > 0 else 0,
category_breakdown=category_counts,
avg_latency_ms=sum(latencies) / len(latencies) if latencies else 0,
p99_latency_ms=latencies[int(len(latencies) * 0.99)] if latencies else 0,
escalation_count=escalations,
override_count=overrides
)
async def generate_user_report(
self,
user_id: str,
start_time: datetime,
end_time: datetime
) -> dict:
"""Generate per-user compliance report."""
events = await self.storage.query(
user_id=user_id,
start_time=start_time,
end_time=end_time
)
return {
"user_id": user_id,
"period": f"{start_time.date()} to {end_time.date()}",
"total_interactions": len(events),
"blocked_count": sum(1 for e in events if e.decision == "blocked"),
"categories_triggered": list(set(
cat for e in events for cat in e.categories
)),
"risk_score": self._calculate_risk_score(events)
}
def _calculate_risk_score(self, events: List[AuditEvent]) -> float:
"""Calculate user risk score based on violations."""
if not events:
return 0.0
blocked = sum(1 for e in events if e.decision == "blocked")
total = len(events)
# Weight by category severity
severity_weights = {
"hate_speech": 3.0,
"violence": 3.0,
"self_harm": 2.5,
"sexual_content": 2.0,
"harassment": 2.0,
"spam": 1.0,
"off_topic": 0.5
}
weighted_score = 0
for event in events:
for cat in event.categories:
weighted_score += severity_weights.get(cat, 1.0)
return min(100, (blocked / total * 50) + (weighted_score / total * 10))
Compliance Tip: Store audit logs for the retention period required by your industry (typically 1-7 years). Use append-only storage and cryptographic signatures for tamper-evidence.
Next: Next steps and continuous improvement. :::