Production Deployment & Observability

Logging and Auditing

3 min read

Comprehensive logging and auditing are essential for compliance, debugging, and continuous improvement of guardrail systems. This lesson covers production-grade logging strategies.

Audit Log Structure

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any
from enum import Enum
import json
import uuid

class AuditEventType(Enum):
    INPUT_CHECK = "input_check"
    OUTPUT_CHECK = "output_check"
    BLOCK = "block"
    ESCALATION = "escalation"
    OVERRIDE = "override"
    CONFIG_CHANGE = "config_change"

@dataclass
class AuditEvent:
    """Immutable audit event for compliance."""
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)
    event_type: AuditEventType = AuditEventType.INPUT_CHECK
    request_id: str = ""
    user_id: Optional[str] = None
    session_id: Optional[str] = None

    # Content (may be redacted for privacy)
    input_hash: str = ""  # Hash instead of raw content
    input_length: int = 0
    output_hash: Optional[str] = None
    output_length: Optional[int] = None

    # Decision
    decision: str = ""
    confidence: float = 0.0
    categories: List[str] = field(default_factory=list)
    rail_type: str = ""

    # Context
    model_used: str = ""
    model_version: str = ""
    config_version: str = ""
    latency_ms: float = 0.0

    # Metadata
    metadata: Dict[str, Any] = field(default_factory=dict)

    def to_json(self) -> str:
        """Serialize to JSON for storage."""
        data = {
            "event_id": self.event_id,
            "timestamp": self.timestamp.isoformat(),
            "event_type": self.event_type.value,
            "request_id": self.request_id,
            "user_id": self.user_id,
            "session_id": self.session_id,
            "input_hash": self.input_hash,
            "input_length": self.input_length,
            "output_hash": self.output_hash,
            "output_length": self.output_length,
            "decision": self.decision,
            "confidence": self.confidence,
            "categories": self.categories,
            "rail_type": self.rail_type,
            "model_used": self.model_used,
            "model_version": self.model_version,
            "config_version": self.config_version,
            "latency_ms": self.latency_ms,
            "metadata": self.metadata
        }
        return json.dumps(data)

Audit Logger Implementation

import hashlib
from abc import ABC, abstractmethod
from typing import Protocol

class AuditStorage(Protocol):
    """Protocol for audit log storage backends."""
    async def write(self, event: AuditEvent) -> None: ...
    async def query(self, filters: dict) -> List[AuditEvent]: ...

class AuditLogger:
    """Production audit logger with multiple backends."""

    def __init__(
        self,
        storage: AuditStorage,
        config_version: str = "1.0.0",
        redact_content: bool = True
    ):
        self.storage = storage
        self.config_version = config_version
        self.redact_content = redact_content

    async def log_check(
        self,
        request_id: str,
        user_id: str,
        input_content: str,
        output_content: str = None,
        decision: str = "pass",
        confidence: float = 1.0,
        categories: List[str] = None,
        rail_type: str = "",
        model_used: str = "",
        latency_ms: float = 0.0,
        metadata: dict = None
    ):
        """Log a guardrail check event."""
        event = AuditEvent(
            event_type=AuditEventType.INPUT_CHECK if not output_content else AuditEventType.OUTPUT_CHECK,
            request_id=request_id,
            user_id=user_id,
            input_hash=self._hash_content(input_content),
            input_length=len(input_content),
            output_hash=self._hash_content(output_content) if output_content else None,
            output_length=len(output_content) if output_content else None,
            decision=decision,
            confidence=confidence,
            categories=categories or [],
            rail_type=rail_type,
            model_used=model_used,
            config_version=self.config_version,
            latency_ms=latency_ms,
            metadata=metadata or {}
        )

        await self.storage.write(event)
        return event

    async def log_block(
        self,
        request_id: str,
        user_id: str,
        reason: str,
        categories: List[str],
        input_content: str,
        rail_type: str,
        metadata: dict = None
    ):
        """Log a blocked request."""
        event = AuditEvent(
            event_type=AuditEventType.BLOCK,
            request_id=request_id,
            user_id=user_id,
            input_hash=self._hash_content(input_content),
            input_length=len(input_content),
            decision="blocked",
            categories=categories,
            rail_type=rail_type,
            config_version=self.config_version,
            metadata={**(metadata or {}), "reason": reason}
        )

        await self.storage.write(event)
        return event

    async def log_override(
        self,
        request_id: str,
        admin_user_id: str,
        original_decision: str,
        new_decision: str,
        justification: str
    ):
        """Log an admin override of guardrail decision."""
        event = AuditEvent(
            event_type=AuditEventType.OVERRIDE,
            request_id=request_id,
            user_id=admin_user_id,
            decision=new_decision,
            config_version=self.config_version,
            metadata={
                "original_decision": original_decision,
                "justification": justification
            }
        )

        await self.storage.write(event)
        return event

    def _hash_content(self, content: str) -> str:
        """Hash content for privacy-preserving logging."""
        if not content:
            return ""
        return hashlib.sha256(content.encode()).hexdigest()[:16]

Storage Backends

import asyncpg
from datetime import datetime
import json

class PostgresAuditStorage:
    """PostgreSQL audit log storage."""

    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.pool = None

    async def initialize(self):
        self.pool = await asyncpg.create_pool(self.connection_string)

        # Create table if not exists
        async with self.pool.acquire() as conn:
            await conn.execute('''
                CREATE TABLE IF NOT EXISTS guardrail_audit (
                    event_id UUID PRIMARY KEY,
                    timestamp TIMESTAMPTZ NOT NULL,
                    event_type VARCHAR(50) NOT NULL,
                    request_id VARCHAR(100),
                    user_id VARCHAR(100),
                    session_id VARCHAR(100),
                    input_hash VARCHAR(32),
                    input_length INT,
                    output_hash VARCHAR(32),
                    output_length INT,
                    decision VARCHAR(50),
                    confidence FLOAT,
                    categories JSONB,
                    rail_type VARCHAR(100),
                    model_used VARCHAR(100),
                    model_version VARCHAR(50),
                    config_version VARCHAR(50),
                    latency_ms FLOAT,
                    metadata JSONB,
                    created_at TIMESTAMPTZ DEFAULT NOW()
                );

                CREATE INDEX IF NOT EXISTS idx_audit_timestamp
                ON guardrail_audit(timestamp);

                CREATE INDEX IF NOT EXISTS idx_audit_user_id
                ON guardrail_audit(user_id);

                CREATE INDEX IF NOT EXISTS idx_audit_event_type
                ON guardrail_audit(event_type);
            ''')

    async def write(self, event: AuditEvent):
        async with self.pool.acquire() as conn:
            await conn.execute('''
                INSERT INTO guardrail_audit (
                    event_id, timestamp, event_type, request_id, user_id,
                    session_id, input_hash, input_length, output_hash,
                    output_length, decision, confidence, categories,
                    rail_type, model_used, model_version, config_version,
                    latency_ms, metadata
                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
                          $12, $13, $14, $15, $16, $17, $18, $19)
            ''',
                event.event_id, event.timestamp, event.event_type.value,
                event.request_id, event.user_id, event.session_id,
                event.input_hash, event.input_length, event.output_hash,
                event.output_length, event.decision, event.confidence,
                json.dumps(event.categories), event.rail_type,
                event.model_used, event.model_version, event.config_version,
                event.latency_ms, json.dumps(event.metadata)
            )

    async def query(
        self,
        start_time: datetime = None,
        end_time: datetime = None,
        user_id: str = None,
        event_type: str = None,
        limit: int = 1000
    ) -> List[AuditEvent]:
        """Query audit logs with filters."""
        conditions = []
        params = []
        param_count = 0

        if start_time:
            param_count += 1
            conditions.append(f"timestamp >= ${param_count}")
            params.append(start_time)

        if end_time:
            param_count += 1
            conditions.append(f"timestamp <= ${param_count}")
            params.append(end_time)

        if user_id:
            param_count += 1
            conditions.append(f"user_id = ${param_count}")
            params.append(user_id)

        if event_type:
            param_count += 1
            conditions.append(f"event_type = ${param_count}")
            params.append(event_type)

        where_clause = " AND ".join(conditions) if conditions else "1=1"

        query = f'''
            SELECT * FROM guardrail_audit
            WHERE {where_clause}
            ORDER BY timestamp DESC
            LIMIT {limit}
        '''

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, *params)

        return [self._row_to_event(row) for row in rows]

Compliance Reporting

from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class ComplianceReport:
    period_start: datetime
    period_end: datetime
    total_requests: int
    blocked_requests: int
    block_rate: float
    category_breakdown: Dict[str, int]
    avg_latency_ms: float
    p99_latency_ms: float
    escalation_count: int
    override_count: int

class ComplianceReporter:
    """Generate compliance reports from audit logs."""

    def __init__(self, storage: AuditStorage):
        self.storage = storage

    async def generate_report(
        self,
        start_time: datetime,
        end_time: datetime
    ) -> ComplianceReport:
        """Generate compliance report for period."""
        events = await self.storage.query(
            start_time=start_time,
            end_time=end_time
        )

        # Aggregate metrics
        total = len(events)
        blocked = sum(1 for e in events if e.decision == "blocked")

        category_counts = {}
        latencies = []

        for event in events:
            for cat in event.categories:
                category_counts[cat] = category_counts.get(cat, 0) + 1
            if event.latency_ms > 0:
                latencies.append(event.latency_ms)

        escalations = sum(
            1 for e in events
            if e.event_type == AuditEventType.ESCALATION
        )

        overrides = sum(
            1 for e in events
            if e.event_type == AuditEventType.OVERRIDE
        )

        latencies.sort()

        return ComplianceReport(
            period_start=start_time,
            period_end=end_time,
            total_requests=total,
            blocked_requests=blocked,
            block_rate=blocked / total * 100 if total > 0 else 0,
            category_breakdown=category_counts,
            avg_latency_ms=sum(latencies) / len(latencies) if latencies else 0,
            p99_latency_ms=latencies[int(len(latencies) * 0.99)] if latencies else 0,
            escalation_count=escalations,
            override_count=overrides
        )

    async def generate_user_report(
        self,
        user_id: str,
        start_time: datetime,
        end_time: datetime
    ) -> dict:
        """Generate per-user compliance report."""
        events = await self.storage.query(
            user_id=user_id,
            start_time=start_time,
            end_time=end_time
        )

        return {
            "user_id": user_id,
            "period": f"{start_time.date()} to {end_time.date()}",
            "total_interactions": len(events),
            "blocked_count": sum(1 for e in events if e.decision == "blocked"),
            "categories_triggered": list(set(
                cat for e in events for cat in e.categories
            )),
            "risk_score": self._calculate_risk_score(events)
        }

    def _calculate_risk_score(self, events: List[AuditEvent]) -> float:
        """Calculate user risk score based on violations."""
        if not events:
            return 0.0

        blocked = sum(1 for e in events if e.decision == "blocked")
        total = len(events)

        # Weight by category severity
        severity_weights = {
            "hate_speech": 3.0,
            "violence": 3.0,
            "self_harm": 2.5,
            "sexual_content": 2.0,
            "harassment": 2.0,
            "spam": 1.0,
            "off_topic": 0.5
        }

        weighted_score = 0
        for event in events:
            for cat in event.categories:
                weighted_score += severity_weights.get(cat, 1.0)

        return min(100, (blocked / total * 50) + (weighted_score / total * 10))

Compliance Tip: Store audit logs for the retention period required by your industry (typically 1-7 years). Use append-only storage and cryptographic signatures for tamper-evidence.

Next: Next steps and continuous improvement. :::

Quiz

Module 6: Production Deployment & Observability

Take Quiz
FREE WEEKLY NEWSLETTER

Stay on the Nerd Track

One email per week — courses, deep dives, tools, and AI experiments.

No spam. Unsubscribe anytime.