Testing & Capstone Project

Capstone: Production Research System

8 min read

This capstone project brings together all concepts from the course to build a production-ready multi-agent research system. You will implement state management with reducers, supervisor pattern coordination, checkpointing for persistence, human-in-the-loop approval, error recovery, and LangSmith observability.


Project Overview

The Production Research System is a multi-agent workflow that:

  1. Researches topics using multiple sources
  2. Analyzes gathered information for key insights
  3. Writes professional reports
  4. Reviews quality through a supervisor
  5. Approves via human-in-the-loop
  6. Persists state for long-running workflows
  7. Traces execution for debugging and monitoring

This system demonstrates real-world patterns used in production AI applications.


System Architecture

                    +------------------+
                    |    Supervisor    |
                    |   (Coordinator)  |
                    +--------+---------+
                             |
        +--------------------+--------------------+
        |                    |                    |
        v                    v                    v
+-------+--------+   +-------+--------+   +-------+--------+
|   Researcher   |   |    Analyzer    |   |     Writer     |
| (Gather Data)  |   | (Extract Insights) | (Draft Report) |
+----------------+   +----------------+   +----------------+
        |                    |                    |
        +--------------------+--------------------+
                             |
                             v
                    +--------+---------+
                    |   Human Review   |
                    |   (Approve/Revise)|
                    +--------+---------+
                             |
                             v
                    +--------+---------+
                    |     Finalize     |
                    |  (Add Metadata)  |
                    +------------------+

Complete Implementation

State Definition

"""
Production Research System - State Definition

This state schema demonstrates:
1. Input fields for configuration
2. Accumulated outputs using reducers
3. Worker output fields
4. Control fields for routing
5. Metadata for tracking

Key Design Decisions:
- Use Annotated with operator.add for list accumulation
- Use Optional for fields that may not exist initially
- Use Literal for type-safe routing
- Include iteration tracking for loop prevention
"""

from typing import TypedDict, Annotated, Literal, Optional
from langgraph.graph import StateGraph, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langsmith import traceable
import operator
import os
from datetime import datetime
import json


# Enable LangSmith tracing for production observability
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production-research-system"


class ResearchDocument(TypedDict):
    """Structure for research documents."""
    source: str
    content: str
    query: str
    timestamp: str
    relevance_score: Optional[float]


class Message(TypedDict):
    """Structure for inter-agent messages."""
    from_agent: str
    to_agent: str
    message_type: str
    content: str
    timestamp: str


class ResearchState(TypedDict):
    """
    Complete state for the production research system.

    Categories:
    1. Input Configuration
    2. Accumulated Data (with reducers)
    3. Worker Outputs
    4. Control Flow
    5. Tracking/Metadata
    """

    # === Input Configuration ===
    query: str                          # Research query from user
    max_iterations: int                  # Safety limit for iterations
    quality_threshold: int               # Minimum quality score (0-100)
    require_human_approval: bool         # Whether human review is required

    # === Accumulated Data (Reducers) ===
    documents: Annotated[list[ResearchDocument], operator.add]
    messages: Annotated[list[Message], operator.add]
    errors: Annotated[list[dict], operator.add]

    # === Worker Outputs ===
    analysis: Optional[str]              # Analysis from analyzer
    report: Optional[str]                # Report from writer
    quality_score: Optional[int]         # Quality assessment (0-100)

    # === Control Flow ===
    next_worker: Literal[
        "researcher",
        "analyzer",
        "writer",
        "quality_check",
        "human_review",
        "finalize",
        "error_handler",
        "done"
    ]
    current_phase: Literal["research", "analysis", "writing", "review", "complete"]

    # === Tracking/Metadata ===
    iteration: int
    started_at: str
    completed_at: Optional[str]
    approved: bool
    approval_notes: Optional[str]
    feedback: Optional[str]

Worker Nodes

"""
Worker Nodes - The agents that perform actual work.

Each worker:
1. Is decorated with @traceable for LangSmith visibility
2. Receives complete state, returns partial updates
3. Adds messages for inter-agent communication
4. Handles errors gracefully
"""


@traceable(name="researcher", run_type="chain", tags=["worker", "llm-call"])
def researcher(state: ResearchState) -> dict:
    """
    Gather research documents based on query.

    In production, this would:
    - Call search APIs (Google, Bing, academic databases)
    - Scrape relevant websites
    - Query internal knowledge bases
    - Retrieve from vector stores
    """
    llm = ChatOpenAI(model="gpt-4o", temperature=0)

    # Use feedback if revision was requested
    query = state["query"]
    if state.get("feedback"):
        query = f"{query}\n\nAdditional focus based on feedback: {state['feedback']}"

    # Simulate research with LLM (in production, use real search APIs)
    prompt = f"""You are a research assistant. Find key facts and information about:

Query: {query}

Provide your findings as a JSON object with this structure:
{{
    "facts": [
        {{"fact": "...", "source": "...", "confidence": "high/medium/low"}},
        ...
    ],
    "summary": "Brief summary of findings"
}}

Include 3-5 key facts with sources."""

    try:
        response = llm.invoke(prompt)

        # Create research document
        doc: ResearchDocument = {
            "source": "llm_research",
            "content": response.content,
            "query": query,
            "timestamp": datetime.now().isoformat(),
            "relevance_score": 0.85  # In production, calculate actual relevance
        }

        # Create status message
        message: Message = {
            "from_agent": "researcher",
            "to_agent": "supervisor",
            "message_type": "status",
            "content": f"Found research findings for: {query[:50]}...",
            "timestamp": datetime.now().isoformat()
        }

        return {
            "documents": [doc],
            "messages": [message],
            "iteration": state["iteration"] + 1,
            "current_phase": "research"
        }

    except Exception as e:
        # Log error for recovery
        error = {
            "node": "researcher",
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }
        return {
            "errors": [error],
            "next_worker": "error_handler"
        }


@traceable(name="analyzer", run_type="chain", tags=["worker", "llm-call"])
def analyzer(state: ResearchState) -> dict:
    """
    Analyze gathered documents for insights.

    Analysis includes:
    - Key theme extraction
    - Evidence evaluation
    - Confidence assessment
    - Gap identification
    """
    llm = ChatOpenAI(model="gpt-4o", temperature=0)

    # Combine all document contents
    doc_contents = "\n\n---\n\n".join([
        f"Source: {doc['source']}\nContent: {doc['content']}"
        for doc in state["documents"]
    ])

    prompt = f"""Analyze these research findings and provide a structured analysis:

Research Documents:
{doc_contents}

Original Query: {state['query']}

Provide analysis in this format:

## Key Themes
- Theme 1: Description
- Theme 2: Description
...

## Supporting Evidence
For each theme, list the evidence from the documents.

## Confidence Assessment
- Overall confidence: HIGH/MEDIUM/LOW
- Reasoning: Why this confidence level

## Information Gaps
What additional research might be needed?

## Recommendations
Based on the analysis, what are the key takeaways?"""

    try:
        response = llm.invoke(prompt)

        # Check analysis quality
        analysis_length = len(response.content)
        quality_indicator = "detailed" if analysis_length > 500 else "brief"

        message: Message = {
            "from_agent": "analyzer",
            "to_agent": "supervisor",
            "message_type": "analysis_complete",
            "content": f"Analysis complete ({quality_indicator}, {analysis_length} chars)",
            "timestamp": datetime.now().isoformat()
        }

        return {
            "analysis": response.content,
            "messages": [message],
            "iteration": state["iteration"] + 1,
            "current_phase": "analysis"
        }

    except Exception as e:
        error = {
            "node": "analyzer",
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }
        return {
            "errors": [error],
            "next_worker": "error_handler"
        }


@traceable(name="writer", run_type="chain", tags=["worker", "llm-call"])
def writer(state: ResearchState) -> dict:
    """
    Write professional research report.

    Report structure:
    - Executive summary
    - Key findings
    - Methodology
    - Detailed analysis
    - Conclusions
    - References
    """
    llm = ChatOpenAI(model="gpt-4o", temperature=0.3)

    prompt = f"""Write a professional research report based on the following:

Query: {state['query']}

Analysis:
{state['analysis']}

Source Documents: {len(state['documents'])} documents analyzed

Write a complete report with this structure:

# Research Report: [Create appropriate title]

## Executive Summary
A 2-3 paragraph summary of key findings.

## Methodology
How the research was conducted.

## Key Findings
Detailed findings with supporting evidence.

## Analysis
In-depth analysis of the findings.

## Conclusions
Final conclusions and implications.

## Recommendations
Actionable recommendations based on findings.

## References
List sources used in this research.

Make the report professional, well-structured, and comprehensive."""

    try:
        response = llm.invoke(prompt)

        message: Message = {
            "from_agent": "writer",
            "to_agent": "supervisor",
            "message_type": "report_complete",
            "content": f"Report drafted ({len(response.content)} chars)",
            "timestamp": datetime.now().isoformat()
        }

        return {
            "report": response.content,
            "messages": [message],
            "iteration": state["iteration"] + 1,
            "current_phase": "writing"
        }

    except Exception as e:
        error = {
            "node": "writer",
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }
        return {
            "errors": [error],
            "next_worker": "error_handler"
        }

Supervisor Node

"""
Supervisor Node - Coordinates workers and checks quality.

The supervisor:
1. Determines which worker should execute next
2. Checks quality of outputs
3. Decides when to request human review
4. Handles iteration limits
5. Routes to error handler when needed
"""


@traceable(name="supervisor", run_type="chain", tags=["supervisor"])
def supervisor(state: ResearchState) -> dict:
    """
    Central coordinator for the research workflow.

    Decision Logic:
    1. Check iteration limit (safety)
    2. Check for errors (recovery)
    3. Route based on current progress
    4. Assess quality before human review
    """

    # Safety: Check iteration limit
    if state["iteration"] >= state["max_iterations"]:
        message: Message = {
            "from_agent": "supervisor",
            "to_agent": "system",
            "message_type": "iteration_limit",
            "content": f"Max iterations ({state['max_iterations']}) reached",
            "timestamp": datetime.now().isoformat()
        }
        return {
            "next_worker": "done",
            "messages": [message],
            "completed_at": datetime.now().isoformat()
        }

    # Check for recent errors
    if state.get("errors") and len(state["errors"]) > 0:
        recent_error = state["errors"][-1]
        if recent_error.get("handled") != True:
            return {"next_worker": "error_handler"}

    # Decision logic based on current state

    # No documents yet -> need research
    if not state.get("documents") or len(state["documents"]) == 0:
        return {"next_worker": "researcher"}

    # Have documents but no analysis -> need analysis
    if not state.get("analysis"):
        return {"next_worker": "analyzer"}

    # Quality check on analysis
    if state.get("analysis"):
        analysis_length = len(state["analysis"])
        if analysis_length < 200:  # Too brief
            message: Message = {
                "from_agent": "supervisor",
                "to_agent": "analyzer",
                "message_type": "revision_request",
                "content": "Analysis needs more depth and detail",
                "timestamp": datetime.now().isoformat()
            }
            return {
                "next_worker": "analyzer",
                "feedback": "Please provide more detailed analysis with specific evidence",
                "messages": [message],
                "analysis": None  # Clear for re-analysis
            }

    # Have analysis but no report -> need writing
    if not state.get("report"):
        return {"next_worker": "writer"}

    # Quality check on report
    if state.get("report"):
        report_length = len(state["report"])
        if report_length < 500:  # Too brief
            message: Message = {
                "from_agent": "supervisor",
                "to_agent": "writer",
                "message_type": "revision_request",
                "content": "Report needs more content",
                "timestamp": datetime.now().isoformat()
            }
            return {
                "next_worker": "writer",
                "feedback": "Please expand the report with more detail",
                "messages": [message],
                "report": None  # Clear for rewriting
            }

    # Have report, check if human approval needed
    if state.get("require_human_approval", True) and not state.get("approved"):
        return {"next_worker": "human_review"}

    # Already approved or no approval needed -> finalize
    if state.get("approved") or not state.get("require_human_approval", True):
        return {"next_worker": "finalize"}

    # Default: done
    return {
        "next_worker": "done",
        "completed_at": datetime.now().isoformat()
    }

Human Review Node

"""
Human Review Node - Implements human-in-the-loop pattern.

Uses LangGraph's interrupt() for native pause/resume.
The workflow pauses here and waits for human input.
"""


@traceable(name="human_review", run_type="chain", tags=["human-in-loop"])
def human_review(state: ResearchState) -> Command:
    """
    Pause for human approval of the research report.

    The interrupt() function:
    1. Saves current state to checkpoint
    2. Returns interrupt data to the caller
    3. Pauses execution until resume

    Resume options:
    - approve: Accept the report
    - revise: Request changes with feedback
    - reject: Reject and end workflow
    """

    # Prepare data for human review
    review_data = {
        "type": "report_approval",
        "query": state["query"],
        "report_preview": state["report"][:1000] if state["report"] else "",
        "report_length": len(state["report"]) if state["report"] else 0,
        "documents_count": len(state.get("documents", [])),
        "iterations": state["iteration"],
        "options": ["approve", "revise", "reject"],
        "instructions": "Please review the report and choose an action"
    }

    # This pauses execution and returns review_data to caller
    decision = interrupt(review_data)

    # Execution resumes here when human provides input
    action = decision.get("action", "reject")
    feedback = decision.get("feedback", "")
    notes = decision.get("notes", "")

    if action == "approve":
        message: Message = {
            "from_agent": "human",
            "to_agent": "supervisor",
            "message_type": "approval",
            "content": f"Report approved. Notes: {notes}",
            "timestamp": datetime.now().isoformat()
        }
        return Command(
            goto="finalize",
            update={
                "approved": True,
                "approval_notes": notes,
                "messages": [message],
                "current_phase": "review"
            }
        )

    elif action == "revise":
        message: Message = {
            "from_agent": "human",
            "to_agent": "supervisor",
            "message_type": "revision_request",
            "content": f"Revision requested: {feedback}",
            "timestamp": datetime.now().isoformat()
        }
        return Command(
            goto="supervisor",
            update={
                "feedback": feedback,
                "report": None,      # Clear report for rewriting
                "analysis": None,    # May need re-analysis based on feedback
                "messages": [message],
                "current_phase": "review"
            }
        )

    else:  # reject
        message: Message = {
            "from_agent": "human",
            "to_agent": "system",
            "message_type": "rejection",
            "content": f"Report rejected. Reason: {feedback}",
            "timestamp": datetime.now().isoformat()
        }
        return Command(
            goto=END,
            update={
                "approved": False,
                "approval_notes": f"Rejected: {feedback}",
                "messages": [message],
                "completed_at": datetime.now().isoformat(),
                "current_phase": "complete"
            }
        )

Finalize and Error Handler Nodes

"""
Finalize Node - Adds final touches to approved reports.
Error Handler Node - Implements error recovery logic.
"""


@traceable(name="finalize", run_type="chain", tags=["finalize"])
def finalize(state: ResearchState) -> dict:
    """
    Add final touches to the approved report.

    Finalization includes:
    - Adding metadata
    - Generating summary statistics
    - Adding timestamps
    - Formatting for output
    """

    # Build final report with metadata
    metadata = f"""
---
## Report Metadata

- **Generated**: {datetime.now().isoformat()}
- **Query**: {state['query']}
- **Documents Analyzed**: {len(state.get('documents', []))}
- **Iterations**: {state['iteration']}
- **Approved**: {state.get('approved', False)}
- **Approval Notes**: {state.get('approval_notes', 'N/A')}

*Generated by Production Research System*
*Powered by LangGraph*
---
"""

    final_report = f"{state['report']}\n\n{metadata}"

    message: Message = {
        "from_agent": "finalize",
        "to_agent": "system",
        "message_type": "complete",
        "content": "Research workflow completed successfully",
        "timestamp": datetime.now().isoformat()
    }

    return {
        "report": final_report,
        "messages": [message],
        "completed_at": datetime.now().isoformat(),
        "current_phase": "complete"
    }


@traceable(name="error_handler", run_type="chain", tags=["error-handling"])
def error_handler(state: ResearchState) -> dict:
    """
    Handle errors with retry logic.

    Error handling strategies:
    1. Retry the failed operation
    2. Skip to next step if possible
    3. Request human intervention for critical errors
    4. Log for later analysis
    """

    if not state.get("errors"):
        # No errors to handle, return to supervisor
        return {"next_worker": "supervisor"}

    recent_error = state["errors"][-1]
    error_count = len(state.get("errors", []))

    # Mark error as handled
    recent_error["handled"] = True
    recent_error["handled_at"] = datetime.now().isoformat()

    message: Message = {
        "from_agent": "error_handler",
        "to_agent": "supervisor",
        "message_type": "error_handled",
        "content": f"Handled error from {recent_error.get('node', 'unknown')}: {recent_error.get('error', 'unknown')}",
        "timestamp": datetime.now().isoformat()
    }

    # If too many errors, escalate to human
    if error_count >= 3:
        escalation_message: Message = {
            "from_agent": "error_handler",
            "to_agent": "human",
            "message_type": "escalation",
            "content": f"Multiple errors ({error_count}) occurred. Human intervention recommended.",
            "timestamp": datetime.now().isoformat()
        }
        return {
            "messages": [message, escalation_message],
            "next_worker": "human_review"
        }

    # Otherwise, retry from supervisor
    return {
        "messages": [message],
        "next_worker": "supervisor"
    }

Graph Construction

"""
Graph Construction - Building the workflow.

This demonstrates:
1. Adding nodes
2. Conditional routing from supervisor
3. Edge definitions
4. Entry point configuration
"""


def route_to_worker(state: ResearchState) -> str:
    """Route from supervisor to appropriate worker."""
    return state.get("next_worker", "done")


def create_research_system() -> StateGraph:
    """
    Build the complete research system graph.

    Graph Structure:
    - Entry: supervisor
    - Supervisor routes to workers based on state
    - Workers return to supervisor
    - Human review can redirect or end
    - Finalize leads to END
    """

    graph = StateGraph(ResearchState)

    # Add all nodes
    graph.add_node("supervisor", supervisor)
    graph.add_node("researcher", researcher)
    graph.add_node("analyzer", analyzer)
    graph.add_node("writer", writer)
    graph.add_node("human_review", human_review)
    graph.add_node("finalize", finalize)
    graph.add_node("error_handler", error_handler)

    # Supervisor routes to workers using conditional edges
    graph.add_conditional_edges(
        "supervisor",
        route_to_worker,
        {
            "researcher": "researcher",
            "analyzer": "analyzer",
            "writer": "writer",
            "quality_check": "supervisor",  # Loop back for quality checks
            "human_review": "human_review",
            "finalize": "finalize",
            "error_handler": "error_handler",
            "done": END
        }
    )

    # Workers return to supervisor for next decision
    graph.add_edge("researcher", "supervisor")
    graph.add_edge("analyzer", "supervisor")
    graph.add_edge("writer", "supervisor")
    graph.add_edge("error_handler", "supervisor")

    # Finalize ends the workflow
    graph.add_edge("finalize", END)

    # Note: human_review uses Command to control flow,
    # so no explicit edge needed

    # Set entry point
    graph.set_entry_point("supervisor")

    return graph


def get_development_app():
    """
    Create app for development/testing with MemorySaver.

    Use this for:
    - Local development
    - Unit testing
    - Integration testing
    - Quick prototyping
    """
    graph = create_research_system()
    checkpointer = MemorySaver()
    return graph.compile(checkpointer=checkpointer)


def get_production_app():
    """
    Create production-ready app with PostgresSaver.

    Use this for:
    - Production deployments
    - Persistent state across restarts
    - Long-running workflows
    - Multi-process deployments

    Requires DATABASE_URL environment variable.
    """
    graph = create_research_system()

    # Use PostgresSaver for production persistence
    checkpointer = PostgresSaver.from_conn_string(
        os.environ["DATABASE_URL"]
    )

    return graph.compile(checkpointer=checkpointer)

Usage Examples

"""
Usage Examples - Running the research system.
"""


def run_research_workflow(query: str, thread_id: str = None):
    """
    Run a complete research workflow.

    Args:
        query: Research topic/question
        thread_id: Unique identifier for this workflow
    """
    import uuid

    # Use development app for this example
    app = get_development_app()

    # Generate thread ID if not provided
    thread_id = thread_id or f"research-{uuid.uuid4().hex[:8]}"
    config = {"configurable": {"thread_id": thread_id}}

    # Initial state
    initial_state = {
        "query": query,
        "max_iterations": 10,
        "quality_threshold": 70,
        "require_human_approval": True,
        "documents": [],
        "messages": [],
        "errors": [],
        "analysis": None,
        "report": None,
        "quality_score": None,
        "next_worker": "researcher",
        "current_phase": "research",
        "iteration": 0,
        "started_at": datetime.now().isoformat(),
        "completed_at": None,
        "approved": False,
        "approval_notes": None,
        "feedback": None
    }

    print(f"Starting research workflow: {thread_id}")
    print(f"Query: {query}")
    print("-" * 50)

    # Run until human review or completion
    result = app.invoke(initial_state, config)

    # Check if paused for human review
    if not result.get("approved") and result.get("report"):
        print("\n=== PAUSED FOR HUMAN REVIEW ===")
        print(f"Report Preview:\n{result['report'][:500]}...")
        print(f"\nThread ID for resume: {thread_id}")
        return result, thread_id

    print("\n=== WORKFLOW COMPLETE ===")
    return result, thread_id


def resume_with_approval(thread_id: str, action: str = "approve", feedback: str = ""):
    """
    Resume a paused workflow with human decision.

    Args:
        thread_id: The workflow thread ID
        action: "approve", "revise", or "reject"
        feedback: Optional feedback for revisions
    """
    app = get_development_app()
    config = {"configurable": {"thread_id": thread_id}}

    # Resume with human decision
    result = app.invoke(
        Command(resume={
            "action": action,
            "feedback": feedback,
            "notes": f"Decision made at {datetime.now().isoformat()}"
        }),
        config
    )

    if action == "approve":
        print("=== REPORT APPROVED ===")
        print(result.get("report", "No report"))
    elif action == "revise":
        print("=== REVISION REQUESTED ===")
        print("Workflow will continue with feedback")
    else:
        print("=== REPORT REJECTED ===")

    return result


# Example execution
if __name__ == "__main__":
    # Run initial workflow
    result, thread_id = run_research_workflow(
        "Impact of Large Language Models on Software Development in 2026"
    )

    # If paused, simulate human approval
    if not result.get("approved") and result.get("report"):
        print("\n[Simulating human approval...]")
        final_result = resume_with_approval(thread_id, "approve")
        print("\nFinal Report:")
        print(final_result.get("report", "No report available"))

Testing the System

Unit Tests

"""
Unit Tests for Production Research System

Test individual components in isolation.
"""

import pytest
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime


class TestSupervisorLogic:
    """Test supervisor decision making."""

    def test_supervisor_routes_to_researcher_when_no_documents(self):
        """Supervisor should route to researcher when documents are empty."""
        state = {
            "query": "Test query",
            "documents": [],
            "analysis": None,
            "report": None,
            "iteration": 0,
            "max_iterations": 10,
            "errors": [],
            "approved": False,
            "require_human_approval": True
        }

        result = supervisor(state)
        assert result["next_worker"] == "researcher"

    def test_supervisor_routes_to_analyzer_when_has_documents(self):
        """Supervisor should route to analyzer when documents exist but no analysis."""
        state = {
            "query": "Test query",
            "documents": [{"content": "test doc", "source": "test"}],
            "analysis": None,
            "report": None,
            "iteration": 1,
            "max_iterations": 10,
            "errors": [],
            "approved": False,
            "require_human_approval": True
        }

        result = supervisor(state)
        assert result["next_worker"] == "analyzer"

    def test_supervisor_routes_to_writer_when_has_analysis(self):
        """Supervisor should route to writer when analysis exists but no report."""
        state = {
            "query": "Test query",
            "documents": [{"content": "test doc", "source": "test"}],
            "analysis": "A" * 250,  # Detailed analysis
            "report": None,
            "iteration": 2,
            "max_iterations": 10,
            "errors": [],
            "approved": False,
            "require_human_approval": True
        }

        result = supervisor(state)
        assert result["next_worker"] == "writer"

    def test_supervisor_routes_to_human_review_when_has_report(self):
        """Supervisor should route to human review when report exists."""
        state = {
            "query": "Test query",
            "documents": [{"content": "test doc", "source": "test"}],
            "analysis": "A" * 250,
            "report": "R" * 600,  # Detailed report
            "iteration": 3,
            "max_iterations": 10,
            "errors": [],
            "approved": False,
            "require_human_approval": True
        }

        result = supervisor(state)
        assert result["next_worker"] == "human_review"

    def test_supervisor_respects_max_iterations(self):
        """Supervisor should end workflow when max iterations reached."""
        state = {
            "query": "Test query",
            "documents": [],
            "analysis": None,
            "report": None,
            "iteration": 10,
            "max_iterations": 10,
            "errors": [],
            "approved": False,
            "require_human_approval": True
        }

        result = supervisor(state)
        assert result["next_worker"] == "done"

    def test_supervisor_routes_to_error_handler_on_unhandled_error(self):
        """Supervisor should route to error handler when errors exist."""
        state = {
            "query": "Test query",
            "documents": [],
            "analysis": None,
            "report": None,
            "iteration": 1,
            "max_iterations": 10,
            "errors": [{"node": "researcher", "error": "API Error"}],
            "approved": False,
            "require_human_approval": True
        }

        result = supervisor(state)
        assert result["next_worker"] == "error_handler"

    def test_supervisor_requests_more_detailed_analysis(self):
        """Supervisor should request revision when analysis is too brief."""
        state = {
            "query": "Test query",
            "documents": [{"content": "test doc", "source": "test"}],
            "analysis": "Brief",  # Too short
            "report": None,
            "iteration": 2,
            "max_iterations": 10,
            "errors": [],
            "approved": False,
            "require_human_approval": True
        }

        result = supervisor(state)
        assert result["next_worker"] == "analyzer"
        assert "feedback" in result


class TestResearcher:
    """Test researcher node."""

    @patch("langchain_openai.ChatOpenAI")
    def test_researcher_creates_documents(self, mock_llm_class):
        """Researcher should create document from LLM response."""
        # Mock LLM
        mock_llm = Mock()
        mock_response = Mock()
        mock_response.content = '{"facts": [{"fact": "Test fact"}], "summary": "Test"}'
        mock_llm.invoke.return_value = mock_response
        mock_llm_class.return_value = mock_llm

        state = {
            "query": "Test query",
            "documents": [],
            "iteration": 0,
            "feedback": None
        }

        result = researcher(state)

        assert len(result["documents"]) == 1
        assert result["documents"][0]["source"] == "llm_research"
        assert result["iteration"] == 1
        assert len(result["messages"]) == 1

    @patch("langchain_openai.ChatOpenAI")
    def test_researcher_handles_errors(self, mock_llm_class):
        """Researcher should handle LLM errors gracefully."""
        mock_llm = Mock()
        mock_llm.invoke.side_effect = Exception("API Error")
        mock_llm_class.return_value = mock_llm

        state = {
            "query": "Test query",
            "documents": [],
            "iteration": 0,
            "feedback": None
        }

        result = researcher(state)

        assert len(result["errors"]) == 1
        assert result["next_worker"] == "error_handler"


class TestErrorHandler:
    """Test error handler node."""

    def test_error_handler_marks_error_as_handled(self):
        """Error handler should mark errors as handled."""
        state = {
            "errors": [{"node": "researcher", "error": "Test error"}]
        }

        result = error_handler(state)

        assert result["next_worker"] == "supervisor"
        assert len(result["messages"]) == 1

    def test_error_handler_escalates_after_multiple_errors(self):
        """Error handler should escalate after 3+ errors."""
        state = {
            "errors": [
                {"node": "researcher", "error": "Error 1"},
                {"node": "analyzer", "error": "Error 2"},
                {"node": "writer", "error": "Error 3"}
            ]
        }

        result = error_handler(state)

        assert result["next_worker"] == "human_review"
        assert len(result["messages"]) == 2  # Handled + escalation

Integration Tests

"""
Integration Tests for Production Research System

Test the complete workflow with mocked LLM.
"""

import pytest
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
from unittest.mock import patch, Mock


class TestResearchWorkflowIntegration:
    """Integration tests for complete workflow."""

    @pytest.fixture
    def mock_openai(self):
        """Mock ChatOpenAI for all tests."""
        with patch("langchain_openai.ChatOpenAI") as mock_class:
            mock_instance = Mock()

            # Create different responses for different prompts
            def invoke_side_effect(prompt):
                response = Mock()
                if "research assistant" in prompt.lower():
                    response.content = '{"facts": [{"fact": "Test finding"}], "summary": "Research summary"}'
                elif "analyze" in prompt.lower():
                    response.content = "## Key Themes\n- Theme 1\n\n## Analysis\n" + "A" * 300
                elif "write" in prompt.lower():
                    response.content = "# Research Report\n\n## Executive Summary\n" + "R" * 500
                else:
                    response.content = "Default response"
                return response

            mock_instance.invoke.side_effect = invoke_side_effect
            mock_class.return_value = mock_instance
            yield mock_instance

    @pytest.fixture
    def app(self, mock_openai):
        """Create compiled app with MemorySaver."""
        graph = create_research_system()
        return graph.compile(checkpointer=MemorySaver())

    def test_complete_workflow_with_approval(self, app):
        """Test complete workflow ending with human approval."""
        config = {"configurable": {"thread_id": "test-complete-workflow"}}

        initial_state = {
            "query": "Test research query",
            "max_iterations": 10,
            "quality_threshold": 70,
            "require_human_approval": True,
            "documents": [],
            "messages": [],
            "errors": [],
            "analysis": None,
            "report": None,
            "quality_score": None,
            "next_worker": "researcher",
            "current_phase": "research",
            "iteration": 0,
            "started_at": datetime.now().isoformat(),
            "completed_at": None,
            "approved": False,
            "approval_notes": None,
            "feedback": None
        }

        # Run until human review
        result = app.invoke(initial_state, config)

        # Should have documents, analysis, and report
        assert len(result["documents"]) > 0
        assert result["analysis"] is not None
        assert result["report"] is not None
        assert result["approved"] is False  # Waiting for approval

        # Resume with approval
        final_result = app.invoke(
            Command(resume={"action": "approve", "notes": "Looks good!"}),
            config
        )

        assert final_result["approved"] is True
        assert "Generated by Production Research System" in final_result["report"]

    def test_workflow_with_revision_request(self, app):
        """Test workflow when human requests revision."""
        config = {"configurable": {"thread_id": "test-revision-workflow"}}

        initial_state = {
            "query": "Test research query",
            "max_iterations": 15,  # More iterations for revision
            "quality_threshold": 70,
            "require_human_approval": True,
            "documents": [],
            "messages": [],
            "errors": [],
            "analysis": None,
            "report": None,
            "quality_score": None,
            "next_worker": "researcher",
            "current_phase": "research",
            "iteration": 0,
            "started_at": datetime.now().isoformat(),
            "completed_at": None,
            "approved": False,
            "approval_notes": None,
            "feedback": None
        }

        # Run until human review
        result = app.invoke(initial_state, config)
        initial_iteration = result["iteration"]

        # Request revision
        revised_result = app.invoke(
            Command(resume={"action": "revise", "feedback": "Add more details"}),
            config
        )

        # Should have more iterations after revision
        assert revised_result["iteration"] > initial_iteration

    def test_workflow_without_human_approval(self, app):
        """Test workflow that doesn't require human approval."""
        config = {"configurable": {"thread_id": "test-no-approval-workflow"}}

        initial_state = {
            "query": "Test research query",
            "max_iterations": 10,
            "quality_threshold": 70,
            "require_human_approval": False,  # No human approval needed
            "documents": [],
            "messages": [],
            "errors": [],
            "analysis": None,
            "report": None,
            "quality_score": None,
            "next_worker": "researcher",
            "current_phase": "research",
            "iteration": 0,
            "started_at": datetime.now().isoformat(),
            "completed_at": None,
            "approved": False,
            "approval_notes": None,
            "feedback": None
        }

        # Should complete without pausing
        result = app.invoke(initial_state, config)

        assert result["completed_at"] is not None
        assert "Generated by Production Research System" in result["report"]

    def test_state_history_tracking(self, app):
        """Test that state history is properly tracked."""
        config = {"configurable": {"thread_id": "test-history-workflow"}}

        initial_state = {
            "query": "Test query",
            "max_iterations": 10,
            "quality_threshold": 70,
            "require_human_approval": False,
            "documents": [],
            "messages": [],
            "errors": [],
            "analysis": None,
            "report": None,
            "quality_score": None,
            "next_worker": "researcher",
            "current_phase": "research",
            "iteration": 0,
            "started_at": datetime.now().isoformat(),
            "completed_at": None,
            "approved": False,
            "approval_notes": None,
            "feedback": None
        }

        app.invoke(initial_state, config)

        # Check state history
        history = list(app.get_state_history(config))

        assert len(history) > 0

        # Verify progression through nodes
        nodes_visited = [
            state.metadata.get("source", "unknown")
            for state in history
        ]

        # Should have visited supervisor multiple times
        assert nodes_visited.count("supervisor") >= 1

Deployment Checklist

Pre-Deployment Verification

## Pre-Deployment Checklist

### Code Quality
- [ ] All unit tests passing
- [ ] All integration tests passing
- [ ] Code review completed
- [ ] No hardcoded secrets or API keys
- [ ] Type hints complete and accurate

### Infrastructure
- [ ] PostgreSQL database provisioned
- [ ] DATABASE_URL environment variable set
- [ ] Connection pooling configured
- [ ] Database migrations applied

### Observability
- [ ] LANGCHAIN_TRACING_V2=true set
- [ ] LANGCHAIN_API_KEY configured
- [ ] LANGCHAIN_PROJECT set appropriately
- [ ] LangSmith project created

### Security
- [ ] API keys in secrets manager
- [ ] Rate limiting configured
- [ ] Input validation in place
- [ ] Error messages don't leak sensitive info

### Performance
- [ ] max_iterations set appropriately
- [ ] LLM model selection optimized for use case
- [ ] Timeout handling implemented
- [ ] Retry logic configured

Production Monitoring Setup

"""
Production Monitoring Configuration

Set up alerts and dashboards for the research system.
"""

from langsmith import Client
from datetime import datetime, timedelta


def setup_monitoring():
    """
    Configure LangSmith monitoring for production.

    Monitors:
    - Error rates
    - Latency percentiles
    - Token usage
    - Human review wait times
    """
    client = Client()

    # Example: Query recent runs for monitoring
    recent_runs = client.list_runs(
        project_name="production-research-system",
        start_time=datetime.now() - timedelta(hours=24)
    )

    metrics = {
        "total_runs": 0,
        "errors": 0,
        "avg_latency_ms": 0,
        "total_tokens": 0
    }

    latencies = []

    for run in recent_runs:
        metrics["total_runs"] += 1

        if run.error:
            metrics["errors"] += 1

        if run.latency_ms:
            latencies.append(run.latency_ms)

        if run.total_tokens:
            metrics["total_tokens"] += run.total_tokens

    if latencies:
        metrics["avg_latency_ms"] = sum(latencies) / len(latencies)
        metrics["p95_latency_ms"] = sorted(latencies)[int(len(latencies) * 0.95)]

    metrics["error_rate"] = (
        metrics["errors"] / metrics["total_runs"]
        if metrics["total_runs"] > 0
        else 0
    )

    return metrics


def create_alert_rules():
    """
    Define alerting rules for the research system.

    Returns alert configurations for:
    - High error rate
    - High latency
    - Stuck workflows
    """
    return {
        "high_error_rate": {
            "condition": "error_rate > 0.05",  # 5% error rate
            "severity": "critical",
            "action": "page_oncall"
        },
        "high_latency": {
            "condition": "p95_latency_ms > 60000",  # 60 seconds
            "severity": "warning",
            "action": "notify_slack"
        },
        "stuck_workflow": {
            "condition": "workflow_age > 3600",  # 1 hour without progress
            "severity": "warning",
            "action": "notify_slack"
        },
        "excessive_iterations": {
            "condition": "iteration > max_iterations * 0.8",
            "severity": "warning",
            "action": "log_warning"
        }
    }

Interview Questions

Q1: Walk through the architecture of this research system.

Answer:

"This is a multi-agent supervisor pattern with five key components:

1. State Management

class ResearchState(TypedDict):
    # Accumulating data with reducers
    documents: Annotated[list[dict], operator.add]
    messages: Annotated[list[dict], operator.add]

    # Worker outputs
    analysis: Optional[str]
    report: Optional[str]

    # Control flow
    next_worker: Literal['researcher', 'analyzer', ...]

The state uses reducers for lists that accumulate across iterations.

2. Supervisor Pattern

The supervisor is the central coordinator:

  • Evaluates current state
  • Decides next worker
  • Performs quality checks
  • Handles iteration limits
  • Routes errors appropriately

3. Worker Nodes

Each worker is specialized:

  • Researcher: Gathers information using LLM (production would use search APIs)
  • Analyzer: Extracts insights from documents
  • Writer: Creates professional reports

Workers follow a consistent pattern:

  • Receive full state
  • Return partial updates
  • Add messages for tracking
  • Handle errors gracefully

4. Human-in-the-Loop

Uses LangGraph's native interrupt():

decision = interrupt({
    'type': 'report_approval',
    'report_preview': state['report'][:1000]
})
# Execution pauses here until resume

Human can approve, request revision, or reject.

5. Error Recovery

The error handler:

  • Logs errors for debugging
  • Marks errors as handled
  • Retries from supervisor
  • Escalates after multiple failures

The flow is: Supervisor -> Worker -> Supervisor -> ... -> Human Review -> Finalize -> END"


Q2: How does checkpointing enable pause/resume in this system?

Answer:

"Checkpointing is fundamental to the human-in-the-loop pattern:

How It Works:

  1. Every node execution saves a checkpoint:
# PostgresSaver persists state to database
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
app = graph.compile(checkpointer=checkpointer)
  1. interrupt() triggers checkpoint save:
def human_review(state):
    decision = interrupt({...})  # State saved here
    # Execution pauses, waiting for resume
  1. Resume uses thread_id to find checkpoint:
config = {'configurable': {'thread_id': 'research-001'}}
app.invoke(Command(resume={'action': 'approve'}), config)

Why This Matters:

  • Persistence: Workflow survives server restarts
  • Async Human Input: Users can take hours to review
  • State Recovery: Can resume from any checkpoint
  • Time Travel: Can replay from earlier checkpoints

Production Pattern:

# Start workflow
result = app.invoke(initial_state, config)
# Returns immediately when hitting interrupt()

# Hours later, human approves
result = app.invoke(Command(resume={'action': 'approve'}), config)
# Continues from where it paused

The key insight is that checkpointing separates execution from storage - the graph logic doesn't change, only where state is persisted."


Q3: How would you scale this system for production?

Answer:

"Production scaling involves several layers:

1. Database Layer

# Connection pooling for PostgresSaver
from psycopg_pool import ConnectionPool

pool = ConnectionPool(
    conninfo=DATABASE_URL,
    min_size=5,
    max_size=20
)
checkpointer = PostgresSaver(pool)

2. Horizontal Scaling

Since state is in PostgreSQL, multiple workers can process different workflows:

# Kubernetes deployment
replicas: 5
resources:
  limits:
    cpu: '2'
    memory: '4Gi'

Each pod handles different thread_ids independently.

3. Rate Limiting LLM Calls

from tenacity import retry, wait_exponential

@retry(wait=wait_exponential(min=1, max=60))
def call_llm(prompt):
    return llm.invoke(prompt)

4. Queue-Based Processing

For high throughput, use a queue:

# Celery task
@celery_app.task
def process_research(query, thread_id):
    app.invoke(initial_state, config)

5. Caching

Cache repeated research queries:

from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_research(query_hash):
    return researcher(state)

6. Monitoring

# Prometheus metrics
research_duration = Histogram('research_duration_seconds')
research_errors = Counter('research_errors_total')

The key principle is stateless workers + stateful storage - workers don't hold state, PostgreSQL does."


Q4: How do you test workflows with human-in-the-loop?

Answer:

"Testing HITL workflows requires simulating human decisions:

1. Unit Test the Human Review Node

def test_human_review_approve():
    state = {'report': 'Test report...', 'query': 'Test'}

    # The interrupt() will be called
    with pytest.raises(GraphInterrupt) as exc:
        human_review(state)

    # Verify interrupt data
    interrupt_data = exc.value.data
    assert interrupt_data['type'] == 'report_approval'

2. Integration Test with Resume

def test_complete_approval_flow():
    app = get_development_app()
    config = {'configurable': {'thread_id': 'test-approval'}}

    # Run until pause
    result = app.invoke(initial_state, config)
    assert result['approved'] is False

    # Resume with approval
    final = app.invoke(
        Command(resume={'action': 'approve'}),
        config
    )
    assert final['approved'] is True

3. Test All Decision Paths

@pytest.mark.parametrize('action,expected_approved', [
    ('approve', True),
    ('reject', False),
])
def test_human_decisions(action, expected_approved):
    # Run workflow
    result = app.invoke(initial_state, config)

    # Resume with decision
    final = app.invoke(
        Command(resume={'action': action}),
        config
    )

    assert final['approved'] == expected_approved

4. Test Revision Loop

def test_revision_loop():
    # Run to human review
    result = app.invoke(initial_state, config)
    iteration_before = result['iteration']

    # Request revision
    revised = app.invoke(
        Command(resume={'action': 'revise', 'feedback': 'More detail'}),
        config
    )

    # Should have done more work
    assert revised['iteration'] > iteration_before

The key is using Command(resume=...) to simulate human input in tests."


Key Takeaways

Concept Implementation Benefit
State Reducers Annotated[list, operator.add] Accumulate data across iterations
Supervisor Pattern Central routing node Coordinate multiple workers
Checkpointing PostgresSaver Persist state for long workflows
Human-in-the-Loop interrupt() + Command(resume=) Native pause/resume
Error Recovery Error handler node Graceful degradation
Observability @traceable decorator LangSmith tracing
Testing MemorySaver + mocked LLM Fast, reliable tests

Course Completion

Congratulations on completing the LangGraph Deep Dive course! You now have the knowledge to build production-ready multi-agent systems with:

  • State Management: TypedDict schemas with reducers
  • Graph Patterns: Supervisor, parallel execution, subgraphs
  • Persistence: Checkpointing with PostgresSaver
  • Human Integration: Native pause/resume with interrupts
  • Deployment: Docker, Kubernetes, LangGraph Platform
  • Testing: Comprehensive test strategies
  • Debugging: Visualization and LangSmith tracing

Next Steps:

  1. Build your own multi-agent system
  2. Explore LangGraph Platform for managed deployment
  3. Contribute to the LangGraph community
  4. Stay updated with LangGraph releases

Complete the module quiz to earn your course completion certificate!

:::

Quiz

Module 6: Testing & Capstone Project

Take Quiz