Production & Reliability
Error Handling & Fallbacks
4 min read
AI systems face unique failure modes—API outages, rate limits, hallucinations, and timeout loops. This lesson covers building resilient systems that gracefully handle failures.
Common Failure Modes
| Failure Type | Cause | Impact |
|---|---|---|
| API timeout | Network issues, overload | Request fails |
| Rate limiting | Too many requests | Requests rejected |
| Context overflow | Input too long | Truncation or error |
| Infinite loops | Agent stuck in tool cycle | Resource exhaustion |
| Hallucination | Model makes up facts | Incorrect output |
| Cost explosion | Runaway token usage | Budget exceeded |
Retry with Exponential Backoff
import asyncio
import random
from functools import wraps
class RetryConfig:
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def retry_with_backoff(config: RetryConfig = None):
config = config or RetryConfig()
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_retries + 1):
try:
return await func(*args, **kwargs)
except (RateLimitError, TimeoutError, APIError) as e:
last_exception = e
if attempt == config.max_retries:
raise
# Calculate delay with exponential backoff
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay
)
# Add jitter to prevent thundering herd
if config.jitter:
delay = delay * (0.5 + random.random())
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
# Usage
@retry_with_backoff(RetryConfig(max_retries=3))
async def call_llm(messages):
return await llm_client.complete(messages)
Circuit Breaker Pattern
Prevent cascading failures:
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_requests: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_requests = half_open_requests
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.half_open_successes = 0
async def call(self, func, *args, **kwargs):
# Check if circuit should transition from OPEN to HALF_OPEN
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_successes = 0
else:
raise CircuitOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
# Success handling
if self.state == CircuitState.HALF_OPEN:
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_requests:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
# Usage
openai_circuit = CircuitBreaker(failure_threshold=5)
async def safe_llm_call(messages):
return await openai_circuit.call(llm_client.complete, messages)
Model Fallback Chain
When primary model fails, fall back to alternatives:
from dataclasses import dataclass
from typing import Optional
@dataclass
class ModelConfig:
name: str
client: any
max_tokens: int
cost_per_1k: float
timeout: float = 30.0
class FallbackChain:
def __init__(self, models: list[ModelConfig]):
self.models = models
self.circuit_breakers = {
m.name: CircuitBreaker() for m in models
}
async def complete(self, messages: list, **kwargs) -> tuple:
"""Try models in order until one succeeds."""
last_error = None
for model in self.models:
try:
circuit = self.circuit_breakers[model.name]
response = await circuit.call(
model.client.complete,
messages=messages,
timeout=model.timeout,
**kwargs
)
return response, model.name
except CircuitOpenError:
# Skip this model, circuit is open
continue
except Exception as e:
last_error = e
# Log and try next model
continue
raise FallbackExhaustedError(
f"All models failed. Last error: {last_error}"
)
# Usage
fallback = FallbackChain([
ModelConfig("gpt-4", openai_client, 8192, 0.03),
ModelConfig("claude-3", anthropic_client, 100000, 0.025),
ModelConfig("gpt-3.5-turbo", openai_client, 4096, 0.002),
])
response, used_model = await fallback.complete(messages)
Timeout and Loop Prevention
import asyncio
from contextlib import asynccontextmanager
class AgentGuardrails:
def __init__(
self,
max_iterations: int = 20,
max_tool_calls: int = 50,
max_runtime_seconds: float = 300.0,
max_tokens_per_run: int = 100000
):
self.max_iterations = max_iterations
self.max_tool_calls = max_tool_calls
self.max_runtime_seconds = max_runtime_seconds
self.max_tokens_per_run = max_tokens_per_run
@asynccontextmanager
async def guard(self):
"""Context manager for agent run with limits."""
state = {
"iterations": 0,
"tool_calls": 0,
"tokens_used": 0,
"start_time": time.time()
}
try:
yield state
finally:
# Log final stats
pass
def check_limits(self, state: dict):
"""Raise if any limit exceeded."""
elapsed = time.time() - state["start_time"]
if state["iterations"] >= self.max_iterations:
raise MaxIterationsExceeded(
f"Agent exceeded {self.max_iterations} iterations"
)
if state["tool_calls"] >= self.max_tool_calls:
raise MaxToolCallsExceeded(
f"Agent exceeded {self.max_tool_calls} tool calls"
)
if elapsed >= self.max_runtime_seconds:
raise AgentTimeoutError(
f"Agent exceeded {self.max_runtime_seconds}s runtime"
)
if state["tokens_used"] >= self.max_tokens_per_run:
raise TokenBudgetExceeded(
f"Agent exceeded {self.max_tokens_per_run} tokens"
)
# Usage
guardrails = AgentGuardrails()
async def run_agent(task: str):
async with guardrails.guard() as state:
while True:
state["iterations"] += 1
guardrails.check_limits(state)
response = await llm.complete(messages)
state["tokens_used"] += response.usage.total_tokens
if response.tool_calls:
state["tool_calls"] += len(response.tool_calls)
# Execute tools...
else:
return response.content
Graceful Degradation
class DegradedResponse:
"""Response when system is degraded."""
def __init__(self, message: str, severity: str, fallback_used: bool):
self.message = message
self.severity = severity # "partial", "cached", "unavailable"
self.fallback_used = fallback_used
async def handle_with_degradation(request):
"""Handle request with graceful degradation."""
try:
# Try full capability
return await full_agent_response(request)
except ModelUnavailableError:
# Fall back to simpler model
return DegradedResponse(
message=await simple_model_response(request),
severity="partial",
fallback_used=True
)
except CacheHitError as e:
# Return cached response
return DegradedResponse(
message=e.cached_response,
severity="cached",
fallback_used=True
)
except Exception:
# Return static fallback
return DegradedResponse(
message="I'm experiencing issues. Please try again later.",
severity="unavailable",
fallback_used=True
)
Interview Tip
When discussing error handling:
- Idempotency - Can requests be safely retried?
- Partial failures - What if only one tool fails?
- User experience - How do you communicate degradation?
- Recovery - How does the system heal after outages?
Next, we'll cover safety guardrails and content filtering. :::