Testing & Capstone Project
Capstone: Production Research System
This capstone project brings together all concepts from the course to build a production-ready multi-agent research system. You will implement state management with reducers, supervisor pattern coordination, checkpointing for persistence, human-in-the-loop approval, error recovery, and LangSmith observability.
Project Overview
The Production Research System is a multi-agent workflow that:
- Researches topics using multiple sources
- Analyzes gathered information for key insights
- Writes professional reports
- Reviews quality through a supervisor
- Approves via human-in-the-loop
- Persists state for long-running workflows
- Traces execution for debugging and monitoring
This system demonstrates real-world patterns used in production AI applications.
System Architecture
+------------------+
| Supervisor |
| (Coordinator) |
+--------+---------+
|
+--------------------+--------------------+
| | |
v v v
+-------+--------+ +-------+--------+ +-------+--------+
| Researcher | | Analyzer | | Writer |
| (Gather Data) | | (Extract Insights) | (Draft Report) |
+----------------+ +----------------+ +----------------+
| | |
+--------------------+--------------------+
|
v
+--------+---------+
| Human Review |
| (Approve/Revise)|
+--------+---------+
|
v
+--------+---------+
| Finalize |
| (Add Metadata) |
+------------------+
Complete Implementation
State Definition
"""
Production Research System - State Definition
This state schema demonstrates:
1. Input fields for configuration
2. Accumulated outputs using reducers
3. Worker output fields
4. Control fields for routing
5. Metadata for tracking
Key Design Decisions:
- Use Annotated with operator.add for list accumulation
- Use Optional for fields that may not exist initially
- Use Literal for type-safe routing
- Include iteration tracking for loop prevention
"""
from typing import TypedDict, Annotated, Literal, Optional
from langgraph.graph import StateGraph, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langsmith import traceable
import operator
import os
from datetime import datetime
import json
# Enable LangSmith tracing for production observability
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production-research-system"
class ResearchDocument(TypedDict):
"""Structure for research documents."""
source: str
content: str
query: str
timestamp: str
relevance_score: Optional[float]
class Message(TypedDict):
"""Structure for inter-agent messages."""
from_agent: str
to_agent: str
message_type: str
content: str
timestamp: str
class ResearchState(TypedDict):
"""
Complete state for the production research system.
Categories:
1. Input Configuration
2. Accumulated Data (with reducers)
3. Worker Outputs
4. Control Flow
5. Tracking/Metadata
"""
# === Input Configuration ===
query: str # Research query from user
max_iterations: int # Safety limit for iterations
quality_threshold: int # Minimum quality score (0-100)
require_human_approval: bool # Whether human review is required
# === Accumulated Data (Reducers) ===
documents: Annotated[list[ResearchDocument], operator.add]
messages: Annotated[list[Message], operator.add]
errors: Annotated[list[dict], operator.add]
# === Worker Outputs ===
analysis: Optional[str] # Analysis from analyzer
report: Optional[str] # Report from writer
quality_score: Optional[int] # Quality assessment (0-100)
# === Control Flow ===
next_worker: Literal[
"researcher",
"analyzer",
"writer",
"quality_check",
"human_review",
"finalize",
"error_handler",
"done"
]
current_phase: Literal["research", "analysis", "writing", "review", "complete"]
# === Tracking/Metadata ===
iteration: int
started_at: str
completed_at: Optional[str]
approved: bool
approval_notes: Optional[str]
feedback: Optional[str]
Worker Nodes
"""
Worker Nodes - The agents that perform actual work.
Each worker:
1. Is decorated with @traceable for LangSmith visibility
2. Receives complete state, returns partial updates
3. Adds messages for inter-agent communication
4. Handles errors gracefully
"""
@traceable(name="researcher", run_type="chain", tags=["worker", "llm-call"])
def researcher(state: ResearchState) -> dict:
"""
Gather research documents based on query.
In production, this would:
- Call search APIs (Google, Bing, academic databases)
- Scrape relevant websites
- Query internal knowledge bases
- Retrieve from vector stores
"""
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Use feedback if revision was requested
query = state["query"]
if state.get("feedback"):
query = f"{query}\n\nAdditional focus based on feedback: {state['feedback']}"
# Simulate research with LLM (in production, use real search APIs)
prompt = f"""You are a research assistant. Find key facts and information about:
Query: {query}
Provide your findings as a JSON object with this structure:
{{
"facts": [
{{"fact": "...", "source": "...", "confidence": "high/medium/low"}},
...
],
"summary": "Brief summary of findings"
}}
Include 3-5 key facts with sources."""
try:
response = llm.invoke(prompt)
# Create research document
doc: ResearchDocument = {
"source": "llm_research",
"content": response.content,
"query": query,
"timestamp": datetime.now().isoformat(),
"relevance_score": 0.85 # In production, calculate actual relevance
}
# Create status message
message: Message = {
"from_agent": "researcher",
"to_agent": "supervisor",
"message_type": "status",
"content": f"Found research findings for: {query[:50]}...",
"timestamp": datetime.now().isoformat()
}
return {
"documents": [doc],
"messages": [message],
"iteration": state["iteration"] + 1,
"current_phase": "research"
}
except Exception as e:
# Log error for recovery
error = {
"node": "researcher",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
return {
"errors": [error],
"next_worker": "error_handler"
}
@traceable(name="analyzer", run_type="chain", tags=["worker", "llm-call"])
def analyzer(state: ResearchState) -> dict:
"""
Analyze gathered documents for insights.
Analysis includes:
- Key theme extraction
- Evidence evaluation
- Confidence assessment
- Gap identification
"""
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Combine all document contents
doc_contents = "\n\n---\n\n".join([
f"Source: {doc['source']}\nContent: {doc['content']}"
for doc in state["documents"]
])
prompt = f"""Analyze these research findings and provide a structured analysis:
Research Documents:
{doc_contents}
Original Query: {state['query']}
Provide analysis in this format:
## Key Themes
- Theme 1: Description
- Theme 2: Description
...
## Supporting Evidence
For each theme, list the evidence from the documents.
## Confidence Assessment
- Overall confidence: HIGH/MEDIUM/LOW
- Reasoning: Why this confidence level
## Information Gaps
What additional research might be needed?
## Recommendations
Based on the analysis, what are the key takeaways?"""
try:
response = llm.invoke(prompt)
# Check analysis quality
analysis_length = len(response.content)
quality_indicator = "detailed" if analysis_length > 500 else "brief"
message: Message = {
"from_agent": "analyzer",
"to_agent": "supervisor",
"message_type": "analysis_complete",
"content": f"Analysis complete ({quality_indicator}, {analysis_length} chars)",
"timestamp": datetime.now().isoformat()
}
return {
"analysis": response.content,
"messages": [message],
"iteration": state["iteration"] + 1,
"current_phase": "analysis"
}
except Exception as e:
error = {
"node": "analyzer",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
return {
"errors": [error],
"next_worker": "error_handler"
}
@traceable(name="writer", run_type="chain", tags=["worker", "llm-call"])
def writer(state: ResearchState) -> dict:
"""
Write professional research report.
Report structure:
- Executive summary
- Key findings
- Methodology
- Detailed analysis
- Conclusions
- References
"""
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
prompt = f"""Write a professional research report based on the following:
Query: {state['query']}
Analysis:
{state['analysis']}
Source Documents: {len(state['documents'])} documents analyzed
Write a complete report with this structure:
# Research Report: [Create appropriate title]
## Executive Summary
A 2-3 paragraph summary of key findings.
## Methodology
How the research was conducted.
## Key Findings
Detailed findings with supporting evidence.
## Analysis
In-depth analysis of the findings.
## Conclusions
Final conclusions and implications.
## Recommendations
Actionable recommendations based on findings.
## References
List sources used in this research.
Make the report professional, well-structured, and comprehensive."""
try:
response = llm.invoke(prompt)
message: Message = {
"from_agent": "writer",
"to_agent": "supervisor",
"message_type": "report_complete",
"content": f"Report drafted ({len(response.content)} chars)",
"timestamp": datetime.now().isoformat()
}
return {
"report": response.content,
"messages": [message],
"iteration": state["iteration"] + 1,
"current_phase": "writing"
}
except Exception as e:
error = {
"node": "writer",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
return {
"errors": [error],
"next_worker": "error_handler"
}
Supervisor Node
"""
Supervisor Node - Coordinates workers and checks quality.
The supervisor:
1. Determines which worker should execute next
2. Checks quality of outputs
3. Decides when to request human review
4. Handles iteration limits
5. Routes to error handler when needed
"""
@traceable(name="supervisor", run_type="chain", tags=["supervisor"])
def supervisor(state: ResearchState) -> dict:
"""
Central coordinator for the research workflow.
Decision Logic:
1. Check iteration limit (safety)
2. Check for errors (recovery)
3. Route based on current progress
4. Assess quality before human review
"""
# Safety: Check iteration limit
if state["iteration"] >= state["max_iterations"]:
message: Message = {
"from_agent": "supervisor",
"to_agent": "system",
"message_type": "iteration_limit",
"content": f"Max iterations ({state['max_iterations']}) reached",
"timestamp": datetime.now().isoformat()
}
return {
"next_worker": "done",
"messages": [message],
"completed_at": datetime.now().isoformat()
}
# Check for recent errors
if state.get("errors") and len(state["errors"]) > 0:
recent_error = state["errors"][-1]
if recent_error.get("handled") != True:
return {"next_worker": "error_handler"}
# Decision logic based on current state
# No documents yet -> need research
if not state.get("documents") or len(state["documents"]) == 0:
return {"next_worker": "researcher"}
# Have documents but no analysis -> need analysis
if not state.get("analysis"):
return {"next_worker": "analyzer"}
# Quality check on analysis
if state.get("analysis"):
analysis_length = len(state["analysis"])
if analysis_length < 200: # Too brief
message: Message = {
"from_agent": "supervisor",
"to_agent": "analyzer",
"message_type": "revision_request",
"content": "Analysis needs more depth and detail",
"timestamp": datetime.now().isoformat()
}
return {
"next_worker": "analyzer",
"feedback": "Please provide more detailed analysis with specific evidence",
"messages": [message],
"analysis": None # Clear for re-analysis
}
# Have analysis but no report -> need writing
if not state.get("report"):
return {"next_worker": "writer"}
# Quality check on report
if state.get("report"):
report_length = len(state["report"])
if report_length < 500: # Too brief
message: Message = {
"from_agent": "supervisor",
"to_agent": "writer",
"message_type": "revision_request",
"content": "Report needs more content",
"timestamp": datetime.now().isoformat()
}
return {
"next_worker": "writer",
"feedback": "Please expand the report with more detail",
"messages": [message],
"report": None # Clear for rewriting
}
# Have report, check if human approval needed
if state.get("require_human_approval", True) and not state.get("approved"):
return {"next_worker": "human_review"}
# Already approved or no approval needed -> finalize
if state.get("approved") or not state.get("require_human_approval", True):
return {"next_worker": "finalize"}
# Default: done
return {
"next_worker": "done",
"completed_at": datetime.now().isoformat()
}
Human Review Node
"""
Human Review Node - Implements human-in-the-loop pattern.
Uses LangGraph's interrupt() for native pause/resume.
The workflow pauses here and waits for human input.
"""
@traceable(name="human_review", run_type="chain", tags=["human-in-loop"])
def human_review(state: ResearchState) -> Command:
"""
Pause for human approval of the research report.
The interrupt() function:
1. Saves current state to checkpoint
2. Returns interrupt data to the caller
3. Pauses execution until resume
Resume options:
- approve: Accept the report
- revise: Request changes with feedback
- reject: Reject and end workflow
"""
# Prepare data for human review
review_data = {
"type": "report_approval",
"query": state["query"],
"report_preview": state["report"][:1000] if state["report"] else "",
"report_length": len(state["report"]) if state["report"] else 0,
"documents_count": len(state.get("documents", [])),
"iterations": state["iteration"],
"options": ["approve", "revise", "reject"],
"instructions": "Please review the report and choose an action"
}
# This pauses execution and returns review_data to caller
decision = interrupt(review_data)
# Execution resumes here when human provides input
action = decision.get("action", "reject")
feedback = decision.get("feedback", "")
notes = decision.get("notes", "")
if action == "approve":
message: Message = {
"from_agent": "human",
"to_agent": "supervisor",
"message_type": "approval",
"content": f"Report approved. Notes: {notes}",
"timestamp": datetime.now().isoformat()
}
return Command(
goto="finalize",
update={
"approved": True,
"approval_notes": notes,
"messages": [message],
"current_phase": "review"
}
)
elif action == "revise":
message: Message = {
"from_agent": "human",
"to_agent": "supervisor",
"message_type": "revision_request",
"content": f"Revision requested: {feedback}",
"timestamp": datetime.now().isoformat()
}
return Command(
goto="supervisor",
update={
"feedback": feedback,
"report": None, # Clear report for rewriting
"analysis": None, # May need re-analysis based on feedback
"messages": [message],
"current_phase": "review"
}
)
else: # reject
message: Message = {
"from_agent": "human",
"to_agent": "system",
"message_type": "rejection",
"content": f"Report rejected. Reason: {feedback}",
"timestamp": datetime.now().isoformat()
}
return Command(
goto=END,
update={
"approved": False,
"approval_notes": f"Rejected: {feedback}",
"messages": [message],
"completed_at": datetime.now().isoformat(),
"current_phase": "complete"
}
)
Finalize and Error Handler Nodes
"""
Finalize Node - Adds final touches to approved reports.
Error Handler Node - Implements error recovery logic.
"""
@traceable(name="finalize", run_type="chain", tags=["finalize"])
def finalize(state: ResearchState) -> dict:
"""
Add final touches to the approved report.
Finalization includes:
- Adding metadata
- Generating summary statistics
- Adding timestamps
- Formatting for output
"""
# Build final report with metadata
metadata = f"""
---
## Report Metadata
- **Generated**: {datetime.now().isoformat()}
- **Query**: {state['query']}
- **Documents Analyzed**: {len(state.get('documents', []))}
- **Iterations**: {state['iteration']}
- **Approved**: {state.get('approved', False)}
- **Approval Notes**: {state.get('approval_notes', 'N/A')}
*Generated by Production Research System*
*Powered by LangGraph*
---
"""
final_report = f"{state['report']}\n\n{metadata}"
message: Message = {
"from_agent": "finalize",
"to_agent": "system",
"message_type": "complete",
"content": "Research workflow completed successfully",
"timestamp": datetime.now().isoformat()
}
return {
"report": final_report,
"messages": [message],
"completed_at": datetime.now().isoformat(),
"current_phase": "complete"
}
@traceable(name="error_handler", run_type="chain", tags=["error-handling"])
def error_handler(state: ResearchState) -> dict:
"""
Handle errors with retry logic.
Error handling strategies:
1. Retry the failed operation
2. Skip to next step if possible
3. Request human intervention for critical errors
4. Log for later analysis
"""
if not state.get("errors"):
# No errors to handle, return to supervisor
return {"next_worker": "supervisor"}
recent_error = state["errors"][-1]
error_count = len(state.get("errors", []))
# Mark error as handled
recent_error["handled"] = True
recent_error["handled_at"] = datetime.now().isoformat()
message: Message = {
"from_agent": "error_handler",
"to_agent": "supervisor",
"message_type": "error_handled",
"content": f"Handled error from {recent_error.get('node', 'unknown')}: {recent_error.get('error', 'unknown')}",
"timestamp": datetime.now().isoformat()
}
# If too many errors, escalate to human
if error_count >= 3:
escalation_message: Message = {
"from_agent": "error_handler",
"to_agent": "human",
"message_type": "escalation",
"content": f"Multiple errors ({error_count}) occurred. Human intervention recommended.",
"timestamp": datetime.now().isoformat()
}
return {
"messages": [message, escalation_message],
"next_worker": "human_review"
}
# Otherwise, retry from supervisor
return {
"messages": [message],
"next_worker": "supervisor"
}
Graph Construction
"""
Graph Construction - Building the workflow.
This demonstrates:
1. Adding nodes
2. Conditional routing from supervisor
3. Edge definitions
4. Entry point configuration
"""
def route_to_worker(state: ResearchState) -> str:
"""Route from supervisor to appropriate worker."""
return state.get("next_worker", "done")
def create_research_system() -> StateGraph:
"""
Build the complete research system graph.
Graph Structure:
- Entry: supervisor
- Supervisor routes to workers based on state
- Workers return to supervisor
- Human review can redirect or end
- Finalize leads to END
"""
graph = StateGraph(ResearchState)
# Add all nodes
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("analyzer", analyzer)
graph.add_node("writer", writer)
graph.add_node("human_review", human_review)
graph.add_node("finalize", finalize)
graph.add_node("error_handler", error_handler)
# Supervisor routes to workers using conditional edges
graph.add_conditional_edges(
"supervisor",
route_to_worker,
{
"researcher": "researcher",
"analyzer": "analyzer",
"writer": "writer",
"quality_check": "supervisor", # Loop back for quality checks
"human_review": "human_review",
"finalize": "finalize",
"error_handler": "error_handler",
"done": END
}
)
# Workers return to supervisor for next decision
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyzer", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("error_handler", "supervisor")
# Finalize ends the workflow
graph.add_edge("finalize", END)
# Note: human_review uses Command to control flow,
# so no explicit edge needed
# Set entry point
graph.set_entry_point("supervisor")
return graph
def get_development_app():
"""
Create app for development/testing with MemorySaver.
Use this for:
- Local development
- Unit testing
- Integration testing
- Quick prototyping
"""
graph = create_research_system()
checkpointer = MemorySaver()
return graph.compile(checkpointer=checkpointer)
def get_production_app():
"""
Create production-ready app with PostgresSaver.
Use this for:
- Production deployments
- Persistent state across restarts
- Long-running workflows
- Multi-process deployments
Requires DATABASE_URL environment variable.
"""
graph = create_research_system()
# Use PostgresSaver for production persistence
checkpointer = PostgresSaver.from_conn_string(
os.environ["DATABASE_URL"]
)
return graph.compile(checkpointer=checkpointer)
Usage Examples
"""
Usage Examples - Running the research system.
"""
def run_research_workflow(query: str, thread_id: str = None):
"""
Run a complete research workflow.
Args:
query: Research topic/question
thread_id: Unique identifier for this workflow
"""
import uuid
# Use development app for this example
app = get_development_app()
# Generate thread ID if not provided
thread_id = thread_id or f"research-{uuid.uuid4().hex[:8]}"
config = {"configurable": {"thread_id": thread_id}}
# Initial state
initial_state = {
"query": query,
"max_iterations": 10,
"quality_threshold": 70,
"require_human_approval": True,
"documents": [],
"messages": [],
"errors": [],
"analysis": None,
"report": None,
"quality_score": None,
"next_worker": "researcher",
"current_phase": "research",
"iteration": 0,
"started_at": datetime.now().isoformat(),
"completed_at": None,
"approved": False,
"approval_notes": None,
"feedback": None
}
print(f"Starting research workflow: {thread_id}")
print(f"Query: {query}")
print("-" * 50)
# Run until human review or completion
result = app.invoke(initial_state, config)
# Check if paused for human review
if not result.get("approved") and result.get("report"):
print("\n=== PAUSED FOR HUMAN REVIEW ===")
print(f"Report Preview:\n{result['report'][:500]}...")
print(f"\nThread ID for resume: {thread_id}")
return result, thread_id
print("\n=== WORKFLOW COMPLETE ===")
return result, thread_id
def resume_with_approval(thread_id: str, action: str = "approve", feedback: str = ""):
"""
Resume a paused workflow with human decision.
Args:
thread_id: The workflow thread ID
action: "approve", "revise", or "reject"
feedback: Optional feedback for revisions
"""
app = get_development_app()
config = {"configurable": {"thread_id": thread_id}}
# Resume with human decision
result = app.invoke(
Command(resume={
"action": action,
"feedback": feedback,
"notes": f"Decision made at {datetime.now().isoformat()}"
}),
config
)
if action == "approve":
print("=== REPORT APPROVED ===")
print(result.get("report", "No report"))
elif action == "revise":
print("=== REVISION REQUESTED ===")
print("Workflow will continue with feedback")
else:
print("=== REPORT REJECTED ===")
return result
# Example execution
if __name__ == "__main__":
# Run initial workflow
result, thread_id = run_research_workflow(
"Impact of Large Language Models on Software Development in 2026"
)
# If paused, simulate human approval
if not result.get("approved") and result.get("report"):
print("\n[Simulating human approval...]")
final_result = resume_with_approval(thread_id, "approve")
print("\nFinal Report:")
print(final_result.get("report", "No report available"))
Testing the System
Unit Tests
"""
Unit Tests for Production Research System
Test individual components in isolation.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime
class TestSupervisorLogic:
"""Test supervisor decision making."""
def test_supervisor_routes_to_researcher_when_no_documents(self):
"""Supervisor should route to researcher when documents are empty."""
state = {
"query": "Test query",
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 10,
"errors": [],
"approved": False,
"require_human_approval": True
}
result = supervisor(state)
assert result["next_worker"] == "researcher"
def test_supervisor_routes_to_analyzer_when_has_documents(self):
"""Supervisor should route to analyzer when documents exist but no analysis."""
state = {
"query": "Test query",
"documents": [{"content": "test doc", "source": "test"}],
"analysis": None,
"report": None,
"iteration": 1,
"max_iterations": 10,
"errors": [],
"approved": False,
"require_human_approval": True
}
result = supervisor(state)
assert result["next_worker"] == "analyzer"
def test_supervisor_routes_to_writer_when_has_analysis(self):
"""Supervisor should route to writer when analysis exists but no report."""
state = {
"query": "Test query",
"documents": [{"content": "test doc", "source": "test"}],
"analysis": "A" * 250, # Detailed analysis
"report": None,
"iteration": 2,
"max_iterations": 10,
"errors": [],
"approved": False,
"require_human_approval": True
}
result = supervisor(state)
assert result["next_worker"] == "writer"
def test_supervisor_routes_to_human_review_when_has_report(self):
"""Supervisor should route to human review when report exists."""
state = {
"query": "Test query",
"documents": [{"content": "test doc", "source": "test"}],
"analysis": "A" * 250,
"report": "R" * 600, # Detailed report
"iteration": 3,
"max_iterations": 10,
"errors": [],
"approved": False,
"require_human_approval": True
}
result = supervisor(state)
assert result["next_worker"] == "human_review"
def test_supervisor_respects_max_iterations(self):
"""Supervisor should end workflow when max iterations reached."""
state = {
"query": "Test query",
"documents": [],
"analysis": None,
"report": None,
"iteration": 10,
"max_iterations": 10,
"errors": [],
"approved": False,
"require_human_approval": True
}
result = supervisor(state)
assert result["next_worker"] == "done"
def test_supervisor_routes_to_error_handler_on_unhandled_error(self):
"""Supervisor should route to error handler when errors exist."""
state = {
"query": "Test query",
"documents": [],
"analysis": None,
"report": None,
"iteration": 1,
"max_iterations": 10,
"errors": [{"node": "researcher", "error": "API Error"}],
"approved": False,
"require_human_approval": True
}
result = supervisor(state)
assert result["next_worker"] == "error_handler"
def test_supervisor_requests_more_detailed_analysis(self):
"""Supervisor should request revision when analysis is too brief."""
state = {
"query": "Test query",
"documents": [{"content": "test doc", "source": "test"}],
"analysis": "Brief", # Too short
"report": None,
"iteration": 2,
"max_iterations": 10,
"errors": [],
"approved": False,
"require_human_approval": True
}
result = supervisor(state)
assert result["next_worker"] == "analyzer"
assert "feedback" in result
class TestResearcher:
"""Test researcher node."""
@patch("langchain_openai.ChatOpenAI")
def test_researcher_creates_documents(self, mock_llm_class):
"""Researcher should create document from LLM response."""
# Mock LLM
mock_llm = Mock()
mock_response = Mock()
mock_response.content = '{"facts": [{"fact": "Test fact"}], "summary": "Test"}'
mock_llm.invoke.return_value = mock_response
mock_llm_class.return_value = mock_llm
state = {
"query": "Test query",
"documents": [],
"iteration": 0,
"feedback": None
}
result = researcher(state)
assert len(result["documents"]) == 1
assert result["documents"][0]["source"] == "llm_research"
assert result["iteration"] == 1
assert len(result["messages"]) == 1
@patch("langchain_openai.ChatOpenAI")
def test_researcher_handles_errors(self, mock_llm_class):
"""Researcher should handle LLM errors gracefully."""
mock_llm = Mock()
mock_llm.invoke.side_effect = Exception("API Error")
mock_llm_class.return_value = mock_llm
state = {
"query": "Test query",
"documents": [],
"iteration": 0,
"feedback": None
}
result = researcher(state)
assert len(result["errors"]) == 1
assert result["next_worker"] == "error_handler"
class TestErrorHandler:
"""Test error handler node."""
def test_error_handler_marks_error_as_handled(self):
"""Error handler should mark errors as handled."""
state = {
"errors": [{"node": "researcher", "error": "Test error"}]
}
result = error_handler(state)
assert result["next_worker"] == "supervisor"
assert len(result["messages"]) == 1
def test_error_handler_escalates_after_multiple_errors(self):
"""Error handler should escalate after 3+ errors."""
state = {
"errors": [
{"node": "researcher", "error": "Error 1"},
{"node": "analyzer", "error": "Error 2"},
{"node": "writer", "error": "Error 3"}
]
}
result = error_handler(state)
assert result["next_worker"] == "human_review"
assert len(result["messages"]) == 2 # Handled + escalation
Integration Tests
"""
Integration Tests for Production Research System
Test the complete workflow with mocked LLM.
"""
import pytest
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
from unittest.mock import patch, Mock
class TestResearchWorkflowIntegration:
"""Integration tests for complete workflow."""
@pytest.fixture
def mock_openai(self):
"""Mock ChatOpenAI for all tests."""
with patch("langchain_openai.ChatOpenAI") as mock_class:
mock_instance = Mock()
# Create different responses for different prompts
def invoke_side_effect(prompt):
response = Mock()
if "research assistant" in prompt.lower():
response.content = '{"facts": [{"fact": "Test finding"}], "summary": "Research summary"}'
elif "analyze" in prompt.lower():
response.content = "## Key Themes\n- Theme 1\n\n## Analysis\n" + "A" * 300
elif "write" in prompt.lower():
response.content = "# Research Report\n\n## Executive Summary\n" + "R" * 500
else:
response.content = "Default response"
return response
mock_instance.invoke.side_effect = invoke_side_effect
mock_class.return_value = mock_instance
yield mock_instance
@pytest.fixture
def app(self, mock_openai):
"""Create compiled app with MemorySaver."""
graph = create_research_system()
return graph.compile(checkpointer=MemorySaver())
def test_complete_workflow_with_approval(self, app):
"""Test complete workflow ending with human approval."""
config = {"configurable": {"thread_id": "test-complete-workflow"}}
initial_state = {
"query": "Test research query",
"max_iterations": 10,
"quality_threshold": 70,
"require_human_approval": True,
"documents": [],
"messages": [],
"errors": [],
"analysis": None,
"report": None,
"quality_score": None,
"next_worker": "researcher",
"current_phase": "research",
"iteration": 0,
"started_at": datetime.now().isoformat(),
"completed_at": None,
"approved": False,
"approval_notes": None,
"feedback": None
}
# Run until human review
result = app.invoke(initial_state, config)
# Should have documents, analysis, and report
assert len(result["documents"]) > 0
assert result["analysis"] is not None
assert result["report"] is not None
assert result["approved"] is False # Waiting for approval
# Resume with approval
final_result = app.invoke(
Command(resume={"action": "approve", "notes": "Looks good!"}),
config
)
assert final_result["approved"] is True
assert "Generated by Production Research System" in final_result["report"]
def test_workflow_with_revision_request(self, app):
"""Test workflow when human requests revision."""
config = {"configurable": {"thread_id": "test-revision-workflow"}}
initial_state = {
"query": "Test research query",
"max_iterations": 15, # More iterations for revision
"quality_threshold": 70,
"require_human_approval": True,
"documents": [],
"messages": [],
"errors": [],
"analysis": None,
"report": None,
"quality_score": None,
"next_worker": "researcher",
"current_phase": "research",
"iteration": 0,
"started_at": datetime.now().isoformat(),
"completed_at": None,
"approved": False,
"approval_notes": None,
"feedback": None
}
# Run until human review
result = app.invoke(initial_state, config)
initial_iteration = result["iteration"]
# Request revision
revised_result = app.invoke(
Command(resume={"action": "revise", "feedback": "Add more details"}),
config
)
# Should have more iterations after revision
assert revised_result["iteration"] > initial_iteration
def test_workflow_without_human_approval(self, app):
"""Test workflow that doesn't require human approval."""
config = {"configurable": {"thread_id": "test-no-approval-workflow"}}
initial_state = {
"query": "Test research query",
"max_iterations": 10,
"quality_threshold": 70,
"require_human_approval": False, # No human approval needed
"documents": [],
"messages": [],
"errors": [],
"analysis": None,
"report": None,
"quality_score": None,
"next_worker": "researcher",
"current_phase": "research",
"iteration": 0,
"started_at": datetime.now().isoformat(),
"completed_at": None,
"approved": False,
"approval_notes": None,
"feedback": None
}
# Should complete without pausing
result = app.invoke(initial_state, config)
assert result["completed_at"] is not None
assert "Generated by Production Research System" in result["report"]
def test_state_history_tracking(self, app):
"""Test that state history is properly tracked."""
config = {"configurable": {"thread_id": "test-history-workflow"}}
initial_state = {
"query": "Test query",
"max_iterations": 10,
"quality_threshold": 70,
"require_human_approval": False,
"documents": [],
"messages": [],
"errors": [],
"analysis": None,
"report": None,
"quality_score": None,
"next_worker": "researcher",
"current_phase": "research",
"iteration": 0,
"started_at": datetime.now().isoformat(),
"completed_at": None,
"approved": False,
"approval_notes": None,
"feedback": None
}
app.invoke(initial_state, config)
# Check state history
history = list(app.get_state_history(config))
assert len(history) > 0
# Verify progression through nodes
nodes_visited = [
state.metadata.get("source", "unknown")
for state in history
]
# Should have visited supervisor multiple times
assert nodes_visited.count("supervisor") >= 1
Deployment Checklist
Pre-Deployment Verification
## Pre-Deployment Checklist
### Code Quality
- [ ] All unit tests passing
- [ ] All integration tests passing
- [ ] Code review completed
- [ ] No hardcoded secrets or API keys
- [ ] Type hints complete and accurate
### Infrastructure
- [ ] PostgreSQL database provisioned
- [ ] DATABASE_URL environment variable set
- [ ] Connection pooling configured
- [ ] Database migrations applied
### Observability
- [ ] LANGCHAIN_TRACING_V2=true set
- [ ] LANGCHAIN_API_KEY configured
- [ ] LANGCHAIN_PROJECT set appropriately
- [ ] LangSmith project created
### Security
- [ ] API keys in secrets manager
- [ ] Rate limiting configured
- [ ] Input validation in place
- [ ] Error messages don't leak sensitive info
### Performance
- [ ] max_iterations set appropriately
- [ ] LLM model selection optimized for use case
- [ ] Timeout handling implemented
- [ ] Retry logic configured
Production Monitoring Setup
"""
Production Monitoring Configuration
Set up alerts and dashboards for the research system.
"""
from langsmith import Client
from datetime import datetime, timedelta
def setup_monitoring():
"""
Configure LangSmith monitoring for production.
Monitors:
- Error rates
- Latency percentiles
- Token usage
- Human review wait times
"""
client = Client()
# Example: Query recent runs for monitoring
recent_runs = client.list_runs(
project_name="production-research-system",
start_time=datetime.now() - timedelta(hours=24)
)
metrics = {
"total_runs": 0,
"errors": 0,
"avg_latency_ms": 0,
"total_tokens": 0
}
latencies = []
for run in recent_runs:
metrics["total_runs"] += 1
if run.error:
metrics["errors"] += 1
if run.latency_ms:
latencies.append(run.latency_ms)
if run.total_tokens:
metrics["total_tokens"] += run.total_tokens
if latencies:
metrics["avg_latency_ms"] = sum(latencies) / len(latencies)
metrics["p95_latency_ms"] = sorted(latencies)[int(len(latencies) * 0.95)]
metrics["error_rate"] = (
metrics["errors"] / metrics["total_runs"]
if metrics["total_runs"] > 0
else 0
)
return metrics
def create_alert_rules():
"""
Define alerting rules for the research system.
Returns alert configurations for:
- High error rate
- High latency
- Stuck workflows
"""
return {
"high_error_rate": {
"condition": "error_rate > 0.05", # 5% error rate
"severity": "critical",
"action": "page_oncall"
},
"high_latency": {
"condition": "p95_latency_ms > 60000", # 60 seconds
"severity": "warning",
"action": "notify_slack"
},
"stuck_workflow": {
"condition": "workflow_age > 3600", # 1 hour without progress
"severity": "warning",
"action": "notify_slack"
},
"excessive_iterations": {
"condition": "iteration > max_iterations * 0.8",
"severity": "warning",
"action": "log_warning"
}
}
Interview Questions
Q1: Walk through the architecture of this research system.
Answer:
"This is a multi-agent supervisor pattern with five key components:
1. State Management
class ResearchState(TypedDict):
# Accumulating data with reducers
documents: Annotated[list[dict], operator.add]
messages: Annotated[list[dict], operator.add]
# Worker outputs
analysis: Optional[str]
report: Optional[str]
# Control flow
next_worker: Literal['researcher', 'analyzer', ...]
The state uses reducers for lists that accumulate across iterations.
2. Supervisor Pattern
The supervisor is the central coordinator:
- Evaluates current state
- Decides next worker
- Performs quality checks
- Handles iteration limits
- Routes errors appropriately
3. Worker Nodes
Each worker is specialized:
- Researcher: Gathers information using LLM (production would use search APIs)
- Analyzer: Extracts insights from documents
- Writer: Creates professional reports
Workers follow a consistent pattern:
- Receive full state
- Return partial updates
- Add messages for tracking
- Handle errors gracefully
4. Human-in-the-Loop
Uses LangGraph's native interrupt():
decision = interrupt({
'type': 'report_approval',
'report_preview': state['report'][:1000]
})
# Execution pauses here until resume
Human can approve, request revision, or reject.
5. Error Recovery
The error handler:
- Logs errors for debugging
- Marks errors as handled
- Retries from supervisor
- Escalates after multiple failures
The flow is: Supervisor -> Worker -> Supervisor -> ... -> Human Review -> Finalize -> END"
Q2: How does checkpointing enable pause/resume in this system?
Answer:
"Checkpointing is fundamental to the human-in-the-loop pattern:
How It Works:
- Every node execution saves a checkpoint:
# PostgresSaver persists state to database
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
app = graph.compile(checkpointer=checkpointer)
- interrupt() triggers checkpoint save:
def human_review(state):
decision = interrupt({...}) # State saved here
# Execution pauses, waiting for resume
- Resume uses thread_id to find checkpoint:
config = {'configurable': {'thread_id': 'research-001'}}
app.invoke(Command(resume={'action': 'approve'}), config)
Why This Matters:
- Persistence: Workflow survives server restarts
- Async Human Input: Users can take hours to review
- State Recovery: Can resume from any checkpoint
- Time Travel: Can replay from earlier checkpoints
Production Pattern:
# Start workflow
result = app.invoke(initial_state, config)
# Returns immediately when hitting interrupt()
# Hours later, human approves
result = app.invoke(Command(resume={'action': 'approve'}), config)
# Continues from where it paused
The key insight is that checkpointing separates execution from storage - the graph logic doesn't change, only where state is persisted."
Q3: How would you scale this system for production?
Answer:
"Production scaling involves several layers:
1. Database Layer
# Connection pooling for PostgresSaver
from psycopg_pool import ConnectionPool
pool = ConnectionPool(
conninfo=DATABASE_URL,
min_size=5,
max_size=20
)
checkpointer = PostgresSaver(pool)
2. Horizontal Scaling
Since state is in PostgreSQL, multiple workers can process different workflows:
# Kubernetes deployment
replicas: 5
resources:
limits:
cpu: '2'
memory: '4Gi'
Each pod handles different thread_ids independently.
3. Rate Limiting LLM Calls
from tenacity import retry, wait_exponential
@retry(wait=wait_exponential(min=1, max=60))
def call_llm(prompt):
return llm.invoke(prompt)
4. Queue-Based Processing
For high throughput, use a queue:
# Celery task
@celery_app.task
def process_research(query, thread_id):
app.invoke(initial_state, config)
5. Caching
Cache repeated research queries:
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_research(query_hash):
return researcher(state)
6. Monitoring
# Prometheus metrics
research_duration = Histogram('research_duration_seconds')
research_errors = Counter('research_errors_total')
The key principle is stateless workers + stateful storage - workers don't hold state, PostgreSQL does."
Q4: How do you test workflows with human-in-the-loop?
Answer:
"Testing HITL workflows requires simulating human decisions:
1. Unit Test the Human Review Node
def test_human_review_approve():
state = {'report': 'Test report...', 'query': 'Test'}
# The interrupt() will be called
with pytest.raises(GraphInterrupt) as exc:
human_review(state)
# Verify interrupt data
interrupt_data = exc.value.data
assert interrupt_data['type'] == 'report_approval'
2. Integration Test with Resume
def test_complete_approval_flow():
app = get_development_app()
config = {'configurable': {'thread_id': 'test-approval'}}
# Run until pause
result = app.invoke(initial_state, config)
assert result['approved'] is False
# Resume with approval
final = app.invoke(
Command(resume={'action': 'approve'}),
config
)
assert final['approved'] is True
3. Test All Decision Paths
@pytest.mark.parametrize('action,expected_approved', [
('approve', True),
('reject', False),
])
def test_human_decisions(action, expected_approved):
# Run workflow
result = app.invoke(initial_state, config)
# Resume with decision
final = app.invoke(
Command(resume={'action': action}),
config
)
assert final['approved'] == expected_approved
4. Test Revision Loop
def test_revision_loop():
# Run to human review
result = app.invoke(initial_state, config)
iteration_before = result['iteration']
# Request revision
revised = app.invoke(
Command(resume={'action': 'revise', 'feedback': 'More detail'}),
config
)
# Should have done more work
assert revised['iteration'] > iteration_before
The key is using Command(resume=...) to simulate human input in tests."
Key Takeaways
| Concept | Implementation | Benefit |
|---|---|---|
| State Reducers | Annotated[list, operator.add] |
Accumulate data across iterations |
| Supervisor Pattern | Central routing node | Coordinate multiple workers |
| Checkpointing | PostgresSaver | Persist state for long workflows |
| Human-in-the-Loop | interrupt() + Command(resume=) |
Native pause/resume |
| Error Recovery | Error handler node | Graceful degradation |
| Observability | @traceable decorator |
LangSmith tracing |
| Testing | MemorySaver + mocked LLM | Fast, reliable tests |
Course Completion
Congratulations on completing the LangGraph Deep Dive course! You now have the knowledge to build production-ready multi-agent systems with:
- State Management: TypedDict schemas with reducers
- Graph Patterns: Supervisor, parallel execution, subgraphs
- Persistence: Checkpointing with PostgresSaver
- Human Integration: Native pause/resume with interrupts
- Deployment: Docker, Kubernetes, LangGraph Platform
- Testing: Comprehensive test strategies
- Debugging: Visualization and LangSmith tracing
Next Steps:
- Build your own multi-agent system
- Explore LangGraph Platform for managed deployment
- Contribute to the LangGraph community
- Stay updated with LangGraph releases
Complete the module quiz to earn your course completion certificate!
:::