State Management Fundamentals

Reducers & State Mutations

4 min read

Why Reducers Matter in Production

Real Interview Question (LangChain L6):

"You have a multi-agent system where 3 agents research in parallel. Each finds 10 documents. How do you accumulate all 30 documents in state without overwriting?"

Wrong Answer:

"Return {"documents": new_docs} from each node." ❌ Last writer wins, only 10 documents survive.

Correct Answer:

"Use Annotated[list[str], operator.add] reducer. LangGraph merges updates by appending, not replacing." ✅

This lesson teaches you: Production patterns for state updates, reducers, and avoiding the #1 bug in LangGraph workflows: accidental state overwrites.


State Update Semantics: Merge vs Replace

As of January 2026, LangGraph uses shallow merge by default:

Default Behavior: Shallow Merge

from typing import TypedDict
from langgraph.graph import StateGraph

class AgentState(TypedDict):
    query: str
    documents: list[str]
    analysis: str

# Node returns partial updates
def research_node(state: AgentState) -> dict:
    """Node returns ONLY the fields it wants to update."""
    return {
        "documents": ["doc1", "doc2", "doc3"]
    }
    # query and analysis remain unchanged!

# Graph merges updates into existing state
graph = StateGraph(AgentState)
graph.add_node("research", research_node)

Key Principle:

# Nodes return PARTIAL state updates (dict)
# Graph performs: new_state = {**old_state, **updates}
# This is SHALLOW MERGE

Example:

# Initial state
state = {
    "query": "AI trends 2026",
    "documents": [],
    "analysis": ""
}

# Node 1 returns
update1 = {"documents": ["doc1", "doc2"]}

# After merge: state = {**state, **update1}
state = {
    "query": "AI trends 2026",      # Unchanged
    "documents": ["doc1", "doc2"],   # REPLACED (not appended!)
    "analysis": ""                   # Unchanged
}

# Node 2 returns
update2 = {"documents": ["doc3", "doc4"]}

# After merge: state = {**state, **update2}
state = {
    "query": "AI trends 2026",
    "documents": ["doc3", "doc4"],   # OVERWROTE ["doc1", "doc2"] ❌
    "analysis": ""
}

Problem: Default merge replaces list fields. You lose previous documents!

Solution: Use reducers for accumulation.


Annotated Reducers: The Production Solution

operator.add for Lists (Most Common)

from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    """State with reducer-based accumulation."""
    query: str
    # Reducer: operator.add → appends instead of replacing
    documents: Annotated[list[str], operator.add]
    key_findings: Annotated[list[str], operator.add]
    analysis: str  # No reducer → default merge (replace)

# Now parallel nodes can accumulate documents
def research_node_1(state: AgentState) -> dict:
    return {"documents": ["doc1", "doc2"]}

def research_node_2(state: AgentState) -> dict:
    return {"documents": ["doc3", "doc4"]}

# After both nodes: documents = ["doc1", "doc2", "doc3", "doc4"] ✅

How It Works:

# With Annotated[list[str], operator.add]:
# new_value = old_value + update_value
# ["doc1", "doc2"] + ["doc3", "doc4"] = ["doc1", "doc2", "doc3", "doc4"]

Production Use Cases:

  • ✅ Accumulating search results across agents
  • ✅ Collecting error messages from multiple nodes
  • ✅ Building conversation history
  • ✅ Aggregating metrics (e.g., token counts)

Custom Reducer Functions

For complex merge logic, write custom reducers:

from typing import Annotated

def merge_dicts(old: dict, new: dict) -> dict:
    """
    Custom reducer: Deep merge dictionaries.
    Used for nested config or metadata.
    """
    result = old.copy()
    for key, value in new.items():
        if key in result and isinstance(result[key], dict) and isinstance(value, dict):
            result[key] = merge_dicts(result[key], value)  # Recursive merge
        else:
            result[key] = value  # Replace primitives
    return result

def deduplicate_list(old: list, new: list) -> list:
    """
    Custom reducer: Append + deduplicate.
    Used for accumulating unique documents.
    """
    return list(dict.fromkeys(old + new))  # Preserves order, removes duplicates

class AdvancedState(TypedDict):
    documents: Annotated[list[str], deduplicate_list]
    metadata: Annotated[dict, merge_dicts]
    query: str

# Usage
def node1(state: AdvancedState) -> dict:
    return {
        "documents": ["doc1", "doc2"],
        "metadata": {"source": "web", "count": 2}
    }

def node2(state: AdvancedState) -> dict:
    return {
        "documents": ["doc2", "doc3"],  # doc2 is duplicate
        "metadata": {"source": "academic", "quality": "high"}
    }

# Result:
# documents = ["doc1", "doc2", "doc3"] (deduplicated)
# metadata = {"source": "academic", "count": 2, "quality": "high"} (deep merged)

When to Use Custom Reducers:

  • ✅ Deduplication logic (unique documents, IDs)
  • ✅ Deep merging nested structures
  • ✅ Aggregation (sum, max, min for metrics)
  • ✅ Priority-based merging (newer overwrites older)

Production Pattern: Counter with operator.add

Track iterations, token usage, or costs:

from typing import Annotated
import operator

class AgentState(TypedDict):
    query: str
    documents: Annotated[list[str], operator.add]

    # Counters with operator.add
    iteration_count: Annotated[int, operator.add]
    total_tokens: Annotated[int, operator.add]
    total_cost_usd: Annotated[float, operator.add]

# Each node increments counters
def research_node(state: AgentState) -> dict:
    # ... do research ...
    tokens_used = 1500
    cost = tokens_used * 0.00002  # $0.02 per 1K tokens

    return {
        "documents": ["doc1", "doc2"],
        "iteration_count": 1,        # Adds to counter
        "total_tokens": tokens_used, # Accumulates
        "total_cost_usd": cost       # Accumulates
    }

def analysis_node(state: AgentState) -> dict:
    tokens_used = 2000
    cost = tokens_used * 0.00002

    return {
        "iteration_count": 1,
        "total_tokens": tokens_used,
        "total_cost_usd": cost
    }

# After both nodes:
# iteration_count = 2
# total_tokens = 3500
# total_cost_usd = 0.07

Why This Works:

  • operator.add on int/float performs addition: 1 + 1 = 2
  • No need for state["iteration_count"] + 1 inside nodes
  • Clean, declarative updates

Immutable Updates for Debugging

Production Pattern: Return new objects, don't mutate in-place.

❌ Bad: In-Place Mutation (Hard to Debug)

def bad_node(state: AgentState) -> dict:
    """Mutates state directly - breaks time-travel debugging."""
    state["documents"].append("new_doc")  # ❌ Modifies original list
    state["analysis"] = "updated"          # ❌ Modifies original string

    return {}  # Returns empty dict, but state already mutated

Problems:

  • Checkpointing breaks: Checkpointer stores mutated state, not clean snapshots
  • Time-travel debugging fails: Can't replay from checkpoint
  • Race conditions: Concurrent nodes mutate shared state

✅ Good: Immutable Updates (Production-Ready)

def good_node(state: AgentState) -> dict:
    """Returns new state without mutating original."""
    # Don't mutate: state["documents"].append(...)
    # Instead: return new list
    new_documents = state["documents"] + ["new_doc"]

    return {
        "documents": new_documents,
        "analysis": "updated"
    }
    # Original state unchanged, graph merges updates

Benefits:

  • ✅ Checkpointing works correctly
  • ✅ Time-travel debugging enabled
  • ✅ Thread-safe for concurrent execution
  • ✅ Easier to test (pure functions)

Production Pattern: Conditional Updates

Only update fields when needed:

from typing import Optional

def conditional_node(state: AgentState) -> dict:
    """Only update analysis if documents exist."""
    updates = {}

    # Conditional field update
    if len(state["documents"]) > 0:
        updates["analysis"] = analyze_documents(state["documents"])
    else:
        updates["error_message"] = "No documents to analyze"

    # Always increment iteration
    updates["iteration_count"] = 1

    return updates  # Only returns fields that changed

Why This Matters:

  • Avoids unnecessary LLM calls
  • Keeps state clean (no empty/null pollution)
  • Clear control flow in LangSmith traces

Reducer for Message Passing Between Agents

Use Case: Multi-agent systems where agents communicate.

from typing import TypedDict, Annotated
import operator

class Message(TypedDict):
    """Agent-to-agent message."""
    sender: str
    recipient: str
    content: str
    timestamp: str

class MultiAgentState(TypedDict):
    query: str
    # Accumulate messages between agents
    messages: Annotated[list[Message], operator.add]
    final_report: str

def researcher_node(state: MultiAgentState) -> dict:
    """Researcher agent sends message to writer."""
    message = Message(
        sender="researcher",
        recipient="writer",
        content="Found 5 papers on AI safety",
        timestamp="2026-01-15T10:30:00Z"
    )

    return {"messages": [message]}

def writer_node(state: MultiAgentState) -> dict:
    """Writer reads messages from researcher."""
    researcher_messages = [
        msg for msg in state["messages"]
        if msg["sender"] == "researcher" and msg["recipient"] == "writer"
    ]

    # ... process messages ...

    response = Message(
        sender="writer",
        recipient="researcher",
        content="Draft complete",
        timestamp="2026-01-15T10:35:00Z"
    )

    return {"messages": [response]}

# messages accumulates communication history

Production Pattern:

Use Annotated[list, operator.add] for audit trails, message logs, and inter-agent communication.


Common Mistakes & Fixes

Mistake 1: Forgetting Reducer, Lists Get Overwritten

# ❌ BAD
class State(TypedDict):
    documents: list[str]  # No reducer!

# Last writer wins
node1_output = {"documents": ["doc1", "doc2"]}
node2_output = {"documents": ["doc3", "doc4"]}
# Final: ["doc3", "doc4"] - lost ["doc1", "doc2"]

# ✅ GOOD
class State(TypedDict):
    documents: Annotated[list[str], operator.add]  # Reducer!

# Both accumulate
# Final: ["doc1", "doc2", "doc3", "doc4"]

Mistake 2: Using Reducer on Non-List Fields

# ❌ BAD
class State(TypedDict):
    analysis: Annotated[str, operator.add]  # Reducer on string?

node1 = {"analysis": "First insight"}
node2 = {"analysis": " Second insight"}
# Result: "First insight Second insight" (string concatenation!)
# Probably not what you want

# ✅ GOOD
class State(TypedDict):
    analysis: str  # No reducer, normal merge (replace)

# Last writer wins (expected for single-value fields)

Mistake 3: Infinite Growth Without Pruning

# ❌ BAD: Unbounded accumulation
class State(TypedDict):
    documents: Annotated[list[str], operator.add]
    # After 1000 iterations: 10,000+ documents in memory!

# ✅ GOOD: Prune in supervisor
def supervisor_node(state: State) -> dict:
    """Keep only last 100 documents."""
    if len(state["documents"]) > 100:
        # Override with pruned list
        return {"documents": state["documents"][-100:]}
    return {}

Interview Questions

Q1: "What's the difference between merge and replace semantics?"

Answer:

"LangGraph uses shallow merge by default: new_state = {**old_state, **updates}. For fields without reducers, this replaces the value. For fields with Annotated[list, operator.add], it accumulates by appending. Merge semantics prevent nodes from accidentally overwriting each other's updates, which is critical in parallel execution."

Q2: "When would you use a custom reducer instead of operator.add?"

Answer:

"Use custom reducers for complex logic that operator.add can't handle. Examples: deduplication (append + remove duplicates), deep merging nested dicts, priority-based updates (newer overwrites older), or aggregation functions (max, min, average). For instance, in a multi-agent system tracking confidence scores, I'd use a custom reducer to keep the maximum confidence per finding."

Q3: "How do reducers affect checkpointing performance?"

Answer:

"Reducers don't directly impact checkpointing—they determine how state is merged during graph execution. However, unbounded accumulation via reducers can bloat checkpoint size. For example, accumulating 10K documents with operator.add creates a 50MB checkpoint. I mitigate this by pruning state periodically in a supervisor node, keeping checkpoint size under 10MB for fast resume times."


Production Code Example: Complete Pattern

from typing import TypedDict, Annotated, Optional
import operator
from langgraph.graph import StateGraph, END

class ResearchState(TypedDict):
    """Production state with reducers."""
    # Input
    query: str

    # Accumulated with reducer
    documents: Annotated[list[str], operator.add]
    key_findings: Annotated[list[str], operator.add]

    # Single-value (merge/replace)
    analysis: Optional[str]
    final_report: Optional[str]

    # Counters with reducer
    iteration_count: Annotated[int, operator.add]
    total_tokens: Annotated[int, operator.add]

    # Control flow
    max_iterations: int
    is_finished: bool

def research_node(state: ResearchState) -> dict:
    """Research documents (immutable update)."""
    new_docs = search_documents(state["query"])

    return {
        "documents": new_docs,          # Appends via reducer
        "iteration_count": 1,           # Increments via reducer
        "total_tokens": len(new_docs) * 100
    }

def analysis_node(state: ResearchState) -> dict:
    """Analyze documents (replace semantics)."""
    analysis = analyze_docs(state["documents"])

    return {
        "analysis": analysis,           # Replaces (no reducer)
        "key_findings": extract_findings(analysis),  # Appends
        "total_tokens": 2000
    }

def should_continue(state: ResearchState) -> str:
    """Conditional edge with safety check."""
    if state["iteration_count"] >= state["max_iterations"]:
        return "end"
    if state["is_finished"]:
        return "end"
    return "research"

# Build graph
graph = StateGraph(ResearchState)
graph.add_node("research", research_node)
graph.add_node("analysis", analysis_node)
graph.add_conditional_edges("analysis", should_continue, {
    "research": "research",
    "end": END
})
graph.set_entry_point("research")

app = graph.compile()

Key Takeaways

Use Annotated[list, operator.add] for accumulation (documents, messages, findings) ✅ Use custom reducers for deduplication, deep merge, or complex logic ✅ Return immutable updates (don't mutate state in-place) ✅ Prune accumulated state to prevent memory bloat ✅ Use reducers for counters (iteration, tokens, cost tracking) ✅ Default merge replaces single-value fields (expected behavior)

Next: Learn advanced state patterns for production in Lesson 3: validation, error states, and recovery metadata.

:::

Quiz

Module 1: State Management Fundamentals

Take Quiz