Advanced Graph Patterns
Error Recovery Patterns: Try-Catch, Fallbacks & Circuit Breakers
Why Error Recovery Is Critical in Production
Real Production Incident (January 2026):
A multi-agent customer support system at an e-commerce company crashed 23 times in one day during Black Friday. Root cause: Claude API rate limits triggered cascading failures across all agents. Customer tickets piled up, costing $150K in lost revenue.
After implementing proper error recovery patterns—try-catch nodes, fallback paths, exponential backoff, and circuit breakers—the same system handled 10x the traffic on Cyber Monday with zero crashes.
This lesson teaches you: Production-hardened error recovery patterns that prevent cascading failures and keep your LangGraph workflows running 24/7.
The Three Pillars of Error Recovery
- Try-Catch Nodes: Isolate failures so they don't crash the graph
- Fallback Paths: Provide degraded but functional alternatives
- Circuit Breakers: Prevent cascading failures to external services
Try-Catch Node Pattern
The Problem: Unhandled Exceptions
# ❌ BAD: Crashes entire graph
def risky_node(state: dict) -> dict:
result = call_external_api() # Can throw!
return {"result": result}
# One API timeout = entire workflow crashes
# Checkpoint lost, must restart from scratch
The Solution: Try-Catch Wrapper
from typing import TypedDict, Annotated, Optional, Literal
import operator
import traceback
class RobustState(TypedDict):
"""State with comprehensive error handling."""
# Core data
query: str
documents: Annotated[list[str], operator.add]
analysis: Optional[str]
# Error tracking
error_message: Optional[str]
error_type: Optional[str]
error_traceback: Optional[str]
failed_node: Optional[str]
# Retry management
retry_count: int
max_retries: int
def try_research_node(state: RobustState) -> dict:
"""
Research node wrapped in try-catch.
Never crashes, always returns state update.
"""
try:
# Risky operation
documents = search_documents(state["query"])
# Success: clear any previous errors
return {
"documents": documents,
"error_message": None,
"error_type": None,
"failed_node": None
}
except TimeoutError as e:
return {
"error_message": f"Search timed out: {str(e)}",
"error_type": "timeout",
"error_traceback": traceback.format_exc(),
"failed_node": "research",
"retry_count": state["retry_count"] + 1
}
except RateLimitError as e:
return {
"error_message": f"Rate limited: {str(e)}",
"error_type": "rate_limit",
"failed_node": "research",
"retry_count": state["retry_count"] + 1
}
except Exception as e:
return {
"error_message": f"Unexpected error: {str(e)}",
"error_type": "unknown",
"error_traceback": traceback.format_exc(),
"failed_node": "research",
"retry_count": state["retry_count"] + 1
}
def route_after_research(state: RobustState) -> Literal["continue", "retry", "fallback"]:
"""Route based on error state."""
# Success case
if not state.get("error_message"):
return "continue"
# Check if we can retry
if state["retry_count"] < state["max_retries"]:
error_type = state.get("error_type", "unknown")
# Only retry transient errors
if error_type in ["timeout", "rate_limit", "connection_error"]:
return "retry"
# Max retries exceeded or non-retriable error
return "fallback"
Production Pattern: Decorator-Based Try-Catch
from functools import wraps
from typing import Callable
def catch_errors(node_name: str):
"""Decorator to add error handling to any node."""
def decorator(func: Callable):
@wraps(func)
def wrapper(state: dict) -> dict:
try:
return func(state)
except Exception as e:
return {
"error_message": str(e),
"error_type": type(e).__name__,
"error_traceback": traceback.format_exc(),
"failed_node": node_name,
"retry_count": state.get("retry_count", 0) + 1
}
return wrapper
return decorator
# Apply to any node
@catch_errors("research")
def research_node(state: RobustState) -> dict:
documents = search_documents(state["query"])
return {"documents": documents}
@catch_errors("analysis")
def analysis_node(state: RobustState) -> dict:
analysis = analyze_documents(state["documents"])
return {"analysis": analysis}
Fallback Paths: Graceful Degradation
Problem: What Happens After Retries Fail?
Without fallbacks, exhausted retries leave you with nothing:
- User sees generic error
- No partial results
- Workflow stops completely
Solution: Multi-Level Fallbacks
from typing import TypedDict, Optional, Literal
class FallbackState(TypedDict):
query: str
primary_result: Optional[str]
fallback_result: Optional[str]
cached_result: Optional[str]
error_message: Optional[str]
fallback_level: int # 0=primary, 1=alternative, 2=cached, 3=static
def primary_processor(state: FallbackState) -> dict:
"""Primary path: Use latest model."""
try:
result = call_claude_opus(state["query"])
return {
"primary_result": result,
"fallback_level": 0
}
except Exception as e:
return {"error_message": str(e)}
def alternative_processor(state: FallbackState) -> dict:
"""Fallback 1: Use cheaper/faster model."""
try:
result = call_claude_haiku(state["query"]) # Faster, cheaper
return {
"fallback_result": result,
"fallback_level": 1,
"error_message": None
}
except Exception as e:
return {"error_message": str(e)}
def cached_processor(state: FallbackState) -> dict:
"""Fallback 2: Return cached similar result."""
cache_key = compute_query_hash(state["query"])
cached = cache.get(cache_key)
if cached:
return {
"cached_result": cached,
"fallback_level": 2,
"error_message": None
}
return {"error_message": "No cached result available"}
def static_fallback(state: FallbackState) -> dict:
"""Fallback 3: Return static response."""
return {
"fallback_result": """
I apologize, but I'm experiencing technical difficulties.
Here's what I can tell you about your query:
- Your request has been logged for later processing
- A support ticket has been created
- Please try again in a few minutes
In the meantime, you might find these resources helpful:
- Documentation: https://docs.example.com
- FAQ: https://example.com/faq
""".strip(),
"fallback_level": 3,
"error_message": "All services unavailable, used static fallback"
}
def route_fallback(state: FallbackState) -> Literal["done", "try_alternative", "try_cache", "static"]:
"""Route through fallback chain."""
if state.get("primary_result") or state.get("fallback_result"):
return "done"
level = state.get("fallback_level", -1)
error = state.get("error_message")
if not error:
return "done"
# Move to next fallback level
if level < 1:
return "try_alternative"
elif level < 2:
return "try_cache"
else:
return "static"
# Build fallback chain
graph = StateGraph(FallbackState)
graph.add_node("primary", primary_processor)
graph.add_node("alternative", alternative_processor)
graph.add_node("cached", cached_processor)
graph.add_node("static", static_fallback)
graph.add_node("finalize", finalize_result)
graph.set_entry_point("primary")
graph.add_conditional_edges("primary", route_fallback, {
"done": "finalize",
"try_alternative": "alternative",
"try_cache": "cached",
"static": "static"
})
graph.add_conditional_edges("alternative", route_fallback, {
"done": "finalize",
"try_cache": "cached",
"static": "static"
})
graph.add_conditional_edges("cached", route_fallback, {
"done": "finalize",
"static": "static"
})
graph.add_edge("static", "finalize")
graph.add_edge("finalize", END)
Exponential Backoff with Jitter
Why Basic Retries Fail
# ❌ BAD: Hammers the API
def bad_retry(state):
for i in range(5):
try:
return call_api()
except:
time.sleep(1) # Fixed delay = API stays overloaded
Production Pattern: Exponential Backoff
import time
import random
from typing import TypedDict, Optional
class RetryState(TypedDict):
task: str
result: Optional[str]
attempt: int
max_attempts: int
base_delay: float # seconds
max_delay: float # cap delay
def calculate_backoff(attempt: int, base: float, max_delay: float) -> float:
"""
Calculate delay with exponential backoff and jitter.
Formula: min(base * 2^attempt + random(0,1), max_delay)
"""
exponential = base * (2 ** attempt)
jitter = random.uniform(0, 1) # Prevent thundering herd
return min(exponential + jitter, max_delay)
def retry_with_backoff(state: RetryState) -> dict:
"""Execute with exponential backoff retry."""
attempt = state["attempt"]
if attempt > 0:
# Wait before retry (not on first attempt)
delay = calculate_backoff(
attempt - 1,
state.get("base_delay", 1.0),
state.get("max_delay", 60.0)
)
print(f"Attempt {attempt}: Waiting {delay:.2f}s before retry")
time.sleep(delay)
try:
result = perform_operation(state["task"])
return {
"result": result,
"attempt": attempt + 1,
"error_message": None
}
except Exception as e:
return {
"error_message": str(e),
"attempt": attempt + 1
}
def should_retry(state: RetryState) -> Literal["continue", "retry", "give_up"]:
"""Decide whether to retry."""
if state.get("result"):
return "continue"
if state["attempt"] < state["max_attempts"]:
return "retry"
return "give_up"
# Example backoff sequence (base=1s):
# Attempt 1: Immediate
# Attempt 2: ~1-2s delay (1*2^0 + jitter)
# Attempt 3: ~2-3s delay (1*2^1 + jitter)
# Attempt 4: ~4-5s delay (1*2^2 + jitter)
# Attempt 5: ~8-9s delay (1*2^3 + jitter)
Async Backoff Pattern
import asyncio
import random
async def async_retry_with_backoff(
func,
max_attempts: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
):
"""Async retry with exponential backoff."""
for attempt in range(max_attempts):
try:
return await func()
except Exception as e:
if attempt == max_attempts - 1:
raise # Last attempt, re-raise
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
# Usage in async node
async def async_research_node(state: dict) -> dict:
async def do_research():
return await async_search(state["query"])
try:
result = await async_retry_with_backoff(do_research)
return {"documents": result}
except Exception as e:
return {"error_message": str(e)}
Circuit Breaker Pattern
The Problem: Cascading Failures
When an external service goes down:
- All requests timeout (slow)
- Timeouts exhaust thread pools
- Memory fills with pending requests
- Entire system becomes unresponsive
Solution: Circuit Breaker
from typing import TypedDict, Literal, Optional
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Blocking requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreakerState(TypedDict):
"""State for circuit breaker pattern."""
query: str
result: Optional[str]
# Circuit breaker tracking
circuit_status: str # closed, open, half_open
consecutive_failures: int
last_failure_time: Optional[str]
# Configuration
failure_threshold: int # Open after N failures
recovery_timeout: int # Seconds before trying again
half_open_max_calls: int # Test calls in half-open
class CircuitBreaker:
"""Production circuit breaker implementation."""
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.failures = 0
self.last_failure_time = None
self.half_open_calls = 0
def can_execute(self) -> tuple[bool, str]:
"""Check if request should proceed."""
if self.state == CircuitState.CLOSED:
return True, "closed"
if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
if self.last_failure_time:
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
if elapsed >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
return True, "half_open"
return False, "open"
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls < 3: # Allow limited test calls
self.half_open_calls += 1
return True, "half_open"
return False, "half_open_limit"
return False, "unknown"
def record_success(self):
"""Record successful call."""
self.failures = 0
self.state = CircuitState.CLOSED
self.half_open_calls = 0
def record_failure(self):
"""Record failed call."""
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit OPENED after {self.failures} failures")
# Global circuit breaker per service
api_circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
def node_with_circuit_breaker(state: CircuitBreakerState) -> dict:
"""Node that respects circuit breaker."""
can_execute, circuit_status = api_circuit.can_execute()
if not can_execute:
return {
"error_message": f"Circuit breaker {circuit_status}: Service unavailable",
"circuit_status": circuit_status
}
try:
result = call_external_api(state["query"])
api_circuit.record_success()
return {
"result": result,
"circuit_status": "closed"
}
except Exception as e:
api_circuit.record_failure()
return {
"error_message": str(e),
"circuit_status": api_circuit.state.value,
"consecutive_failures": api_circuit.failures
}
State-Based Circuit Breaker (Stateless Nodes)
from datetime import datetime, timedelta
def check_circuit(state: CircuitBreakerState) -> Literal["proceed", "blocked", "test"]:
"""Check circuit breaker using state (no global)."""
status = state.get("circuit_status", "closed")
failures = state.get("consecutive_failures", 0)
threshold = state.get("failure_threshold", 5)
recovery = state.get("recovery_timeout", 60)
if status == "closed":
return "proceed"
if status == "open":
last_failure = state.get("last_failure_time")
if last_failure:
elapsed = (datetime.now() - datetime.fromisoformat(last_failure)).total_seconds()
if elapsed >= recovery:
return "test" # Try half-open
return "blocked"
if status == "half_open":
return "test"
return "proceed"
def update_circuit_on_success(state: CircuitBreakerState) -> dict:
"""Reset circuit on success."""
return {
"circuit_status": "closed",
"consecutive_failures": 0,
"last_failure_time": None
}
def update_circuit_on_failure(state: CircuitBreakerState) -> dict:
"""Update circuit on failure."""
failures = state.get("consecutive_failures", 0) + 1
threshold = state.get("failure_threshold", 5)
new_status = "open" if failures >= threshold else state.get("circuit_status", "closed")
return {
"circuit_status": new_status,
"consecutive_failures": failures,
"last_failure_time": datetime.now().isoformat()
}
Complete Production Error Recovery Graph
from typing import TypedDict, Annotated, Optional, Literal
import operator
from langgraph.graph import StateGraph, END
class ProductionState(TypedDict):
"""Complete production state with error handling."""
# Core data
query: str
documents: Annotated[list[str], operator.add]
result: Optional[str]
# Error tracking
error_message: Optional[str]
error_type: Optional[str]
failed_node: Optional[str]
# Retry management
retry_count: int
max_retries: int
# Circuit breaker
circuit_status: str
consecutive_failures: int
failure_threshold: int
last_failure_time: Optional[str]
# Fallback tracking
fallback_level: int
used_fallback: bool
def research_with_recovery(state: ProductionState) -> dict:
"""Research with full error recovery."""
# Check circuit breaker
if state.get("circuit_status") == "open":
last_failure = state.get("last_failure_time")
if last_failure:
elapsed = (datetime.now() - datetime.fromisoformat(last_failure)).total_seconds()
if elapsed < 60: # Recovery timeout
return {
"error_message": "Circuit open, skipping request",
"error_type": "circuit_open"
}
try:
documents = search_with_api(state["query"])
return {
"documents": documents,
"error_message": None,
"circuit_status": "closed",
"consecutive_failures": 0
}
except RateLimitError as e:
return handle_error(state, e, "rate_limit")
except TimeoutError as e:
return handle_error(state, e, "timeout")
except Exception as e:
return handle_error(state, e, "unknown")
def handle_error(state: dict, error: Exception, error_type: str) -> dict:
"""Unified error handler."""
failures = state.get("consecutive_failures", 0) + 1
threshold = state.get("failure_threshold", 5)
return {
"error_message": str(error),
"error_type": error_type,
"failed_node": "research",
"retry_count": state.get("retry_count", 0) + 1,
"circuit_status": "open" if failures >= threshold else "closed",
"consecutive_failures": failures,
"last_failure_time": datetime.now().isoformat()
}
def route_after_research(state: ProductionState) -> str:
"""Smart routing based on error type."""
if state.get("documents"):
return "continue"
error_type = state.get("error_type", "unknown")
retry_count = state.get("retry_count", 0)
max_retries = state.get("max_retries", 3)
# Retriable errors
if error_type in ["timeout", "rate_limit"] and retry_count < max_retries:
return "retry_with_backoff"
# Circuit open
if error_type == "circuit_open":
return "fallback"
# Non-retriable or max retries
return "fallback"
# Build production graph
graph = StateGraph(ProductionState)
graph.add_node("research", research_with_recovery)
graph.add_node("retry_with_backoff", retry_with_backoff_node)
graph.add_node("fallback", fallback_node)
graph.add_node("finalize", finalize_node)
graph.set_entry_point("research")
graph.add_conditional_edges("research", route_after_research, {
"continue": "finalize",
"retry_with_backoff": "retry_with_backoff",
"fallback": "fallback"
})
graph.add_edge("retry_with_backoff", "research")
graph.add_edge("fallback", "finalize")
graph.add_edge("finalize", END)
app = graph.compile()
Interview Questions
Q1: "How do you prevent a single API failure from crashing your entire LangGraph workflow?"
Strong Answer:
"I wrap all external calls in try-catch blocks within nodes. Instead of raising exceptions, nodes return error state updates like
{error_message, error_type, failed_node}. A conditional edge then routes to either retry (for transient errors like timeouts) or fallback (for persistent failures). The graph never crashes—it gracefully degrades. I also use the decorator pattern to add error handling uniformly across all nodes."
Q2: "When would you use exponential backoff vs. fixed retry delays?"
Answer:
"Fixed delays cause 'thundering herd'—when an API recovers, all waiting requests hit it simultaneously. Exponential backoff spreads retry load over time: 1s, 2s, 4s, 8s. I add jitter (random 0-1s) to further spread retries across clients. This gives the API time to recover. I use exponential backoff for rate limits and overload errors, but might use shorter fixed delays for network blips where I expect immediate recovery."
Q3: "Explain the circuit breaker pattern and when to use it in LangGraph."
Answer:
"A circuit breaker tracks consecutive failures to an external service. When failures exceed a threshold (e.g., 5), it 'opens' and immediately rejects requests instead of waiting for timeouts. After a recovery period (e.g., 60s), it enters 'half-open' state and allows a few test requests. If they succeed, it 'closes' and resumes normal operation. I use it for external APIs that might go down—it prevents cascading failures where timeout-waiting requests exhaust resources. In LangGraph, I track circuit state in the graph state and check it before making API calls."
Key Takeaways
- Try-catch in every node: Never let exceptions crash the graph
- Error state fields: Track
error_message,error_type,failed_node - Exponential backoff with jitter: Prevents thundering herd
- Multi-level fallbacks: Primary → Alternative → Cache → Static
- Circuit breaker: Opens after N failures, tests recovery after timeout
- Route by error type: Retriable errors retry, others go to fallback
Module 2 Complete! Don't forget to take the Module 2 Quiz to test your knowledge. Next up: Module 3 - Checkpointing & Persistence where you'll learn to make your workflows resumable and production-ready.
:::