Production Security Patterns

Defense in Depth Architecture

3 min read

Defense in depth is a security strategy that layers multiple controls so that no single point of failure can compromise the entire system. This lesson shows how to apply this principle to LLM applications.

The Layered Security Model

┌─────────────────────────────────────────────────────────────┐
│                    Defense in Depth Layers                   │
│                                                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │  Layer 1: Network & Infrastructure                   │   │
│   │  WAF, DDoS protection, TLS, API gateway             │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │  Layer 2: Authentication & Authorization             │   │
│   │  API keys, OAuth, RBAC, session management          │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │  Layer 3: Input Validation                          │   │
│   │  Length limits, pattern detection, content filters   │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │  Layer 4: LLM Guardrails                            │   │
│   │  NeMo Guardrails, LLaMA Guard, topic rails          │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │  Layer 5: Output Sanitization                       │   │
│   │  XSS prevention, PII redaction, content moderation  │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │  Layer 6: Monitoring & Response                     │   │
│   │  Logging, alerting, incident response               │   │
│   └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

Implementing the Security Pipeline

from dataclasses import dataclass
from typing import Optional, List, Callable
from enum import Enum
import time

class SecurityDecision(Enum):
    ALLOW = "allow"
    BLOCK = "block"
    RATE_LIMIT = "rate_limit"
    REQUIRE_AUTH = "require_auth"

@dataclass
class SecurityContext:
    user_id: Optional[str]
    ip_address: str
    request_id: str
    timestamp: float
    is_authenticated: bool = False
    roles: List[str] = None

@dataclass
class SecurityCheckResult:
    passed: bool
    layer: str
    decision: SecurityDecision
    reason: Optional[str] = None

class SecurityPipeline:
    """Defense in depth security pipeline."""

    def __init__(self):
        self.layers: List[Callable] = []
        self.metrics = {"checks": 0, "blocked": 0}

    def add_layer(self, check_func: Callable, name: str):
        """Add a security layer to the pipeline."""
        self.layers.append((name, check_func))

    def check(
        self,
        content: str,
        context: SecurityContext
    ) -> List[SecurityCheckResult]:
        """Run content through all security layers."""
        results = []
        self.metrics["checks"] += 1

        for layer_name, check_func in self.layers:
            result = check_func(content, context)
            result.layer = layer_name
            results.append(result)

            # Stop on first block
            if not result.passed:
                self.metrics["blocked"] += 1
                break

        return results

    def is_allowed(self, results: List[SecurityCheckResult]) -> bool:
        """Check if all layers passed."""
        return all(r.passed for r in results)

Layer Implementations

Layer 1: Rate Limiting

from collections import defaultdict
import time

class RateLimiter:
    """Token bucket rate limiter."""

    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.buckets = defaultdict(list)

    def check(
        self,
        content: str,
        context: SecurityContext
    ) -> SecurityCheckResult:
        """Check rate limit for user."""
        key = context.user_id or context.ip_address
        now = time.time()
        minute_ago = now - 60

        # Clean old requests
        self.buckets[key] = [
            t for t in self.buckets[key] if t > minute_ago
        ]

        if len(self.buckets[key]) >= self.requests_per_minute:
            return SecurityCheckResult(
                passed=False,
                layer="rate_limit",
                decision=SecurityDecision.RATE_LIMIT,
                reason=f"Rate limit exceeded: {self.requests_per_minute}/min"
            )

        self.buckets[key].append(now)
        return SecurityCheckResult(
            passed=True,
            layer="rate_limit",
            decision=SecurityDecision.ALLOW
        )

Layer 2: Authentication Check

class AuthenticationLayer:
    """Check authentication and authorization."""

    def __init__(self, required_roles: List[str] = None):
        self.required_roles = required_roles or []

    def check(
        self,
        content: str,
        context: SecurityContext
    ) -> SecurityCheckResult:
        """Verify authentication."""
        if not context.is_authenticated:
            return SecurityCheckResult(
                passed=False,
                layer="authentication",
                decision=SecurityDecision.REQUIRE_AUTH,
                reason="Authentication required"
            )

        # Check required roles
        if self.required_roles:
            user_roles = set(context.roles or [])
            required = set(self.required_roles)

            if not required.intersection(user_roles):
                return SecurityCheckResult(
                    passed=False,
                    layer="authorization",
                    decision=SecurityDecision.BLOCK,
                    reason=f"Missing required role: {self.required_roles}"
                )

        return SecurityCheckResult(
            passed=True,
            layer="authentication",
            decision=SecurityDecision.ALLOW
        )

Layer 3: Input Validation

import re

class InputValidationLayer:
    """Validate input content."""

    def __init__(self, max_length: int = 4000):
        self.max_length = max_length
        self.blocked_patterns = [
            r"ignore\s+previous\s+instructions",
            r"system\s*prompt",
            r"<script>",
        ]

    def check(
        self,
        content: str,
        context: SecurityContext
    ) -> SecurityCheckResult:
        """Validate input."""
        # Length check
        if len(content) > self.max_length:
            return SecurityCheckResult(
                passed=False,
                layer="input_validation",
                decision=SecurityDecision.BLOCK,
                reason=f"Input exceeds {self.max_length} characters"
            )

        # Pattern check
        content_lower = content.lower()
        for pattern in self.blocked_patterns:
            if re.search(pattern, content_lower):
                return SecurityCheckResult(
                    passed=False,
                    layer="input_validation",
                    decision=SecurityDecision.BLOCK,
                    reason="Blocked pattern detected"
                )

        return SecurityCheckResult(
            passed=True,
            layer="input_validation",
            decision=SecurityDecision.ALLOW
        )

Complete Pipeline Example

class SecureLLMApplication:
    """LLM application with defense in depth."""

    def __init__(self, llm_client):
        self.llm = llm_client
        self.pipeline = self._build_pipeline()

    def _build_pipeline(self) -> SecurityPipeline:
        """Build the security pipeline."""
        pipeline = SecurityPipeline()

        # Add layers in order
        pipeline.add_layer(
            RateLimiter(requests_per_minute=30).check,
            "rate_limiting"
        )
        pipeline.add_layer(
            AuthenticationLayer().check,
            "authentication"
        )
        pipeline.add_layer(
            InputValidationLayer(max_length=4000).check,
            "input_validation"
        )

        return pipeline

    async def process_request(
        self,
        user_input: str,
        context: SecurityContext
    ) -> dict:
        """Process request through security pipeline."""
        # Run security checks
        results = self.pipeline.check(user_input, context)

        if not self.pipeline.is_allowed(results):
            failed = next(r for r in results if not r.passed)
            return {
                "success": False,
                "error": failed.reason,
                "decision": failed.decision.value
            }

        # Generate LLM response
        response = await self.llm.generate(user_input)

        # Post-process output (additional layer)
        sanitized = self._sanitize_output(response)

        return {
            "success": True,
            "response": sanitized,
            "security_checks": len(results)
        }

    def _sanitize_output(self, response: str) -> str:
        """Sanitize LLM output."""
        import html
        return html.escape(response)

# Usage
app = SecureLLMApplication(llm_client)

context = SecurityContext(
    user_id="user123",
    ip_address="192.168.1.1",
    request_id="req-abc",
    timestamp=time.time(),
    is_authenticated=True,
    roles=["user"]
)

result = await app.process_request("Hello, how are you?", context)

Best Practices

Practice Implementation
Fail closed Block on any layer failure
Log at each layer Track where attacks are caught
Independent layers Each layer works standalone
Graceful degradation Clear error messages to users
Regular testing Verify each layer catches its threats

Key Takeaway: No single security control is perfect. Layer multiple defenses so attackers must bypass all of them to succeed. :::

Quiz

Module 5: Production Security Patterns

Take Quiz