Testing & Capstone Project

Debugging & Visualization

5 min read

Debugging LangGraph workflows requires specialized techniques beyond traditional debugging. This lesson covers comprehensive debugging strategies, visualization tools, LangSmith integration for production tracing, and advanced patterns for identifying and resolving complex issues in stateful agent systems.


The Challenge of Debugging Agent Workflows

Debugging LangGraph applications presents unique challenges that traditional debugging tools cannot fully address.

Why Agent Debugging is Different

"""
Traditional Debugging vs Agent Workflow Debugging

Traditional Application:
- Linear execution flow
- Predictable function calls
- Deterministic outputs
- Easy to reproduce bugs

Agent Workflow:
- Non-linear, conditional execution
- LLM outputs vary between runs
- State accumulates across nodes
- Race conditions in async execution
- External API dependencies
- Human-in-the-loop interrupts

Key Debugging Challenges:
1. Non-determinism: Same input produces different outputs
2. State complexity: Reducers combine updates in subtle ways
3. Routing decisions: Conditional edges may take unexpected paths
4. Checkpoint history: Need to understand state evolution
5. Latency attribution: Which node is slow?
6. Error propagation: Errors may surface far from origin
"""

from typing import TypedDict, Annotated, Literal, Optional, Any
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
import operator
from datetime import datetime
import json
import logging
import traceback

# Configure comprehensive logging for debugging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s | %(levelname)s | %(name)s | %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("langgraph.debug")


class DebugState(TypedDict):
    """State with debugging information."""
    query: str
    documents: Annotated[list[dict], operator.add]
    analysis: Optional[str]
    next_step: Literal["research", "analyze", "summarize", "done"]
    iteration: int
    max_iterations: int
    # Debug fields
    _debug_trace: Annotated[list[dict], operator.add]
    _node_timings: Annotated[list[dict], operator.add]

Graph Visualization Techniques

Visualizing your graph structure helps identify routing issues, missing edges, and architectural problems before they cause runtime errors.

Built-in Mermaid Diagram Generation

from langgraph.graph import StateGraph
from IPython.display import Image, display
import base64

def create_sample_graph():
    """Create a sample graph for visualization examples."""

    class WorkflowState(TypedDict):
        query: str
        documents: list[dict]
        analysis: Optional[str]
        summary: Optional[str]
        next_step: str

    def supervisor(state: WorkflowState) -> dict:
        if not state.get("documents"):
            return {"next_step": "research"}
        elif not state.get("analysis"):
            return {"next_step": "analyze"}
        elif not state.get("summary"):
            return {"next_step": "summarize"}
        return {"next_step": "done"}

    def researcher(state: WorkflowState) -> dict:
        return {"documents": [{"source": "web", "content": "Research findings"}]}

    def analyzer(state: WorkflowState) -> dict:
        return {"analysis": "Detailed analysis of documents"}

    def summarizer(state: WorkflowState) -> dict:
        return {"summary": "Executive summary"}

    def route_supervisor(state: WorkflowState) -> str:
        return state["next_step"]

    # Build graph
    graph = StateGraph(WorkflowState)

    graph.add_node("supervisor", supervisor)
    graph.add_node("researcher", researcher)
    graph.add_node("analyzer", analyzer)
    graph.add_node("summarizer", summarizer)

    graph.add_conditional_edges(
        "supervisor",
        route_supervisor,
        {
            "research": "researcher",
            "analyze": "analyzer",
            "summarize": "summarizer",
            "done": END
        }
    )

    graph.add_edge("researcher", "supervisor")
    graph.add_edge("analyzer", "supervisor")
    graph.add_edge("summarizer", "supervisor")

    graph.set_entry_point("supervisor")

    return graph


def visualize_graph_mermaid(graph: StateGraph, title: str = "Workflow"):
    """
    Generate Mermaid diagram from graph.

    The Mermaid syntax is widely supported in:
    - GitHub README files
    - Notion documents
    - Jupyter notebooks
    - Documentation sites
    """
    app = graph.compile()

    # Get Mermaid diagram as string
    mermaid_code = app.get_graph().draw_mermaid()

    print(f"=== {title} Mermaid Diagram ===")
    print(mermaid_code)
    print()

    # The output looks like:
    # ```mermaid
    # graph TD
    #     __start__ --> supervisor
    #     supervisor -->|research| researcher
    #     supervisor -->|analyze| analyzer
    #     supervisor -->|summarize| summarizer
    #     supervisor -->|done| __end__
    #     researcher --> supervisor
    #     analyzer --> supervisor
    #     summarizer --> supervisor
    # ```

    return mermaid_code


def visualize_graph_png(graph: StateGraph, output_path: str = "graph.png"):
    """
    Export graph as PNG image.

    Requires graphviz to be installed:
    - macOS: brew install graphviz
    - Ubuntu: apt-get install graphviz
    - Windows: choco install graphviz

    Also requires pygraphviz or grandalf Python packages.
    """
    app = graph.compile()

    try:
        # Get PNG bytes
        png_bytes = app.get_graph().draw_mermaid_png()

        # Save to file
        with open(output_path, "wb") as f:
            f.write(png_bytes)

        print(f"Graph saved to {output_path}")

        # In Jupyter notebook, display inline
        # display(Image(png_bytes))

        return png_bytes

    except Exception as e:
        print(f"PNG generation failed: {e}")
        print("Falling back to Mermaid text output")
        return visualize_graph_mermaid(graph)


def visualize_graph_ascii(graph: StateGraph):
    """
    Generate ASCII representation of graph.
    Useful for terminal/CLI environments.
    """
    app = graph.compile()
    graph_repr = app.get_graph()

    print("=== Graph Structure (ASCII) ===")
    print()

    # Get nodes
    nodes = list(graph_repr.nodes.keys())
    print(f"Nodes ({len(nodes)}):")
    for node in nodes:
        print(f"  - {node}")
    print()

    # Get edges
    edges = graph_repr.edges
    print(f"Edges ({len(edges)}):")
    for edge in edges:
        if hasattr(edge, 'source') and hasattr(edge, 'target'):
            print(f"  {edge.source} --> {edge.target}")
        else:
            print(f"  {edge}")
    print()

    return {"nodes": nodes, "edges": edges}


# Usage example
graph = create_sample_graph()
visualize_graph_mermaid(graph, "Research Workflow")
visualize_graph_ascii(graph)

Visualizing Subgraphs and Complex Hierarchies

def visualize_hierarchical_graph(main_graph, subgraphs: dict):
    """
    Visualize hierarchical graph with subgraphs.

    Args:
        main_graph: The parent graph
        subgraphs: Dictionary of {name: subgraph}
    """
    print("=== Hierarchical Graph Structure ===")
    print()

    # Main graph
    print("MAIN GRAPH:")
    main_app = main_graph.compile()
    print(main_app.get_graph().draw_mermaid())
    print()

    # Each subgraph
    for name, subgraph in subgraphs.items():
        print(f"SUBGRAPH: {name}")
        sub_app = subgraph.compile()
        print(sub_app.get_graph().draw_mermaid())
        print()


def create_graph_documentation(graph: StateGraph, output_file: str):
    """
    Generate markdown documentation for a graph.
    Useful for project documentation.
    """
    app = graph.compile()
    graph_repr = app.get_graph()

    doc = []
    doc.append("# Workflow Graph Documentation")
    doc.append("")
    doc.append("## Visual Diagram")
    doc.append("")
    doc.append("```mermaid")
    doc.append(app.get_graph().draw_mermaid().replace("```mermaid\n", "").replace("\n```", ""))
    doc.append("```")
    doc.append("")
    doc.append("## Nodes")
    doc.append("")
    doc.append("| Node | Description |")
    doc.append("|------|-------------|")

    for node in graph_repr.nodes.keys():
        doc.append(f"| `{node}` | TODO: Add description |")

    doc.append("")
    doc.append("## Edges")
    doc.append("")
    doc.append("| From | To | Condition |")
    doc.append("|------|-------|-----------|")

    for edge in graph_repr.edges:
        if hasattr(edge, 'source') and hasattr(edge, 'target'):
            condition = getattr(edge, 'data', '-')
            doc.append(f"| `{edge.source}` | `{edge.target}` | {condition} |")

    doc.append("")
    doc.append(f"*Generated: {datetime.now().isoformat()}*")

    content = "\n".join(doc)

    with open(output_file, "w") as f:
        f.write(content)

    print(f"Documentation saved to {output_file}")
    return content

Step-by-Step Execution Debugging

Understanding exactly what happens at each step of execution is crucial for debugging complex workflows.

Streaming with Updates Mode

def debug_execution_stream(app, input_state: dict, config: dict):
    """
    Stream execution with detailed debugging output.

    The 'updates' stream mode shows exactly what each node
    returns as state updates.
    """
    print("=" * 60)
    print("DEBUG EXECUTION - Updates Mode")
    print("=" * 60)
    print()
    print(f"Input State: {json.dumps(input_state, indent=2, default=str)}")
    print()
    print("-" * 60)

    step = 0
    node_timings = []

    for event in app.stream(input_state, config, stream_mode="updates"):
        step += 1
        timestamp = datetime.now()

        # Extract node name and updates
        node_name = list(event.keys())[0]
        state_update = event[node_name]

        # Record timing
        timing = {
            "step": step,
            "node": node_name,
            "timestamp": timestamp.isoformat()
        }
        node_timings.append(timing)

        # Print debug info
        print(f"STEP {step}: {node_name}")
        print(f"  Timestamp: {timestamp.strftime('%H:%M:%S.%f')}")
        print(f"  Update Keys: {list(state_update.keys())}")

        # Pretty print the update
        for key, value in state_update.items():
            if isinstance(value, str) and len(value) > 100:
                print(f"  {key}: {value[:100]}... (truncated)")
            elif isinstance(value, list) and len(value) > 3:
                print(f"  {key}: [{len(value)} items]")
            else:
                print(f"  {key}: {value}")

        print("-" * 60)

    # Print timing summary
    print()
    print("EXECUTION SUMMARY:")
    print(f"  Total Steps: {step}")

    if len(node_timings) >= 2:
        start = datetime.fromisoformat(node_timings[0]["timestamp"])
        end = datetime.fromisoformat(node_timings[-1]["timestamp"])
        duration = (end - start).total_seconds()
        print(f"  Total Duration: {duration:.3f}s")

    print()

    return node_timings


def debug_execution_values(app, input_state: dict, config: dict):
    """
    Stream execution with full state after each node.

    The 'values' stream mode shows the complete accumulated
    state after each node executes.
    """
    print("=" * 60)
    print("DEBUG EXECUTION - Values Mode")
    print("=" * 60)
    print()

    step = 0

    for state in app.stream(input_state, config, stream_mode="values"):
        step += 1

        print(f"STATE AFTER STEP {step}:")

        for key, value in state.items():
            if key.startswith("_"):
                continue  # Skip internal fields

            if isinstance(value, str) and len(value) > 200:
                print(f"  {key}: {value[:200]}...")
            elif isinstance(value, list):
                print(f"  {key}: [{len(value)} items]")
            elif isinstance(value, dict):
                print(f"  {key}: {{...}}")
            else:
                print(f"  {key}: {value}")

        print("-" * 60)

    return step


def compare_stream_modes(app, input_state: dict, config: dict):
    """
    Demonstrate difference between stream modes.

    Stream Modes in LangGraph 1.0.5:
    - "values": Full state after each node
    - "updates": Only the updates from each node
    - "messages": For chat-like applications
    - "events": Low-level events for advanced debugging
    """

    print("MODE: updates")
    print("-" * 40)
    updates_result = []
    for event in app.stream(input_state, config, stream_mode="updates"):
        updates_result.append(event)
        print(event)
    print()

    # Reset state for clean comparison
    new_config = {"configurable": {"thread_id": f"compare-{datetime.now().timestamp()}"}}

    print("MODE: values")
    print("-" * 40)
    values_result = []
    for state in app.stream(input_state, new_config, stream_mode="values"):
        values_result.append(state)
        print(f"Keys: {list(state.keys())}")
    print()

    return {
        "updates_count": len(updates_result),
        "values_count": len(values_result)
    }

Interactive Debugging with Breakpoints

class DebugBreakpoint:
    """
    Debugging helper that pauses execution at specified nodes.
    Useful for interactive debugging sessions.
    """

    def __init__(self, break_at_nodes: list[str] = None, break_on_condition=None):
        self.break_at_nodes = break_at_nodes or []
        self.break_on_condition = break_on_condition
        self.execution_log = []
        self.paused = False

    def wrap_node(self, node_func, node_name: str):
        """Wrap a node function with breakpoint logic."""

        def wrapped(state: dict) -> dict:
            # Log entry
            entry = {
                "node": node_name,
                "timestamp": datetime.now().isoformat(),
                "input_state": {k: v for k, v in state.items() if not k.startswith("_")}
            }

            # Check if should break
            should_break = node_name in self.break_at_nodes

            if self.break_on_condition and self.break_on_condition(state, node_name):
                should_break = True

            if should_break:
                print()
                print("=" * 50)
                print(f"BREAKPOINT: {node_name}")
                print("=" * 50)
                print(f"Current State:")
                for k, v in state.items():
                    if not k.startswith("_"):
                        print(f"  {k}: {v}")
                print()
                print("Commands: [c]ontinue, [s]tate, [q]uit")

                while True:
                    cmd = input("> ").strip().lower()
                    if cmd == 'c':
                        break
                    elif cmd == 's':
                        print(json.dumps(state, indent=2, default=str))
                    elif cmd == 'q':
                        raise KeyboardInterrupt("Debug quit requested")
                    else:
                        print("Unknown command")

            # Execute node
            result = node_func(state)

            # Log exit
            entry["output"] = result
            self.execution_log.append(entry)

            return result

        return wrapped

    def get_log(self) -> list[dict]:
        """Get execution log."""
        return self.execution_log.copy()

    def print_log(self):
        """Print formatted execution log."""
        print("=== Execution Log ===")
        for i, entry in enumerate(self.execution_log):
            print(f"\n{i+1}. {entry['node']} @ {entry['timestamp']}")
            print(f"   Output keys: {list(entry['output'].keys())}")


def create_debuggable_graph(graph: StateGraph, breakpoints: list[str] = None):
    """
    Create a version of the graph with debugging capabilities.

    Args:
        graph: Original StateGraph
        breakpoints: List of node names to break at
    """
    debugger = DebugBreakpoint(break_at_nodes=breakpoints or [])

    # Note: This is a conceptual example
    # In practice, you would rebuild the graph with wrapped nodes

    print("Debuggable graph created with breakpoints at:", breakpoints)
    return debugger

State History Inspection

Understanding how state evolves through execution is essential for debugging state-related issues.

Comprehensive State History Analysis

def inspect_full_state_history(app, config: dict):
    """
    Inspect complete state history from checkpoints.

    This shows every saved state, allowing you to:
    - Trace how state evolved
    - Find where state became incorrect
    - Identify which node caused an issue
    """
    print("=" * 60)
    print("STATE HISTORY INSPECTION")
    print("=" * 60)
    print()

    history = list(app.get_state_history(config))

    print(f"Total Checkpoints: {len(history)}")
    print()

    for i, state_snapshot in enumerate(reversed(history)):
        checkpoint_id = state_snapshot.config.get("configurable", {}).get("checkpoint_id", "unknown")

        print(f"Checkpoint {i + 1}: {checkpoint_id[:20]}...")
        print(f"  Source Node: {state_snapshot.metadata.get('source', 'unknown')}")
        print(f"  Step: {state_snapshot.metadata.get('step', 'unknown')}")
        print(f"  Created: {state_snapshot.metadata.get('created_at', 'unknown')}")

        # Show state values
        print("  State Values:")
        for key, value in state_snapshot.values.items():
            if key.startswith("_"):
                continue

            value_preview = str(value)
            if len(value_preview) > 80:
                value_preview = value_preview[:80] + "..."

            print(f"    {key}: {value_preview}")

        # Show next nodes
        if hasattr(state_snapshot, 'next') and state_snapshot.next:
            print(f"  Next Nodes: {state_snapshot.next}")

        print("-" * 40)

    return history


def compare_checkpoints(app, config: dict, checkpoint_a: int, checkpoint_b: int):
    """
    Compare two checkpoints to see what changed.

    Useful for finding exactly which node changed which fields.
    """
    history = list(app.get_state_history(config))
    history.reverse()  # Chronological order

    if checkpoint_a >= len(history) or checkpoint_b >= len(history):
        print("Invalid checkpoint indices")
        return None

    state_a = history[checkpoint_a].values
    state_b = history[checkpoint_b].values

    print(f"Comparing Checkpoint {checkpoint_a} vs Checkpoint {checkpoint_b}")
    print("=" * 60)

    # Find differences
    all_keys = set(state_a.keys()) | set(state_b.keys())

    changes = {
        "added": [],
        "removed": [],
        "modified": []
    }

    for key in all_keys:
        if key.startswith("_"):
            continue

        in_a = key in state_a
        in_b = key in state_b

        if in_a and not in_b:
            changes["removed"].append(key)
        elif in_b and not in_a:
            changes["added"].append(key)
        elif state_a.get(key) != state_b.get(key):
            changes["modified"].append({
                "key": key,
                "before": state_a.get(key),
                "after": state_b.get(key)
            })

    # Print changes
    if changes["added"]:
        print("\nADDED:")
        for key in changes["added"]:
            print(f"  + {key}: {state_b[key]}")

    if changes["removed"]:
        print("\nREMOVED:")
        for key in changes["removed"]:
            print(f"  - {key}: {state_a[key]}")

    if changes["modified"]:
        print("\nMODIFIED:")
        for change in changes["modified"]:
            print(f"  {change['key']}:")
            print(f"    Before: {change['before']}")
            print(f"    After:  {change['after']}")

    if not any(changes.values()):
        print("\nNo differences found")

    return changes


def find_state_issue(app, config: dict, predicate) -> Optional[dict]:
    """
    Find the first checkpoint where a condition becomes true.

    Example usage:
        find_state_issue(app, config, lambda s: s.get("error") is not None)
        find_state_issue(app, config, lambda s: len(s.get("documents", [])) > 10)
    """
    history = list(app.get_state_history(config))
    history.reverse()  # Chronological order

    for i, state_snapshot in enumerate(history):
        if predicate(state_snapshot.values):
            print(f"Condition matched at checkpoint {i}")
            print(f"  Node: {state_snapshot.metadata.get('source')}")
            print(f"  Step: {state_snapshot.metadata.get('step')}")
            return {
                "checkpoint_index": i,
                "metadata": state_snapshot.metadata,
                "values": state_snapshot.values
            }

    print("Condition never matched in state history")
    return None

Time Travel Debugging

def replay_from_checkpoint(app, config: dict, checkpoint_index: int, new_input: dict = None):
    """
    Replay execution from a specific checkpoint.

    This is powerful for:
    - Testing fixes without re-running entire workflow
    - Exploring alternative execution paths
    - Debugging intermittent issues
    """
    history = list(app.get_state_history(config))
    history.reverse()

    if checkpoint_index >= len(history):
        raise ValueError(f"Checkpoint {checkpoint_index} not found")

    target_checkpoint = history[checkpoint_index]
    checkpoint_config = target_checkpoint.config

    print(f"Replaying from checkpoint {checkpoint_index}")
    print(f"  Node: {target_checkpoint.metadata.get('source')}")
    print(f"  State: {list(target_checkpoint.values.keys())}")
    print()

    # Update state if new_input provided
    if new_input:
        print(f"Applying modifications: {new_input}")
        app.update_state(checkpoint_config, new_input)

    # Resume execution
    result = app.invoke(None, checkpoint_config)

    print("Replay complete")
    return result


def fork_execution(app, original_config: dict, checkpoint_index: int, modifications: dict):
    """
    Fork execution from a checkpoint with modifications.

    Creates a new thread that diverges from the original,
    allowing you to test "what if" scenarios.
    """
    history = list(app.get_state_history(original_config))
    history.reverse()

    if checkpoint_index >= len(history):
        raise ValueError(f"Checkpoint {checkpoint_index} not found")

    target_state = history[checkpoint_index]

    # Create new thread ID for the fork
    fork_thread_id = f"fork-{datetime.now().timestamp()}"
    fork_config = {"configurable": {"thread_id": fork_thread_id}}

    # Initialize forked state
    forked_state = {**target_state.values, **modifications}

    print(f"Forking to thread: {fork_thread_id}")
    print(f"Modifications: {modifications}")

    # Run forked execution
    result = app.invoke(forked_state, fork_config)

    return {
        "fork_thread_id": fork_thread_id,
        "result": result
    }

Infinite Loop Detection and Prevention

Infinite loops are a common issue in agent workflows. Here are comprehensive strategies for detection and prevention.

Loop Detection Patterns

import hashlib

def create_state_hash(state: dict) -> str:
    """
    Create a deterministic hash of state for loop detection.

    Excludes:
    - Internal fields (starting with _)
    - Timestamps and counters that naturally change
    """
    hashable_state = {}

    for key, value in state.items():
        if key.startswith("_"):
            continue
        if key in ["iteration", "timestamp", "step"]:
            continue

        # Convert to hashable representation
        if isinstance(value, list):
            hashable_state[key] = tuple(
                json.dumps(v, sort_keys=True, default=str) if isinstance(v, dict) else v
                for v in value
            )
        elif isinstance(value, dict):
            hashable_state[key] = json.dumps(value, sort_keys=True, default=str)
        else:
            hashable_state[key] = value

    state_str = json.dumps(hashable_state, sort_keys=True, default=str)
    return hashlib.sha256(state_str.encode()).hexdigest()[:16]


class LoopDetector:
    """
    Detect infinite loops in graph execution.

    Strategies:
    1. State hashing: Same state seen twice indicates loop
    2. Node sequence: Same sequence of nodes repeating
    3. Iteration counting: Max iterations exceeded
    """

    def __init__(self, max_iterations: int = 50, max_same_state: int = 2):
        self.max_iterations = max_iterations
        self.max_same_state = max_same_state
        self.state_hashes = []
        self.node_sequence = []
        self.iteration = 0

    def check(self, state: dict, node_name: str) -> tuple[bool, str]:
        """
        Check for loop condition.

        Returns: (is_loop, reason)
        """
        self.iteration += 1
        self.node_sequence.append(node_name)

        # Check iteration limit
        if self.iteration > self.max_iterations:
            return True, f"Max iterations ({self.max_iterations}) exceeded"

        # Check state hash
        state_hash = create_state_hash(state)
        hash_count = self.state_hashes.count(state_hash)

        if hash_count >= self.max_same_state:
            return True, f"Same state seen {hash_count + 1} times"

        self.state_hashes.append(state_hash)

        # Check for repeating node sequence
        if len(self.node_sequence) >= 6:
            last_3 = tuple(self.node_sequence[-3:])
            prev_3 = tuple(self.node_sequence[-6:-3])
            if last_3 == prev_3:
                return True, f"Repeating node sequence: {last_3}"

        return False, "OK"

    def get_report(self) -> dict:
        """Get debugging report."""
        return {
            "iterations": self.iteration,
            "unique_states": len(set(self.state_hashes)),
            "node_sequence": self.node_sequence,
            "state_hash_history": self.state_hashes[-10:]  # Last 10
        }


def create_loop_safe_node(node_func, loop_detector: LoopDetector, node_name: str):
    """
    Wrap a node with loop detection.
    """
    def wrapped(state: dict) -> dict:
        is_loop, reason = loop_detector.check(state, node_name)

        if is_loop:
            logger.error(f"Loop detected at {node_name}: {reason}")
            logger.error(f"Report: {loop_detector.get_report()}")

            # Return state that routes to END
            return {
                "_loop_detected": True,
                "_loop_reason": reason,
                "next_step": "done"  # Adjust based on your routing field
            }

        return node_func(state)

    return wrapped


def add_iteration_guard(graph: StateGraph, max_iterations: int = 50):
    """
    Add iteration guard to a graph.

    This modifies routing to check iteration count
    and route to END if exceeded.
    """

    def iteration_check(state: dict) -> str:
        """Guard node that checks iteration count."""
        iteration = state.get("iteration", 0)

        if iteration >= max_iterations:
            logger.warning(f"Iteration limit reached: {iteration}")
            return "done"

        return state.get("next_step", "done")

    # Add as the first routing check
    graph.add_node("_iteration_guard", lambda s: {"iteration": s.get("iteration", 0) + 1})

    print(f"Iteration guard added with max={max_iterations}")

    return graph

Visualizing Loop Issues

def visualize_execution_path(app, config: dict):
    """
    Visualize the actual execution path taken.

    Helps identify:
    - Loops (same nodes appearing repeatedly)
    - Dead ends (paths that don't reach END)
    - Unexpected routes
    """
    history = list(app.get_state_history(config))
    history.reverse()

    path = []
    for state in history:
        node = state.metadata.get("source", "unknown")
        path.append(node)

    print("=== Execution Path ===")
    print()

    # Print as flow diagram
    for i, node in enumerate(path):
        indent = "  " if i > 0 else ""
        connector = "└─> " if i > 0 else ""
        print(f"{indent}{connector}{node}")

    print()

    # Detect repeated sequences
    node_counts = {}
    for node in path:
        node_counts[node] = node_counts.get(node, 0) + 1

    repeated = {n: c for n, c in node_counts.items() if c > 1}
    if repeated:
        print("Repeated Nodes:")
        for node, count in repeated.items():
            print(f"  {node}: {count} times")

    return path


def create_execution_timeline(app, config: dict):
    """
    Create a timeline visualization of execution.
    """
    history = list(app.get_state_history(config))
    history.reverse()

    print("=== Execution Timeline ===")
    print()

    for i, state in enumerate(history):
        node = state.metadata.get("source", "unknown")
        step = state.metadata.get("step", i)

        # Create ASCII timeline
        marker = "●" if node != "__start__" else "○"
        line = "│" if i < len(history) - 1 else " "

        print(f"  {marker} Step {step}: {node}")
        print(f"  {line}")

    print()

LangSmith Integration for Production Debugging

LangSmith provides comprehensive tracing and debugging for production LangGraph applications.

Setting Up LangSmith Tracing

import os
from langsmith import traceable
from langsmith.run_trees import RunTree

# Configure LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "langgraph-debugging"

# Optional: Enable detailed logging
os.environ["LANGCHAIN_VERBOSE"] = "true"


@traceable(name="research_node", run_type="chain")
def traced_research_node(state: dict) -> dict:
    """
    Node with LangSmith tracing.

    The @traceable decorator automatically:
    - Records input/output
    - Tracks execution time
    - Captures errors
    - Links to parent traces
    """
    # Node logic here
    return {"documents": [{"source": "traced", "content": "Research results"}]}


@traceable(name="analysis_node", run_type="chain", tags=["critical"])
def traced_analysis_node(state: dict) -> dict:
    """
    Node with custom tags for filtering.

    Tags help you filter traces in LangSmith UI:
    - "critical": High-priority nodes
    - "llm-call": Nodes that call LLMs
    - "external-api": Nodes with external dependencies
    """
    return {"analysis": "Traced analysis output"}


class LangSmithDebugger:
    """
    Helper class for LangSmith debugging operations.
    """

    def __init__(self, project_name: str = "langgraph-debug"):
        self.project_name = project_name
        os.environ["LANGCHAIN_PROJECT"] = project_name

    @traceable(name="debug_session", run_type="chain")
    def run_with_tracing(self, app, input_state: dict, config: dict):
        """
        Run graph with comprehensive tracing.
        """
        result = app.invoke(input_state, config)
        return result

    @traceable(name="debug_stream", run_type="chain")
    def stream_with_tracing(self, app, input_state: dict, config: dict):
        """
        Stream graph with tracing for each step.
        """
        results = []
        for event in app.stream(input_state, config, stream_mode="updates"):
            results.append(event)
        return results


# Example: Tracing with custom metadata
@traceable(
    name="supervised_task",
    run_type="chain",
    metadata={"version": "1.0", "environment": "development"}
)
def traced_supervisor(state: dict) -> dict:
    """
    Supervisor with rich metadata.

    Metadata appears in LangSmith UI and helps:
    - Filter by version
    - Compare environments
    - Track deployments
    """
    if not state.get("documents"):
        return {"next_step": "research"}
    return {"next_step": "done"}

Production Debugging with LangSmith

from langsmith import Client
from datetime import datetime, timedelta

class ProductionDebugger:
    """
    Debug production LangGraph applications using LangSmith.
    """

    def __init__(self, api_key: str = None):
        self.client = Client(api_key=api_key)

    def find_errors(self, project_name: str, hours: int = 24):
        """
        Find all error traces in the last N hours.
        """
        start_time = datetime.now() - timedelta(hours=hours)

        runs = self.client.list_runs(
            project_name=project_name,
            error=True,
            start_time=start_time
        )

        errors = []
        for run in runs:
            errors.append({
                "run_id": str(run.id),
                "name": run.name,
                "error": run.error,
                "start_time": run.start_time,
                "inputs": run.inputs,
                "trace_id": str(run.trace_id)
            })

        print(f"Found {len(errors)} errors in last {hours} hours")
        return errors

    def analyze_slow_runs(self, project_name: str, threshold_seconds: float = 30.0):
        """
        Find runs that exceeded latency threshold.
        """
        runs = self.client.list_runs(
            project_name=project_name,
            start_time=datetime.now() - timedelta(hours=24)
        )

        slow_runs = []
        for run in runs:
            if run.end_time and run.start_time:
                duration = (run.end_time - run.start_time).total_seconds()
                if duration > threshold_seconds:
                    slow_runs.append({
                        "run_id": str(run.id),
                        "name": run.name,
                        "duration_seconds": duration,
                        "trace_id": str(run.trace_id)
                    })

        # Sort by duration
        slow_runs.sort(key=lambda x: x["duration_seconds"], reverse=True)

        print(f"Found {len(slow_runs)} slow runs (>{threshold_seconds}s)")
        return slow_runs

    def get_run_details(self, run_id: str):
        """
        Get detailed information about a specific run.
        """
        run = self.client.read_run(run_id)

        return {
            "id": str(run.id),
            "name": run.name,
            "status": run.status,
            "error": run.error,
            "inputs": run.inputs,
            "outputs": run.outputs,
            "start_time": run.start_time,
            "end_time": run.end_time,
            "latency_ms": run.latency_ms,
            "token_usage": run.total_tokens,
            "feedback": list(self.client.list_feedback(run_ids=[run_id]))
        }

    def compare_runs(self, run_id_a: str, run_id_b: str):
        """
        Compare two runs to find differences.
        """
        run_a = self.get_run_details(run_id_a)
        run_b = self.get_run_details(run_id_b)

        comparison = {
            "latency_diff_ms": (run_a.get("latency_ms", 0) or 0) - (run_b.get("latency_ms", 0) or 0),
            "token_diff": (run_a.get("token_usage", 0) or 0) - (run_b.get("token_usage", 0) or 0),
            "status_a": run_a.get("status"),
            "status_b": run_b.get("status"),
            "error_a": run_a.get("error"),
            "error_b": run_b.get("error")
        }

        return comparison


# Example: Creating custom annotations for debugging
@traceable(name="debug_checkpoint")
def add_debug_checkpoint(state: dict, checkpoint_name: str) -> dict:
    """
    Add a debug checkpoint that appears in LangSmith.

    Use this to mark important points in execution.
    """
    print(f"DEBUG CHECKPOINT: {checkpoint_name}")
    print(f"  State keys: {list(state.keys())}")

    # This creates a trace entry in LangSmith
    return state

Advanced Debugging Patterns

Node Execution Tracing

from functools import wraps
from typing import Callable
import time

def trace_node(name: str = None, log_state: bool = True):
    """
    Decorator for comprehensive node tracing.

    Usage:
        @trace_node("research")
        def research_node(state: dict) -> dict:
            ...
    """
    def decorator(func: Callable):
        node_name = name or func.__name__

        @wraps(func)
        def wrapper(state: dict) -> dict:
            start_time = time.time()

            # Log entry
            logger.debug(f"[{node_name}] ENTER")
            if log_state:
                logger.debug(f"[{node_name}] Input: {_truncate_state(state)}")

            try:
                result = func(state)

                # Log success
                elapsed = time.time() - start_time
                logger.debug(f"[{node_name}] EXIT ({elapsed:.3f}s)")
                if log_state:
                    logger.debug(f"[{node_name}] Output: {_truncate_state(result)}")

                return result

            except Exception as e:
                # Log error
                elapsed = time.time() - start_time
                logger.error(f"[{node_name}] ERROR ({elapsed:.3f}s): {e}")
                logger.error(f"[{node_name}] Traceback:\n{traceback.format_exc()}")
                raise

        return wrapper
    return decorator


def _truncate_state(state: dict, max_len: int = 200) -> str:
    """Truncate state for logging."""
    result = {}
    for key, value in state.items():
        if key.startswith("_"):
            continue
        str_value = str(value)
        if len(str_value) > max_len:
            result[key] = str_value[:max_len] + "..."
        else:
            result[key] = value
    return str(result)


# Example usage
@trace_node("supervisor")
def traced_supervisor_node(state: dict) -> dict:
    if not state.get("documents"):
        return {"next_step": "research"}
    return {"next_step": "done"}


@trace_node("researcher", log_state=True)
def traced_researcher_node(state: dict) -> dict:
    # Simulated work
    time.sleep(0.1)
    return {"documents": [{"content": "Research findings"}]}

Assertion-Based Debugging

class StateAssertion:
    """
    Add runtime assertions to validate state.

    Catches issues early before they propagate.
    """

    def __init__(self):
        self.failures = []

    def assert_field(self, state: dict, field: str, predicate: Callable, message: str = None):
        """Assert a condition on a state field."""
        value = state.get(field)

        if not predicate(value):
            failure = {
                "field": field,
                "value": value,
                "message": message or f"Assertion failed for {field}"
            }
            self.failures.append(failure)
            logger.error(f"ASSERTION FAILED: {failure}")
            return False
        return True

    def assert_not_none(self, state: dict, field: str):
        """Assert field is not None."""
        return self.assert_field(
            state, field,
            lambda v: v is not None,
            f"{field} must not be None"
        )

    def assert_non_empty_list(self, state: dict, field: str):
        """Assert field is a non-empty list."""
        return self.assert_field(
            state, field,
            lambda v: isinstance(v, list) and len(v) > 0,
            f"{field} must be a non-empty list"
        )

    def assert_type(self, state: dict, field: str, expected_type: type):
        """Assert field is of expected type."""
        return self.assert_field(
            state, field,
            lambda v: isinstance(v, expected_type),
            f"{field} must be of type {expected_type.__name__}"
        )

    def get_failures(self) -> list[dict]:
        """Get all assertion failures."""
        return self.failures.copy()

    def clear(self):
        """Clear failures."""
        self.failures = []


def create_validated_node(node_func, input_assertions: list = None, output_assertions: list = None):
    """
    Wrap a node with input/output validation.

    Args:
        node_func: Original node function
        input_assertions: List of (field, predicate, message) tuples
        output_assertions: List of (field, predicate, message) tuples
    """
    validator = StateAssertion()

    def wrapped(state: dict) -> dict:
        # Validate input
        if input_assertions:
            for field, predicate, message in input_assertions:
                validator.assert_field(state, field, predicate, message)

        if validator.failures:
            logger.error(f"Input validation failed: {validator.get_failures()}")
            validator.clear()

        # Execute node
        result = node_func(state)

        # Validate output
        if output_assertions:
            for field, predicate, message in output_assertions:
                validator.assert_field(result, field, predicate, message)

        if validator.failures:
            logger.error(f"Output validation failed: {validator.get_failures()}")
            validator.clear()

        return result

    return wrapped


# Example usage
validated_analyzer = create_validated_node(
    lambda s: {"analysis": "Result"},
    input_assertions=[
        ("documents", lambda v: v and len(v) > 0, "Need documents to analyze")
    ],
    output_assertions=[
        ("analysis", lambda v: v and len(v) > 50, "Analysis too short")
    ]
)

Error Recovery Debugging

class ErrorRecoveryDebugger:
    """
    Debug error recovery behavior in LangGraph applications.
    """

    def __init__(self):
        self.error_log = []
        self.recovery_log = []

    def log_error(self, node: str, error: Exception, state: dict):
        """Log an error occurrence."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "node": node,
            "error_type": type(error).__name__,
            "error_message": str(error),
            "state_snapshot": {k: v for k, v in state.items() if not k.startswith("_")}
        }
        self.error_log.append(entry)
        logger.error(f"Error in {node}: {error}")

    def log_recovery(self, node: str, strategy: str, success: bool):
        """Log a recovery attempt."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "node": node,
            "strategy": strategy,
            "success": success
        }
        self.recovery_log.append(entry)

        if success:
            logger.info(f"Recovery successful in {node} using {strategy}")
        else:
            logger.warning(f"Recovery failed in {node} using {strategy}")

    def create_recoverable_node(self, node_func, node_name: str,
                                 recovery_strategies: list = None):
        """
        Wrap a node with error recovery and logging.
        """
        strategies = recovery_strategies or [
            ("retry_once", lambda s: s),
            ("fallback_value", lambda s: {"_fallback": True})
        ]

        def wrapped(state: dict) -> dict:
            try:
                return node_func(state)
            except Exception as e:
                self.log_error(node_name, e, state)

                # Try recovery strategies
                for strategy_name, strategy_func in strategies:
                    try:
                        result = strategy_func(state)
                        self.log_recovery(node_name, strategy_name, True)
                        return result
                    except Exception as recovery_error:
                        self.log_recovery(node_name, strategy_name, False)
                        continue

                # All strategies failed
                raise RuntimeError(f"All recovery strategies failed for {node_name}") from e

        return wrapped

    def get_error_summary(self) -> dict:
        """Get summary of errors and recovery attempts."""
        return {
            "total_errors": len(self.error_log),
            "errors_by_node": self._count_by_field(self.error_log, "node"),
            "errors_by_type": self._count_by_field(self.error_log, "error_type"),
            "recovery_attempts": len(self.recovery_log),
            "recovery_success_rate": self._calculate_success_rate()
        }

    def _count_by_field(self, log: list, field: str) -> dict:
        counts = {}
        for entry in log:
            value = entry.get(field, "unknown")
            counts[value] = counts.get(value, 0) + 1
        return counts

    def _calculate_success_rate(self) -> float:
        if not self.recovery_log:
            return 0.0
        successes = sum(1 for r in self.recovery_log if r["success"])
        return successes / len(self.recovery_log)

Interview Questions

Q1: How do you debug LangGraph workflows?

Answer:

"I use a multi-layered debugging approach for LangGraph workflows:

1. Visualization First

# Always start by visualizing the graph structure
app = graph.compile()
print(app.get_graph().draw_mermaid())

This reveals structural issues like missing edges or incorrect routing.

2. Step-by-Step Streaming

# Use 'updates' mode to see exactly what each node returns
for event in app.stream(input_state, config, stream_mode='updates'):
    node = list(event.keys())[0]
    print(f'{node}: {event[node]}')

This shows the actual state changes at each step.

3. State History Inspection

# Review complete checkpoint history
for state in app.get_state_history(config):
    print(f"Node: {state.metadata.get('source')}")
    print(f"State: {state.values}")

This helps trace how state evolved and identify where issues started.

4. LangSmith for Production

  • Enable tracing with LANGCHAIN_TRACING_V2=true
  • Use @traceable decorator on critical nodes
  • Query traces by error status, latency, or custom tags

5. Loop Detection

# Hash state to detect repetition
state_hashes = []
for state in history:
    h = hash(frozenset(state.values.items()))
    if h in state_hashes:
        print('Loop detected!')
    state_hashes.append(h)

The key is combining visualization (structure), streaming (runtime), checkpoints (history), and tracing (production)."


Q2: How do you identify and prevent infinite loops?

Answer:

"Infinite loops in LangGraph typically occur from routing issues or state that never changes. Here's my approach:

Detection Strategies:

  1. State Hashing:
def create_state_hash(state):
    # Exclude volatile fields like timestamps
    hashable = {k: v for k, v in state.items()
                if k not in ['iteration', 'timestamp']}
    return hashlib.sha256(
        json.dumps(hashable, sort_keys=True).encode()
    ).hexdigest()

# If same hash appears twice, we're looping
  1. Iteration Counters:
class State(TypedDict):
    iteration: int
    max_iterations: int

def supervisor(state):
    if state['iteration'] >= state['max_iterations']:
        return {'next_step': 'done'}  # Force exit
  1. Node Sequence Detection:
# If same node sequence repeats, we're looping
if node_sequence[-3:] == node_sequence[-6:-3]:
    print('Repeating sequence detected')

Prevention Strategies:

  1. Always include iteration guard in state
  2. Visualize graph to ensure all paths reach END
  3. Add max_iterations to routing logic
  4. Use loop detection wrapper on nodes
  5. Test with various inputs before production

The most common causes I've seen are:

  • Missing edge to END from error handlers
  • Routing logic that never satisfies exit conditions
  • State updates that don't change routing-relevant fields"

Q3: What stream modes does LangGraph support and when do you use each?

Answer:

"LangGraph 1.0.5 supports several stream modes, each optimized for different use cases:

1. stream_mode='updates'

for event in app.stream(input, config, stream_mode='updates'):
    # event = {'node_name': {'field': 'new_value'}}
  • Shows only what each node returned
  • Best for debugging node behavior
  • Smallest payload size
  • Use when: Debugging specific node outputs

2. stream_mode='values'

for state in app.stream(input, config, stream_mode='values'):
    # state = complete accumulated state
  • Shows full state after each node
  • Best for understanding state evolution
  • Larger payload size
  • Use when: Tracking reducer behavior

3. stream_mode='messages'

  • Optimized for chat applications
  • Streams individual message chunks
  • Use when: Building conversational interfaces

4. stream_mode='events'

  • Low-level events for advanced debugging
  • Includes internal LangGraph events
  • Use when: Deep debugging of graph internals

For production debugging, I typically start with 'updates' to see what changed, then switch to 'values' if I need to understand accumulated state. For real-time chat applications, 'messages' provides the best user experience."


Q4: How do you use LangSmith for production debugging?

Answer:

"LangSmith is essential for debugging production LangGraph applications. Here's my setup:

1. Enable Tracing:

import os
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_API_KEY'] = 'your-key'
os.environ['LANGCHAIN_PROJECT'] = 'production-agents'

2. Annotate Critical Nodes:

from langsmith import traceable

@traceable(name='supervisor', tags=['critical'])
def supervisor(state):
    # Traces appear in LangSmith with rich context
    ...

3. Query Error Traces:

from langsmith import Client

client = Client()
errors = client.list_runs(
    project_name='production-agents',
    error=True,
    start_time=datetime.now() - timedelta(hours=24)
)

4. Analyze Latency:

slow_runs = client.list_runs(
    project_name='production-agents',
    filter='latency > 30000'  # Over 30 seconds
)

Key Benefits:

  • Full trace history for any execution
  • Input/output capture for reproduction
  • Latency breakdown by node
  • Error aggregation and alerting
  • Comparison between runs
  • Feedback collection for evaluation

I also use custom metadata to tag traces by version, environment, and user ID, making it easy to filter and analyze specific scenarios."


Key Takeaways

Technique Purpose When to Use
draw_mermaid() Visualize graph structure Development, documentation
stream(mode='updates') See node-by-node changes Debugging runtime behavior
stream(mode='values') See full state evolution Understanding reducers
get_state_history() Inspect checkpoint timeline Time-travel debugging
State hashing Detect infinite loops Loop prevention
@traceable LangSmith integration Production monitoring
Iteration guards Prevent runaway execution Safety mechanism
Assertion wrappers Validate state at runtime Early error detection

Best Practices Summary

  1. Always visualize first - Graph diagrams reveal structural issues instantly

  2. Use stream modes strategically - updates for debugging, values for state tracking

  3. Enable LangSmith in production - Essential for debugging issues you cannot reproduce locally

  4. Add iteration guards - Prevent infinite loops with max_iterations checks

  5. Log node entry/exit - Use decorators for consistent tracing

  6. Hash state for loop detection - Compare state hashes to detect repetition

  7. Validate state transitions - Add assertions to catch issues early

  8. Use time-travel debugging - Replay from checkpoints to test fixes

  9. Document your debugging process - Create runbooks for common issues

  10. Test with edge cases - Many bugs only appear with unusual inputs

Next: Capstone Project - Production Research System

:::

Quiz

Module 6: Testing & Capstone Project

Take Quiz