Production & Reliability
Deployment Strategies
3 min read
Deploying AI systems requires careful strategies to minimize risk while maximizing learning. This lesson covers deployment patterns specific to LLM-based applications.
Blue-Green Deployment
Run two identical environments, switch traffic instantly:
┌─────────────────────────────────────────────────────────────┐
│ Load Balancer │
└─────────────────────────────────────────────────────────────┘
│ │
│ (100%) │ (0%)
▼ ▼
┌──────────────────────┐ ┌──────────────────────┐
│ Blue Environment │ │ Green Environment │
│ (Current v1.2) │ │ (New v1.3) │
│ │ │ │
│ - Agent v1.2 │ │ - Agent v1.3 │
│ - Prompts v5 │ │ - Prompts v6 │
│ - Model: GPT-4 │ │ - Model: GPT-4 │
└──────────────────────┘ └──────────────────────┘
class BlueGreenDeployer:
def __init__(self, load_balancer, environments: dict):
self.lb = load_balancer
self.environments = environments
self.active = "blue"
async def deploy_new_version(self, version: str):
"""Deploy to inactive environment and switch."""
inactive = "green" if self.active == "blue" else "blue"
# Step 1: Deploy to inactive environment
await self.environments[inactive].deploy(version)
# Step 2: Run health checks
health = await self.environments[inactive].health_check()
if not health["healthy"]:
raise DeploymentError(f"Health check failed: {health}")
# Step 3: Run smoke tests
smoke = await self.run_smoke_tests(inactive)
if not smoke["passed"]:
raise DeploymentError(f"Smoke tests failed: {smoke}")
# Step 4: Switch traffic
await self.lb.set_weights({
self.active: 0,
inactive: 100
})
self.active = inactive
return {"status": "success", "active": self.active}
async def rollback(self):
"""Instant rollback to previous environment."""
inactive = "green" if self.active == "blue" else "blue"
await self.lb.set_weights({
self.active: 0,
inactive: 100
})
self.active = inactive
return {"status": "rolled_back", "active": self.active}
Canary Deployment
Gradually shift traffic to validate new version:
import asyncio
class CanaryDeployer:
def __init__(self, load_balancer, metrics_client):
self.lb = load_balancer
self.metrics = metrics_client
async def canary_deploy(
self,
new_version: str,
stages: list = None,
validation_period: int = 300 # 5 minutes per stage
):
"""Gradually roll out with automatic validation."""
stages = stages or [5, 10, 25, 50, 100] # Percentage
for percentage in stages:
# Shift traffic
await self.lb.set_weights({
"stable": 100 - percentage,
"canary": percentage
})
# Wait and collect metrics
await asyncio.sleep(validation_period)
# Validate canary health
validation = await self.validate_canary()
if not validation["passed"]:
# Automatic rollback
await self.lb.set_weights({
"stable": 100,
"canary": 0
})
return {
"status": "rolled_back",
"failed_at": percentage,
"reason": validation["reason"]
}
return {"status": "success", "version": new_version}
async def validate_canary(self) -> dict:
"""Compare canary metrics against stable."""
stable_metrics = await self.metrics.get("stable", window="5m")
canary_metrics = await self.metrics.get("canary", window="5m")
checks = {
"error_rate": canary_metrics["error_rate"] <= stable_metrics["error_rate"] * 1.1,
"latency_p95": canary_metrics["latency_p95"] <= stable_metrics["latency_p95"] * 1.2,
"user_satisfaction": canary_metrics.get("satisfaction", 1.0) >= 0.9
}
failed = [k for k, v in checks.items() if not v]
return {
"passed": len(failed) == 0,
"reason": failed if failed else None
}
Shadow Deployment
Test new version with real traffic, no user impact:
import asyncio
from typing import Any
class ShadowDeployer:
def __init__(self, production_agent, shadow_agent, comparator):
self.production = production_agent
self.shadow = shadow_agent
self.comparator = comparator
self.results = []
async def handle_request(self, request: dict) -> Any:
"""Process request in production, shadow in background."""
# Production response (returned to user)
production_task = asyncio.create_task(
self.production.run(request)
)
# Shadow response (not returned to user)
shadow_task = asyncio.create_task(
self.shadow.run(request)
)
# Wait for production (user-facing)
production_response = await production_task
# Don't wait for shadow, but capture when ready
asyncio.create_task(
self._compare_results(request, production_response, shadow_task)
)
return production_response
async def _compare_results(
self,
request: dict,
production_response: Any,
shadow_task: asyncio.Task
):
"""Compare production and shadow responses."""
try:
shadow_response = await asyncio.wait_for(shadow_task, timeout=60)
comparison = await self.comparator.compare(
request=request,
production=production_response,
shadow=shadow_response
)
self.results.append(comparison)
# Log significant differences
if comparison["divergence_score"] > 0.3:
await self.log_divergence(comparison)
except asyncio.TimeoutError:
self.results.append({"error": "shadow_timeout"})
def get_shadow_report(self) -> dict:
"""Generate report on shadow performance."""
if not self.results:
return {"status": "no_data"}
valid_results = [r for r in self.results if "error" not in r]
return {
"total_requests": len(self.results),
"successful_shadows": len(valid_results),
"avg_divergence": sum(r["divergence_score"] for r in valid_results) / len(valid_results),
"shadow_better_count": sum(1 for r in valid_results if r["shadow_preferred"]),
"ready_for_promotion": self._assess_readiness(valid_results)
}
Feature Flags for AI
Control AI features dynamically:
class AIFeatureFlags:
def __init__(self, flag_service):
self.flags = flag_service
async def get_config(self, user_id: str, context: dict = None) -> dict:
"""Get AI configuration based on feature flags."""
config = {
"model": "gpt-3.5-turbo", # Default
"max_tokens": 1000,
"tools_enabled": [],
"features": {}
}
# Check model upgrade flag
if await self.flags.is_enabled("gpt4_rollout", user_id):
config["model"] = "gpt-4"
config["max_tokens"] = 4000
# Check new tool flag
if await self.flags.is_enabled("code_execution_tool", user_id):
config["tools_enabled"].append("code_executor")
# Check experimental features
if await self.flags.is_enabled("streaming_responses", user_id):
config["features"]["streaming"] = True
# Percentage-based rollout
if await self.flags.percentage_enabled("new_prompt_v2", user_id, 25):
config["prompt_version"] = "v2"
else:
config["prompt_version"] = "v1"
return config
# Usage
flags = AIFeatureFlags(flag_service)
async def handle_request(user_id: str, request: dict):
config = await flags.get_config(user_id)
agent = create_agent(
model=config["model"],
tools=config["tools_enabled"],
prompt_version=config["prompt_version"]
)
return await agent.run(request)
Deployment Checklist
deployment_checklist = {
"pre_deployment": [
"Run full test suite",
"Verify prompt regression tests pass",
"Check model version compatibility",
"Review cost impact estimates",
"Prepare rollback plan"
],
"during_deployment": [
"Monitor error rates (< 1% threshold)",
"Watch latency P95 (< 2x baseline)",
"Check token usage (within budget)",
"Verify safety filters active",
"Test critical user flows"
],
"post_deployment": [
"Review user feedback",
"Analyze cost per request",
"Check for prompt leakage",
"Validate logging working",
"Update runbooks if needed"
]
}
Interview Tip
When discussing deployment:
- Risk assessment - What could go wrong with this deployment?
- Rollback time - How fast can you revert?
- Validation metrics - How do you know the deployment succeeded?
- Cost monitoring - New version might use more tokens
With production knowledge complete, let's apply everything to real interview case studies. :::