Production Security Patterns
Monitoring & Logging
3 min read
Effective monitoring and logging are essential for detecting attacks, analyzing incidents, and improving your security posture. This lesson covers what to log, how to detect anomalies, and setting up alerts.
What to Log in LLM Applications
┌─────────────────────────────────────────────────────────────┐
│ LLM Security Logging │
│ │
│ Log These Events: │
│ ✓ All user inputs (sanitized) │
│ ✓ LLM outputs (or hashes for privacy) │
│ ✓ Blocked requests with reason │
│ ✓ Rate limit hits │
│ ✓ Authentication failures │
│ ✓ Guardrail triggers │
│ ✓ API latency and token usage │
│ ✓ Error rates and types │
└─────────────────────────────────────────────────────────────┘
Structured Logging Implementation
import json
import logging
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any
from pathlib import Path
import hashlib
@dataclass
class LLMSecurityLog:
"""Structured log entry for LLM security events."""
timestamp: str
event_type: str
request_id: str
user_id: Optional[str]
ip_address: str
input_hash: str # Hash for privacy
input_length: int
output_hash: Optional[str] = None
output_length: Optional[int] = None
blocked: bool = False
block_reason: Optional[str] = None
guardrail_triggered: Optional[str] = None
latency_ms: Optional[float] = None
token_count: Optional[int] = None
metadata: Optional[Dict[str, Any]] = None
class SecurityLogger:
"""Security-focused logger for LLM applications."""
def __init__(self, log_path: Path):
self.log_path = log_path
self._setup_logger()
def _setup_logger(self):
"""Configure structured logging."""
self.logger = logging.getLogger("llm_security")
self.logger.setLevel(logging.INFO)
# JSON file handler
handler = logging.FileHandler(self.log_path)
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def _hash_content(self, content: str) -> str:
"""Create privacy-preserving hash of content."""
return hashlib.sha256(content.encode()).hexdigest()[:16]
def log_request(
self,
request_id: str,
user_id: Optional[str],
ip_address: str,
user_input: str,
llm_output: Optional[str] = None,
blocked: bool = False,
block_reason: Optional[str] = None,
guardrail: Optional[str] = None,
latency_ms: Optional[float] = None,
token_count: Optional[int] = None,
metadata: Optional[Dict] = None
):
"""Log a request with security context."""
log_entry = LLMSecurityLog(
timestamp=datetime.utcnow().isoformat(),
event_type="llm_request",
request_id=request_id,
user_id=user_id,
ip_address=ip_address,
input_hash=self._hash_content(user_input),
input_length=len(user_input),
output_hash=self._hash_content(llm_output) if llm_output else None,
output_length=len(llm_output) if llm_output else None,
blocked=blocked,
block_reason=block_reason,
guardrail_triggered=guardrail,
latency_ms=latency_ms,
token_count=token_count,
metadata=metadata
)
self.logger.info(json.dumps(asdict(log_entry)))
def log_security_event(
self,
event_type: str,
request_id: str,
details: Dict[str, Any]
):
"""Log a security-specific event."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"request_id": request_id,
**details
}
self.logger.info(json.dumps(entry))
# Usage
logger = SecurityLogger(Path("./security.log"))
logger.log_request(
request_id="req-123",
user_id="user-456",
ip_address="192.168.1.1",
user_input="Hello, how are you?",
llm_output="I'm doing well, thank you!",
latency_ms=245.5,
token_count=15
)
Anomaly Detection
from collections import defaultdict
from datetime import datetime, timedelta
from typing import List, Tuple
import statistics
class AnomalyDetector:
"""Detect suspicious patterns in LLM usage."""
def __init__(self):
self.user_patterns = defaultdict(list)
self.blocked_attempts = defaultdict(list)
self.latency_history = []
def record_request(
self,
user_id: str,
timestamp: datetime,
was_blocked: bool,
latency_ms: float
):
"""Record request for pattern analysis."""
self.user_patterns[user_id].append(timestamp)
self.latency_history.append(latency_ms)
if was_blocked:
self.blocked_attempts[user_id].append(timestamp)
def check_anomalies(self, user_id: str) -> List[Tuple[str, str]]:
"""Check for anomalous patterns."""
anomalies = []
# Check 1: Rapid requests (potential automation)
recent = self._get_recent_requests(user_id, minutes=1)
if len(recent) > 30: # More than 30 req/min
anomalies.append((
"rapid_requests",
f"User made {len(recent)} requests in 1 minute"
))
# Check 2: High block rate
block_rate = self._calculate_block_rate(user_id)
if block_rate > 0.3: # More than 30% blocked
anomalies.append((
"high_block_rate",
f"Block rate: {block_rate:.1%}"
))
# Check 3: Unusual hours (if user has history)
if self._is_unusual_hour(user_id):
anomalies.append((
"unusual_activity_time",
"Activity outside normal hours"
))
return anomalies
def _get_recent_requests(
self,
user_id: str,
minutes: int
) -> List[datetime]:
"""Get requests in last N minutes."""
cutoff = datetime.utcnow() - timedelta(minutes=minutes)
return [
t for t in self.user_patterns[user_id]
if t > cutoff
]
def _calculate_block_rate(self, user_id: str) -> float:
"""Calculate percentage of blocked requests."""
total = len(self.user_patterns[user_id])
blocked = len(self.blocked_attempts[user_id])
if total == 0:
return 0.0
return blocked / total
def _is_unusual_hour(self, user_id: str) -> bool:
"""Check if current hour is unusual for user."""
if len(self.user_patterns[user_id]) < 100:
return False # Not enough history
current_hour = datetime.utcnow().hour
historical_hours = [
t.hour for t in self.user_patterns[user_id]
]
# Check if current hour is rare (less than 5% of activity)
hour_count = historical_hours.count(current_hour)
return hour_count / len(historical_hours) < 0.05
# Usage
detector = AnomalyDetector()
# Record requests
detector.record_request("user123", datetime.utcnow(), False, 200)
detector.record_request("user123", datetime.utcnow(), True, 50)
# Check for anomalies
anomalies = detector.check_anomalies("user123")
for anomaly_type, description in anomalies:
print(f"[ALERT] {anomaly_type}: {description}")
Alerting System
from dataclasses import dataclass
from enum import Enum
from typing import Callable, List
import json
class AlertSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class SecurityAlert:
severity: AlertSeverity
alert_type: str
message: str
user_id: Optional[str]
request_id: Optional[str]
metadata: Dict[str, Any]
class AlertingSystem:
"""Security alerting system."""
def __init__(self):
self.handlers: List[Callable[[SecurityAlert], None]] = []
self.alert_history = []
def add_handler(self, handler: Callable[[SecurityAlert], None]):
"""Add alert handler (email, Slack, PagerDuty, etc.)."""
self.handlers.append(handler)
def alert(self, alert: SecurityAlert):
"""Send alert to all handlers."""
self.alert_history.append(alert)
for handler in self.handlers:
try:
handler(alert)
except Exception as e:
print(f"Alert handler failed: {e}")
def check_and_alert(
self,
anomalies: List[Tuple[str, str]],
user_id: str,
request_id: str
):
"""Create alerts for detected anomalies."""
severity_map = {
"rapid_requests": AlertSeverity.MEDIUM,
"high_block_rate": AlertSeverity.HIGH,
"unusual_activity_time": AlertSeverity.LOW,
"injection_attempt": AlertSeverity.CRITICAL,
}
for anomaly_type, description in anomalies:
severity = severity_map.get(anomaly_type, AlertSeverity.MEDIUM)
self.alert(SecurityAlert(
severity=severity,
alert_type=anomaly_type,
message=description,
user_id=user_id,
request_id=request_id,
metadata={"detected_at": datetime.utcnow().isoformat()}
))
# Example handlers
def console_handler(alert: SecurityAlert):
"""Print alerts to console."""
print(f"[{alert.severity.value.upper()}] {alert.alert_type}: {alert.message}")
def slack_handler(alert: SecurityAlert):
"""Send alert to Slack (placeholder)."""
# In production, use Slack SDK
webhook_payload = {
"text": f"*{alert.severity.value.upper()}* - {alert.alert_type}",
"blocks": [
{
"type": "section",
"text": {"type": "mrkdwn", "text": alert.message}
}
]
}
# requests.post(SLACK_WEBHOOK_URL, json=webhook_payload)
# Setup
alerting = AlertingSystem()
alerting.add_handler(console_handler)
Dashboard Metrics
from collections import Counter
from datetime import datetime, timedelta
class SecurityMetrics:
"""Collect and expose security metrics."""
def __init__(self):
self.counters = Counter()
self.request_times = []
def record(self, metric: str, value: int = 1):
"""Record a metric."""
self.counters[metric] += value
def get_summary(self) -> Dict[str, Any]:
"""Get metrics summary for dashboard."""
return {
"total_requests": self.counters["requests"],
"blocked_requests": self.counters["blocked"],
"block_rate": self._calculate_rate("blocked", "requests"),
"injection_attempts": self.counters["injection_detected"],
"rate_limit_hits": self.counters["rate_limited"],
"auth_failures": self.counters["auth_failed"],
"guardrail_triggers": self.counters["guardrail_triggered"],
}
def _calculate_rate(self, numerator: str, denominator: str) -> float:
denom = self.counters[denominator]
if denom == 0:
return 0.0
return self.counters[numerator] / denom
# Expose as Prometheus metrics (example)
def expose_prometheus_metrics(metrics: SecurityMetrics) -> str:
"""Format metrics for Prometheus scraping."""
summary = metrics.get_summary()
lines = []
for key, value in summary.items():
metric_name = f"llm_security_{key}"
lines.append(f"{metric_name} {value}")
return "\n".join(lines)
Key Takeaway: Comprehensive logging and monitoring enable you to detect attacks in real-time, investigate incidents after the fact, and continuously improve your security posture based on observed patterns. :::