Production & Reliability
Safety & Guardrails
3 min read
Production AI systems need multiple layers of safety—input validation, output filtering, and behavioral constraints. This lesson covers implementing comprehensive guardrails.
Defense in Depth
┌─────────────────────────────────────────────────────────────┐
│ User Input │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 1: Input Validation │
│ - Length limits │
│ - Format validation │
│ - Injection detection │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 2: Content Moderation │
│ - Harmful content detection │
│ - PII detection │
│ - Topic classification │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 3: LLM Processing │
│ - System prompt constraints │
│ - Tool restrictions │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 4: Output Filtering │
│ - Response validation │
│ - Hallucination detection │
│ - Citation verification │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ User Response │
└─────────────────────────────────────────────────────────────┘
Input Validation
import re
from dataclasses import dataclass
from typing import Optional
@dataclass
class ValidationResult:
is_valid: bool
error_message: Optional[str] = None
sanitized_input: Optional[str] = None
class InputValidator:
def __init__(
self,
max_length: int = 10000,
max_tokens: int = 4000,
blocked_patterns: list = None
):
self.max_length = max_length
self.max_tokens = max_tokens
self.blocked_patterns = blocked_patterns or []
# Common injection patterns
self.injection_patterns = [
r"ignore\s+(previous|above)\s+instructions",
r"disregard\s+your\s+(system|initial)",
r"you\s+are\s+now\s+",
r"pretend\s+you\s+are",
r"act\s+as\s+if",
]
def validate(self, user_input: str) -> ValidationResult:
# Check length
if len(user_input) > self.max_length:
return ValidationResult(
is_valid=False,
error_message=f"Input exceeds {self.max_length} characters"
)
# Check for injection attempts
input_lower = user_input.lower()
for pattern in self.injection_patterns:
if re.search(pattern, input_lower):
return ValidationResult(
is_valid=False,
error_message="Input contains prohibited patterns"
)
# Check blocked patterns (custom)
for pattern in self.blocked_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
return ValidationResult(
is_valid=False,
error_message="Input contains blocked content"
)
# Sanitize (remove null bytes, control characters)
sanitized = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', user_input)
return ValidationResult(
is_valid=True,
sanitized_input=sanitized
)
Content Moderation
from enum import Enum
class ContentCategory(Enum):
SAFE = "safe"
HARMFUL = "harmful"
SEXUAL = "sexual"
HATE = "hate"
VIOLENCE = "violence"
SELF_HARM = "self_harm"
PII = "pii"
class ContentModerator:
def __init__(self, moderation_model):
self.model = moderation_model
self.pii_patterns = {
"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b"
}
async def moderate(self, text: str) -> dict:
results = {
"flagged": False,
"categories": [],
"pii_detected": [],
"confidence": 0.0
}
# Check for PII
for pii_type, pattern in self.pii_patterns.items():
if re.search(pattern, text):
results["pii_detected"].append(pii_type)
results["flagged"] = True
# Use moderation model for content categories
moderation = await self.model.moderate(text)
for category, score in moderation.scores.items():
if score > 0.7: # Threshold
results["categories"].append(category)
results["flagged"] = True
results["confidence"] = max(results["confidence"], score)
return results
def redact_pii(self, text: str) -> str:
"""Replace PII with placeholders."""
redacted = text
for pii_type, pattern in self.pii_patterns.items():
redacted = re.sub(pattern, f"[{pii_type.upper()}_REDACTED]", redacted)
return redacted
Output Filtering
class OutputFilter:
def __init__(self, allowed_domains: list = None):
self.allowed_domains = allowed_domains or []
async def filter(self, response: str, context: dict = None) -> dict:
"""Filter and validate LLM output."""
issues = []
# Check for leaked system prompt indicators
system_leak_patterns = [
"my system prompt",
"i was instructed to",
"my instructions say",
"i am programmed to"
]
for pattern in system_leak_patterns:
if pattern in response.lower():
issues.append("potential_system_leak")
# Verify URLs are from allowed domains
urls = re.findall(r'https?://[^\s<>"{}|\\^`\[\]]+', response)
for url in urls:
domain = url.split('/')[2]
if self.allowed_domains and domain not in self.allowed_domains:
issues.append(f"unauthorized_domain: {domain}")
# Check factual claims against context (if RAG)
if context and "sources" in context:
# Simplified hallucination check
claims = self._extract_claims(response)
for claim in claims:
if not self._verify_claim(claim, context["sources"]):
issues.append(f"unverified_claim: {claim[:50]}...")
return {
"filtered_response": response,
"issues": issues,
"passed": len(issues) == 0
}
def _extract_claims(self, text: str) -> list:
"""Extract factual claims from text."""
# Simplified: split by sentences
sentences = text.split('.')
return [s.strip() for s in sentences if len(s.strip()) > 20]
def _verify_claim(self, claim: str, sources: list) -> bool:
"""Check if claim is supported by sources."""
# In production: use embedding similarity or NLI model
claim_lower = claim.lower()
for source in sources:
if any(word in source.lower() for word in claim_lower.split()[:5]):
return True
return False
Rate Limiting
import time
from collections import defaultdict
class RateLimiter:
def __init__(
self,
requests_per_minute: int = 60,
requests_per_hour: int = 1000,
tokens_per_day: int = 1000000
):
self.rpm = requests_per_minute
self.rph = requests_per_hour
self.tpd = tokens_per_day
self.request_times = defaultdict(list)
self.token_usage = defaultdict(int)
self.token_reset_time = defaultdict(float)
def check_limit(self, user_id: str) -> tuple[bool, str]:
"""Check if user is within rate limits."""
now = time.time()
# Clean old entries
minute_ago = now - 60
hour_ago = now - 3600
self.request_times[user_id] = [
t for t in self.request_times[user_id] if t > hour_ago
]
# Check requests per minute
recent_minute = sum(1 for t in self.request_times[user_id] if t > minute_ago)
if recent_minute >= self.rpm:
return False, f"Rate limit: {self.rpm} requests/minute exceeded"
# Check requests per hour
if len(self.request_times[user_id]) >= self.rph:
return False, f"Rate limit: {self.rph} requests/hour exceeded"
# Check daily token limit
if now - self.token_reset_time[user_id] > 86400:
self.token_usage[user_id] = 0
self.token_reset_time[user_id] = now
if self.token_usage[user_id] >= self.tpd:
return False, f"Daily token limit of {self.tpd} exceeded"
return True, "OK"
def record_request(self, user_id: str, tokens_used: int = 0):
"""Record a request for rate limiting."""
self.request_times[user_id].append(time.time())
self.token_usage[user_id] += tokens_used
Combined Guardrail Pipeline
class GuardrailPipeline:
def __init__(self):
self.input_validator = InputValidator()
self.moderator = ContentModerator(moderation_model)
self.output_filter = OutputFilter()
self.rate_limiter = RateLimiter()
async def process_request(
self,
user_id: str,
user_input: str
) -> dict:
# Step 1: Rate limiting
allowed, message = self.rate_limiter.check_limit(user_id)
if not allowed:
return {"error": message, "blocked_at": "rate_limit"}
# Step 2: Input validation
validation = self.input_validator.validate(user_input)
if not validation.is_valid:
return {"error": validation.error_message, "blocked_at": "validation"}
# Step 3: Content moderation
moderation = await self.moderator.moderate(validation.sanitized_input)
if moderation["flagged"]:
return {
"error": "Content policy violation",
"categories": moderation["categories"],
"blocked_at": "moderation"
}
return {
"approved": True,
"sanitized_input": validation.sanitized_input
}
Interview Tip
When discussing safety:
- Layered approach - No single point of failure
- False positives - Balance safety vs. usability
- Logging - Track blocked content for analysis
- Updates - How do you handle new attack vectors?
Next, we'll cover deployment strategies for AI systems. :::