Advanced Graph Patterns

Conditional Branching & Parallel Execution

5 min read

Why Branching Matters in Production

Real Production Scenario (January 2026):

An enterprise document processing system needed to route documents to different processing pipelines based on type (PDF, image, structured data) and then process multiple sections in parallel. With sequential processing, a 100-page report took 45 minutes. With parallel branching using the Send API, it now takes 8 minutes.

This lesson teaches you: How to implement conditional routing with 3+ branches and leverage the Send API for parallel execution in production LangGraph workflows.


Conditional Edges with Multiple Branches

Basic Pattern: 2-Way Branch

from typing import Literal
from langgraph.graph import StateGraph, END

def route_simple(state: dict) -> Literal["path_a", "path_b"]:
    if state["score"] > 0.8:
        return "path_a"
    return "path_b"

graph.add_conditional_edges("router", route_simple, {
    "path_a": "node_a",
    "path_b": "node_b"
})

Production Pattern: 3+ Way Branch

from typing import TypedDict, Literal, Optional

class DocumentState(TypedDict):
    content: str
    doc_type: Optional[str]
    language: Optional[str]
    priority: str
    result: Optional[str]

def classify_document(state: DocumentState) -> Literal[
    "process_pdf",
    "process_image",
    "process_structured",
    "process_text",
    "error_handler"
]:
    """Route to appropriate processor based on document type."""
    doc_type = state.get("doc_type", "").lower()

    # Priority override for urgent documents
    if state.get("priority") == "urgent":
        return "process_text"  # Fast path

    # Type-based routing
    routing_map = {
        "pdf": "process_pdf",
        "image": "process_image",
        "jpg": "process_image",
        "png": "process_image",
        "json": "process_structured",
        "csv": "process_structured",
        "xml": "process_structured",
    }

    route = routing_map.get(doc_type, "process_text")

    # Validate we can handle this type
    if doc_type and doc_type not in routing_map and doc_type != "text":
        return "error_handler"

    return route

# Build graph with 5-way conditional
graph = StateGraph(DocumentState)
graph.add_node("classifier", classify_node)
graph.add_node("process_pdf", pdf_processor)
graph.add_node("process_image", image_processor)
graph.add_node("process_structured", structured_processor)
graph.add_node("process_text", text_processor)
graph.add_node("error_handler", error_node)
graph.add_node("finalizer", finalize_node)

graph.add_conditional_edges(
    "classifier",
    classify_document,
    {
        "process_pdf": "process_pdf",
        "process_image": "process_image",
        "process_structured": "process_structured",
        "process_text": "process_text",
        "error_handler": "error_handler",
    }
)

# All processors lead to finalizer
for node in ["process_pdf", "process_image", "process_structured", "process_text"]:
    graph.add_edge(node, "finalizer")
graph.add_edge("error_handler", "finalizer")
graph.add_edge("finalizer", END)

graph.set_entry_point("classifier")

The Send API: Parallel Execution (January 2026)

The Problem: Sequential processing of independent tasks is slow.

The Solution: LangGraph's Send API launches parallel executions.

from langgraph.constants import Send
from typing import TypedDict, Annotated, List
import operator

class ParallelState(TypedDict):
    documents: List[str]
    results: Annotated[List[str], operator.add]  # Accumulates from parallel runs

def fan_out(state: ParallelState) -> List[Send]:
    """Launch parallel processing for each document."""
    sends = []
    for i, doc in enumerate(state["documents"]):
        sends.append(
            Send(
                "process_document",  # Target node
                {"doc_id": i, "content": doc}  # State for this branch
            )
        )
    return sends

def process_document(state: dict) -> dict:
    """Process a single document (runs in parallel)."""
    doc_id = state["doc_id"]
    content = state["content"]

    # Simulate processing
    result = f"Processed doc {doc_id}: {len(content)} chars"

    return {"results": [result]}  # Will be merged via operator.add

def fan_in(state: ParallelState) -> dict:
    """Collect results from parallel processing."""
    all_results = state.get("results", [])
    summary = f"Processed {len(all_results)} documents"
    return {"summary": summary}

# Build parallel graph
graph = StateGraph(ParallelState)
graph.add_node("fan_out", fan_out)
graph.add_node("process_document", process_document)
graph.add_node("fan_in", fan_in)

graph.add_conditional_edges("fan_out", fan_out)  # Returns List[Send]
graph.add_edge("process_document", "fan_in")
graph.add_edge("fan_in", END)

graph.set_entry_point("fan_out")

Key Concepts:

  • Send(node_name, state) creates a parallel execution
  • Return List[Send] from a node to fan out
  • Use Annotated[List, operator.add] to merge parallel results
  • All parallel branches automatically sync before the next node

Map-Reduce Pattern with Send

Production Pattern: Process chunks in parallel, then reduce:

from langgraph.constants import Send
from typing import TypedDict, Annotated, List, Optional
import operator

class MapReduceState(TypedDict):
    # Input
    large_document: str
    chunk_size: int

    # Parallel processing
    chunk_results: Annotated[List[dict], operator.add]

    # Final output
    final_summary: Optional[str]

def split_and_map(state: MapReduceState) -> List[Send]:
    """Split document into chunks and process in parallel."""
    doc = state["large_document"]
    chunk_size = state.get("chunk_size", 1000)

    # Split into chunks
    chunks = [doc[i:i+chunk_size] for i in range(0, len(doc), chunk_size)]

    # Launch parallel processing
    sends = []
    for i, chunk in enumerate(chunks):
        sends.append(Send(
            "process_chunk",
            {
                "chunk_id": i,
                "chunk_content": chunk,
                "total_chunks": len(chunks)
            }
        ))

    return sends

def process_chunk(state: dict) -> dict:
    """Process a single chunk (runs in parallel)."""
    chunk_id = state["chunk_id"]
    content = state["chunk_content"]

    # Simulate LLM summarization
    summary = f"Chunk {chunk_id}: {len(content)} chars, key points extracted"

    return {
        "chunk_results": [{
            "chunk_id": chunk_id,
            "summary": summary,
            "word_count": len(content.split())
        }]
    }

def reduce_results(state: MapReduceState) -> dict:
    """Combine all chunk results into final summary."""
    results = state.get("chunk_results", [])

    # Sort by chunk_id to maintain order
    sorted_results = sorted(results, key=lambda x: x["chunk_id"])

    # Combine summaries
    combined = "\n".join([r["summary"] for r in sorted_results])
    total_words = sum(r["word_count"] for r in sorted_results)

    final = f"""
Document Analysis Complete
==========================
Total Chunks: {len(results)}
Total Words: {total_words}

Summary by Section:
{combined}
"""

    return {"final_summary": final}

# Build map-reduce graph
graph = StateGraph(MapReduceState)
graph.add_node("split_and_map", split_and_map)
graph.add_node("process_chunk", process_chunk)
graph.add_node("reduce_results", reduce_results)

graph.add_conditional_edges("split_and_map", split_and_map)
graph.add_edge("process_chunk", "reduce_results")
graph.add_edge("reduce_results", END)

graph.set_entry_point("split_and_map")

Parallel Branches with Different Processors

Scenario: Route to multiple processors simultaneously:

def analyze_multi_aspect(state: dict) -> List[Send]:
    """Analyze document from multiple perspectives in parallel."""
    content = state["content"]

    return [
        Send("sentiment_analyzer", {"content": content, "aspect": "sentiment"}),
        Send("entity_extractor", {"content": content, "aspect": "entities"}),
        Send("topic_classifier", {"content": content, "aspect": "topics"}),
        Send("language_detector", {"content": content, "aspect": "language"}),
    ]

# Each processor runs in parallel
def sentiment_analyzer(state: dict) -> dict:
    return {"analysis_results": [{"aspect": "sentiment", "result": "positive"}]}

def entity_extractor(state: dict) -> dict:
    return {"analysis_results": [{"aspect": "entities", "result": ["OpenAI", "LangGraph"]}]}

def topic_classifier(state: dict) -> dict:
    return {"analysis_results": [{"aspect": "topics", "result": ["AI", "agents"]}]}

def language_detector(state: dict) -> dict:
    return {"analysis_results": [{"aspect": "language", "result": "en"}]}

Error Handling in Parallel Execution

Critical: Handle failures in individual branches:

from typing import TypedDict, Annotated, List, Optional
import operator
import traceback

class RobustParallelState(TypedDict):
    items: List[str]
    successes: Annotated[List[dict], operator.add]
    failures: Annotated[List[dict], operator.add]

def process_with_error_handling(state: dict) -> dict:
    """Process with error catching."""
    item_id = state["item_id"]
    content = state["content"]

    try:
        # Attempt processing
        if "error" in content.lower():
            raise ValueError(f"Simulated error in item {item_id}")

        result = f"Processed: {content}"
        return {
            "successes": [{"item_id": item_id, "result": result}]
        }

    except Exception as e:
        return {
            "failures": [{
                "item_id": item_id,
                "error": str(e),
                "traceback": traceback.format_exc()
            }]
        }

def summarize_parallel_run(state: RobustParallelState) -> dict:
    """Summarize results including failures."""
    successes = state.get("successes", [])
    failures = state.get("failures", [])

    summary = {
        "total_processed": len(successes) + len(failures),
        "successful": len(successes),
        "failed": len(failures),
        "success_rate": len(successes) / max(1, len(successes) + len(failures)),
        "failure_details": failures
    }

    return {"summary": summary}

Common Interview Questions

Q1: "How does the Send API differ from simple conditional edges?"

Strong Answer:

"Conditional edges route to one branch at a time—it's one-of-N selection. The Send API enables fan-out to multiple branches simultaneously—it's parallel execution. Use conditional edges when you need to choose a path based on state. Use Send when you need to process multiple items or aspects in parallel and merge results. Send is essential for map-reduce patterns where you split work, process in parallel, and combine results."

Q2: "How do you handle failures in parallel branches?"

Answer:

"Each parallel branch should wrap its logic in try-catch and return either to a 'successes' or 'failures' list in state, both using operator.add reducers. The fan-in node then summarizes results, calculates success rates, and can trigger alerts or retries for failures. Never let one branch failure crash the entire parallel run—isolate failures and continue processing other branches."

Q3: "When would you use map-reduce vs. simple parallel processing?"

Answer:

"Use map-reduce when processing large data that needs to be chunked—like long documents, large datasets, or batch API calls. The 'map' phase splits and processes in parallel, the 'reduce' phase combines results. Use simple parallel when you have a fixed set of independent operations—like running sentiment analysis, entity extraction, and topic classification simultaneously on the same input."


Key Takeaways for Production

Use 3+ branch routing for complex classification (document types, priorities) ✅ Send API for parallel execution - essential for performance at scale ✅ Map-reduce pattern for large document/data processing ✅ Always handle errors in parallel branches individually ✅ Use operator.add reducers to merge parallel results ✅ Sort by ID after fan-in to maintain order if needed

Next: Learn error recovery patterns including try-catch nodes, fallback paths, and circuit breakers in Lesson 3.

:::

Quiz

Module 2: Advanced Graph Patterns

Take Quiz