Production Security Patterns
Defense in Depth Architecture
3 min read
Defense in depth is a security strategy that layers multiple controls so that no single point of failure can compromise the entire system. This lesson shows how to apply this principle to LLM applications.
The Layered Security Model
┌─────────────────────────────────────────────────────────────┐
│ Defense in Depth Layers │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Layer 1: Network & Infrastructure │ │
│ │ WAF, DDoS protection, TLS, API gateway │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Layer 2: Authentication & Authorization │ │
│ │ API keys, OAuth, RBAC, session management │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Layer 3: Input Validation │ │
│ │ Length limits, pattern detection, content filters │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Layer 4: LLM Guardrails │ │
│ │ NeMo Guardrails, LLaMA Guard, topic rails │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Layer 5: Output Sanitization │ │
│ │ XSS prevention, PII redaction, content moderation │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Layer 6: Monitoring & Response │ │
│ │ Logging, alerting, incident response │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Implementing the Security Pipeline
from dataclasses import dataclass
from typing import Optional, List, Callable
from enum import Enum
import time
class SecurityDecision(Enum):
ALLOW = "allow"
BLOCK = "block"
RATE_LIMIT = "rate_limit"
REQUIRE_AUTH = "require_auth"
@dataclass
class SecurityContext:
user_id: Optional[str]
ip_address: str
request_id: str
timestamp: float
is_authenticated: bool = False
roles: List[str] = None
@dataclass
class SecurityCheckResult:
passed: bool
layer: str
decision: SecurityDecision
reason: Optional[str] = None
class SecurityPipeline:
"""Defense in depth security pipeline."""
def __init__(self):
self.layers: List[Callable] = []
self.metrics = {"checks": 0, "blocked": 0}
def add_layer(self, check_func: Callable, name: str):
"""Add a security layer to the pipeline."""
self.layers.append((name, check_func))
def check(
self,
content: str,
context: SecurityContext
) -> List[SecurityCheckResult]:
"""Run content through all security layers."""
results = []
self.metrics["checks"] += 1
for layer_name, check_func in self.layers:
result = check_func(content, context)
result.layer = layer_name
results.append(result)
# Stop on first block
if not result.passed:
self.metrics["blocked"] += 1
break
return results
def is_allowed(self, results: List[SecurityCheckResult]) -> bool:
"""Check if all layers passed."""
return all(r.passed for r in results)
Layer Implementations
Layer 1: Rate Limiting
from collections import defaultdict
import time
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, requests_per_minute: int = 60):
self.requests_per_minute = requests_per_minute
self.buckets = defaultdict(list)
def check(
self,
content: str,
context: SecurityContext
) -> SecurityCheckResult:
"""Check rate limit for user."""
key = context.user_id or context.ip_address
now = time.time()
minute_ago = now - 60
# Clean old requests
self.buckets[key] = [
t for t in self.buckets[key] if t > minute_ago
]
if len(self.buckets[key]) >= self.requests_per_minute:
return SecurityCheckResult(
passed=False,
layer="rate_limit",
decision=SecurityDecision.RATE_LIMIT,
reason=f"Rate limit exceeded: {self.requests_per_minute}/min"
)
self.buckets[key].append(now)
return SecurityCheckResult(
passed=True,
layer="rate_limit",
decision=SecurityDecision.ALLOW
)
Layer 2: Authentication Check
class AuthenticationLayer:
"""Check authentication and authorization."""
def __init__(self, required_roles: List[str] = None):
self.required_roles = required_roles or []
def check(
self,
content: str,
context: SecurityContext
) -> SecurityCheckResult:
"""Verify authentication."""
if not context.is_authenticated:
return SecurityCheckResult(
passed=False,
layer="authentication",
decision=SecurityDecision.REQUIRE_AUTH,
reason="Authentication required"
)
# Check required roles
if self.required_roles:
user_roles = set(context.roles or [])
required = set(self.required_roles)
if not required.intersection(user_roles):
return SecurityCheckResult(
passed=False,
layer="authorization",
decision=SecurityDecision.BLOCK,
reason=f"Missing required role: {self.required_roles}"
)
return SecurityCheckResult(
passed=True,
layer="authentication",
decision=SecurityDecision.ALLOW
)
Layer 3: Input Validation
import re
class InputValidationLayer:
"""Validate input content."""
def __init__(self, max_length: int = 4000):
self.max_length = max_length
self.blocked_patterns = [
r"ignore\s+previous\s+instructions",
r"system\s*prompt",
r"<script>",
]
def check(
self,
content: str,
context: SecurityContext
) -> SecurityCheckResult:
"""Validate input."""
# Length check
if len(content) > self.max_length:
return SecurityCheckResult(
passed=False,
layer="input_validation",
decision=SecurityDecision.BLOCK,
reason=f"Input exceeds {self.max_length} characters"
)
# Pattern check
content_lower = content.lower()
for pattern in self.blocked_patterns:
if re.search(pattern, content_lower):
return SecurityCheckResult(
passed=False,
layer="input_validation",
decision=SecurityDecision.BLOCK,
reason="Blocked pattern detected"
)
return SecurityCheckResult(
passed=True,
layer="input_validation",
decision=SecurityDecision.ALLOW
)
Complete Pipeline Example
class SecureLLMApplication:
"""LLM application with defense in depth."""
def __init__(self, llm_client):
self.llm = llm_client
self.pipeline = self._build_pipeline()
def _build_pipeline(self) -> SecurityPipeline:
"""Build the security pipeline."""
pipeline = SecurityPipeline()
# Add layers in order
pipeline.add_layer(
RateLimiter(requests_per_minute=30).check,
"rate_limiting"
)
pipeline.add_layer(
AuthenticationLayer().check,
"authentication"
)
pipeline.add_layer(
InputValidationLayer(max_length=4000).check,
"input_validation"
)
return pipeline
async def process_request(
self,
user_input: str,
context: SecurityContext
) -> dict:
"""Process request through security pipeline."""
# Run security checks
results = self.pipeline.check(user_input, context)
if not self.pipeline.is_allowed(results):
failed = next(r for r in results if not r.passed)
return {
"success": False,
"error": failed.reason,
"decision": failed.decision.value
}
# Generate LLM response
response = await self.llm.generate(user_input)
# Post-process output (additional layer)
sanitized = self._sanitize_output(response)
return {
"success": True,
"response": sanitized,
"security_checks": len(results)
}
def _sanitize_output(self, response: str) -> str:
"""Sanitize LLM output."""
import html
return html.escape(response)
# Usage
app = SecureLLMApplication(llm_client)
context = SecurityContext(
user_id="user123",
ip_address="192.168.1.1",
request_id="req-abc",
timestamp=time.time(),
is_authenticated=True,
roles=["user"]
)
result = await app.process_request("Hello, how are you?", context)
Best Practices
| Practice | Implementation |
|---|---|
| Fail closed | Block on any layer failure |
| Log at each layer | Track where attacks are caught |
| Independent layers | Each layer works standalone |
| Graceful degradation | Clear error messages to users |
| Regular testing | Verify each layer catches its threats |
Key Takeaway: No single security control is perfect. Layer multiple defenses so attackers must bypass all of them to succeed. :::