State Management Fundamentals
Advanced State Patterns in Production
Why Advanced State Patterns Matter
Real Production Incident (January 2026):
A multi-agent legal document analysis system at a law firm crashed after 48 hours of processing 100K+ pages. Root cause? No state validation: corrupt data from a failed OCR node propagated through the pipeline, causing downstream LLM calls to fail with context overflow errors.
After implementing validation, error states, and size limits, the system ran for 7 days straight processing 500K+ pages with zero crashes.
This lesson teaches you: Production-hardened state patterns for validation, error recovery, and long-running workflows that companies like Anthropic and Meta expect in senior LangGraph engineers.
State Validation: Guard Your Inputs
Runtime Validation with Pydantic
Problem: User input or external APIs can send malformed data.
Solution: Validate at graph entry point.
from pydantic import BaseModel, Field, field_validator, ValidationError
from typing import Annotated, Optional
import operator
from langgraph.graph import StateGraph
class ValidatedInput(BaseModel):
"""Pydantic model for API input validation."""
query: str = Field(..., min_length=10, max_length=1000)
max_documents: int = Field(default=10, ge=1, le=100)
search_depth: int = Field(default=1, ge=1, le=3)
@field_validator('query')
@classmethod
def no_malicious_content(cls, v: str) -> str:
"""Block potential prompt injection."""
forbidden = ['ignore previous', 'disregard', 'system:', 'admin']
if any(term in v.lower() for term in forbidden):
raise ValueError(f"Query contains forbidden terms")
return v
class AgentState(TypedDict):
"""Internal state (TypedDict for performance)."""
query: str
documents: Annotated[list[str], operator.add]
analysis: Optional[str]
error: Optional[str]
def validate_and_convert_input(user_input: dict) -> AgentState:
"""
Entry point: Validate with Pydantic, convert to TypedDict.
As of January 2026, this is the recommended pattern.
"""
try:
validated = ValidatedInput(**user_input)
return AgentState(
query=validated.query,
documents=[],
analysis=None,
error=None
)
except ValidationError as e:
# Return error state instead of raising
return AgentState(
query=user_input.get("query", ""),
documents=[],
analysis=None,
error=f"Validation failed: {e.errors()}"
)
# Usage
graph = StateGraph(AgentState)
# Validate before invoking
raw_input = {"query": "short", "max_documents": 200} # Invalid!
state = validate_and_convert_input(raw_input)
if state["error"]:
print(f"Invalid input: {state['error']}")
else:
result = graph.invoke(state)
Production Pattern:
Pydantic at boundaries (API input/output), TypedDict internally (zero overhead).
State Size Management for Long-Running Workflows
Problem: Unbounded Growth
# After 1000 iterations:
state = {
"documents": ["doc1", "doc2", ..., "doc10000"], # 50MB+
"messages": [...], # Another 20MB
"traces": [...] # Another 30MB
}
# Total: 100MB+ per checkpoint! PostgresSaver slows to a crawl.
Solution 1: Periodic Pruning
from typing import TypedDict, Annotated, Optional
import operator
class ManagedState(TypedDict):
"""State with size management."""
documents: Annotated[list[str], operator.add]
messages: Annotated[list[dict], operator.add]
# Size tracking
state_size_mb: float
max_state_size_mb: float # e.g., 10MB
def estimate_size_mb(state: dict) -> float:
"""Estimate state size in MB (fast approximation)."""
import json
return len(json.dumps(state, default=str).encode('utf-8')) / (1024 * 1024)
def prune_state(state: ManagedState) -> dict:
"""
Supervisor node: Prune state if too large.
Called every N iterations (e.g., every 10 nodes).
"""
current_size = estimate_size_mb(state)
updates = {"state_size_mb": current_size}
if current_size > state["max_state_size_mb"]:
# Strategy 1: Keep only recent data
updates["documents"] = state["documents"][-50:] # Last 50 docs
updates["messages"] = state["messages"][-20:] # Last 20 messages
print(f"⚠️ Pruned state: {current_size:.2f}MB → {estimate_size_mb(updates):.2f}MB")
return updates
# Build graph with pruning supervisor
def supervisor_node(state: ManagedState) -> dict:
"""Run every 10 iterations to prune state."""
pruned = prune_state(state)
# ... routing logic ...
return pruned
Solution 2: Offload to External Storage
import hashlib
import json
from typing import TypedDict, Annotated
import operator
class OffloadedState(TypedDict):
"""State with offloaded large fields."""
query: str
document_ids: Annotated[list[str], operator.add] # IDs, not full docs
analysis: str
def store_document_external(doc: str, storage_client) -> str:
"""
Store large document in S3/Redis, return ID.
Use for documents >10KB.
"""
doc_id = hashlib.sha256(doc.encode()).hexdigest()[:16]
storage_client.set(doc_id, doc) # Redis/S3
return doc_id
def retrieve_documents(doc_ids: list[str], storage_client) -> list[str]:
"""Retrieve documents when needed."""
return [storage_client.get(doc_id) for doc_id in doc_ids]
# Usage
def research_node(state: OffloadedState, config: dict) -> dict:
"""Store large documents externally."""
storage = config["configurable"]["storage_client"]
large_docs = fetch_documents(state["query"]) # Returns 100KB+ docs
doc_ids = [store_document_external(doc, storage) for doc in large_docs]
return {"document_ids": doc_ids} # Store IDs in state (tiny)
def analysis_node(state: OffloadedState, config: dict) -> dict:
"""Retrieve documents for analysis."""
storage = config["configurable"]["storage_client"]
docs = retrieve_documents(state["document_ids"], storage)
analysis = analyze(docs)
return {"analysis": analysis}
# Invoke with external storage
app.invoke(
{"query": "AI trends"},
config={"configurable": {"storage_client": redis_client}}
)
Production Limits (January 2026):
| Checkpointer | Max Size | Recommendation |
|---|---|---|
| MemorySaver | 100MB | Prune at 50MB |
| SqliteSaver | 1GB | Prune at 100MB or offload |
| PostgresSaver | 1GB (configurable) | Prune at 100MB, use external storage for >10MB |
| RedisSaver | 512MB default | Increase limit or offload |
Error States & Recovery Metadata
Pattern: Never Raise, Always Return Error State
from typing import TypedDict, Optional, Literal
class RobustState(TypedDict):
"""State with error tracking."""
query: str
documents: list[str]
analysis: Optional[str]
# Error handling fields
error_message: Optional[str]
failed_node: Optional[str]
retry_count: int
max_retries: int
def fragile_node(state: RobustState) -> dict:
"""
Node that can fail.
❌ BAD: raise Exception("API failed")
✅ GOOD: Return error state
"""
try:
# Call external API (can fail)
docs = call_external_api(state["query"])
return {"documents": docs}
except Exception as e:
# Return error state instead of crashing
return {
"error_message": f"API call failed: {str(e)}",
"failed_node": "fragile_node",
"retry_count": state["retry_count"] + 1
}
def error_handler_node(state: RobustState) -> dict:
"""Supervisor checks for errors and decides retry/abort."""
if state["error_message"]:
if state["retry_count"] < state["max_retries"]:
# Clear error and retry
return {
"error_message": None, # Clear error
"next_action": "retry"
}
else:
# Max retries reached, abort gracefully
return {
"next_action": "abort",
"analysis": f"Failed after {state['retry_count']} retries"
}
# No error, continue
return {"next_action": "continue"}
Why This Matters:
- ✅ Graph never crashes, always completes gracefully
- ✅ LangSmith traces show errors in context (not just stack trace)
- ✅ Checkpoints can resume from error state
- ✅ Production dashboards can alert on error patterns
Production Pattern: Circuit Breaker
Problem: External API down → 100 nodes all fail → checkpoint bloat.
Solution: Circuit breaker stops calling failed service.
from typing import TypedDict, Literal
from datetime import datetime, timedelta
class CircuitBreakerState(TypedDict):
"""State with circuit breaker pattern."""
query: str
documents: list[str]
# Circuit breaker fields
circuit_status: Literal["closed", "open", "half_open"]
failure_count: int
last_failure_time: Optional[str] # ISO timestamp
circuit_threshold: int # Open circuit after N failures
def call_with_circuit_breaker(state: CircuitBreakerState) -> dict:
"""Check circuit before calling external service."""
# Circuit open: Skip call, return immediately
if state["circuit_status"] == "open":
# Check if cooldown period passed (e.g., 60 seconds)
last_failure = datetime.fromisoformat(state["last_failure_time"])
if datetime.now() - last_failure > timedelta(seconds=60):
# Try half-open: allow one test call
return {**state, "circuit_status": "half_open"}
else:
# Still in cooldown
return {
"error_message": "Circuit breaker open, skipping API call",
"circuit_status": "open"
}
# Circuit closed or half-open: Try call
try:
docs = call_external_api(state["query"])
# Success: Reset circuit
return {
"documents": docs,
"circuit_status": "closed",
"failure_count": 0
}
except Exception as e:
# Failure: Increment counter
new_failure_count = state["failure_count"] + 1
if new_failure_count >= state["circuit_threshold"]:
# Open circuit
return {
"error_message": f"API failed {new_failure_count} times, opening circuit",
"circuit_status": "open",
"failure_count": new_failure_count,
"last_failure_time": datetime.now().isoformat()
}
else:
# Stay closed, increment counter
return {
"error_message": str(e),
"failure_count": new_failure_count
}
Production Benefit:
After 3 failures, circuit opens. Subsequent nodes skip API calls instantly instead of waiting for timeouts. System degrades gracefully instead of cascading failure.
Metadata for Observability
from typing import TypedDict, Annotated
import operator
from datetime import datetime
class ObservableState(TypedDict):
"""State with rich observability metadata."""
query: str
documents: Annotated[list[str], operator.add]
# Observability metadata
workflow_id: str
started_at: str # ISO timestamp
last_updated_at: str
total_llm_calls: Annotated[int, operator.add]
total_tokens: Annotated[int, operator.add]
total_cost_usd: Annotated[float, operator.add]
node_execution_times: dict[str, float] # {"research": 2.3, "analysis": 5.1}
def instrumented_node(state: ObservableState) -> dict:
"""Node with instrumentation."""
import time
start_time = time.time()
# ... do work ...
docs = fetch_documents(state["query"])
tokens_used = len(docs) * 100
execution_time = time.time() - start_time
return {
"documents": docs,
"last_updated_at": datetime.now().isoformat(),
"total_llm_calls": 1,
"total_tokens": tokens_used,
"total_cost_usd": tokens_used * 0.00002,
"node_execution_times": {
**state["node_execution_times"],
"instrumented_node": execution_time
}
}
# After workflow: rich metrics for dashboards
# total_cost_usd: $2.45
# total_tokens: 122,500
# node_execution_times: {"research": 2.3s, "analysis": 5.1s, "writer": 8.2s}
Use in Production:
- Send to LangSmith for real-time dashboards
- Alert when
total_cost_usd > $10(budget limit) - Profile slow nodes:
node_execution_times > 30s
Interview Questions
Q1: "How do you prevent state from growing unbounded in a 24/7 workflow?"
Strong Answer:
"I use three strategies: (1) Periodic pruning in a supervisor node—keep only the last N documents/messages based on memory limits. (2) Offload large objects to external storage (Redis/S3) and store only IDs in state. (3) Track state size with
estimate_size_mb()and alert when it exceeds thresholds. For example, in a legal document system, I pruned state every 100 documents, keeping size under 10MB for fast PostgresSaver checkpoints."
Q2: "Should nodes raise exceptions or return error states?"
Answer:
"Return error states, not exceptions. In production, raising exceptions crashes the graph and loses checkpoint context. Instead, I return
{"error_message": str(e), "failed_node": "research"}and let a supervisor node decide whether to retry, skip, or abort. This keeps the graph running, preserves error context in LangSmith traces, and allows checkpoints to resume from the error state."
Q3: "How do you implement circuit breaker pattern in LangGraph?"
Answer:
"I add circuit breaker fields to state:
circuit_status(closed/open/half_open),failure_count,last_failure_time,circuit_threshold. Before calling an external API, I check if the circuit is open. If so, I skip the call and return an error state immediately. After N consecutive failures (threshold), I open the circuit and enter a cooldown period. After cooldown, I try one 'half_open' test call. If it succeeds, I close the circuit. This prevents cascading failures when APIs are down."
Production Code Example
from typing import TypedDict, Annotated, Optional, Literal
import operator
from datetime import datetime
import json
class ProductionState(TypedDict):
"""Production-ready state with all patterns."""
# Core data
query: str
documents: Annotated[list[str], operator.add]
analysis: Optional[str]
# Size management
state_size_mb: float
max_state_size_mb: float
# Error handling
error_message: Optional[str]
retry_count: int
max_retries: int
# Circuit breaker
circuit_status: Literal["closed", "open", "half_open"]
failure_count: int
circuit_threshold: int
last_failure_time: Optional[str]
# Observability
workflow_id: str
total_tokens: Annotated[int, operator.add]
total_cost_usd: Annotated[float, operator.add]
node_execution_times: dict[str, float]
def production_research_node(state: ProductionState) -> dict:
"""Node with all production patterns."""
import time
start_time = time.time()
# Check circuit breaker
if state["circuit_status"] == "open":
return {"error_message": "Circuit open, skipping API call"}
try:
# Call external API
docs = fetch_documents(state["query"])
tokens = len(docs) * 100
# Success: return updates
return {
"documents": docs,
"total_tokens": tokens,
"total_cost_usd": tokens * 0.00002,
"node_execution_times": {
**state["node_execution_times"],
"research": time.time() - start_time
},
"circuit_status": "closed",
"failure_count": 0
}
except Exception as e:
# Failure: update error state
new_failure_count = state["failure_count"] + 1
circuit_status = "open" if new_failure_count >= state["circuit_threshold"] else "closed"
return {
"error_message": str(e),
"retry_count": state["retry_count"] + 1,
"failure_count": new_failure_count,
"circuit_status": circuit_status,
"last_failure_time": datetime.now().isoformat()
}
def supervisor_with_pruning(state: ProductionState) -> dict:
"""Supervisor: error recovery + size management."""
updates = {}
# Error recovery
if state["error_message"]:
if state["retry_count"] < state["max_retries"]:
updates["error_message"] = None # Retry
else:
updates["next_action"] = "abort" # Give up
# Size management
current_size = len(json.dumps(dict(state)).encode()) / (1024 * 1024)
updates["state_size_mb"] = current_size
if current_size > state["max_state_size_mb"]:
updates["documents"] = state["documents"][-50:] # Prune
return updates
Key Takeaways
✅ Validate input with Pydantic at graph entry (prevent bad data) ✅ Prune state periodically in supervisor nodes (prevent memory bloat) ✅ Offload large objects to external storage (keep state small) ✅ Return error states instead of raising exceptions (graceful degradation) ✅ Implement circuit breaker for external APIs (prevent cascading failures) ✅ Track observability metadata (tokens, cost, execution time)
Next: You've mastered state management! Move to Module 2 to learn advanced graph patterns: recursive loops, dynamic branching, and error recovery. Don't forget to complete the Module 1 Quiz to test your knowledge!
:::