State Management Fundamentals
Reducers & State Mutations
Why Reducers Matter in Production
Real Interview Question (LangChain L6):
"You have a multi-agent system where 3 agents research in parallel. Each finds 10 documents. How do you accumulate all 30 documents in state without overwriting?"
Wrong Answer:
"Return
{"documents": new_docs}from each node." ❌ Last writer wins, only 10 documents survive.
Correct Answer:
"Use
Annotated[list[str], operator.add]reducer. LangGraph merges updates by appending, not replacing." ✅
This lesson teaches you: Production patterns for state updates, reducers, and avoiding the #1 bug in LangGraph workflows: accidental state overwrites.
State Update Semantics: Merge vs Replace
As of January 2026, LangGraph uses shallow merge by default:
Default Behavior: Shallow Merge
from typing import TypedDict
from langgraph.graph import StateGraph
class AgentState(TypedDict):
query: str
documents: list[str]
analysis: str
# Node returns partial updates
def research_node(state: AgentState) -> dict:
"""Node returns ONLY the fields it wants to update."""
return {
"documents": ["doc1", "doc2", "doc3"]
}
# query and analysis remain unchanged!
# Graph merges updates into existing state
graph = StateGraph(AgentState)
graph.add_node("research", research_node)
Key Principle:
# Nodes return PARTIAL state updates (dict)
# Graph performs: new_state = {**old_state, **updates}
# This is SHALLOW MERGE
Example:
# Initial state
state = {
"query": "AI trends 2026",
"documents": [],
"analysis": ""
}
# Node 1 returns
update1 = {"documents": ["doc1", "doc2"]}
# After merge: state = {**state, **update1}
state = {
"query": "AI trends 2026", # Unchanged
"documents": ["doc1", "doc2"], # REPLACED (not appended!)
"analysis": "" # Unchanged
}
# Node 2 returns
update2 = {"documents": ["doc3", "doc4"]}
# After merge: state = {**state, **update2}
state = {
"query": "AI trends 2026",
"documents": ["doc3", "doc4"], # OVERWROTE ["doc1", "doc2"] ❌
"analysis": ""
}
Problem: Default merge replaces list fields. You lose previous documents!
Solution: Use reducers for accumulation.
Annotated Reducers: The Production Solution
operator.add for Lists (Most Common)
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
"""State with reducer-based accumulation."""
query: str
# Reducer: operator.add → appends instead of replacing
documents: Annotated[list[str], operator.add]
key_findings: Annotated[list[str], operator.add]
analysis: str # No reducer → default merge (replace)
# Now parallel nodes can accumulate documents
def research_node_1(state: AgentState) -> dict:
return {"documents": ["doc1", "doc2"]}
def research_node_2(state: AgentState) -> dict:
return {"documents": ["doc3", "doc4"]}
# After both nodes: documents = ["doc1", "doc2", "doc3", "doc4"] ✅
How It Works:
# With Annotated[list[str], operator.add]:
# new_value = old_value + update_value
# ["doc1", "doc2"] + ["doc3", "doc4"] = ["doc1", "doc2", "doc3", "doc4"]
Production Use Cases:
- ✅ Accumulating search results across agents
- ✅ Collecting error messages from multiple nodes
- ✅ Building conversation history
- ✅ Aggregating metrics (e.g., token counts)
Custom Reducer Functions
For complex merge logic, write custom reducers:
from typing import Annotated
def merge_dicts(old: dict, new: dict) -> dict:
"""
Custom reducer: Deep merge dictionaries.
Used for nested config or metadata.
"""
result = old.copy()
for key, value in new.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = merge_dicts(result[key], value) # Recursive merge
else:
result[key] = value # Replace primitives
return result
def deduplicate_list(old: list, new: list) -> list:
"""
Custom reducer: Append + deduplicate.
Used for accumulating unique documents.
"""
return list(dict.fromkeys(old + new)) # Preserves order, removes duplicates
class AdvancedState(TypedDict):
documents: Annotated[list[str], deduplicate_list]
metadata: Annotated[dict, merge_dicts]
query: str
# Usage
def node1(state: AdvancedState) -> dict:
return {
"documents": ["doc1", "doc2"],
"metadata": {"source": "web", "count": 2}
}
def node2(state: AdvancedState) -> dict:
return {
"documents": ["doc2", "doc3"], # doc2 is duplicate
"metadata": {"source": "academic", "quality": "high"}
}
# Result:
# documents = ["doc1", "doc2", "doc3"] (deduplicated)
# metadata = {"source": "academic", "count": 2, "quality": "high"} (deep merged)
When to Use Custom Reducers:
- ✅ Deduplication logic (unique documents, IDs)
- ✅ Deep merging nested structures
- ✅ Aggregation (sum, max, min for metrics)
- ✅ Priority-based merging (newer overwrites older)
Production Pattern: Counter with operator.add
Track iterations, token usage, or costs:
from typing import Annotated
import operator
class AgentState(TypedDict):
query: str
documents: Annotated[list[str], operator.add]
# Counters with operator.add
iteration_count: Annotated[int, operator.add]
total_tokens: Annotated[int, operator.add]
total_cost_usd: Annotated[float, operator.add]
# Each node increments counters
def research_node(state: AgentState) -> dict:
# ... do research ...
tokens_used = 1500
cost = tokens_used * 0.00002 # $0.02 per 1K tokens
return {
"documents": ["doc1", "doc2"],
"iteration_count": 1, # Adds to counter
"total_tokens": tokens_used, # Accumulates
"total_cost_usd": cost # Accumulates
}
def analysis_node(state: AgentState) -> dict:
tokens_used = 2000
cost = tokens_used * 0.00002
return {
"iteration_count": 1,
"total_tokens": tokens_used,
"total_cost_usd": cost
}
# After both nodes:
# iteration_count = 2
# total_tokens = 3500
# total_cost_usd = 0.07
Why This Works:
operator.addonint/floatperforms addition:1 + 1 = 2- No need for
state["iteration_count"] + 1inside nodes - Clean, declarative updates
Immutable Updates for Debugging
Production Pattern: Return new objects, don't mutate in-place.
❌ Bad: In-Place Mutation (Hard to Debug)
def bad_node(state: AgentState) -> dict:
"""Mutates state directly - breaks time-travel debugging."""
state["documents"].append("new_doc") # ❌ Modifies original list
state["analysis"] = "updated" # ❌ Modifies original string
return {} # Returns empty dict, but state already mutated
Problems:
- Checkpointing breaks: Checkpointer stores mutated state, not clean snapshots
- Time-travel debugging fails: Can't replay from checkpoint
- Race conditions: Concurrent nodes mutate shared state
✅ Good: Immutable Updates (Production-Ready)
def good_node(state: AgentState) -> dict:
"""Returns new state without mutating original."""
# Don't mutate: state["documents"].append(...)
# Instead: return new list
new_documents = state["documents"] + ["new_doc"]
return {
"documents": new_documents,
"analysis": "updated"
}
# Original state unchanged, graph merges updates
Benefits:
- ✅ Checkpointing works correctly
- ✅ Time-travel debugging enabled
- ✅ Thread-safe for concurrent execution
- ✅ Easier to test (pure functions)
Production Pattern: Conditional Updates
Only update fields when needed:
from typing import Optional
def conditional_node(state: AgentState) -> dict:
"""Only update analysis if documents exist."""
updates = {}
# Conditional field update
if len(state["documents"]) > 0:
updates["analysis"] = analyze_documents(state["documents"])
else:
updates["error_message"] = "No documents to analyze"
# Always increment iteration
updates["iteration_count"] = 1
return updates # Only returns fields that changed
Why This Matters:
- Avoids unnecessary LLM calls
- Keeps state clean (no empty/null pollution)
- Clear control flow in LangSmith traces
Reducer for Message Passing Between Agents
Use Case: Multi-agent systems where agents communicate.
from typing import TypedDict, Annotated
import operator
class Message(TypedDict):
"""Agent-to-agent message."""
sender: str
recipient: str
content: str
timestamp: str
class MultiAgentState(TypedDict):
query: str
# Accumulate messages between agents
messages: Annotated[list[Message], operator.add]
final_report: str
def researcher_node(state: MultiAgentState) -> dict:
"""Researcher agent sends message to writer."""
message = Message(
sender="researcher",
recipient="writer",
content="Found 5 papers on AI safety",
timestamp="2026-01-15T10:30:00Z"
)
return {"messages": [message]}
def writer_node(state: MultiAgentState) -> dict:
"""Writer reads messages from researcher."""
researcher_messages = [
msg for msg in state["messages"]
if msg["sender"] == "researcher" and msg["recipient"] == "writer"
]
# ... process messages ...
response = Message(
sender="writer",
recipient="researcher",
content="Draft complete",
timestamp="2026-01-15T10:35:00Z"
)
return {"messages": [response]}
# messages accumulates communication history
Production Pattern:
Use
Annotated[list, operator.add]for audit trails, message logs, and inter-agent communication.
Common Mistakes & Fixes
Mistake 1: Forgetting Reducer, Lists Get Overwritten
# ❌ BAD
class State(TypedDict):
documents: list[str] # No reducer!
# Last writer wins
node1_output = {"documents": ["doc1", "doc2"]}
node2_output = {"documents": ["doc3", "doc4"]}
# Final: ["doc3", "doc4"] - lost ["doc1", "doc2"]
# ✅ GOOD
class State(TypedDict):
documents: Annotated[list[str], operator.add] # Reducer!
# Both accumulate
# Final: ["doc1", "doc2", "doc3", "doc4"]
Mistake 2: Using Reducer on Non-List Fields
# ❌ BAD
class State(TypedDict):
analysis: Annotated[str, operator.add] # Reducer on string?
node1 = {"analysis": "First insight"}
node2 = {"analysis": " Second insight"}
# Result: "First insight Second insight" (string concatenation!)
# Probably not what you want
# ✅ GOOD
class State(TypedDict):
analysis: str # No reducer, normal merge (replace)
# Last writer wins (expected for single-value fields)
Mistake 3: Infinite Growth Without Pruning
# ❌ BAD: Unbounded accumulation
class State(TypedDict):
documents: Annotated[list[str], operator.add]
# After 1000 iterations: 10,000+ documents in memory!
# ✅ GOOD: Prune in supervisor
def supervisor_node(state: State) -> dict:
"""Keep only last 100 documents."""
if len(state["documents"]) > 100:
# Override with pruned list
return {"documents": state["documents"][-100:]}
return {}
Interview Questions
Q1: "What's the difference between merge and replace semantics?"
Answer:
"LangGraph uses shallow merge by default:
new_state = {**old_state, **updates}. For fields without reducers, this replaces the value. For fields withAnnotated[list, operator.add], it accumulates by appending. Merge semantics prevent nodes from accidentally overwriting each other's updates, which is critical in parallel execution."
Q2: "When would you use a custom reducer instead of operator.add?"
Answer:
"Use custom reducers for complex logic that operator.add can't handle. Examples: deduplication (append + remove duplicates), deep merging nested dicts, priority-based updates (newer overwrites older), or aggregation functions (max, min, average). For instance, in a multi-agent system tracking confidence scores, I'd use a custom reducer to keep the maximum confidence per finding."
Q3: "How do reducers affect checkpointing performance?"
Answer:
"Reducers don't directly impact checkpointing—they determine how state is merged during graph execution. However, unbounded accumulation via reducers can bloat checkpoint size. For example, accumulating 10K documents with operator.add creates a 50MB checkpoint. I mitigate this by pruning state periodically in a supervisor node, keeping checkpoint size under 10MB for fast resume times."
Production Code Example: Complete Pattern
from typing import TypedDict, Annotated, Optional
import operator
from langgraph.graph import StateGraph, END
class ResearchState(TypedDict):
"""Production state with reducers."""
# Input
query: str
# Accumulated with reducer
documents: Annotated[list[str], operator.add]
key_findings: Annotated[list[str], operator.add]
# Single-value (merge/replace)
analysis: Optional[str]
final_report: Optional[str]
# Counters with reducer
iteration_count: Annotated[int, operator.add]
total_tokens: Annotated[int, operator.add]
# Control flow
max_iterations: int
is_finished: bool
def research_node(state: ResearchState) -> dict:
"""Research documents (immutable update)."""
new_docs = search_documents(state["query"])
return {
"documents": new_docs, # Appends via reducer
"iteration_count": 1, # Increments via reducer
"total_tokens": len(new_docs) * 100
}
def analysis_node(state: ResearchState) -> dict:
"""Analyze documents (replace semantics)."""
analysis = analyze_docs(state["documents"])
return {
"analysis": analysis, # Replaces (no reducer)
"key_findings": extract_findings(analysis), # Appends
"total_tokens": 2000
}
def should_continue(state: ResearchState) -> str:
"""Conditional edge with safety check."""
if state["iteration_count"] >= state["max_iterations"]:
return "end"
if state["is_finished"]:
return "end"
return "research"
# Build graph
graph = StateGraph(ResearchState)
graph.add_node("research", research_node)
graph.add_node("analysis", analysis_node)
graph.add_conditional_edges("analysis", should_continue, {
"research": "research",
"end": END
})
graph.set_entry_point("research")
app = graph.compile()
Key Takeaways
✅ Use Annotated[list, operator.add] for accumulation (documents, messages, findings)
✅ Use custom reducers for deduplication, deep merge, or complex logic
✅ Return immutable updates (don't mutate state in-place)
✅ Prune accumulated state to prevent memory bloat
✅ Use reducers for counters (iteration, tokens, cost tracking)
✅ Default merge replaces single-value fields (expected behavior)
Next: Learn advanced state patterns for production in Lesson 3: validation, error states, and recovery metadata.
:::