Lesson 18 of 23

Production & Reliability

Error Handling & Fallbacks

4 min read

AI systems face unique failure modes—API outages, rate limits, hallucinations, and timeout loops. This lesson covers building resilient systems that gracefully handle failures.

Common Failure Modes

Failure Type Cause Impact
API timeout Network issues, overload Request fails
Rate limiting Too many requests Requests rejected
Context overflow Input too long Truncation or error
Infinite loops Agent stuck in tool cycle Resource exhaustion
Hallucination Model makes up facts Incorrect output
Cost explosion Runaway token usage Budget exceeded

Retry with Exponential Backoff

import asyncio
import random
from functools import wraps

class RetryConfig:
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter

def retry_with_backoff(config: RetryConfig = None):
    config = config or RetryConfig()

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(config.max_retries + 1):
                try:
                    return await func(*args, **kwargs)

                except (RateLimitError, TimeoutError, APIError) as e:
                    last_exception = e

                    if attempt == config.max_retries:
                        raise

                    # Calculate delay with exponential backoff
                    delay = min(
                        config.base_delay * (config.exponential_base ** attempt),
                        config.max_delay
                    )

                    # Add jitter to prevent thundering herd
                    if config.jitter:
                        delay = delay * (0.5 + random.random())

                    await asyncio.sleep(delay)

            raise last_exception

        return wrapper
    return decorator

# Usage
@retry_with_backoff(RetryConfig(max_retries=3))
async def call_llm(messages):
    return await llm_client.complete(messages)

Circuit Breaker Pattern

Prevent cascading failures:

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_requests: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.half_open_successes = 0

    async def call(self, func, *args, **kwargs):
        # Check if circuit should transition from OPEN to HALF_OPEN
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_successes = 0
            else:
                raise CircuitOpenError("Circuit breaker is open")

        try:
            result = await func(*args, **kwargs)

            # Success handling
            if self.state == CircuitState.HALF_OPEN:
                self.half_open_successes += 1
                if self.half_open_successes >= self.half_open_requests:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0

            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

            raise

# Usage
openai_circuit = CircuitBreaker(failure_threshold=5)

async def safe_llm_call(messages):
    return await openai_circuit.call(llm_client.complete, messages)

Model Fallback Chain

When primary model fails, fall back to alternatives:

from dataclasses import dataclass
from typing import Optional

@dataclass
class ModelConfig:
    name: str
    client: any
    max_tokens: int
    cost_per_1k: float
    timeout: float = 30.0

class FallbackChain:
    def __init__(self, models: list[ModelConfig]):
        self.models = models
        self.circuit_breakers = {
            m.name: CircuitBreaker() for m in models
        }

    async def complete(self, messages: list, **kwargs) -> tuple:
        """Try models in order until one succeeds."""
        last_error = None

        for model in self.models:
            try:
                circuit = self.circuit_breakers[model.name]

                response = await circuit.call(
                    model.client.complete,
                    messages=messages,
                    timeout=model.timeout,
                    **kwargs
                )

                return response, model.name

            except CircuitOpenError:
                # Skip this model, circuit is open
                continue

            except Exception as e:
                last_error = e
                # Log and try next model
                continue

        raise FallbackExhaustedError(
            f"All models failed. Last error: {last_error}"
        )

# Usage
fallback = FallbackChain([
    ModelConfig("gpt-4", openai_client, 8192, 0.03),
    ModelConfig("claude-3", anthropic_client, 100000, 0.025),
    ModelConfig("gpt-3.5-turbo", openai_client, 4096, 0.002),
])

response, used_model = await fallback.complete(messages)

Timeout and Loop Prevention

import asyncio
from contextlib import asynccontextmanager

class AgentGuardrails:
    def __init__(
        self,
        max_iterations: int = 20,
        max_tool_calls: int = 50,
        max_runtime_seconds: float = 300.0,
        max_tokens_per_run: int = 100000
    ):
        self.max_iterations = max_iterations
        self.max_tool_calls = max_tool_calls
        self.max_runtime_seconds = max_runtime_seconds
        self.max_tokens_per_run = max_tokens_per_run

    @asynccontextmanager
    async def guard(self):
        """Context manager for agent run with limits."""
        state = {
            "iterations": 0,
            "tool_calls": 0,
            "tokens_used": 0,
            "start_time": time.time()
        }

        try:
            yield state
        finally:
            # Log final stats
            pass

    def check_limits(self, state: dict):
        """Raise if any limit exceeded."""
        elapsed = time.time() - state["start_time"]

        if state["iterations"] >= self.max_iterations:
            raise MaxIterationsExceeded(
                f"Agent exceeded {self.max_iterations} iterations"
            )

        if state["tool_calls"] >= self.max_tool_calls:
            raise MaxToolCallsExceeded(
                f"Agent exceeded {self.max_tool_calls} tool calls"
            )

        if elapsed >= self.max_runtime_seconds:
            raise AgentTimeoutError(
                f"Agent exceeded {self.max_runtime_seconds}s runtime"
            )

        if state["tokens_used"] >= self.max_tokens_per_run:
            raise TokenBudgetExceeded(
                f"Agent exceeded {self.max_tokens_per_run} tokens"
            )

# Usage
guardrails = AgentGuardrails()

async def run_agent(task: str):
    async with guardrails.guard() as state:
        while True:
            state["iterations"] += 1
            guardrails.check_limits(state)

            response = await llm.complete(messages)
            state["tokens_used"] += response.usage.total_tokens

            if response.tool_calls:
                state["tool_calls"] += len(response.tool_calls)
                # Execute tools...
            else:
                return response.content

Graceful Degradation

class DegradedResponse:
    """Response when system is degraded."""

    def __init__(self, message: str, severity: str, fallback_used: bool):
        self.message = message
        self.severity = severity  # "partial", "cached", "unavailable"
        self.fallback_used = fallback_used

async def handle_with_degradation(request):
    """Handle request with graceful degradation."""

    try:
        # Try full capability
        return await full_agent_response(request)

    except ModelUnavailableError:
        # Fall back to simpler model
        return DegradedResponse(
            message=await simple_model_response(request),
            severity="partial",
            fallback_used=True
        )

    except CacheHitError as e:
        # Return cached response
        return DegradedResponse(
            message=e.cached_response,
            severity="cached",
            fallback_used=True
        )

    except Exception:
        # Return static fallback
        return DegradedResponse(
            message="I'm experiencing issues. Please try again later.",
            severity="unavailable",
            fallback_used=True
        )

Interview Tip

When discussing error handling:

  1. Idempotency - Can requests be safely retried?
  2. Partial failures - What if only one tool fails?
  3. User experience - How do you communicate degradation?
  4. Recovery - How does the system heal after outages?

Next, we'll cover safety guardrails and content filtering. :::

Quiz

Module 5: Production & Reliability

Take Quiz