Testing & Capstone Project
Debugging & Visualization
Debugging LangGraph workflows requires specialized techniques beyond traditional debugging. This lesson covers comprehensive debugging strategies, visualization tools, LangSmith integration for production tracing, and advanced patterns for identifying and resolving complex issues in stateful agent systems.
The Challenge of Debugging Agent Workflows
Debugging LangGraph applications presents unique challenges that traditional debugging tools cannot fully address.
Why Agent Debugging is Different
"""
Traditional Debugging vs Agent Workflow Debugging
Traditional Application:
- Linear execution flow
- Predictable function calls
- Deterministic outputs
- Easy to reproduce bugs
Agent Workflow:
- Non-linear, conditional execution
- LLM outputs vary between runs
- State accumulates across nodes
- Race conditions in async execution
- External API dependencies
- Human-in-the-loop interrupts
Key Debugging Challenges:
1. Non-determinism: Same input produces different outputs
2. State complexity: Reducers combine updates in subtle ways
3. Routing decisions: Conditional edges may take unexpected paths
4. Checkpoint history: Need to understand state evolution
5. Latency attribution: Which node is slow?
6. Error propagation: Errors may surface far from origin
"""
from typing import TypedDict, Annotated, Literal, Optional, Any
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
import operator
from datetime import datetime
import json
import logging
import traceback
# Configure comprehensive logging for debugging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s | %(levelname)s | %(name)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("langgraph.debug")
class DebugState(TypedDict):
"""State with debugging information."""
query: str
documents: Annotated[list[dict], operator.add]
analysis: Optional[str]
next_step: Literal["research", "analyze", "summarize", "done"]
iteration: int
max_iterations: int
# Debug fields
_debug_trace: Annotated[list[dict], operator.add]
_node_timings: Annotated[list[dict], operator.add]
Graph Visualization Techniques
Visualizing your graph structure helps identify routing issues, missing edges, and architectural problems before they cause runtime errors.
Built-in Mermaid Diagram Generation
from langgraph.graph import StateGraph
from IPython.display import Image, display
import base64
def create_sample_graph():
"""Create a sample graph for visualization examples."""
class WorkflowState(TypedDict):
query: str
documents: list[dict]
analysis: Optional[str]
summary: Optional[str]
next_step: str
def supervisor(state: WorkflowState) -> dict:
if not state.get("documents"):
return {"next_step": "research"}
elif not state.get("analysis"):
return {"next_step": "analyze"}
elif not state.get("summary"):
return {"next_step": "summarize"}
return {"next_step": "done"}
def researcher(state: WorkflowState) -> dict:
return {"documents": [{"source": "web", "content": "Research findings"}]}
def analyzer(state: WorkflowState) -> dict:
return {"analysis": "Detailed analysis of documents"}
def summarizer(state: WorkflowState) -> dict:
return {"summary": "Executive summary"}
def route_supervisor(state: WorkflowState) -> str:
return state["next_step"]
# Build graph
graph = StateGraph(WorkflowState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("analyzer", analyzer)
graph.add_node("summarizer", summarizer)
graph.add_conditional_edges(
"supervisor",
route_supervisor,
{
"research": "researcher",
"analyze": "analyzer",
"summarize": "summarizer",
"done": END
}
)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyzer", "supervisor")
graph.add_edge("summarizer", "supervisor")
graph.set_entry_point("supervisor")
return graph
def visualize_graph_mermaid(graph: StateGraph, title: str = "Workflow"):
"""
Generate Mermaid diagram from graph.
The Mermaid syntax is widely supported in:
- GitHub README files
- Notion documents
- Jupyter notebooks
- Documentation sites
"""
app = graph.compile()
# Get Mermaid diagram as string
mermaid_code = app.get_graph().draw_mermaid()
print(f"=== {title} Mermaid Diagram ===")
print(mermaid_code)
print()
# The output looks like:
# ```mermaid
# graph TD
# __start__ --> supervisor
# supervisor -->|research| researcher
# supervisor -->|analyze| analyzer
# supervisor -->|summarize| summarizer
# supervisor -->|done| __end__
# researcher --> supervisor
# analyzer --> supervisor
# summarizer --> supervisor
# ```
return mermaid_code
def visualize_graph_png(graph: StateGraph, output_path: str = "graph.png"):
"""
Export graph as PNG image.
Requires graphviz to be installed:
- macOS: brew install graphviz
- Ubuntu: apt-get install graphviz
- Windows: choco install graphviz
Also requires pygraphviz or grandalf Python packages.
"""
app = graph.compile()
try:
# Get PNG bytes
png_bytes = app.get_graph().draw_mermaid_png()
# Save to file
with open(output_path, "wb") as f:
f.write(png_bytes)
print(f"Graph saved to {output_path}")
# In Jupyter notebook, display inline
# display(Image(png_bytes))
return png_bytes
except Exception as e:
print(f"PNG generation failed: {e}")
print("Falling back to Mermaid text output")
return visualize_graph_mermaid(graph)
def visualize_graph_ascii(graph: StateGraph):
"""
Generate ASCII representation of graph.
Useful for terminal/CLI environments.
"""
app = graph.compile()
graph_repr = app.get_graph()
print("=== Graph Structure (ASCII) ===")
print()
# Get nodes
nodes = list(graph_repr.nodes.keys())
print(f"Nodes ({len(nodes)}):")
for node in nodes:
print(f" - {node}")
print()
# Get edges
edges = graph_repr.edges
print(f"Edges ({len(edges)}):")
for edge in edges:
if hasattr(edge, 'source') and hasattr(edge, 'target'):
print(f" {edge.source} --> {edge.target}")
else:
print(f" {edge}")
print()
return {"nodes": nodes, "edges": edges}
# Usage example
graph = create_sample_graph()
visualize_graph_mermaid(graph, "Research Workflow")
visualize_graph_ascii(graph)
Visualizing Subgraphs and Complex Hierarchies
def visualize_hierarchical_graph(main_graph, subgraphs: dict):
"""
Visualize hierarchical graph with subgraphs.
Args:
main_graph: The parent graph
subgraphs: Dictionary of {name: subgraph}
"""
print("=== Hierarchical Graph Structure ===")
print()
# Main graph
print("MAIN GRAPH:")
main_app = main_graph.compile()
print(main_app.get_graph().draw_mermaid())
print()
# Each subgraph
for name, subgraph in subgraphs.items():
print(f"SUBGRAPH: {name}")
sub_app = subgraph.compile()
print(sub_app.get_graph().draw_mermaid())
print()
def create_graph_documentation(graph: StateGraph, output_file: str):
"""
Generate markdown documentation for a graph.
Useful for project documentation.
"""
app = graph.compile()
graph_repr = app.get_graph()
doc = []
doc.append("# Workflow Graph Documentation")
doc.append("")
doc.append("## Visual Diagram")
doc.append("")
doc.append("```mermaid")
doc.append(app.get_graph().draw_mermaid().replace("```mermaid\n", "").replace("\n```", ""))
doc.append("```")
doc.append("")
doc.append("## Nodes")
doc.append("")
doc.append("| Node | Description |")
doc.append("|------|-------------|")
for node in graph_repr.nodes.keys():
doc.append(f"| `{node}` | TODO: Add description |")
doc.append("")
doc.append("## Edges")
doc.append("")
doc.append("| From | To | Condition |")
doc.append("|------|-------|-----------|")
for edge in graph_repr.edges:
if hasattr(edge, 'source') and hasattr(edge, 'target'):
condition = getattr(edge, 'data', '-')
doc.append(f"| `{edge.source}` | `{edge.target}` | {condition} |")
doc.append("")
doc.append(f"*Generated: {datetime.now().isoformat()}*")
content = "\n".join(doc)
with open(output_file, "w") as f:
f.write(content)
print(f"Documentation saved to {output_file}")
return content
Step-by-Step Execution Debugging
Understanding exactly what happens at each step of execution is crucial for debugging complex workflows.
Streaming with Updates Mode
def debug_execution_stream(app, input_state: dict, config: dict):
"""
Stream execution with detailed debugging output.
The 'updates' stream mode shows exactly what each node
returns as state updates.
"""
print("=" * 60)
print("DEBUG EXECUTION - Updates Mode")
print("=" * 60)
print()
print(f"Input State: {json.dumps(input_state, indent=2, default=str)}")
print()
print("-" * 60)
step = 0
node_timings = []
for event in app.stream(input_state, config, stream_mode="updates"):
step += 1
timestamp = datetime.now()
# Extract node name and updates
node_name = list(event.keys())[0]
state_update = event[node_name]
# Record timing
timing = {
"step": step,
"node": node_name,
"timestamp": timestamp.isoformat()
}
node_timings.append(timing)
# Print debug info
print(f"STEP {step}: {node_name}")
print(f" Timestamp: {timestamp.strftime('%H:%M:%S.%f')}")
print(f" Update Keys: {list(state_update.keys())}")
# Pretty print the update
for key, value in state_update.items():
if isinstance(value, str) and len(value) > 100:
print(f" {key}: {value[:100]}... (truncated)")
elif isinstance(value, list) and len(value) > 3:
print(f" {key}: [{len(value)} items]")
else:
print(f" {key}: {value}")
print("-" * 60)
# Print timing summary
print()
print("EXECUTION SUMMARY:")
print(f" Total Steps: {step}")
if len(node_timings) >= 2:
start = datetime.fromisoformat(node_timings[0]["timestamp"])
end = datetime.fromisoformat(node_timings[-1]["timestamp"])
duration = (end - start).total_seconds()
print(f" Total Duration: {duration:.3f}s")
print()
return node_timings
def debug_execution_values(app, input_state: dict, config: dict):
"""
Stream execution with full state after each node.
The 'values' stream mode shows the complete accumulated
state after each node executes.
"""
print("=" * 60)
print("DEBUG EXECUTION - Values Mode")
print("=" * 60)
print()
step = 0
for state in app.stream(input_state, config, stream_mode="values"):
step += 1
print(f"STATE AFTER STEP {step}:")
for key, value in state.items():
if key.startswith("_"):
continue # Skip internal fields
if isinstance(value, str) and len(value) > 200:
print(f" {key}: {value[:200]}...")
elif isinstance(value, list):
print(f" {key}: [{len(value)} items]")
elif isinstance(value, dict):
print(f" {key}: {{...}}")
else:
print(f" {key}: {value}")
print("-" * 60)
return step
def compare_stream_modes(app, input_state: dict, config: dict):
"""
Demonstrate difference between stream modes.
Stream Modes in LangGraph 1.0.5:
- "values": Full state after each node
- "updates": Only the updates from each node
- "messages": For chat-like applications
- "events": Low-level events for advanced debugging
"""
print("MODE: updates")
print("-" * 40)
updates_result = []
for event in app.stream(input_state, config, stream_mode="updates"):
updates_result.append(event)
print(event)
print()
# Reset state for clean comparison
new_config = {"configurable": {"thread_id": f"compare-{datetime.now().timestamp()}"}}
print("MODE: values")
print("-" * 40)
values_result = []
for state in app.stream(input_state, new_config, stream_mode="values"):
values_result.append(state)
print(f"Keys: {list(state.keys())}")
print()
return {
"updates_count": len(updates_result),
"values_count": len(values_result)
}
Interactive Debugging with Breakpoints
class DebugBreakpoint:
"""
Debugging helper that pauses execution at specified nodes.
Useful for interactive debugging sessions.
"""
def __init__(self, break_at_nodes: list[str] = None, break_on_condition=None):
self.break_at_nodes = break_at_nodes or []
self.break_on_condition = break_on_condition
self.execution_log = []
self.paused = False
def wrap_node(self, node_func, node_name: str):
"""Wrap a node function with breakpoint logic."""
def wrapped(state: dict) -> dict:
# Log entry
entry = {
"node": node_name,
"timestamp": datetime.now().isoformat(),
"input_state": {k: v for k, v in state.items() if not k.startswith("_")}
}
# Check if should break
should_break = node_name in self.break_at_nodes
if self.break_on_condition and self.break_on_condition(state, node_name):
should_break = True
if should_break:
print()
print("=" * 50)
print(f"BREAKPOINT: {node_name}")
print("=" * 50)
print(f"Current State:")
for k, v in state.items():
if not k.startswith("_"):
print(f" {k}: {v}")
print()
print("Commands: [c]ontinue, [s]tate, [q]uit")
while True:
cmd = input("> ").strip().lower()
if cmd == 'c':
break
elif cmd == 's':
print(json.dumps(state, indent=2, default=str))
elif cmd == 'q':
raise KeyboardInterrupt("Debug quit requested")
else:
print("Unknown command")
# Execute node
result = node_func(state)
# Log exit
entry["output"] = result
self.execution_log.append(entry)
return result
return wrapped
def get_log(self) -> list[dict]:
"""Get execution log."""
return self.execution_log.copy()
def print_log(self):
"""Print formatted execution log."""
print("=== Execution Log ===")
for i, entry in enumerate(self.execution_log):
print(f"\n{i+1}. {entry['node']} @ {entry['timestamp']}")
print(f" Output keys: {list(entry['output'].keys())}")
def create_debuggable_graph(graph: StateGraph, breakpoints: list[str] = None):
"""
Create a version of the graph with debugging capabilities.
Args:
graph: Original StateGraph
breakpoints: List of node names to break at
"""
debugger = DebugBreakpoint(break_at_nodes=breakpoints or [])
# Note: This is a conceptual example
# In practice, you would rebuild the graph with wrapped nodes
print("Debuggable graph created with breakpoints at:", breakpoints)
return debugger
State History Inspection
Understanding how state evolves through execution is essential for debugging state-related issues.
Comprehensive State History Analysis
def inspect_full_state_history(app, config: dict):
"""
Inspect complete state history from checkpoints.
This shows every saved state, allowing you to:
- Trace how state evolved
- Find where state became incorrect
- Identify which node caused an issue
"""
print("=" * 60)
print("STATE HISTORY INSPECTION")
print("=" * 60)
print()
history = list(app.get_state_history(config))
print(f"Total Checkpoints: {len(history)}")
print()
for i, state_snapshot in enumerate(reversed(history)):
checkpoint_id = state_snapshot.config.get("configurable", {}).get("checkpoint_id", "unknown")
print(f"Checkpoint {i + 1}: {checkpoint_id[:20]}...")
print(f" Source Node: {state_snapshot.metadata.get('source', 'unknown')}")
print(f" Step: {state_snapshot.metadata.get('step', 'unknown')}")
print(f" Created: {state_snapshot.metadata.get('created_at', 'unknown')}")
# Show state values
print(" State Values:")
for key, value in state_snapshot.values.items():
if key.startswith("_"):
continue
value_preview = str(value)
if len(value_preview) > 80:
value_preview = value_preview[:80] + "..."
print(f" {key}: {value_preview}")
# Show next nodes
if hasattr(state_snapshot, 'next') and state_snapshot.next:
print(f" Next Nodes: {state_snapshot.next}")
print("-" * 40)
return history
def compare_checkpoints(app, config: dict, checkpoint_a: int, checkpoint_b: int):
"""
Compare two checkpoints to see what changed.
Useful for finding exactly which node changed which fields.
"""
history = list(app.get_state_history(config))
history.reverse() # Chronological order
if checkpoint_a >= len(history) or checkpoint_b >= len(history):
print("Invalid checkpoint indices")
return None
state_a = history[checkpoint_a].values
state_b = history[checkpoint_b].values
print(f"Comparing Checkpoint {checkpoint_a} vs Checkpoint {checkpoint_b}")
print("=" * 60)
# Find differences
all_keys = set(state_a.keys()) | set(state_b.keys())
changes = {
"added": [],
"removed": [],
"modified": []
}
for key in all_keys:
if key.startswith("_"):
continue
in_a = key in state_a
in_b = key in state_b
if in_a and not in_b:
changes["removed"].append(key)
elif in_b and not in_a:
changes["added"].append(key)
elif state_a.get(key) != state_b.get(key):
changes["modified"].append({
"key": key,
"before": state_a.get(key),
"after": state_b.get(key)
})
# Print changes
if changes["added"]:
print("\nADDED:")
for key in changes["added"]:
print(f" + {key}: {state_b[key]}")
if changes["removed"]:
print("\nREMOVED:")
for key in changes["removed"]:
print(f" - {key}: {state_a[key]}")
if changes["modified"]:
print("\nMODIFIED:")
for change in changes["modified"]:
print(f" {change['key']}:")
print(f" Before: {change['before']}")
print(f" After: {change['after']}")
if not any(changes.values()):
print("\nNo differences found")
return changes
def find_state_issue(app, config: dict, predicate) -> Optional[dict]:
"""
Find the first checkpoint where a condition becomes true.
Example usage:
find_state_issue(app, config, lambda s: s.get("error") is not None)
find_state_issue(app, config, lambda s: len(s.get("documents", [])) > 10)
"""
history = list(app.get_state_history(config))
history.reverse() # Chronological order
for i, state_snapshot in enumerate(history):
if predicate(state_snapshot.values):
print(f"Condition matched at checkpoint {i}")
print(f" Node: {state_snapshot.metadata.get('source')}")
print(f" Step: {state_snapshot.metadata.get('step')}")
return {
"checkpoint_index": i,
"metadata": state_snapshot.metadata,
"values": state_snapshot.values
}
print("Condition never matched in state history")
return None
Time Travel Debugging
def replay_from_checkpoint(app, config: dict, checkpoint_index: int, new_input: dict = None):
"""
Replay execution from a specific checkpoint.
This is powerful for:
- Testing fixes without re-running entire workflow
- Exploring alternative execution paths
- Debugging intermittent issues
"""
history = list(app.get_state_history(config))
history.reverse()
if checkpoint_index >= len(history):
raise ValueError(f"Checkpoint {checkpoint_index} not found")
target_checkpoint = history[checkpoint_index]
checkpoint_config = target_checkpoint.config
print(f"Replaying from checkpoint {checkpoint_index}")
print(f" Node: {target_checkpoint.metadata.get('source')}")
print(f" State: {list(target_checkpoint.values.keys())}")
print()
# Update state if new_input provided
if new_input:
print(f"Applying modifications: {new_input}")
app.update_state(checkpoint_config, new_input)
# Resume execution
result = app.invoke(None, checkpoint_config)
print("Replay complete")
return result
def fork_execution(app, original_config: dict, checkpoint_index: int, modifications: dict):
"""
Fork execution from a checkpoint with modifications.
Creates a new thread that diverges from the original,
allowing you to test "what if" scenarios.
"""
history = list(app.get_state_history(original_config))
history.reverse()
if checkpoint_index >= len(history):
raise ValueError(f"Checkpoint {checkpoint_index} not found")
target_state = history[checkpoint_index]
# Create new thread ID for the fork
fork_thread_id = f"fork-{datetime.now().timestamp()}"
fork_config = {"configurable": {"thread_id": fork_thread_id}}
# Initialize forked state
forked_state = {**target_state.values, **modifications}
print(f"Forking to thread: {fork_thread_id}")
print(f"Modifications: {modifications}")
# Run forked execution
result = app.invoke(forked_state, fork_config)
return {
"fork_thread_id": fork_thread_id,
"result": result
}
Infinite Loop Detection and Prevention
Infinite loops are a common issue in agent workflows. Here are comprehensive strategies for detection and prevention.
Loop Detection Patterns
import hashlib
def create_state_hash(state: dict) -> str:
"""
Create a deterministic hash of state for loop detection.
Excludes:
- Internal fields (starting with _)
- Timestamps and counters that naturally change
"""
hashable_state = {}
for key, value in state.items():
if key.startswith("_"):
continue
if key in ["iteration", "timestamp", "step"]:
continue
# Convert to hashable representation
if isinstance(value, list):
hashable_state[key] = tuple(
json.dumps(v, sort_keys=True, default=str) if isinstance(v, dict) else v
for v in value
)
elif isinstance(value, dict):
hashable_state[key] = json.dumps(value, sort_keys=True, default=str)
else:
hashable_state[key] = value
state_str = json.dumps(hashable_state, sort_keys=True, default=str)
return hashlib.sha256(state_str.encode()).hexdigest()[:16]
class LoopDetector:
"""
Detect infinite loops in graph execution.
Strategies:
1. State hashing: Same state seen twice indicates loop
2. Node sequence: Same sequence of nodes repeating
3. Iteration counting: Max iterations exceeded
"""
def __init__(self, max_iterations: int = 50, max_same_state: int = 2):
self.max_iterations = max_iterations
self.max_same_state = max_same_state
self.state_hashes = []
self.node_sequence = []
self.iteration = 0
def check(self, state: dict, node_name: str) -> tuple[bool, str]:
"""
Check for loop condition.
Returns: (is_loop, reason)
"""
self.iteration += 1
self.node_sequence.append(node_name)
# Check iteration limit
if self.iteration > self.max_iterations:
return True, f"Max iterations ({self.max_iterations}) exceeded"
# Check state hash
state_hash = create_state_hash(state)
hash_count = self.state_hashes.count(state_hash)
if hash_count >= self.max_same_state:
return True, f"Same state seen {hash_count + 1} times"
self.state_hashes.append(state_hash)
# Check for repeating node sequence
if len(self.node_sequence) >= 6:
last_3 = tuple(self.node_sequence[-3:])
prev_3 = tuple(self.node_sequence[-6:-3])
if last_3 == prev_3:
return True, f"Repeating node sequence: {last_3}"
return False, "OK"
def get_report(self) -> dict:
"""Get debugging report."""
return {
"iterations": self.iteration,
"unique_states": len(set(self.state_hashes)),
"node_sequence": self.node_sequence,
"state_hash_history": self.state_hashes[-10:] # Last 10
}
def create_loop_safe_node(node_func, loop_detector: LoopDetector, node_name: str):
"""
Wrap a node with loop detection.
"""
def wrapped(state: dict) -> dict:
is_loop, reason = loop_detector.check(state, node_name)
if is_loop:
logger.error(f"Loop detected at {node_name}: {reason}")
logger.error(f"Report: {loop_detector.get_report()}")
# Return state that routes to END
return {
"_loop_detected": True,
"_loop_reason": reason,
"next_step": "done" # Adjust based on your routing field
}
return node_func(state)
return wrapped
def add_iteration_guard(graph: StateGraph, max_iterations: int = 50):
"""
Add iteration guard to a graph.
This modifies routing to check iteration count
and route to END if exceeded.
"""
def iteration_check(state: dict) -> str:
"""Guard node that checks iteration count."""
iteration = state.get("iteration", 0)
if iteration >= max_iterations:
logger.warning(f"Iteration limit reached: {iteration}")
return "done"
return state.get("next_step", "done")
# Add as the first routing check
graph.add_node("_iteration_guard", lambda s: {"iteration": s.get("iteration", 0) + 1})
print(f"Iteration guard added with max={max_iterations}")
return graph
Visualizing Loop Issues
def visualize_execution_path(app, config: dict):
"""
Visualize the actual execution path taken.
Helps identify:
- Loops (same nodes appearing repeatedly)
- Dead ends (paths that don't reach END)
- Unexpected routes
"""
history = list(app.get_state_history(config))
history.reverse()
path = []
for state in history:
node = state.metadata.get("source", "unknown")
path.append(node)
print("=== Execution Path ===")
print()
# Print as flow diagram
for i, node in enumerate(path):
indent = " " if i > 0 else ""
connector = "└─> " if i > 0 else ""
print(f"{indent}{connector}{node}")
print()
# Detect repeated sequences
node_counts = {}
for node in path:
node_counts[node] = node_counts.get(node, 0) + 1
repeated = {n: c for n, c in node_counts.items() if c > 1}
if repeated:
print("Repeated Nodes:")
for node, count in repeated.items():
print(f" {node}: {count} times")
return path
def create_execution_timeline(app, config: dict):
"""
Create a timeline visualization of execution.
"""
history = list(app.get_state_history(config))
history.reverse()
print("=== Execution Timeline ===")
print()
for i, state in enumerate(history):
node = state.metadata.get("source", "unknown")
step = state.metadata.get("step", i)
# Create ASCII timeline
marker = "●" if node != "__start__" else "○"
line = "│" if i < len(history) - 1 else " "
print(f" {marker} Step {step}: {node}")
print(f" {line}")
print()
LangSmith Integration for Production Debugging
LangSmith provides comprehensive tracing and debugging for production LangGraph applications.
Setting Up LangSmith Tracing
import os
from langsmith import traceable
from langsmith.run_trees import RunTree
# Configure LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "langgraph-debugging"
# Optional: Enable detailed logging
os.environ["LANGCHAIN_VERBOSE"] = "true"
@traceable(name="research_node", run_type="chain")
def traced_research_node(state: dict) -> dict:
"""
Node with LangSmith tracing.
The @traceable decorator automatically:
- Records input/output
- Tracks execution time
- Captures errors
- Links to parent traces
"""
# Node logic here
return {"documents": [{"source": "traced", "content": "Research results"}]}
@traceable(name="analysis_node", run_type="chain", tags=["critical"])
def traced_analysis_node(state: dict) -> dict:
"""
Node with custom tags for filtering.
Tags help you filter traces in LangSmith UI:
- "critical": High-priority nodes
- "llm-call": Nodes that call LLMs
- "external-api": Nodes with external dependencies
"""
return {"analysis": "Traced analysis output"}
class LangSmithDebugger:
"""
Helper class for LangSmith debugging operations.
"""
def __init__(self, project_name: str = "langgraph-debug"):
self.project_name = project_name
os.environ["LANGCHAIN_PROJECT"] = project_name
@traceable(name="debug_session", run_type="chain")
def run_with_tracing(self, app, input_state: dict, config: dict):
"""
Run graph with comprehensive tracing.
"""
result = app.invoke(input_state, config)
return result
@traceable(name="debug_stream", run_type="chain")
def stream_with_tracing(self, app, input_state: dict, config: dict):
"""
Stream graph with tracing for each step.
"""
results = []
for event in app.stream(input_state, config, stream_mode="updates"):
results.append(event)
return results
# Example: Tracing with custom metadata
@traceable(
name="supervised_task",
run_type="chain",
metadata={"version": "1.0", "environment": "development"}
)
def traced_supervisor(state: dict) -> dict:
"""
Supervisor with rich metadata.
Metadata appears in LangSmith UI and helps:
- Filter by version
- Compare environments
- Track deployments
"""
if not state.get("documents"):
return {"next_step": "research"}
return {"next_step": "done"}
Production Debugging with LangSmith
from langsmith import Client
from datetime import datetime, timedelta
class ProductionDebugger:
"""
Debug production LangGraph applications using LangSmith.
"""
def __init__(self, api_key: str = None):
self.client = Client(api_key=api_key)
def find_errors(self, project_name: str, hours: int = 24):
"""
Find all error traces in the last N hours.
"""
start_time = datetime.now() - timedelta(hours=hours)
runs = self.client.list_runs(
project_name=project_name,
error=True,
start_time=start_time
)
errors = []
for run in runs:
errors.append({
"run_id": str(run.id),
"name": run.name,
"error": run.error,
"start_time": run.start_time,
"inputs": run.inputs,
"trace_id": str(run.trace_id)
})
print(f"Found {len(errors)} errors in last {hours} hours")
return errors
def analyze_slow_runs(self, project_name: str, threshold_seconds: float = 30.0):
"""
Find runs that exceeded latency threshold.
"""
runs = self.client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(hours=24)
)
slow_runs = []
for run in runs:
if run.end_time and run.start_time:
duration = (run.end_time - run.start_time).total_seconds()
if duration > threshold_seconds:
slow_runs.append({
"run_id": str(run.id),
"name": run.name,
"duration_seconds": duration,
"trace_id": str(run.trace_id)
})
# Sort by duration
slow_runs.sort(key=lambda x: x["duration_seconds"], reverse=True)
print(f"Found {len(slow_runs)} slow runs (>{threshold_seconds}s)")
return slow_runs
def get_run_details(self, run_id: str):
"""
Get detailed information about a specific run.
"""
run = self.client.read_run(run_id)
return {
"id": str(run.id),
"name": run.name,
"status": run.status,
"error": run.error,
"inputs": run.inputs,
"outputs": run.outputs,
"start_time": run.start_time,
"end_time": run.end_time,
"latency_ms": run.latency_ms,
"token_usage": run.total_tokens,
"feedback": list(self.client.list_feedback(run_ids=[run_id]))
}
def compare_runs(self, run_id_a: str, run_id_b: str):
"""
Compare two runs to find differences.
"""
run_a = self.get_run_details(run_id_a)
run_b = self.get_run_details(run_id_b)
comparison = {
"latency_diff_ms": (run_a.get("latency_ms", 0) or 0) - (run_b.get("latency_ms", 0) or 0),
"token_diff": (run_a.get("token_usage", 0) or 0) - (run_b.get("token_usage", 0) or 0),
"status_a": run_a.get("status"),
"status_b": run_b.get("status"),
"error_a": run_a.get("error"),
"error_b": run_b.get("error")
}
return comparison
# Example: Creating custom annotations for debugging
@traceable(name="debug_checkpoint")
def add_debug_checkpoint(state: dict, checkpoint_name: str) -> dict:
"""
Add a debug checkpoint that appears in LangSmith.
Use this to mark important points in execution.
"""
print(f"DEBUG CHECKPOINT: {checkpoint_name}")
print(f" State keys: {list(state.keys())}")
# This creates a trace entry in LangSmith
return state
Advanced Debugging Patterns
Node Execution Tracing
from functools import wraps
from typing import Callable
import time
def trace_node(name: str = None, log_state: bool = True):
"""
Decorator for comprehensive node tracing.
Usage:
@trace_node("research")
def research_node(state: dict) -> dict:
...
"""
def decorator(func: Callable):
node_name = name or func.__name__
@wraps(func)
def wrapper(state: dict) -> dict:
start_time = time.time()
# Log entry
logger.debug(f"[{node_name}] ENTER")
if log_state:
logger.debug(f"[{node_name}] Input: {_truncate_state(state)}")
try:
result = func(state)
# Log success
elapsed = time.time() - start_time
logger.debug(f"[{node_name}] EXIT ({elapsed:.3f}s)")
if log_state:
logger.debug(f"[{node_name}] Output: {_truncate_state(result)}")
return result
except Exception as e:
# Log error
elapsed = time.time() - start_time
logger.error(f"[{node_name}] ERROR ({elapsed:.3f}s): {e}")
logger.error(f"[{node_name}] Traceback:\n{traceback.format_exc()}")
raise
return wrapper
return decorator
def _truncate_state(state: dict, max_len: int = 200) -> str:
"""Truncate state for logging."""
result = {}
for key, value in state.items():
if key.startswith("_"):
continue
str_value = str(value)
if len(str_value) > max_len:
result[key] = str_value[:max_len] + "..."
else:
result[key] = value
return str(result)
# Example usage
@trace_node("supervisor")
def traced_supervisor_node(state: dict) -> dict:
if not state.get("documents"):
return {"next_step": "research"}
return {"next_step": "done"}
@trace_node("researcher", log_state=True)
def traced_researcher_node(state: dict) -> dict:
# Simulated work
time.sleep(0.1)
return {"documents": [{"content": "Research findings"}]}
Assertion-Based Debugging
class StateAssertion:
"""
Add runtime assertions to validate state.
Catches issues early before they propagate.
"""
def __init__(self):
self.failures = []
def assert_field(self, state: dict, field: str, predicate: Callable, message: str = None):
"""Assert a condition on a state field."""
value = state.get(field)
if not predicate(value):
failure = {
"field": field,
"value": value,
"message": message or f"Assertion failed for {field}"
}
self.failures.append(failure)
logger.error(f"ASSERTION FAILED: {failure}")
return False
return True
def assert_not_none(self, state: dict, field: str):
"""Assert field is not None."""
return self.assert_field(
state, field,
lambda v: v is not None,
f"{field} must not be None"
)
def assert_non_empty_list(self, state: dict, field: str):
"""Assert field is a non-empty list."""
return self.assert_field(
state, field,
lambda v: isinstance(v, list) and len(v) > 0,
f"{field} must be a non-empty list"
)
def assert_type(self, state: dict, field: str, expected_type: type):
"""Assert field is of expected type."""
return self.assert_field(
state, field,
lambda v: isinstance(v, expected_type),
f"{field} must be of type {expected_type.__name__}"
)
def get_failures(self) -> list[dict]:
"""Get all assertion failures."""
return self.failures.copy()
def clear(self):
"""Clear failures."""
self.failures = []
def create_validated_node(node_func, input_assertions: list = None, output_assertions: list = None):
"""
Wrap a node with input/output validation.
Args:
node_func: Original node function
input_assertions: List of (field, predicate, message) tuples
output_assertions: List of (field, predicate, message) tuples
"""
validator = StateAssertion()
def wrapped(state: dict) -> dict:
# Validate input
if input_assertions:
for field, predicate, message in input_assertions:
validator.assert_field(state, field, predicate, message)
if validator.failures:
logger.error(f"Input validation failed: {validator.get_failures()}")
validator.clear()
# Execute node
result = node_func(state)
# Validate output
if output_assertions:
for field, predicate, message in output_assertions:
validator.assert_field(result, field, predicate, message)
if validator.failures:
logger.error(f"Output validation failed: {validator.get_failures()}")
validator.clear()
return result
return wrapped
# Example usage
validated_analyzer = create_validated_node(
lambda s: {"analysis": "Result"},
input_assertions=[
("documents", lambda v: v and len(v) > 0, "Need documents to analyze")
],
output_assertions=[
("analysis", lambda v: v and len(v) > 50, "Analysis too short")
]
)
Error Recovery Debugging
class ErrorRecoveryDebugger:
"""
Debug error recovery behavior in LangGraph applications.
"""
def __init__(self):
self.error_log = []
self.recovery_log = []
def log_error(self, node: str, error: Exception, state: dict):
"""Log an error occurrence."""
entry = {
"timestamp": datetime.now().isoformat(),
"node": node,
"error_type": type(error).__name__,
"error_message": str(error),
"state_snapshot": {k: v for k, v in state.items() if not k.startswith("_")}
}
self.error_log.append(entry)
logger.error(f"Error in {node}: {error}")
def log_recovery(self, node: str, strategy: str, success: bool):
"""Log a recovery attempt."""
entry = {
"timestamp": datetime.now().isoformat(),
"node": node,
"strategy": strategy,
"success": success
}
self.recovery_log.append(entry)
if success:
logger.info(f"Recovery successful in {node} using {strategy}")
else:
logger.warning(f"Recovery failed in {node} using {strategy}")
def create_recoverable_node(self, node_func, node_name: str,
recovery_strategies: list = None):
"""
Wrap a node with error recovery and logging.
"""
strategies = recovery_strategies or [
("retry_once", lambda s: s),
("fallback_value", lambda s: {"_fallback": True})
]
def wrapped(state: dict) -> dict:
try:
return node_func(state)
except Exception as e:
self.log_error(node_name, e, state)
# Try recovery strategies
for strategy_name, strategy_func in strategies:
try:
result = strategy_func(state)
self.log_recovery(node_name, strategy_name, True)
return result
except Exception as recovery_error:
self.log_recovery(node_name, strategy_name, False)
continue
# All strategies failed
raise RuntimeError(f"All recovery strategies failed for {node_name}") from e
return wrapped
def get_error_summary(self) -> dict:
"""Get summary of errors and recovery attempts."""
return {
"total_errors": len(self.error_log),
"errors_by_node": self._count_by_field(self.error_log, "node"),
"errors_by_type": self._count_by_field(self.error_log, "error_type"),
"recovery_attempts": len(self.recovery_log),
"recovery_success_rate": self._calculate_success_rate()
}
def _count_by_field(self, log: list, field: str) -> dict:
counts = {}
for entry in log:
value = entry.get(field, "unknown")
counts[value] = counts.get(value, 0) + 1
return counts
def _calculate_success_rate(self) -> float:
if not self.recovery_log:
return 0.0
successes = sum(1 for r in self.recovery_log if r["success"])
return successes / len(self.recovery_log)
Interview Questions
Q1: How do you debug LangGraph workflows?
Answer:
"I use a multi-layered debugging approach for LangGraph workflows:
1. Visualization First
# Always start by visualizing the graph structure
app = graph.compile()
print(app.get_graph().draw_mermaid())
This reveals structural issues like missing edges or incorrect routing.
2. Step-by-Step Streaming
# Use 'updates' mode to see exactly what each node returns
for event in app.stream(input_state, config, stream_mode='updates'):
node = list(event.keys())[0]
print(f'{node}: {event[node]}')
This shows the actual state changes at each step.
3. State History Inspection
# Review complete checkpoint history
for state in app.get_state_history(config):
print(f"Node: {state.metadata.get('source')}")
print(f"State: {state.values}")
This helps trace how state evolved and identify where issues started.
4. LangSmith for Production
- Enable tracing with
LANGCHAIN_TRACING_V2=true - Use
@traceabledecorator on critical nodes - Query traces by error status, latency, or custom tags
5. Loop Detection
# Hash state to detect repetition
state_hashes = []
for state in history:
h = hash(frozenset(state.values.items()))
if h in state_hashes:
print('Loop detected!')
state_hashes.append(h)
The key is combining visualization (structure), streaming (runtime), checkpoints (history), and tracing (production)."
Q2: How do you identify and prevent infinite loops?
Answer:
"Infinite loops in LangGraph typically occur from routing issues or state that never changes. Here's my approach:
Detection Strategies:
- State Hashing:
def create_state_hash(state):
# Exclude volatile fields like timestamps
hashable = {k: v for k, v in state.items()
if k not in ['iteration', 'timestamp']}
return hashlib.sha256(
json.dumps(hashable, sort_keys=True).encode()
).hexdigest()
# If same hash appears twice, we're looping
- Iteration Counters:
class State(TypedDict):
iteration: int
max_iterations: int
def supervisor(state):
if state['iteration'] >= state['max_iterations']:
return {'next_step': 'done'} # Force exit
- Node Sequence Detection:
# If same node sequence repeats, we're looping
if node_sequence[-3:] == node_sequence[-6:-3]:
print('Repeating sequence detected')
Prevention Strategies:
- Always include iteration guard in state
- Visualize graph to ensure all paths reach END
- Add max_iterations to routing logic
- Use loop detection wrapper on nodes
- Test with various inputs before production
The most common causes I've seen are:
- Missing edge to END from error handlers
- Routing logic that never satisfies exit conditions
- State updates that don't change routing-relevant fields"
Q3: What stream modes does LangGraph support and when do you use each?
Answer:
"LangGraph 1.0.5 supports several stream modes, each optimized for different use cases:
1. stream_mode='updates'
for event in app.stream(input, config, stream_mode='updates'):
# event = {'node_name': {'field': 'new_value'}}
- Shows only what each node returned
- Best for debugging node behavior
- Smallest payload size
- Use when: Debugging specific node outputs
2. stream_mode='values'
for state in app.stream(input, config, stream_mode='values'):
# state = complete accumulated state
- Shows full state after each node
- Best for understanding state evolution
- Larger payload size
- Use when: Tracking reducer behavior
3. stream_mode='messages'
- Optimized for chat applications
- Streams individual message chunks
- Use when: Building conversational interfaces
4. stream_mode='events'
- Low-level events for advanced debugging
- Includes internal LangGraph events
- Use when: Deep debugging of graph internals
For production debugging, I typically start with 'updates' to see what changed, then switch to 'values' if I need to understand accumulated state. For real-time chat applications, 'messages' provides the best user experience."
Q4: How do you use LangSmith for production debugging?
Answer:
"LangSmith is essential for debugging production LangGraph applications. Here's my setup:
1. Enable Tracing:
import os
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_API_KEY'] = 'your-key'
os.environ['LANGCHAIN_PROJECT'] = 'production-agents'
2. Annotate Critical Nodes:
from langsmith import traceable
@traceable(name='supervisor', tags=['critical'])
def supervisor(state):
# Traces appear in LangSmith with rich context
...
3. Query Error Traces:
from langsmith import Client
client = Client()
errors = client.list_runs(
project_name='production-agents',
error=True,
start_time=datetime.now() - timedelta(hours=24)
)
4. Analyze Latency:
slow_runs = client.list_runs(
project_name='production-agents',
filter='latency > 30000' # Over 30 seconds
)
Key Benefits:
- Full trace history for any execution
- Input/output capture for reproduction
- Latency breakdown by node
- Error aggregation and alerting
- Comparison between runs
- Feedback collection for evaluation
I also use custom metadata to tag traces by version, environment, and user ID, making it easy to filter and analyze specific scenarios."
Key Takeaways
| Technique | Purpose | When to Use |
|---|---|---|
draw_mermaid() |
Visualize graph structure | Development, documentation |
stream(mode='updates') |
See node-by-node changes | Debugging runtime behavior |
stream(mode='values') |
See full state evolution | Understanding reducers |
get_state_history() |
Inspect checkpoint timeline | Time-travel debugging |
| State hashing | Detect infinite loops | Loop prevention |
@traceable |
LangSmith integration | Production monitoring |
| Iteration guards | Prevent runaway execution | Safety mechanism |
| Assertion wrappers | Validate state at runtime | Early error detection |
Best Practices Summary
-
Always visualize first - Graph diagrams reveal structural issues instantly
-
Use stream modes strategically -
updatesfor debugging,valuesfor state tracking -
Enable LangSmith in production - Essential for debugging issues you cannot reproduce locally
-
Add iteration guards - Prevent infinite loops with max_iterations checks
-
Log node entry/exit - Use decorators for consistent tracing
-
Hash state for loop detection - Compare state hashes to detect repetition
-
Validate state transitions - Add assertions to catch issues early
-
Use time-travel debugging - Replay from checkpoints to test fixes
-
Document your debugging process - Create runbooks for common issues
-
Test with edge cases - Many bugs only appear with unusual inputs
Next: Capstone Project - Production Research System
:::