Advanced Graph Patterns

Error Recovery Patterns: Try-Catch, Fallbacks & Circuit Breakers

5 min read

Why Error Recovery Is Critical in Production

Real Production Incident (January 2026):

A multi-agent customer support system at an e-commerce company crashed 23 times in one day during Black Friday. Root cause: Claude API rate limits triggered cascading failures across all agents. Customer tickets piled up, costing $150K in lost revenue.

After implementing proper error recovery patterns—try-catch nodes, fallback paths, exponential backoff, and circuit breakers—the same system handled 10x the traffic on Cyber Monday with zero crashes.

This lesson teaches you: Production-hardened error recovery patterns that prevent cascading failures and keep your LangGraph workflows running 24/7.


The Three Pillars of Error Recovery

  1. Try-Catch Nodes: Isolate failures so they don't crash the graph
  2. Fallback Paths: Provide degraded but functional alternatives
  3. Circuit Breakers: Prevent cascading failures to external services

Try-Catch Node Pattern

The Problem: Unhandled Exceptions

# ❌ BAD: Crashes entire graph
def risky_node(state: dict) -> dict:
    result = call_external_api()  # Can throw!
    return {"result": result}

# One API timeout = entire workflow crashes
# Checkpoint lost, must restart from scratch

The Solution: Try-Catch Wrapper

from typing import TypedDict, Annotated, Optional, Literal
import operator
import traceback

class RobustState(TypedDict):
    """State with comprehensive error handling."""
    # Core data
    query: str
    documents: Annotated[list[str], operator.add]
    analysis: Optional[str]

    # Error tracking
    error_message: Optional[str]
    error_type: Optional[str]
    error_traceback: Optional[str]
    failed_node: Optional[str]

    # Retry management
    retry_count: int
    max_retries: int

def try_research_node(state: RobustState) -> dict:
    """
    Research node wrapped in try-catch.
    Never crashes, always returns state update.
    """
    try:
        # Risky operation
        documents = search_documents(state["query"])

        # Success: clear any previous errors
        return {
            "documents": documents,
            "error_message": None,
            "error_type": None,
            "failed_node": None
        }

    except TimeoutError as e:
        return {
            "error_message": f"Search timed out: {str(e)}",
            "error_type": "timeout",
            "error_traceback": traceback.format_exc(),
            "failed_node": "research",
            "retry_count": state["retry_count"] + 1
        }

    except RateLimitError as e:
        return {
            "error_message": f"Rate limited: {str(e)}",
            "error_type": "rate_limit",
            "failed_node": "research",
            "retry_count": state["retry_count"] + 1
        }

    except Exception as e:
        return {
            "error_message": f"Unexpected error: {str(e)}",
            "error_type": "unknown",
            "error_traceback": traceback.format_exc(),
            "failed_node": "research",
            "retry_count": state["retry_count"] + 1
        }

def route_after_research(state: RobustState) -> Literal["continue", "retry", "fallback"]:
    """Route based on error state."""
    # Success case
    if not state.get("error_message"):
        return "continue"

    # Check if we can retry
    if state["retry_count"] < state["max_retries"]:
        error_type = state.get("error_type", "unknown")

        # Only retry transient errors
        if error_type in ["timeout", "rate_limit", "connection_error"]:
            return "retry"

    # Max retries exceeded or non-retriable error
    return "fallback"

Production Pattern: Decorator-Based Try-Catch

from functools import wraps
from typing import Callable

def catch_errors(node_name: str):
    """Decorator to add error handling to any node."""
    def decorator(func: Callable):
        @wraps(func)
        def wrapper(state: dict) -> dict:
            try:
                return func(state)
            except Exception as e:
                return {
                    "error_message": str(e),
                    "error_type": type(e).__name__,
                    "error_traceback": traceback.format_exc(),
                    "failed_node": node_name,
                    "retry_count": state.get("retry_count", 0) + 1
                }
        return wrapper
    return decorator

# Apply to any node
@catch_errors("research")
def research_node(state: RobustState) -> dict:
    documents = search_documents(state["query"])
    return {"documents": documents}

@catch_errors("analysis")
def analysis_node(state: RobustState) -> dict:
    analysis = analyze_documents(state["documents"])
    return {"analysis": analysis}

Fallback Paths: Graceful Degradation

Problem: What Happens After Retries Fail?

Without fallbacks, exhausted retries leave you with nothing:

  • User sees generic error
  • No partial results
  • Workflow stops completely

Solution: Multi-Level Fallbacks

from typing import TypedDict, Optional, Literal

class FallbackState(TypedDict):
    query: str
    primary_result: Optional[str]
    fallback_result: Optional[str]
    cached_result: Optional[str]
    error_message: Optional[str]
    fallback_level: int  # 0=primary, 1=alternative, 2=cached, 3=static

def primary_processor(state: FallbackState) -> dict:
    """Primary path: Use latest model."""
    try:
        result = call_claude_opus(state["query"])
        return {
            "primary_result": result,
            "fallback_level": 0
        }
    except Exception as e:
        return {"error_message": str(e)}

def alternative_processor(state: FallbackState) -> dict:
    """Fallback 1: Use cheaper/faster model."""
    try:
        result = call_claude_haiku(state["query"])  # Faster, cheaper
        return {
            "fallback_result": result,
            "fallback_level": 1,
            "error_message": None
        }
    except Exception as e:
        return {"error_message": str(e)}

def cached_processor(state: FallbackState) -> dict:
    """Fallback 2: Return cached similar result."""
    cache_key = compute_query_hash(state["query"])
    cached = cache.get(cache_key)

    if cached:
        return {
            "cached_result": cached,
            "fallback_level": 2,
            "error_message": None
        }

    return {"error_message": "No cached result available"}

def static_fallback(state: FallbackState) -> dict:
    """Fallback 3: Return static response."""
    return {
        "fallback_result": """
I apologize, but I'm experiencing technical difficulties.
Here's what I can tell you about your query:

- Your request has been logged for later processing
- A support ticket has been created
- Please try again in a few minutes

In the meantime, you might find these resources helpful:
- Documentation: https://docs.example.com
- FAQ: https://example.com/faq
        """.strip(),
        "fallback_level": 3,
        "error_message": "All services unavailable, used static fallback"
    }

def route_fallback(state: FallbackState) -> Literal["done", "try_alternative", "try_cache", "static"]:
    """Route through fallback chain."""
    if state.get("primary_result") or state.get("fallback_result"):
        return "done"

    level = state.get("fallback_level", -1)
    error = state.get("error_message")

    if not error:
        return "done"

    # Move to next fallback level
    if level < 1:
        return "try_alternative"
    elif level < 2:
        return "try_cache"
    else:
        return "static"

# Build fallback chain
graph = StateGraph(FallbackState)
graph.add_node("primary", primary_processor)
graph.add_node("alternative", alternative_processor)
graph.add_node("cached", cached_processor)
graph.add_node("static", static_fallback)
graph.add_node("finalize", finalize_result)

graph.set_entry_point("primary")

graph.add_conditional_edges("primary", route_fallback, {
    "done": "finalize",
    "try_alternative": "alternative",
    "try_cache": "cached",
    "static": "static"
})

graph.add_conditional_edges("alternative", route_fallback, {
    "done": "finalize",
    "try_cache": "cached",
    "static": "static"
})

graph.add_conditional_edges("cached", route_fallback, {
    "done": "finalize",
    "static": "static"
})

graph.add_edge("static", "finalize")
graph.add_edge("finalize", END)

Exponential Backoff with Jitter

Why Basic Retries Fail

# ❌ BAD: Hammers the API
def bad_retry(state):
    for i in range(5):
        try:
            return call_api()
        except:
            time.sleep(1)  # Fixed delay = API stays overloaded

Production Pattern: Exponential Backoff

import time
import random
from typing import TypedDict, Optional

class RetryState(TypedDict):
    task: str
    result: Optional[str]
    attempt: int
    max_attempts: int
    base_delay: float  # seconds
    max_delay: float   # cap delay

def calculate_backoff(attempt: int, base: float, max_delay: float) -> float:
    """
    Calculate delay with exponential backoff and jitter.
    Formula: min(base * 2^attempt + random(0,1), max_delay)
    """
    exponential = base * (2 ** attempt)
    jitter = random.uniform(0, 1)  # Prevent thundering herd
    return min(exponential + jitter, max_delay)

def retry_with_backoff(state: RetryState) -> dict:
    """Execute with exponential backoff retry."""
    attempt = state["attempt"]

    if attempt > 0:
        # Wait before retry (not on first attempt)
        delay = calculate_backoff(
            attempt - 1,
            state.get("base_delay", 1.0),
            state.get("max_delay", 60.0)
        )
        print(f"Attempt {attempt}: Waiting {delay:.2f}s before retry")
        time.sleep(delay)

    try:
        result = perform_operation(state["task"])
        return {
            "result": result,
            "attempt": attempt + 1,
            "error_message": None
        }
    except Exception as e:
        return {
            "error_message": str(e),
            "attempt": attempt + 1
        }

def should_retry(state: RetryState) -> Literal["continue", "retry", "give_up"]:
    """Decide whether to retry."""
    if state.get("result"):
        return "continue"

    if state["attempt"] < state["max_attempts"]:
        return "retry"

    return "give_up"

# Example backoff sequence (base=1s):
# Attempt 1: Immediate
# Attempt 2: ~1-2s delay (1*2^0 + jitter)
# Attempt 3: ~2-3s delay (1*2^1 + jitter)
# Attempt 4: ~4-5s delay (1*2^2 + jitter)
# Attempt 5: ~8-9s delay (1*2^3 + jitter)

Async Backoff Pattern

import asyncio
import random

async def async_retry_with_backoff(
    func,
    max_attempts: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0
):
    """Async retry with exponential backoff."""
    for attempt in range(max_attempts):
        try:
            return await func()
        except Exception as e:
            if attempt == max_attempts - 1:
                raise  # Last attempt, re-raise

            delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
            await asyncio.sleep(delay)

# Usage in async node
async def async_research_node(state: dict) -> dict:
    async def do_research():
        return await async_search(state["query"])

    try:
        result = await async_retry_with_backoff(do_research)
        return {"documents": result}
    except Exception as e:
        return {"error_message": str(e)}

Circuit Breaker Pattern

The Problem: Cascading Failures

When an external service goes down:

  • All requests timeout (slow)
  • Timeouts exhaust thread pools
  • Memory fills with pending requests
  • Entire system becomes unresponsive

Solution: Circuit Breaker

from typing import TypedDict, Literal, Optional
from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Blocking requests
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreakerState(TypedDict):
    """State for circuit breaker pattern."""
    query: str
    result: Optional[str]

    # Circuit breaker tracking
    circuit_status: str  # closed, open, half_open
    consecutive_failures: int
    last_failure_time: Optional[str]

    # Configuration
    failure_threshold: int    # Open after N failures
    recovery_timeout: int     # Seconds before trying again
    half_open_max_calls: int  # Test calls in half-open

class CircuitBreaker:
    """Production circuit breaker implementation."""

    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = CircuitState.CLOSED
        self.failures = 0
        self.last_failure_time = None
        self.half_open_calls = 0

    def can_execute(self) -> tuple[bool, str]:
        """Check if request should proceed."""
        if self.state == CircuitState.CLOSED:
            return True, "closed"

        if self.state == CircuitState.OPEN:
            # Check if recovery timeout has passed
            if self.last_failure_time:
                elapsed = (datetime.now() - self.last_failure_time).total_seconds()
                if elapsed >= self.recovery_timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.half_open_calls = 0
                    return True, "half_open"
            return False, "open"

        if self.state == CircuitState.HALF_OPEN:
            if self.half_open_calls < 3:  # Allow limited test calls
                self.half_open_calls += 1
                return True, "half_open"
            return False, "half_open_limit"

        return False, "unknown"

    def record_success(self):
        """Record successful call."""
        self.failures = 0
        self.state = CircuitState.CLOSED
        self.half_open_calls = 0

    def record_failure(self):
        """Record failed call."""
        self.failures += 1
        self.last_failure_time = datetime.now()

        if self.failures >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit OPENED after {self.failures} failures")

# Global circuit breaker per service
api_circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=60)

def node_with_circuit_breaker(state: CircuitBreakerState) -> dict:
    """Node that respects circuit breaker."""
    can_execute, circuit_status = api_circuit.can_execute()

    if not can_execute:
        return {
            "error_message": f"Circuit breaker {circuit_status}: Service unavailable",
            "circuit_status": circuit_status
        }

    try:
        result = call_external_api(state["query"])
        api_circuit.record_success()
        return {
            "result": result,
            "circuit_status": "closed"
        }

    except Exception as e:
        api_circuit.record_failure()
        return {
            "error_message": str(e),
            "circuit_status": api_circuit.state.value,
            "consecutive_failures": api_circuit.failures
        }

State-Based Circuit Breaker (Stateless Nodes)

from datetime import datetime, timedelta

def check_circuit(state: CircuitBreakerState) -> Literal["proceed", "blocked", "test"]:
    """Check circuit breaker using state (no global)."""
    status = state.get("circuit_status", "closed")
    failures = state.get("consecutive_failures", 0)
    threshold = state.get("failure_threshold", 5)
    recovery = state.get("recovery_timeout", 60)

    if status == "closed":
        return "proceed"

    if status == "open":
        last_failure = state.get("last_failure_time")
        if last_failure:
            elapsed = (datetime.now() - datetime.fromisoformat(last_failure)).total_seconds()
            if elapsed >= recovery:
                return "test"  # Try half-open
        return "blocked"

    if status == "half_open":
        return "test"

    return "proceed"

def update_circuit_on_success(state: CircuitBreakerState) -> dict:
    """Reset circuit on success."""
    return {
        "circuit_status": "closed",
        "consecutive_failures": 0,
        "last_failure_time": None
    }

def update_circuit_on_failure(state: CircuitBreakerState) -> dict:
    """Update circuit on failure."""
    failures = state.get("consecutive_failures", 0) + 1
    threshold = state.get("failure_threshold", 5)

    new_status = "open" if failures >= threshold else state.get("circuit_status", "closed")

    return {
        "circuit_status": new_status,
        "consecutive_failures": failures,
        "last_failure_time": datetime.now().isoformat()
    }

Complete Production Error Recovery Graph

from typing import TypedDict, Annotated, Optional, Literal
import operator
from langgraph.graph import StateGraph, END

class ProductionState(TypedDict):
    """Complete production state with error handling."""
    # Core data
    query: str
    documents: Annotated[list[str], operator.add]
    result: Optional[str]

    # Error tracking
    error_message: Optional[str]
    error_type: Optional[str]
    failed_node: Optional[str]

    # Retry management
    retry_count: int
    max_retries: int

    # Circuit breaker
    circuit_status: str
    consecutive_failures: int
    failure_threshold: int
    last_failure_time: Optional[str]

    # Fallback tracking
    fallback_level: int
    used_fallback: bool

def research_with_recovery(state: ProductionState) -> dict:
    """Research with full error recovery."""
    # Check circuit breaker
    if state.get("circuit_status") == "open":
        last_failure = state.get("last_failure_time")
        if last_failure:
            elapsed = (datetime.now() - datetime.fromisoformat(last_failure)).total_seconds()
            if elapsed < 60:  # Recovery timeout
                return {
                    "error_message": "Circuit open, skipping request",
                    "error_type": "circuit_open"
                }

    try:
        documents = search_with_api(state["query"])
        return {
            "documents": documents,
            "error_message": None,
            "circuit_status": "closed",
            "consecutive_failures": 0
        }

    except RateLimitError as e:
        return handle_error(state, e, "rate_limit")
    except TimeoutError as e:
        return handle_error(state, e, "timeout")
    except Exception as e:
        return handle_error(state, e, "unknown")

def handle_error(state: dict, error: Exception, error_type: str) -> dict:
    """Unified error handler."""
    failures = state.get("consecutive_failures", 0) + 1
    threshold = state.get("failure_threshold", 5)

    return {
        "error_message": str(error),
        "error_type": error_type,
        "failed_node": "research",
        "retry_count": state.get("retry_count", 0) + 1,
        "circuit_status": "open" if failures >= threshold else "closed",
        "consecutive_failures": failures,
        "last_failure_time": datetime.now().isoformat()
    }

def route_after_research(state: ProductionState) -> str:
    """Smart routing based on error type."""
    if state.get("documents"):
        return "continue"

    error_type = state.get("error_type", "unknown")
    retry_count = state.get("retry_count", 0)
    max_retries = state.get("max_retries", 3)

    # Retriable errors
    if error_type in ["timeout", "rate_limit"] and retry_count < max_retries:
        return "retry_with_backoff"

    # Circuit open
    if error_type == "circuit_open":
        return "fallback"

    # Non-retriable or max retries
    return "fallback"

# Build production graph
graph = StateGraph(ProductionState)
graph.add_node("research", research_with_recovery)
graph.add_node("retry_with_backoff", retry_with_backoff_node)
graph.add_node("fallback", fallback_node)
graph.add_node("finalize", finalize_node)

graph.set_entry_point("research")

graph.add_conditional_edges("research", route_after_research, {
    "continue": "finalize",
    "retry_with_backoff": "retry_with_backoff",
    "fallback": "fallback"
})

graph.add_edge("retry_with_backoff", "research")
graph.add_edge("fallback", "finalize")
graph.add_edge("finalize", END)

app = graph.compile()

Interview Questions

Q1: "How do you prevent a single API failure from crashing your entire LangGraph workflow?"

Strong Answer:

"I wrap all external calls in try-catch blocks within nodes. Instead of raising exceptions, nodes return error state updates like {error_message, error_type, failed_node}. A conditional edge then routes to either retry (for transient errors like timeouts) or fallback (for persistent failures). The graph never crashes—it gracefully degrades. I also use the decorator pattern to add error handling uniformly across all nodes."

Q2: "When would you use exponential backoff vs. fixed retry delays?"

Answer:

"Fixed delays cause 'thundering herd'—when an API recovers, all waiting requests hit it simultaneously. Exponential backoff spreads retry load over time: 1s, 2s, 4s, 8s. I add jitter (random 0-1s) to further spread retries across clients. This gives the API time to recover. I use exponential backoff for rate limits and overload errors, but might use shorter fixed delays for network blips where I expect immediate recovery."

Q3: "Explain the circuit breaker pattern and when to use it in LangGraph."

Answer:

"A circuit breaker tracks consecutive failures to an external service. When failures exceed a threshold (e.g., 5), it 'opens' and immediately rejects requests instead of waiting for timeouts. After a recovery period (e.g., 60s), it enters 'half-open' state and allows a few test requests. If they succeed, it 'closes' and resumes normal operation. I use it for external APIs that might go down—it prevents cascading failures where timeout-waiting requests exhaust resources. In LangGraph, I track circuit state in the graph state and check it before making API calls."


Key Takeaways

  • Try-catch in every node: Never let exceptions crash the graph
  • Error state fields: Track error_message, error_type, failed_node
  • Exponential backoff with jitter: Prevents thundering herd
  • Multi-level fallbacks: Primary → Alternative → Cache → Static
  • Circuit breaker: Opens after N failures, tests recovery after timeout
  • Route by error type: Retriable errors retry, others go to fallback

Module 2 Complete! Don't forget to take the Module 2 Quiz to test your knowledge. Next up: Module 3 - Checkpointing & Persistence where you'll learn to make your workflows resumable and production-ready.

:::

Quiz

Module 2: Advanced Graph Patterns

Take Quiz