Advanced Graph Patterns
Conditional Branching & Parallel Execution
Why Branching Matters in Production
Real Production Scenario (January 2026):
An enterprise document processing system needed to route documents to different processing pipelines based on type (PDF, image, structured data) and then process multiple sections in parallel. With sequential processing, a 100-page report took 45 minutes. With parallel branching using the Send API, it now takes 8 minutes.
This lesson teaches you: How to implement conditional routing with 3+ branches and leverage the Send API for parallel execution in production LangGraph workflows.
Conditional Edges with Multiple Branches
Basic Pattern: 2-Way Branch
from typing import Literal
from langgraph.graph import StateGraph, END
def route_simple(state: dict) -> Literal["path_a", "path_b"]:
if state["score"] > 0.8:
return "path_a"
return "path_b"
graph.add_conditional_edges("router", route_simple, {
"path_a": "node_a",
"path_b": "node_b"
})
Production Pattern: 3+ Way Branch
from typing import TypedDict, Literal, Optional
class DocumentState(TypedDict):
content: str
doc_type: Optional[str]
language: Optional[str]
priority: str
result: Optional[str]
def classify_document(state: DocumentState) -> Literal[
"process_pdf",
"process_image",
"process_structured",
"process_text",
"error_handler"
]:
"""Route to appropriate processor based on document type."""
doc_type = state.get("doc_type", "").lower()
# Priority override for urgent documents
if state.get("priority") == "urgent":
return "process_text" # Fast path
# Type-based routing
routing_map = {
"pdf": "process_pdf",
"image": "process_image",
"jpg": "process_image",
"png": "process_image",
"json": "process_structured",
"csv": "process_structured",
"xml": "process_structured",
}
route = routing_map.get(doc_type, "process_text")
# Validate we can handle this type
if doc_type and doc_type not in routing_map and doc_type != "text":
return "error_handler"
return route
# Build graph with 5-way conditional
graph = StateGraph(DocumentState)
graph.add_node("classifier", classify_node)
graph.add_node("process_pdf", pdf_processor)
graph.add_node("process_image", image_processor)
graph.add_node("process_structured", structured_processor)
graph.add_node("process_text", text_processor)
graph.add_node("error_handler", error_node)
graph.add_node("finalizer", finalize_node)
graph.add_conditional_edges(
"classifier",
classify_document,
{
"process_pdf": "process_pdf",
"process_image": "process_image",
"process_structured": "process_structured",
"process_text": "process_text",
"error_handler": "error_handler",
}
)
# All processors lead to finalizer
for node in ["process_pdf", "process_image", "process_structured", "process_text"]:
graph.add_edge(node, "finalizer")
graph.add_edge("error_handler", "finalizer")
graph.add_edge("finalizer", END)
graph.set_entry_point("classifier")
The Send API: Parallel Execution (January 2026)
The Problem: Sequential processing of independent tasks is slow.
The Solution: LangGraph's Send API launches parallel executions.
from langgraph.constants import Send
from typing import TypedDict, Annotated, List
import operator
class ParallelState(TypedDict):
documents: List[str]
results: Annotated[List[str], operator.add] # Accumulates from parallel runs
def fan_out(state: ParallelState) -> List[Send]:
"""Launch parallel processing for each document."""
sends = []
for i, doc in enumerate(state["documents"]):
sends.append(
Send(
"process_document", # Target node
{"doc_id": i, "content": doc} # State for this branch
)
)
return sends
def process_document(state: dict) -> dict:
"""Process a single document (runs in parallel)."""
doc_id = state["doc_id"]
content = state["content"]
# Simulate processing
result = f"Processed doc {doc_id}: {len(content)} chars"
return {"results": [result]} # Will be merged via operator.add
def fan_in(state: ParallelState) -> dict:
"""Collect results from parallel processing."""
all_results = state.get("results", [])
summary = f"Processed {len(all_results)} documents"
return {"summary": summary}
# Build parallel graph
graph = StateGraph(ParallelState)
graph.add_node("fan_out", fan_out)
graph.add_node("process_document", process_document)
graph.add_node("fan_in", fan_in)
graph.add_conditional_edges("fan_out", fan_out) # Returns List[Send]
graph.add_edge("process_document", "fan_in")
graph.add_edge("fan_in", END)
graph.set_entry_point("fan_out")
Key Concepts:
Send(node_name, state)creates a parallel execution- Return
List[Send]from a node to fan out - Use
Annotated[List, operator.add]to merge parallel results - All parallel branches automatically sync before the next node
Map-Reduce Pattern with Send
Production Pattern: Process chunks in parallel, then reduce:
from langgraph.constants import Send
from typing import TypedDict, Annotated, List, Optional
import operator
class MapReduceState(TypedDict):
# Input
large_document: str
chunk_size: int
# Parallel processing
chunk_results: Annotated[List[dict], operator.add]
# Final output
final_summary: Optional[str]
def split_and_map(state: MapReduceState) -> List[Send]:
"""Split document into chunks and process in parallel."""
doc = state["large_document"]
chunk_size = state.get("chunk_size", 1000)
# Split into chunks
chunks = [doc[i:i+chunk_size] for i in range(0, len(doc), chunk_size)]
# Launch parallel processing
sends = []
for i, chunk in enumerate(chunks):
sends.append(Send(
"process_chunk",
{
"chunk_id": i,
"chunk_content": chunk,
"total_chunks": len(chunks)
}
))
return sends
def process_chunk(state: dict) -> dict:
"""Process a single chunk (runs in parallel)."""
chunk_id = state["chunk_id"]
content = state["chunk_content"]
# Simulate LLM summarization
summary = f"Chunk {chunk_id}: {len(content)} chars, key points extracted"
return {
"chunk_results": [{
"chunk_id": chunk_id,
"summary": summary,
"word_count": len(content.split())
}]
}
def reduce_results(state: MapReduceState) -> dict:
"""Combine all chunk results into final summary."""
results = state.get("chunk_results", [])
# Sort by chunk_id to maintain order
sorted_results = sorted(results, key=lambda x: x["chunk_id"])
# Combine summaries
combined = "\n".join([r["summary"] for r in sorted_results])
total_words = sum(r["word_count"] for r in sorted_results)
final = f"""
Document Analysis Complete
==========================
Total Chunks: {len(results)}
Total Words: {total_words}
Summary by Section:
{combined}
"""
return {"final_summary": final}
# Build map-reduce graph
graph = StateGraph(MapReduceState)
graph.add_node("split_and_map", split_and_map)
graph.add_node("process_chunk", process_chunk)
graph.add_node("reduce_results", reduce_results)
graph.add_conditional_edges("split_and_map", split_and_map)
graph.add_edge("process_chunk", "reduce_results")
graph.add_edge("reduce_results", END)
graph.set_entry_point("split_and_map")
Parallel Branches with Different Processors
Scenario: Route to multiple processors simultaneously:
def analyze_multi_aspect(state: dict) -> List[Send]:
"""Analyze document from multiple perspectives in parallel."""
content = state["content"]
return [
Send("sentiment_analyzer", {"content": content, "aspect": "sentiment"}),
Send("entity_extractor", {"content": content, "aspect": "entities"}),
Send("topic_classifier", {"content": content, "aspect": "topics"}),
Send("language_detector", {"content": content, "aspect": "language"}),
]
# Each processor runs in parallel
def sentiment_analyzer(state: dict) -> dict:
return {"analysis_results": [{"aspect": "sentiment", "result": "positive"}]}
def entity_extractor(state: dict) -> dict:
return {"analysis_results": [{"aspect": "entities", "result": ["OpenAI", "LangGraph"]}]}
def topic_classifier(state: dict) -> dict:
return {"analysis_results": [{"aspect": "topics", "result": ["AI", "agents"]}]}
def language_detector(state: dict) -> dict:
return {"analysis_results": [{"aspect": "language", "result": "en"}]}
Error Handling in Parallel Execution
Critical: Handle failures in individual branches:
from typing import TypedDict, Annotated, List, Optional
import operator
import traceback
class RobustParallelState(TypedDict):
items: List[str]
successes: Annotated[List[dict], operator.add]
failures: Annotated[List[dict], operator.add]
def process_with_error_handling(state: dict) -> dict:
"""Process with error catching."""
item_id = state["item_id"]
content = state["content"]
try:
# Attempt processing
if "error" in content.lower():
raise ValueError(f"Simulated error in item {item_id}")
result = f"Processed: {content}"
return {
"successes": [{"item_id": item_id, "result": result}]
}
except Exception as e:
return {
"failures": [{
"item_id": item_id,
"error": str(e),
"traceback": traceback.format_exc()
}]
}
def summarize_parallel_run(state: RobustParallelState) -> dict:
"""Summarize results including failures."""
successes = state.get("successes", [])
failures = state.get("failures", [])
summary = {
"total_processed": len(successes) + len(failures),
"successful": len(successes),
"failed": len(failures),
"success_rate": len(successes) / max(1, len(successes) + len(failures)),
"failure_details": failures
}
return {"summary": summary}
Common Interview Questions
Q1: "How does the Send API differ from simple conditional edges?"
Strong Answer:
"Conditional edges route to one branch at a time—it's one-of-N selection. The Send API enables fan-out to multiple branches simultaneously—it's parallel execution. Use conditional edges when you need to choose a path based on state. Use Send when you need to process multiple items or aspects in parallel and merge results. Send is essential for map-reduce patterns where you split work, process in parallel, and combine results."
Q2: "How do you handle failures in parallel branches?"
Answer:
"Each parallel branch should wrap its logic in try-catch and return either to a 'successes' or 'failures' list in state, both using operator.add reducers. The fan-in node then summarizes results, calculates success rates, and can trigger alerts or retries for failures. Never let one branch failure crash the entire parallel run—isolate failures and continue processing other branches."
Q3: "When would you use map-reduce vs. simple parallel processing?"
Answer:
"Use map-reduce when processing large data that needs to be chunked—like long documents, large datasets, or batch API calls. The 'map' phase splits and processes in parallel, the 'reduce' phase combines results. Use simple parallel when you have a fixed set of independent operations—like running sentiment analysis, entity extraction, and topic classification simultaneously on the same input."
Key Takeaways for Production
✅ Use 3+ branch routing for complex classification (document types, priorities) ✅ Send API for parallel execution - essential for performance at scale ✅ Map-reduce pattern for large document/data processing ✅ Always handle errors in parallel branches individually ✅ Use operator.add reducers to merge parallel results ✅ Sort by ID after fan-in to maintain order if needed
Next: Learn error recovery patterns including try-catch nodes, fallback paths, and circuit breakers in Lesson 3.
:::