Production Security Patterns

Monitoring & Logging

3 min read

Effective monitoring and logging are essential for detecting attacks, analyzing incidents, and improving your security posture. This lesson covers what to log, how to detect anomalies, and setting up alerts.

What to Log in LLM Applications

┌─────────────────────────────────────────────────────────────┐
│                    LLM Security Logging                      │
│                                                             │
│   Log These Events:                                         │
│   ✓ All user inputs (sanitized)                             │
│   ✓ LLM outputs (or hashes for privacy)                     │
│   ✓ Blocked requests with reason                            │
│   ✓ Rate limit hits                                         │
│   ✓ Authentication failures                                 │
│   ✓ Guardrail triggers                                      │
│   ✓ API latency and token usage                            │
│   ✓ Error rates and types                                  │
└─────────────────────────────────────────────────────────────┘

Structured Logging Implementation

import json
import logging
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any
from pathlib import Path
import hashlib

@dataclass
class LLMSecurityLog:
    """Structured log entry for LLM security events."""
    timestamp: str
    event_type: str
    request_id: str
    user_id: Optional[str]
    ip_address: str
    input_hash: str  # Hash for privacy
    input_length: int
    output_hash: Optional[str] = None
    output_length: Optional[int] = None
    blocked: bool = False
    block_reason: Optional[str] = None
    guardrail_triggered: Optional[str] = None
    latency_ms: Optional[float] = None
    token_count: Optional[int] = None
    metadata: Optional[Dict[str, Any]] = None

class SecurityLogger:
    """Security-focused logger for LLM applications."""

    def __init__(self, log_path: Path):
        self.log_path = log_path
        self._setup_logger()

    def _setup_logger(self):
        """Configure structured logging."""
        self.logger = logging.getLogger("llm_security")
        self.logger.setLevel(logging.INFO)

        # JSON file handler
        handler = logging.FileHandler(self.log_path)
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)

    def _hash_content(self, content: str) -> str:
        """Create privacy-preserving hash of content."""
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def log_request(
        self,
        request_id: str,
        user_id: Optional[str],
        ip_address: str,
        user_input: str,
        llm_output: Optional[str] = None,
        blocked: bool = False,
        block_reason: Optional[str] = None,
        guardrail: Optional[str] = None,
        latency_ms: Optional[float] = None,
        token_count: Optional[int] = None,
        metadata: Optional[Dict] = None
    ):
        """Log a request with security context."""
        log_entry = LLMSecurityLog(
            timestamp=datetime.utcnow().isoformat(),
            event_type="llm_request",
            request_id=request_id,
            user_id=user_id,
            ip_address=ip_address,
            input_hash=self._hash_content(user_input),
            input_length=len(user_input),
            output_hash=self._hash_content(llm_output) if llm_output else None,
            output_length=len(llm_output) if llm_output else None,
            blocked=blocked,
            block_reason=block_reason,
            guardrail_triggered=guardrail,
            latency_ms=latency_ms,
            token_count=token_count,
            metadata=metadata
        )

        self.logger.info(json.dumps(asdict(log_entry)))

    def log_security_event(
        self,
        event_type: str,
        request_id: str,
        details: Dict[str, Any]
    ):
        """Log a security-specific event."""
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "request_id": request_id,
            **details
        }
        self.logger.info(json.dumps(entry))

# Usage
logger = SecurityLogger(Path("./security.log"))

logger.log_request(
    request_id="req-123",
    user_id="user-456",
    ip_address="192.168.1.1",
    user_input="Hello, how are you?",
    llm_output="I'm doing well, thank you!",
    latency_ms=245.5,
    token_count=15
)

Anomaly Detection

from collections import defaultdict
from datetime import datetime, timedelta
from typing import List, Tuple
import statistics

class AnomalyDetector:
    """Detect suspicious patterns in LLM usage."""

    def __init__(self):
        self.user_patterns = defaultdict(list)
        self.blocked_attempts = defaultdict(list)
        self.latency_history = []

    def record_request(
        self,
        user_id: str,
        timestamp: datetime,
        was_blocked: bool,
        latency_ms: float
    ):
        """Record request for pattern analysis."""
        self.user_patterns[user_id].append(timestamp)
        self.latency_history.append(latency_ms)

        if was_blocked:
            self.blocked_attempts[user_id].append(timestamp)

    def check_anomalies(self, user_id: str) -> List[Tuple[str, str]]:
        """Check for anomalous patterns."""
        anomalies = []

        # Check 1: Rapid requests (potential automation)
        recent = self._get_recent_requests(user_id, minutes=1)
        if len(recent) > 30:  # More than 30 req/min
            anomalies.append((
                "rapid_requests",
                f"User made {len(recent)} requests in 1 minute"
            ))

        # Check 2: High block rate
        block_rate = self._calculate_block_rate(user_id)
        if block_rate > 0.3:  # More than 30% blocked
            anomalies.append((
                "high_block_rate",
                f"Block rate: {block_rate:.1%}"
            ))

        # Check 3: Unusual hours (if user has history)
        if self._is_unusual_hour(user_id):
            anomalies.append((
                "unusual_activity_time",
                "Activity outside normal hours"
            ))

        return anomalies

    def _get_recent_requests(
        self,
        user_id: str,
        minutes: int
    ) -> List[datetime]:
        """Get requests in last N minutes."""
        cutoff = datetime.utcnow() - timedelta(minutes=minutes)
        return [
            t for t in self.user_patterns[user_id]
            if t > cutoff
        ]

    def _calculate_block_rate(self, user_id: str) -> float:
        """Calculate percentage of blocked requests."""
        total = len(self.user_patterns[user_id])
        blocked = len(self.blocked_attempts[user_id])

        if total == 0:
            return 0.0
        return blocked / total

    def _is_unusual_hour(self, user_id: str) -> bool:
        """Check if current hour is unusual for user."""
        if len(self.user_patterns[user_id]) < 100:
            return False  # Not enough history

        current_hour = datetime.utcnow().hour
        historical_hours = [
            t.hour for t in self.user_patterns[user_id]
        ]

        # Check if current hour is rare (less than 5% of activity)
        hour_count = historical_hours.count(current_hour)
        return hour_count / len(historical_hours) < 0.05

# Usage
detector = AnomalyDetector()

# Record requests
detector.record_request("user123", datetime.utcnow(), False, 200)
detector.record_request("user123", datetime.utcnow(), True, 50)

# Check for anomalies
anomalies = detector.check_anomalies("user123")
for anomaly_type, description in anomalies:
    print(f"[ALERT] {anomaly_type}: {description}")

Alerting System

from dataclasses import dataclass
from enum import Enum
from typing import Callable, List
import json

class AlertSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class SecurityAlert:
    severity: AlertSeverity
    alert_type: str
    message: str
    user_id: Optional[str]
    request_id: Optional[str]
    metadata: Dict[str, Any]

class AlertingSystem:
    """Security alerting system."""

    def __init__(self):
        self.handlers: List[Callable[[SecurityAlert], None]] = []
        self.alert_history = []

    def add_handler(self, handler: Callable[[SecurityAlert], None]):
        """Add alert handler (email, Slack, PagerDuty, etc.)."""
        self.handlers.append(handler)

    def alert(self, alert: SecurityAlert):
        """Send alert to all handlers."""
        self.alert_history.append(alert)

        for handler in self.handlers:
            try:
                handler(alert)
            except Exception as e:
                print(f"Alert handler failed: {e}")

    def check_and_alert(
        self,
        anomalies: List[Tuple[str, str]],
        user_id: str,
        request_id: str
    ):
        """Create alerts for detected anomalies."""
        severity_map = {
            "rapid_requests": AlertSeverity.MEDIUM,
            "high_block_rate": AlertSeverity.HIGH,
            "unusual_activity_time": AlertSeverity.LOW,
            "injection_attempt": AlertSeverity.CRITICAL,
        }

        for anomaly_type, description in anomalies:
            severity = severity_map.get(anomaly_type, AlertSeverity.MEDIUM)

            self.alert(SecurityAlert(
                severity=severity,
                alert_type=anomaly_type,
                message=description,
                user_id=user_id,
                request_id=request_id,
                metadata={"detected_at": datetime.utcnow().isoformat()}
            ))

# Example handlers
def console_handler(alert: SecurityAlert):
    """Print alerts to console."""
    print(f"[{alert.severity.value.upper()}] {alert.alert_type}: {alert.message}")

def slack_handler(alert: SecurityAlert):
    """Send alert to Slack (placeholder)."""
    # In production, use Slack SDK
    webhook_payload = {
        "text": f"*{alert.severity.value.upper()}* - {alert.alert_type}",
        "blocks": [
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": alert.message}
            }
        ]
    }
    # requests.post(SLACK_WEBHOOK_URL, json=webhook_payload)

# Setup
alerting = AlertingSystem()
alerting.add_handler(console_handler)

Dashboard Metrics

from collections import Counter
from datetime import datetime, timedelta

class SecurityMetrics:
    """Collect and expose security metrics."""

    def __init__(self):
        self.counters = Counter()
        self.request_times = []

    def record(self, metric: str, value: int = 1):
        """Record a metric."""
        self.counters[metric] += value

    def get_summary(self) -> Dict[str, Any]:
        """Get metrics summary for dashboard."""
        return {
            "total_requests": self.counters["requests"],
            "blocked_requests": self.counters["blocked"],
            "block_rate": self._calculate_rate("blocked", "requests"),
            "injection_attempts": self.counters["injection_detected"],
            "rate_limit_hits": self.counters["rate_limited"],
            "auth_failures": self.counters["auth_failed"],
            "guardrail_triggers": self.counters["guardrail_triggered"],
        }

    def _calculate_rate(self, numerator: str, denominator: str) -> float:
        denom = self.counters[denominator]
        if denom == 0:
            return 0.0
        return self.counters[numerator] / denom

# Expose as Prometheus metrics (example)
def expose_prometheus_metrics(metrics: SecurityMetrics) -> str:
    """Format metrics for Prometheus scraping."""
    summary = metrics.get_summary()
    lines = []
    for key, value in summary.items():
        metric_name = f"llm_security_{key}"
        lines.append(f"{metric_name} {value}")
    return "\n".join(lines)

Key Takeaway: Comprehensive logging and monitoring enable you to detect attacks in real-time, investigate incidents after the fact, and continuously improve your security posture based on observed patterns. :::

Quiz

Module 5: Production Security Patterns

Take Quiz