Human-in-the-Loop & Production Patterns
Human-in-the-Loop Patterns
Production AI systems rarely operate fully autonomously. From legal contract review to financial approvals to content moderation, human oversight is essential. LangGraph provides native interrupt and resume patterns that enable clean pause-resume workflows without polling or external queues.
Why Human-in-the-Loop?
| Use Case | Requirement |
|---|---|
| High-stakes decisions | Financial approvals, legal documents, medical recommendations |
| Quality assurance | Content review, translation verification, data validation |
| Compliance | Audit trails, approval chains, regulatory requirements |
| Learning | Human corrections improve future agent behavior |
| Edge cases | Agent handles 95% automatically, humans handle exceptions |
Real Scenario (January 2026): A legal AI drafting contracts needed attorney approval before sending. Native interrupt pattern: graph pauses, attorney reviews in dashboard, approves or requests changes, graph resumes. Zero polling, zero external queues.
The interrupt() Function
LangGraph 1.0.5 provides native interrupt support. When interrupt() is called, execution pauses and control returns to the caller.
from typing import TypedDict, Literal
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_openai import ChatOpenAI
# ============================================================
# STATE DEFINITION
# ============================================================
class ContractState(TypedDict):
"""State for contract drafting workflow."""
# Input
client_name: str
contract_type: str
terms: dict
# Draft
draft: str
draft_version: int
# Review
review_decision: str
reviewer_feedback: str
approved: bool
# Metadata
review_history: list[dict]
# ============================================================
# NODE IMPLEMENTATIONS
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def draft_contract(state: ContractState) -> dict:
"""Generate or revise contract draft."""
version = state.get("draft_version", 0) + 1
feedback = state.get("reviewer_feedback", "")
if feedback:
# Incorporate feedback into revision
prompt = f"""Revise this contract based on reviewer feedback.
Current draft:
{state['draft']}
Reviewer feedback:
{feedback}
Provide the revised contract."""
else:
# Initial draft
prompt = f"""Draft a {state['contract_type']} contract for {state['client_name']}.
Terms:
{state['terms']}
Provide a professional contract draft."""
response = llm.invoke(prompt)
return {
"draft": response.content,
"draft_version": version,
"reviewer_feedback": "" # Clear feedback after addressing
}
def human_review(state: ContractState) -> Command:
"""
Pause for human review using native interrupt.
The interrupt() call:
1. Checkpoints current state
2. Returns control to caller with interrupt payload
3. Waits for Command(resume=data) to continue
"""
# Create interrupt payload for the reviewer
review_request = interrupt({
"type": "contract_review",
"contract_id": f"contract-{state['client_name'].lower().replace(' ', '-')}",
"draft": state["draft"],
"version": state["draft_version"],
"client": state["client_name"],
"contract_type": state["contract_type"],
"options": ["approve", "reject", "request_changes"],
"instructions": "Please review this contract draft and provide your decision."
})
# When resumed, review_request contains the human's response
action = review_request.get("action", "reject")
feedback = review_request.get("feedback", "")
# Record review in history
review_record = {
"version": state["draft_version"],
"action": action,
"feedback": feedback,
"reviewer": review_request.get("reviewer", "unknown")
}
history = state.get("review_history", []) + [review_record]
if action == "approve":
return Command(
goto="finalize_contract",
update={
"approved": True,
"review_decision": "approved",
"review_history": history
}
)
elif action == "request_changes":
return Command(
goto="draft_contract",
update={
"reviewer_feedback": feedback,
"review_decision": "changes_requested",
"review_history": history
}
)
else: # reject
return Command(
goto=END,
update={
"approved": False,
"review_decision": "rejected",
"reviewer_feedback": feedback,
"review_history": history
}
)
def finalize_contract(state: ContractState) -> dict:
"""Finalize approved contract with signatures and metadata."""
final_draft = f"""
{state['draft']}
---
APPROVAL RECORD
Client: {state['client_name']}
Contract Type: {state['contract_type']}
Version: {state['draft_version']}
Status: APPROVED
Review History: {len(state.get('review_history', []))} reviews
---
"""
return {"draft": final_draft}
# ============================================================
# BUILD GRAPH
# ============================================================
def build_contract_workflow():
"""Build the contract review workflow."""
builder = StateGraph(ContractState)
# Add nodes
builder.add_node("draft_contract", draft_contract)
builder.add_node("human_review", human_review)
builder.add_node("finalize_contract", finalize_contract)
# Define flow
builder.add_edge(START, "draft_contract")
builder.add_edge("draft_contract", "human_review")
# human_review uses Command to route
builder.add_edge("finalize_contract", END)
return builder
# Usage
workflow = build_contract_workflow()
app = workflow.compile(checkpointer=PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
))
Resuming After Interrupt
When a graph hits an interrupt, it returns immediately with interrupt data. The client handles the human interaction, then resumes.
from langgraph.types import Command
# ============================================================
# INITIAL INVOCATION - WILL PAUSE AT INTERRUPT
# ============================================================
config = {"configurable": {"thread_id": "contract-acme-2026-001"}}
# Start the workflow
result = app.invoke(
{
"client_name": "ACME Corporation",
"contract_type": "service_agreement",
"terms": {
"duration": "12 months",
"value": "$50,000",
"payment_terms": "Net 30"
},
"draft": "",
"draft_version": 0,
"approved": False,
"review_history": []
},
config
)
# Result contains the interrupt payload
print(result)
# Output:
# {
# "type": "contract_review",
# "draft": "...",
# "version": 1,
# "options": ["approve", "reject", "request_changes"],
# ...
# }
# ============================================================
# HUMAN REVIEWS (outside the graph)
# ============================================================
# ... time passes, human reviews in UI ...
# Human decides: request_changes with feedback
# ============================================================
# RESUME WITH HUMAN DECISION
# ============================================================
resume_result = app.invoke(
Command(resume={
"action": "request_changes",
"feedback": "Please add a termination clause and clarify payment penalties.",
"reviewer": "jane.attorney@firm.com"
}),
config # Same thread_id!
)
# Graph continues: draft_contract revises based on feedback,
# then human_review triggers again
# Resume again with approval
final_result = app.invoke(
Command(resume={
"action": "approve",
"reviewer": "jane.attorney@firm.com"
}),
config
)
# Graph completes: finalize_contract runs, workflow ends
print(final_result["approved"]) # True
print(final_result["draft_version"]) # 2 (after revision)
Multiple Approval Stages
Complex workflows often require multiple human touchpoints with different reviewers.
from typing import TypedDict, Literal
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
class MultiStageState(TypedDict):
"""State for multi-stage approval workflow."""
document: str
document_type: str
# Stage tracking
current_stage: Literal["draft", "manager", "legal", "executive", "complete"]
stages_completed: list[str]
# Approvals
manager_approval: bool
legal_approval: bool
executive_approval: bool
# Feedback from each stage
stage_feedback: dict
def manager_review(state: MultiStageState) -> Command:
"""First stage: Manager review."""
decision = interrupt({
"stage": "manager_review",
"reviewer_role": "Department Manager",
"document": state["document"],
"document_type": state["document_type"],
"required_action": "Review for business accuracy and completeness",
"options": ["approve", "reject", "request_changes"],
"escalate_option": True # Can skip to executive
})
action = decision.get("action")
feedback = decision.get("feedback", "")
completed = state.get("stages_completed", []) + ["manager"]
stage_feedback = state.get("stage_feedback", {})
stage_feedback["manager"] = feedback
if action == "approve":
return Command(
goto="legal_review",
update={
"manager_approval": True,
"current_stage": "legal",
"stages_completed": completed,
"stage_feedback": stage_feedback
}
)
elif action == "escalate":
# Skip legal, go directly to executive
return Command(
goto="executive_review",
update={
"manager_approval": True,
"current_stage": "executive",
"stages_completed": completed,
"stage_feedback": stage_feedback
}
)
elif action == "request_changes":
return Command(
goto="revise_document",
update={
"current_stage": "draft",
"stage_feedback": stage_feedback
}
)
else: # reject
return Command(
goto=END,
update={
"manager_approval": False,
"stage_feedback": stage_feedback
}
)
def legal_review(state: MultiStageState) -> Command:
"""Second stage: Legal review."""
decision = interrupt({
"stage": "legal_review",
"reviewer_role": "Legal Counsel",
"document": state["document"],
"document_type": state["document_type"],
"required_action": "Review for legal compliance and risk",
"previous_approvals": state.get("stages_completed", []),
"options": ["approve", "reject", "request_changes"]
})
action = decision.get("action")
feedback = decision.get("feedback", "")
completed = state.get("stages_completed", []) + ["legal"]
stage_feedback = state.get("stage_feedback", {})
stage_feedback["legal"] = feedback
if action == "approve":
return Command(
goto="executive_review",
update={
"legal_approval": True,
"current_stage": "executive",
"stages_completed": completed,
"stage_feedback": stage_feedback
}
)
elif action == "request_changes":
return Command(
goto="revise_document",
update={
"current_stage": "draft",
"stage_feedback": stage_feedback
}
)
else:
return Command(
goto=END,
update={
"legal_approval": False,
"stage_feedback": stage_feedback
}
)
def executive_review(state: MultiStageState) -> Command:
"""Final stage: Executive approval."""
decision = interrupt({
"stage": "executive_review",
"reviewer_role": "Executive",
"document": state["document"],
"document_type": state["document_type"],
"required_action": "Final approval",
"previous_approvals": state.get("stages_completed", []),
"options": ["approve", "reject"]
})
action = decision.get("action")
completed = state.get("stages_completed", []) + ["executive"]
if action == "approve":
return Command(
goto="finalize",
update={
"executive_approval": True,
"current_stage": "complete",
"stages_completed": completed
}
)
else:
return Command(
goto=END,
update={
"executive_approval": False,
"stages_completed": completed
}
)
def revise_document(state: MultiStageState) -> dict:
"""Revise document based on feedback from any stage."""
all_feedback = state.get("stage_feedback", {})
prompt = f"""Revise this document based on feedback:
Document:
{state['document']}
Feedback from stages:
{all_feedback}
Provide the revised document."""
response = llm.invoke(prompt)
return {
"document": response.content,
"current_stage": "manager" # Start approval process again
}
def finalize(state: MultiStageState) -> dict:
"""Finalize fully approved document."""
return {
"document": f"{state['document']}\n\n[APPROVED BY ALL STAGES]"
}
Timeout and Escalation Patterns
Production systems need to handle cases where humans don't respond in time.
from datetime import datetime, timedelta
from typing import TypedDict
from langgraph.types import interrupt, Command
class TimeoutState(TypedDict):
"""State with timeout tracking."""
task: str
result: str
# Timeout tracking
interrupt_timestamp: str
timeout_hours: int
escalated: bool
escalation_reason: str
def review_with_timeout(state: TimeoutState) -> Command:
"""
Human review with automatic escalation on timeout.
Note: Timeout is checked on resume, not during wait.
External scheduler should trigger resume after timeout.
"""
timeout_hours = state.get("timeout_hours", 24)
interrupt_time = datetime.now()
decision = interrupt({
"type": "review_with_timeout",
"task": state["task"],
"current_result": state["result"],
"deadline": (interrupt_time + timedelta(hours=timeout_hours)).isoformat(),
"escalation_contact": "senior.manager@company.com",
"instructions": f"Please review within {timeout_hours} hours or task will escalate."
})
# Check if we resumed after timeout
resume_time = datetime.now()
elapsed = resume_time - interrupt_time
if elapsed > timedelta(hours=timeout_hours):
# Timeout occurred - escalate
return Command(
goto="escalate",
update={
"escalated": True,
"escalation_reason": f"Review timeout after {elapsed.total_seconds() / 3600:.1f} hours"
}
)
# Normal response within timeout
return Command(
goto="continue_workflow",
update={
"result": decision.get("result", state["result"]),
"escalated": False
}
)
def escalate(state: TimeoutState) -> dict:
"""Handle escalated tasks."""
# Notify senior manager, create ticket, etc.
return {
"result": f"ESCALATED: {state['task']} - {state['escalation_reason']}"
}
# ============================================================
# EXTERNAL TIMEOUT HANDLER
# ============================================================
async def timeout_monitor(app, thread_id: str, timeout_hours: int = 24):
"""
External service that triggers resume on timeout.
Run this as a background task or scheduled job.
"""
import asyncio
await asyncio.sleep(timeout_hours * 3600)
# Check if still waiting (not already resumed)
state = app.get_state({"configurable": {"thread_id": thread_id}})
if state.next: # Still has pending nodes (waiting at interrupt)
# Force resume with timeout indicator
app.invoke(
Command(resume={
"action": "timeout",
"auto_escalate": True
}),
{"configurable": {"thread_id": thread_id}}
)
Streaming with Interrupts
For real-time UI updates, combine streaming with interrupt handling.
from langgraph.types import interrupt, Command, StreamWriter
async def streaming_review(
state: ContractState,
writer: StreamWriter
) -> Command:
"""Stream progress updates while waiting for human review."""
# Stream status update
writer({"status": "awaiting_review", "draft": state["draft"]})
# Interrupt for human input
decision = interrupt({
"type": "review",
"draft": state["draft"]
})
# Stream decision received
writer({"status": "review_complete", "decision": decision["action"]})
if decision["action"] == "approve":
return Command(goto="finalize")
else:
return Command(goto="revise")
# Client-side streaming handler
async def handle_workflow_stream(app, config, initial_state):
"""Handle streaming workflow with interrupts."""
async for event in app.astream(initial_state, config):
if "status" in event:
print(f"Status: {event['status']}")
if event.get("type") == "interrupt":
# Workflow paused - show review UI
print(f"Review needed: {event['payload']}")
# Get human decision (from UI, API, etc.)
decision = await get_human_decision(event["payload"])
# Resume workflow
async for resume_event in app.astream(
Command(resume=decision),
config
):
print(f"Resume event: {resume_event}")
Production Pattern: Approval Dashboard Backend
Complete backend for a human approval dashboard.
from typing import TypedDict, Optional
from datetime import datetime
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
import uuid
# ============================================================
# DATABASE MODELS (pseudo-code)
# ============================================================
class ApprovalRequest:
"""Database model for pending approvals."""
id: str
thread_id: str
request_type: str
payload: dict
created_at: datetime
deadline: datetime
assigned_to: str
status: str # pending, approved, rejected, expired
# ============================================================
# APPROVAL WORKFLOW
# ============================================================
class ApprovalState(TypedDict):
"""State for approval workflow."""
request_id: str
request_type: str
payload: dict
assigned_to: str
deadline_hours: int
# Results
approved: bool
decision_notes: str
decided_by: str
decided_at: str
def create_approval_request(state: ApprovalState) -> dict:
"""Create approval request record in database."""
request_id = str(uuid.uuid4())
deadline = datetime.now() + timedelta(hours=state["deadline_hours"])
# Save to database
db.save(ApprovalRequest(
id=request_id,
thread_id=state.get("thread_id"),
request_type=state["request_type"],
payload=state["payload"],
created_at=datetime.now(),
deadline=deadline,
assigned_to=state["assigned_to"],
status="pending"
))
# Send notification
notify_user(
user=state["assigned_to"],
message=f"New approval request: {state['request_type']}",
link=f"/approvals/{request_id}"
)
return {"request_id": request_id}
def await_decision(state: ApprovalState) -> Command:
"""Interrupt and wait for human decision."""
decision = interrupt({
"request_id": state["request_id"],
"type": state["request_type"],
"payload": state["payload"],
"assigned_to": state["assigned_to"],
"actions": ["approve", "reject"]
})
# Update database record
db.update(
ApprovalRequest,
id=state["request_id"],
status="approved" if decision["approved"] else "rejected"
)
return Command(
goto="process_decision",
update={
"approved": decision["approved"],
"decision_notes": decision.get("notes", ""),
"decided_by": decision.get("user"),
"decided_at": datetime.now().isoformat()
}
)
def process_decision(state: ApprovalState) -> dict:
"""Process the approval decision."""
if state["approved"]:
# Execute approved action
execute_approved_action(state["request_type"], state["payload"])
return {"status": "completed"}
else:
# Handle rejection
handle_rejection(state["request_type"], state["payload"], state["decision_notes"])
return {"status": "rejected"}
# ============================================================
# API ENDPOINTS
# ============================================================
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app_api = FastAPI()
class ApprovalDecision(BaseModel):
approved: bool
notes: Optional[str] = ""
@app_api.get("/approvals/pending")
async def get_pending_approvals(user: str):
"""Get all pending approvals for a user."""
approvals = db.query(
ApprovalRequest,
assigned_to=user,
status="pending"
)
return [
{
"id": a.id,
"type": a.request_type,
"payload": a.payload,
"created_at": a.created_at.isoformat(),
"deadline": a.deadline.isoformat()
}
for a in approvals
]
@app_api.post("/approvals/{request_id}/decide")
async def submit_decision(request_id: str, decision: ApprovalDecision, user: str):
"""Submit approval decision."""
# Get the approval request
approval = db.get(ApprovalRequest, id=request_id)
if not approval:
raise HTTPException(404, "Approval request not found")
if approval.status != "pending":
raise HTTPException(400, f"Approval already {approval.status}")
# Resume the workflow
config = {"configurable": {"thread_id": approval.thread_id}}
try:
result = workflow_app.invoke(
Command(resume={
"approved": decision.approved,
"notes": decision.notes,
"user": user
}),
config
)
return {"status": "success", "result": result}
except Exception as e:
raise HTTPException(500, str(e))
@app_api.get("/approvals/{request_id}/status")
async def get_approval_status(request_id: str):
"""Get status of an approval request."""
approval = db.get(ApprovalRequest, id=request_id)
if not approval:
raise HTTPException(404, "Approval request not found")
return {
"id": approval.id,
"status": approval.status,
"created_at": approval.created_at.isoformat(),
"deadline": approval.deadline.isoformat()
}
Interview Questions
Q: How do you implement human-in-the-loop in LangGraph?
"Use the native
interrupt()function. When executed, it pauses the graph and returns control to the caller with the interrupt payload. The graph state is automatically checkpointed. Later, resume withCommand(resume=data)to continue execution from where it paused. The human's response is available as the return value ofinterrupt()."
Q: Why use interrupt() over simple polling or external queues?
"Interrupt is native to LangGraph's execution model. It properly checkpoints state at the exact pause point, integrates with LangGraph Platform's task queue, supports clean timeout and escalation patterns, and enables true pause-resume semantics. Unlike polling, there's no busy-waiting. Unlike external queues, state consistency is guaranteed because the checkpointer handles it."
Q: How do you handle timeouts in human-in-the-loop workflows?
"Record the interrupt timestamp in the interrupt payload, include a deadline. On resume, compare current time to the deadline. If exceeded, route to an escalation node. An external scheduler or cron job can trigger automatic resume after timeout by invoking
Command(resume=timeout_indicator). This separation keeps timeout logic explicit and testable."
Q: How do you implement multi-stage approvals?
"Chain multiple interrupt points with different roles. Each interrupt specifies the reviewer role and required action. Use state to track completed stages. Routing logic after each interrupt decides whether to proceed to next stage, request revisions, or reject. The graph structure naturally supports complex approval chains including skip-levels and parallel approvals."
Key Takeaways
- interrupt() pauses execution and returns control to caller with payload
- Command(resume=data) continues execution from pause point with human response
- Checkpointing required - interrupt only works with a configured checkpointer
- Same thread_id - resume must use the same thread_id as the initial invoke
- Multiple stages - chain interrupts for complex approval workflows
- Timeout handling - check elapsed time on resume, use external scheduler for auto-escalation
- Streaming compatible - combine with astream for real-time UI updates
- Database integration - store approval requests for dashboard/API access
:::