Testing & Capstone Project
Testing LangGraph Workflows
Testing LangGraph applications requires a systematic approach that covers individual nodes, state transitions, graph execution, and the complex interactions between agents. This lesson provides comprehensive testing strategies for building reliable, production-ready workflow systems.
Real-World Testing Impact
January 2026 Case Study: A fintech company's trading agent made an incorrect decision that cost $47,000. Root cause analysis revealed an untested edge case: when document count exceeded 50, the analyzer node failed silently and returned empty analysis, which the supervisor misinterpreted as "no issues found." After implementing the testing strategies in this lesson, the team achieved zero production incidents over the following 8 months.
Testing ROI Metrics:
Before comprehensive testing:
- 3-5 production incidents per month
- 2-4 hours average incident recovery
- 15% of deployments caused issues
After implementation:
- 0 production incidents in 8 months
- Bugs caught in CI/CD before deployment
- Deployment confidence increased to 99%
Testing Strategy Pyramid for LangGraph
┌─────────────────┐
│ E2E Tests │ ← Few, expensive, real LLMs
│ (Production) │
├─────────────────┤
│ Integration │ ← More, mocked LLMs
│ (Full Graph) │
├─────────────────┤
│ Unit Tests │ ← Many, fast, no LLMs
│ (Nodes) │
└─────────────────┘
Test Distribution Target:
- Unit Tests: 70% (fast, isolated, deterministic)
- Integration Tests: 25% (mocked dependencies, full flows)
- E2E Tests: 5% (real APIs, smoke tests only)
Unit Testing Individual Nodes
Unit tests verify that individual nodes produce correct state updates given specific inputs. These tests should be fast, deterministic, and isolated from external dependencies.
Testing Node Functions
"""
Unit tests for LangGraph node functions.
Each node is a pure function: State -> State update
Test by providing mock state and verifying returned updates.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime
from typing import Any
# Import your nodes
from src.nodes.researcher import researcher_node
from src.nodes.analyzer import analyzer_node
from src.nodes.writer import writer_node
from src.nodes.supervisor import supervisor_node
class TestResearcherNode:
"""Test suite for researcher node functionality."""
def test_researcher_returns_documents(self):
"""Verify researcher node adds documents to state."""
# Arrange - create initial state
initial_state = {
"query": "AI trends in 2026",
"documents": [],
"iteration": 0,
"max_iterations": 10
}
# Act - execute node
result = researcher_node(initial_state)
# Assert - verify state updates
assert "documents" in result, "Researcher must return documents"
assert isinstance(result["documents"], list)
assert len(result["documents"]) > 0, "Must find at least one document"
assert result["iteration"] == 1, "Must increment iteration"
def test_researcher_incorporates_feedback(self):
"""Verify researcher uses feedback to refine search."""
state = {
"query": "Machine learning applications",
"documents": [],
"iteration": 1,
"feedback": "Focus specifically on healthcare applications",
"max_iterations": 10
}
result = researcher_node(state)
# Verify feedback was considered
# (In real tests, you'd mock the LLM and verify the prompt)
assert len(result["documents"]) > 0
def test_researcher_handles_empty_query(self):
"""Verify researcher handles edge case of empty query."""
state = {
"query": "",
"documents": [],
"iteration": 0,
"max_iterations": 10
}
# Should either return empty docs or raise appropriate error
with pytest.raises(ValueError, match="Query cannot be empty"):
researcher_node(state)
def test_researcher_documents_have_required_fields(self):
"""Verify returned documents have proper structure."""
state = {
"query": "Test query",
"documents": [],
"iteration": 0,
"max_iterations": 10
}
result = researcher_node(state)
for doc in result["documents"]:
assert "source" in doc, "Document must have source"
assert "content" in doc, "Document must have content"
assert "timestamp" in doc, "Document must have timestamp"
class TestAnalyzerNode:
"""Test suite for analyzer node functionality."""
def test_analyzer_produces_analysis(self):
"""Verify analyzer creates analysis from documents."""
state = {
"query": "AI trends",
"documents": [
{"source": "arxiv", "content": "LLMs are transforming NLP"},
{"source": "news", "content": "GPT-4 adoption growing rapidly"}
],
"analysis": None,
"iteration": 1
}
result = analyzer_node(state)
assert "analysis" in result
assert result["analysis"] is not None
assert len(result["analysis"]) > 100, "Analysis should be substantial"
assert result["iteration"] == 2
def test_analyzer_handles_empty_documents(self):
"""Verify analyzer handles no documents gracefully."""
state = {
"query": "Test",
"documents": [],
"analysis": None,
"iteration": 0
}
result = analyzer_node(state)
# Should indicate no analysis possible
assert "analysis" in result
assert "insufficient data" in result["analysis"].lower() or result["analysis"] is None
def test_analyzer_with_large_document_set(self):
"""Verify analyzer handles many documents without issues."""
large_doc_set = [
{"source": f"source_{i}", "content": f"Content about topic {i}"}
for i in range(100)
]
state = {
"query": "Comprehensive research",
"documents": large_doc_set,
"analysis": None,
"iteration": 5
}
result = analyzer_node(state)
assert result["analysis"] is not None
# Should not just be empty or crash
assert len(result["analysis"]) > 50
class TestSupervisorNode:
"""Test suite for supervisor routing logic."""
def test_supervisor_routes_to_researcher_when_no_documents(self):
"""Supervisor should request research when no documents exist."""
state = {
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 10
}
result = supervisor_node(state)
assert result["next_worker"] == "researcher"
def test_supervisor_routes_to_analyzer_when_documents_exist(self):
"""Supervisor should request analysis when documents are ready."""
state = {
"documents": [{"content": "test doc"}],
"analysis": None,
"report": None,
"iteration": 1,
"max_iterations": 10
}
result = supervisor_node(state)
assert result["next_worker"] == "analyzer"
def test_supervisor_routes_to_writer_when_analysis_ready(self):
"""Supervisor should request report when analysis is complete."""
state = {
"documents": [{"content": "test doc"}],
"analysis": "This is a comprehensive analysis of the findings...",
"report": None,
"iteration": 2,
"max_iterations": 10
}
result = supervisor_node(state)
assert result["next_worker"] == "writer"
def test_supervisor_routes_to_done_when_complete(self):
"""Supervisor should finish when report is approved."""
state = {
"documents": [{"content": "test doc"}],
"analysis": "Analysis content",
"report": "Final report content",
"approved": True,
"iteration": 4,
"max_iterations": 10
}
result = supervisor_node(state)
assert result["next_worker"] == "done"
def test_supervisor_respects_iteration_limit(self):
"""Supervisor should stop at max iterations."""
state = {
"documents": [],
"analysis": None,
"report": None,
"iteration": 10,
"max_iterations": 10
}
result = supervisor_node(state)
assert result["next_worker"] == "done"
def test_supervisor_quality_check_requests_revision(self):
"""Supervisor should request revision for low-quality analysis."""
state = {
"documents": [{"content": "doc"}],
"analysis": "Too short", # Below quality threshold
"report": None,
"iteration": 2,
"max_iterations": 10
}
result = supervisor_node(state)
# Should route back for improvement
assert result["next_worker"] == "analyzer"
assert "feedback" in result
Testing with Mocked Dependencies
"""
Mock external dependencies for deterministic testing.
LLMs, APIs, and databases should be mocked to ensure:
- Fast test execution
- Deterministic results
- No external API costs during testing
"""
import pytest
from unittest.mock import Mock, patch, AsyncMock
@pytest.fixture
def mock_openai_llm():
"""
Fixture providing a mocked OpenAI LLM.
Returns predictable responses for testing node logic.
"""
with patch("src.nodes.analyzer.ChatOpenAI") as mock_class:
mock_instance = Mock()
# Configure the mock to return structured response
mock_response = Mock()
mock_response.content = """
## Analysis Summary
### Key Themes
1. Artificial Intelligence is advancing rapidly
2. LLMs are becoming mainstream
3. Automation is transforming industries
### Supporting Evidence
- Multiple sources confirm AI adoption growth
- Investment in AI increased 40% year-over-year
### Confidence Level: High
"""
mock_instance.invoke.return_value = mock_response
mock_class.return_value = mock_instance
yield mock_instance
def test_analyzer_with_mocked_llm(mock_openai_llm):
"""Test analyzer node using mocked LLM."""
state = {
"query": "AI industry trends",
"documents": [
{"source": "report", "content": "AI adoption is growing"},
{"source": "news", "content": "Companies investing in automation"}
],
"analysis": None,
"iteration": 1
}
result = analyzer_node(state)
# Verify LLM was called
mock_openai_llm.invoke.assert_called_once()
# Verify response was processed correctly
assert "Analysis Summary" in result["analysis"]
assert "Key Themes" in result["analysis"]
@pytest.fixture
def mock_search_api():
"""Fixture for mocked document search API."""
with patch("src.nodes.researcher.search_documents") as mock:
mock.return_value = [
{
"source": "arxiv",
"content": "Research paper on transformer architectures",
"url": "https://arxiv.org/paper/123",
"timestamp": "2026-01-01"
},
{
"source": "news",
"content": "OpenAI announces new model capabilities",
"url": "https://news.example.com/article",
"timestamp": "2026-01-02"
},
{
"source": "blog",
"content": "Best practices for LLM applications",
"url": "https://blog.example.com/post",
"timestamp": "2026-01-03"
}
]
yield mock
def test_researcher_with_mocked_api(mock_search_api):
"""Test researcher node with mocked search API."""
state = {
"query": "transformer architectures",
"documents": [],
"iteration": 0,
"max_iterations": 10
}
result = researcher_node(state)
# Verify API was called with correct query
mock_search_api.assert_called_once()
call_args = mock_search_api.call_args
assert "transformer" in str(call_args).lower()
# Verify documents were properly added
assert len(result["documents"]) == 3
assert result["documents"][0]["source"] == "arxiv"
@pytest.fixture
def mock_llm_with_errors():
"""Fixture that simulates LLM API errors."""
with patch("src.nodes.analyzer.ChatOpenAI") as mock_class:
mock_instance = Mock()
# Simulate rate limit error on first call, success on retry
mock_instance.invoke.side_effect = [
Exception("Rate limit exceeded"),
Mock(content="Analysis after retry")
]
mock_class.return_value = mock_instance
yield mock_instance
def test_analyzer_retries_on_error(mock_llm_with_errors):
"""Test that analyzer retries failed LLM calls."""
state = {
"query": "test",
"documents": [{"content": "doc"}],
"analysis": None,
"iteration": 1
}
result = analyzer_node(state)
# Should have retried and succeeded
assert mock_llm_with_errors.invoke.call_count == 2
assert result["analysis"] == "Analysis after retry"
Integration Testing Full Graphs
Integration tests verify that the complete graph executes correctly, with all nodes working together and state flowing properly between them.
Testing Complete Graph Execution
"""
Integration tests for complete LangGraph workflows.
These tests verify:
- Graph compiles correctly
- State flows between nodes
- Conditional edges route properly
- Final state is correct
"""
import pytest
from langgraph.checkpoint.memory import MemorySaver
from src.graphs.research_graph import create_research_graph
class TestResearchGraphIntegration:
"""Integration test suite for the research graph."""
@pytest.fixture
def compiled_graph(self):
"""Create a compiled graph with in-memory checkpointing."""
graph = create_research_graph()
checkpointer = MemorySaver()
return graph.compile(checkpointer=checkpointer)
def test_full_graph_execution(self, compiled_graph):
"""Test complete graph flow from start to finish."""
initial_input = {
"query": "Summarize recent developments in AI safety",
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 10,
"approved": False,
"messages": []
}
config = {"configurable": {"thread_id": "integration-test-1"}}
result = compiled_graph.invoke(initial_input, config)
# Verify final state has expected fields
assert result["report"] is not None, "Should produce a report"
assert len(result["documents"]) > 0, "Should gather documents"
assert result["analysis"] is not None, "Should produce analysis"
assert result["iteration"] > 0, "Should have iterated"
def test_graph_stops_at_human_review(self, compiled_graph):
"""Test graph pauses at human review checkpoint."""
initial_input = {
"query": "Research topic requiring approval",
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 10,
"approved": False,
"messages": []
}
config = {"configurable": {"thread_id": "integration-test-2"}}
result = compiled_graph.invoke(initial_input, config)
# Graph should pause for human review
state = compiled_graph.get_state(config)
assert "human_review" in state.next or result.get("approved") is False
def test_graph_respects_iteration_limit(self, compiled_graph):
"""Test graph terminates at max iterations."""
initial_input = {
"query": "Complex query that might loop",
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 3, # Low limit for testing
"approved": False,
"messages": []
}
config = {"configurable": {"thread_id": "integration-test-3"}}
result = compiled_graph.invoke(initial_input, config)
assert result["iteration"] <= 3, "Should respect iteration limit"
def test_state_accumulation_with_reducer(self, compiled_graph):
"""Test that documents accumulate correctly via reducer."""
config = {"configurable": {"thread_id": "accumulation-test"}}
# First invocation
result1 = compiled_graph.invoke({
"query": "Topic A",
"documents": [],
"messages": [],
"iteration": 0,
"max_iterations": 5,
"analysis": None,
"report": None,
"approved": False
}, config)
initial_doc_count = len(result1["documents"])
assert initial_doc_count > 0
# Continue execution (simulating additional research)
state = compiled_graph.get_state(config)
assert len(state.values["documents"]) == initial_doc_count
def test_graph_handles_error_recovery(self, compiled_graph):
"""Test graph recovers from node errors."""
# This would use specific error injection
config = {"configurable": {"thread_id": "error-test"}}
initial_input = {
"query": "Test with potential error",
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 10,
"approved": False,
"messages": [],
"simulate_error": True # Custom flag for testing
}
# Should complete without raising exception
result = compiled_graph.invoke(initial_input, config)
# Verify error was handled
error_messages = [m for m in result.get("messages", []) if m.get("type") == "error_recovered"]
# Error handler should have been invoked if error occurred
class TestStateTransitions:
"""Test state transitions in the graph."""
@pytest.fixture
def graph_with_checkpointer(self):
graph = create_research_graph()
return graph.compile(checkpointer=MemorySaver())
def test_all_conditional_edge_paths(self, graph_with_checkpointer):
"""Verify all conditional routing paths work correctly."""
from src.nodes.supervisor import supervisor_node
# Path 1: No documents -> researcher
state1 = {
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 10
}
result1 = supervisor_node(state1)
assert result1["next_worker"] == "researcher"
# Path 2: Has documents, no analysis -> analyzer
state2 = {
"documents": [{"content": "doc"}],
"analysis": None,
"report": None,
"iteration": 1,
"max_iterations": 10
}
result2 = supervisor_node(state2)
assert result2["next_worker"] == "analyzer"
# Path 3: Has analysis, no report -> writer
state3 = {
"documents": [{"content": "doc"}],
"analysis": "Detailed analysis that meets quality threshold with enough content",
"report": None,
"iteration": 2,
"max_iterations": 10
}
result3 = supervisor_node(state3)
assert result3["next_worker"] == "writer"
# Path 4: Has report, not approved -> human_review
state4 = {
"documents": [{"content": "doc"}],
"analysis": "Analysis",
"report": "Report content",
"approved": False,
"iteration": 3,
"max_iterations": 10
}
result4 = supervisor_node(state4)
assert result4["next_worker"] == "human_review"
# Path 5: Everything complete -> done
state5 = {
"documents": [{"content": "doc"}],
"analysis": "Analysis",
"report": "Report",
"approved": True,
"iteration": 4,
"max_iterations": 10
}
result5 = supervisor_node(state5)
assert result5["next_worker"] == "done"
def test_state_history_tracking(self, graph_with_checkpointer):
"""Verify state history is properly recorded."""
config = {"configurable": {"thread_id": "history-test"}}
graph_with_checkpointer.invoke({
"query": "Test query",
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 5,
"approved": False,
"messages": []
}, config)
# Check state history
history = list(graph_with_checkpointer.get_state_history(config))
assert len(history) > 1, "Should have multiple checkpoints"
# Verify history is in reverse chronological order
for i, state in enumerate(history[:-1]):
assert state.created_at >= history[i + 1].created_at
Testing Human-in-the-Loop Flows
"""
Tests for human-in-the-loop interrupt and resume functionality.
"""
import pytest
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
class TestHumanInTheLoop:
"""Test human approval workflows."""
@pytest.fixture
def approval_graph(self):
"""Create graph that requires human approval."""
from src.graphs.approval_graph import create_approval_graph
graph = create_approval_graph()
return graph.compile(checkpointer=MemorySaver())
def test_interrupt_pauses_execution(self, approval_graph):
"""Verify interrupt() pauses graph for human input."""
config = {"configurable": {"thread_id": "interrupt-test"}}
result = approval_graph.invoke({
"document": "Contract requiring review",
"approved": False,
"review_notes": None
}, config)
# Get state to check if paused
state = approval_graph.get_state(config)
# Should be paused at human_review node
assert len(state.next) > 0, "Graph should be paused"
assert state.tasks[0].interrupts, "Should have interrupt data"
def test_resume_with_approval(self, approval_graph):
"""Test resuming after human approves."""
config = {"configurable": {"thread_id": "resume-approve-test"}}
# Initial invocation - will pause for review
approval_graph.invoke({
"document": "Contract for review",
"approved": False,
"review_notes": None
}, config)
# Resume with approval
result = approval_graph.invoke(
Command(resume={"action": "approve", "notes": "Looks good!"}),
config
)
assert result["approved"] is True
assert "Looks good!" in result.get("review_notes", "")
def test_resume_with_rejection(self, approval_graph):
"""Test resuming after human rejects."""
config = {"configurable": {"thread_id": "resume-reject-test"}}
approval_graph.invoke({
"document": "Contract for review",
"approved": False,
"review_notes": None
}, config)
# Resume with rejection
result = approval_graph.invoke(
Command(resume={"action": "reject", "reason": "Terms unacceptable"}),
config
)
assert result["approved"] is False
assert "unacceptable" in result.get("rejection_reason", "").lower()
def test_resume_with_revision_request(self, approval_graph):
"""Test resuming with request for changes."""
config = {"configurable": {"thread_id": "resume-revise-test"}}
approval_graph.invoke({
"document": "Contract needing changes",
"approved": False,
"review_notes": None
}, config)
# Resume with revision request
result = approval_graph.invoke(
Command(resume={
"action": "request_changes",
"feedback": "Please clarify section 3"
}),
config
)
# Should route back for revision
state = approval_graph.get_state(config)
# Verify it went back to revision node or has feedback
assert "clarify section 3" in result.get("feedback", "")
Testing Async Graphs
"""
Tests for asynchronous LangGraph execution.
"""
import pytest
import asyncio
from langgraph.checkpoint.memory import MemorySaver
class TestAsyncGraphExecution:
"""Test suite for async graph operations."""
@pytest.fixture
def async_graph(self):
from src.graphs.async_research_graph import create_async_research_graph
graph = create_async_research_graph()
return graph.compile(checkpointer=MemorySaver())
@pytest.mark.asyncio
async def test_async_invoke(self, async_graph):
"""Test async graph invocation."""
config = {"configurable": {"thread_id": "async-test-1"}}
result = await async_graph.ainvoke({
"query": "Async research topic",
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 5
}, config)
assert result["report"] is not None
@pytest.mark.asyncio
async def test_async_stream(self, async_graph):
"""Test async streaming of graph events."""
config = {"configurable": {"thread_id": "async-stream-test"}}
events = []
async for event in async_graph.astream({
"query": "Streaming test",
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 5
}, config, stream_mode="updates"):
events.append(event)
assert len(events) > 0, "Should receive stream events"
# Verify events contain node updates
node_names = [list(e.keys())[0] for e in events]
assert "supervisor" in node_names
@pytest.mark.asyncio
async def test_concurrent_graph_executions(self, async_graph):
"""Test multiple concurrent graph executions."""
async def run_graph(thread_id: str, query: str):
config = {"configurable": {"thread_id": thread_id}}
return await async_graph.ainvoke({
"query": query,
"documents": [],
"analysis": None,
"report": None,
"iteration": 0,
"max_iterations": 3
}, config)
# Run 5 concurrent executions
tasks = [
run_graph(f"concurrent-{i}", f"Query {i}")
for i in range(5)
]
results = await asyncio.gather(*tasks)
# All should complete successfully
assert len(results) == 5
for result in results:
assert result["report"] is not None
Test Configuration and Fixtures
"""
conftest.py - Shared pytest fixtures for LangGraph testing.
"""
import pytest
import os
from unittest.mock import patch, Mock
from langgraph.checkpoint.memory import MemorySaver
@pytest.fixture(scope="session")
def mock_all_llms():
"""
Session-scoped fixture to mock all LLM calls.
Provides fast, deterministic tests without API costs.
"""
responses = {
"research": "Found relevant documents about the topic",
"analyze": "Analysis shows three key themes emerging",
"write": "# Research Report\n\nExecutive summary of findings..."
}
def create_mock_response(content):
mock = Mock()
mock.content = content
return mock
with patch("langchain_openai.ChatOpenAI") as mock_openai, \
patch("langchain_anthropic.ChatAnthropic") as mock_anthropic:
for mock_class in [mock_openai, mock_anthropic]:
instance = Mock()
instance.invoke.return_value = create_mock_response(responses["analyze"])
mock_class.return_value = instance
yield {"openai": mock_openai, "anthropic": mock_anthropic}
@pytest.fixture
def memory_checkpointer():
"""Provide fresh in-memory checkpointer for each test."""
return MemorySaver()
@pytest.fixture
def test_config():
"""Generate unique test configuration."""
import uuid
return {
"configurable": {
"thread_id": f"test-{uuid.uuid4().hex[:8]}"
}
}
@pytest.fixture
def sample_documents():
"""Sample documents for testing."""
return [
{
"source": "arxiv",
"content": "Research paper on neural networks and their applications",
"url": "https://arxiv.org/abs/1234",
"timestamp": "2026-01-01T00:00:00Z"
},
{
"source": "news",
"content": "Breaking: New AI breakthrough announced",
"url": "https://news.example.com/ai-breakthrough",
"timestamp": "2026-01-02T00:00:00Z"
},
{
"source": "blog",
"content": "Practical guide to implementing LLM applications",
"url": "https://blog.example.com/llm-guide",
"timestamp": "2026-01-03T00:00:00Z"
}
]
@pytest.fixture
def sample_analysis():
"""Sample analysis for testing."""
return """
## Analysis Summary
### Key Themes
1. Neural network architectures are evolving
2. Practical applications are expanding
3. Industry adoption is accelerating
### Supporting Evidence
- Multiple academic papers confirm trends
- News coverage indicates market interest
### Confidence Level: High
"""
# Async fixtures
@pytest.fixture
def event_loop():
"""Create event loop for async tests."""
import asyncio
loop = asyncio.new_event_loop()
yield loop
loop.close()
Interview Questions
Q: How do you approach testing LangGraph nodes that call external LLMs?
"Mock the LLM at the class level using pytest fixtures. Create deterministic mock responses that cover success cases, edge cases, and error scenarios. This ensures fast, repeatable tests without API costs. For critical paths, also maintain a small set of integration tests with real LLMs in CI/CD."
Q: What's your strategy for testing supervisor routing logic?
"Test every conditional path independently. Create state fixtures representing each decision point: empty documents, has documents, has analysis, quality thresholds, iteration limits. Verify the supervisor returns correct next_worker for each state. Use parameterized tests to cover all routing combinations systematically."
Q: How do you test human-in-the-loop flows in LangGraph?
"Use MemorySaver checkpointer to enable state inspection. Invoke graph, verify it pauses at interrupt with correct context. Then invoke again with Command(resume=data) to continue. Test all human response paths: approve, reject, request changes. Verify state updates correctly after each response type."
Q: What test coverage metrics do you target for production LangGraph apps?
"Aim for 80%+ line coverage with focus on critical paths: supervisor routing logic (100%), error handlers (100%), state reducers (100%). Integration test coverage on all graph paths. E2E smoke tests for critical user journeys. More important than coverage percentage: test quality and edge case coverage."
Key Takeaways
| Testing Layer | Focus | Tools | Speed |
|---|---|---|---|
| Unit Tests | Individual nodes, pure functions | pytest, Mock | Fast (ms) |
| Integration | Full graph flows, state transitions | MemorySaver, fixtures | Medium (seconds) |
| E2E Tests | Real APIs, production scenarios | Staging env, real LLMs | Slow (minutes) |
Critical Testing Patterns:
- Mock LLMs and external APIs for unit tests
- Use MemorySaver for integration testing
- Test all supervisor routing paths explicitly
- Verify human-in-the-loop interrupt/resume cycles
- Test state accumulation with reducers
- Cover error recovery paths
Next: Debugging and Visualization techniques
:::