Lesson 19 of 23

Production & Reliability

Safety & Guardrails

3 min read

Production AI systems need multiple layers of safety—input validation, output filtering, and behavioral constraints. This lesson covers implementing comprehensive guardrails.

Defense in Depth

┌─────────────────────────────────────────────────────────────┐
│                     User Input                               │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│  Layer 1: Input Validation                                   │
│  - Length limits                                             │
│  - Format validation                                         │
│  - Injection detection                                       │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│  Layer 2: Content Moderation                                 │
│  - Harmful content detection                                 │
│  - PII detection                                             │
│  - Topic classification                                      │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│  Layer 3: LLM Processing                                     │
│  - System prompt constraints                                 │
│  - Tool restrictions                                         │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│  Layer 4: Output Filtering                                   │
│  - Response validation                                       │
│  - Hallucination detection                                   │
│  - Citation verification                                     │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│                     User Response                            │
└─────────────────────────────────────────────────────────────┘

Input Validation

import re
from dataclasses import dataclass
from typing import Optional

@dataclass
class ValidationResult:
    is_valid: bool
    error_message: Optional[str] = None
    sanitized_input: Optional[str] = None

class InputValidator:
    def __init__(
        self,
        max_length: int = 10000,
        max_tokens: int = 4000,
        blocked_patterns: list = None
    ):
        self.max_length = max_length
        self.max_tokens = max_tokens
        self.blocked_patterns = blocked_patterns or []

        # Common injection patterns
        self.injection_patterns = [
            r"ignore\s+(previous|above)\s+instructions",
            r"disregard\s+your\s+(system|initial)",
            r"you\s+are\s+now\s+",
            r"pretend\s+you\s+are",
            r"act\s+as\s+if",
        ]

    def validate(self, user_input: str) -> ValidationResult:
        # Check length
        if len(user_input) > self.max_length:
            return ValidationResult(
                is_valid=False,
                error_message=f"Input exceeds {self.max_length} characters"
            )

        # Check for injection attempts
        input_lower = user_input.lower()
        for pattern in self.injection_patterns:
            if re.search(pattern, input_lower):
                return ValidationResult(
                    is_valid=False,
                    error_message="Input contains prohibited patterns"
                )

        # Check blocked patterns (custom)
        for pattern in self.blocked_patterns:
            if re.search(pattern, user_input, re.IGNORECASE):
                return ValidationResult(
                    is_valid=False,
                    error_message="Input contains blocked content"
                )

        # Sanitize (remove null bytes, control characters)
        sanitized = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', user_input)

        return ValidationResult(
            is_valid=True,
            sanitized_input=sanitized
        )

Content Moderation

from enum import Enum

class ContentCategory(Enum):
    SAFE = "safe"
    HARMFUL = "harmful"
    SEXUAL = "sexual"
    HATE = "hate"
    VIOLENCE = "violence"
    SELF_HARM = "self_harm"
    PII = "pii"

class ContentModerator:
    def __init__(self, moderation_model):
        self.model = moderation_model
        self.pii_patterns = {
            "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
            "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
            "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
            "credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b"
        }

    async def moderate(self, text: str) -> dict:
        results = {
            "flagged": False,
            "categories": [],
            "pii_detected": [],
            "confidence": 0.0
        }

        # Check for PII
        for pii_type, pattern in self.pii_patterns.items():
            if re.search(pattern, text):
                results["pii_detected"].append(pii_type)
                results["flagged"] = True

        # Use moderation model for content categories
        moderation = await self.model.moderate(text)

        for category, score in moderation.scores.items():
            if score > 0.7:  # Threshold
                results["categories"].append(category)
                results["flagged"] = True
                results["confidence"] = max(results["confidence"], score)

        return results

    def redact_pii(self, text: str) -> str:
        """Replace PII with placeholders."""
        redacted = text
        for pii_type, pattern in self.pii_patterns.items():
            redacted = re.sub(pattern, f"[{pii_type.upper()}_REDACTED]", redacted)
        return redacted

Output Filtering

class OutputFilter:
    def __init__(self, allowed_domains: list = None):
        self.allowed_domains = allowed_domains or []

    async def filter(self, response: str, context: dict = None) -> dict:
        """Filter and validate LLM output."""
        issues = []

        # Check for leaked system prompt indicators
        system_leak_patterns = [
            "my system prompt",
            "i was instructed to",
            "my instructions say",
            "i am programmed to"
        ]

        for pattern in system_leak_patterns:
            if pattern in response.lower():
                issues.append("potential_system_leak")

        # Verify URLs are from allowed domains
        urls = re.findall(r'https?://[^\s<>"{}|\\^`\[\]]+', response)
        for url in urls:
            domain = url.split('/')[2]
            if self.allowed_domains and domain not in self.allowed_domains:
                issues.append(f"unauthorized_domain: {domain}")

        # Check factual claims against context (if RAG)
        if context and "sources" in context:
            # Simplified hallucination check
            claims = self._extract_claims(response)
            for claim in claims:
                if not self._verify_claim(claim, context["sources"]):
                    issues.append(f"unverified_claim: {claim[:50]}...")

        return {
            "filtered_response": response,
            "issues": issues,
            "passed": len(issues) == 0
        }

    def _extract_claims(self, text: str) -> list:
        """Extract factual claims from text."""
        # Simplified: split by sentences
        sentences = text.split('.')
        return [s.strip() for s in sentences if len(s.strip()) > 20]

    def _verify_claim(self, claim: str, sources: list) -> bool:
        """Check if claim is supported by sources."""
        # In production: use embedding similarity or NLI model
        claim_lower = claim.lower()
        for source in sources:
            if any(word in source.lower() for word in claim_lower.split()[:5]):
                return True
        return False

Rate Limiting

import time
from collections import defaultdict

class RateLimiter:
    def __init__(
        self,
        requests_per_minute: int = 60,
        requests_per_hour: int = 1000,
        tokens_per_day: int = 1000000
    ):
        self.rpm = requests_per_minute
        self.rph = requests_per_hour
        self.tpd = tokens_per_day

        self.request_times = defaultdict(list)
        self.token_usage = defaultdict(int)
        self.token_reset_time = defaultdict(float)

    def check_limit(self, user_id: str) -> tuple[bool, str]:
        """Check if user is within rate limits."""
        now = time.time()

        # Clean old entries
        minute_ago = now - 60
        hour_ago = now - 3600

        self.request_times[user_id] = [
            t for t in self.request_times[user_id] if t > hour_ago
        ]

        # Check requests per minute
        recent_minute = sum(1 for t in self.request_times[user_id] if t > minute_ago)
        if recent_minute >= self.rpm:
            return False, f"Rate limit: {self.rpm} requests/minute exceeded"

        # Check requests per hour
        if len(self.request_times[user_id]) >= self.rph:
            return False, f"Rate limit: {self.rph} requests/hour exceeded"

        # Check daily token limit
        if now - self.token_reset_time[user_id] > 86400:
            self.token_usage[user_id] = 0
            self.token_reset_time[user_id] = now

        if self.token_usage[user_id] >= self.tpd:
            return False, f"Daily token limit of {self.tpd} exceeded"

        return True, "OK"

    def record_request(self, user_id: str, tokens_used: int = 0):
        """Record a request for rate limiting."""
        self.request_times[user_id].append(time.time())
        self.token_usage[user_id] += tokens_used

Combined Guardrail Pipeline

class GuardrailPipeline:
    def __init__(self):
        self.input_validator = InputValidator()
        self.moderator = ContentModerator(moderation_model)
        self.output_filter = OutputFilter()
        self.rate_limiter = RateLimiter()

    async def process_request(
        self,
        user_id: str,
        user_input: str
    ) -> dict:
        # Step 1: Rate limiting
        allowed, message = self.rate_limiter.check_limit(user_id)
        if not allowed:
            return {"error": message, "blocked_at": "rate_limit"}

        # Step 2: Input validation
        validation = self.input_validator.validate(user_input)
        if not validation.is_valid:
            return {"error": validation.error_message, "blocked_at": "validation"}

        # Step 3: Content moderation
        moderation = await self.moderator.moderate(validation.sanitized_input)
        if moderation["flagged"]:
            return {
                "error": "Content policy violation",
                "categories": moderation["categories"],
                "blocked_at": "moderation"
            }

        return {
            "approved": True,
            "sanitized_input": validation.sanitized_input
        }

Interview Tip

When discussing safety:

  1. Layered approach - No single point of failure
  2. False positives - Balance safety vs. usability
  3. Logging - Track blocked content for analysis
  4. Updates - How do you handle new attack vectors?

Next, we'll cover deployment strategies for AI systems. :::

Quiz

Module 5: Production & Reliability

Take Quiz