Human-in-the-Loop & Production Patterns

Human-in-the-Loop Patterns

5 min read

Production AI systems rarely operate fully autonomously. From legal contract review to financial approvals to content moderation, human oversight is essential. LangGraph provides native interrupt and resume patterns that enable clean pause-resume workflows without polling or external queues.


Why Human-in-the-Loop?

Use Case Requirement
High-stakes decisions Financial approvals, legal documents, medical recommendations
Quality assurance Content review, translation verification, data validation
Compliance Audit trails, approval chains, regulatory requirements
Learning Human corrections improve future agent behavior
Edge cases Agent handles 95% automatically, humans handle exceptions

Real Scenario (January 2026): A legal AI drafting contracts needed attorney approval before sending. Native interrupt pattern: graph pauses, attorney reviews in dashboard, approves or requests changes, graph resumes. Zero polling, zero external queues.


The interrupt() Function

LangGraph 1.0.5 provides native interrupt support. When interrupt() is called, execution pauses and control returns to the caller.

from typing import TypedDict, Literal
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_openai import ChatOpenAI


# ============================================================
# STATE DEFINITION
# ============================================================

class ContractState(TypedDict):
    """State for contract drafting workflow."""
    # Input
    client_name: str
    contract_type: str
    terms: dict

    # Draft
    draft: str
    draft_version: int

    # Review
    review_decision: str
    reviewer_feedback: str
    approved: bool

    # Metadata
    review_history: list[dict]


# ============================================================
# NODE IMPLEMENTATIONS
# ============================================================

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)


def draft_contract(state: ContractState) -> dict:
    """Generate or revise contract draft."""
    version = state.get("draft_version", 0) + 1
    feedback = state.get("reviewer_feedback", "")

    if feedback:
        # Incorporate feedback into revision
        prompt = f"""Revise this contract based on reviewer feedback.

Current draft:
{state['draft']}

Reviewer feedback:
{feedback}

Provide the revised contract."""
    else:
        # Initial draft
        prompt = f"""Draft a {state['contract_type']} contract for {state['client_name']}.

Terms:
{state['terms']}

Provide a professional contract draft."""

    response = llm.invoke(prompt)

    return {
        "draft": response.content,
        "draft_version": version,
        "reviewer_feedback": ""  # Clear feedback after addressing
    }


def human_review(state: ContractState) -> Command:
    """
    Pause for human review using native interrupt.

    The interrupt() call:
    1. Checkpoints current state
    2. Returns control to caller with interrupt payload
    3. Waits for Command(resume=data) to continue
    """
    # Create interrupt payload for the reviewer
    review_request = interrupt({
        "type": "contract_review",
        "contract_id": f"contract-{state['client_name'].lower().replace(' ', '-')}",
        "draft": state["draft"],
        "version": state["draft_version"],
        "client": state["client_name"],
        "contract_type": state["contract_type"],
        "options": ["approve", "reject", "request_changes"],
        "instructions": "Please review this contract draft and provide your decision."
    })

    # When resumed, review_request contains the human's response
    action = review_request.get("action", "reject")
    feedback = review_request.get("feedback", "")

    # Record review in history
    review_record = {
        "version": state["draft_version"],
        "action": action,
        "feedback": feedback,
        "reviewer": review_request.get("reviewer", "unknown")
    }

    history = state.get("review_history", []) + [review_record]

    if action == "approve":
        return Command(
            goto="finalize_contract",
            update={
                "approved": True,
                "review_decision": "approved",
                "review_history": history
            }
        )
    elif action == "request_changes":
        return Command(
            goto="draft_contract",
            update={
                "reviewer_feedback": feedback,
                "review_decision": "changes_requested",
                "review_history": history
            }
        )
    else:  # reject
        return Command(
            goto=END,
            update={
                "approved": False,
                "review_decision": "rejected",
                "reviewer_feedback": feedback,
                "review_history": history
            }
        )


def finalize_contract(state: ContractState) -> dict:
    """Finalize approved contract with signatures and metadata."""
    final_draft = f"""
{state['draft']}

---
APPROVAL RECORD
Client: {state['client_name']}
Contract Type: {state['contract_type']}
Version: {state['draft_version']}
Status: APPROVED
Review History: {len(state.get('review_history', []))} reviews
---
"""
    return {"draft": final_draft}


# ============================================================
# BUILD GRAPH
# ============================================================

def build_contract_workflow():
    """Build the contract review workflow."""
    builder = StateGraph(ContractState)

    # Add nodes
    builder.add_node("draft_contract", draft_contract)
    builder.add_node("human_review", human_review)
    builder.add_node("finalize_contract", finalize_contract)

    # Define flow
    builder.add_edge(START, "draft_contract")
    builder.add_edge("draft_contract", "human_review")
    # human_review uses Command to route
    builder.add_edge("finalize_contract", END)

    return builder


# Usage
workflow = build_contract_workflow()
app = workflow.compile(checkpointer=PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/langgraph"
))

Resuming After Interrupt

When a graph hits an interrupt, it returns immediately with interrupt data. The client handles the human interaction, then resumes.

from langgraph.types import Command

# ============================================================
# INITIAL INVOCATION - WILL PAUSE AT INTERRUPT
# ============================================================

config = {"configurable": {"thread_id": "contract-acme-2026-001"}}

# Start the workflow
result = app.invoke(
    {
        "client_name": "ACME Corporation",
        "contract_type": "service_agreement",
        "terms": {
            "duration": "12 months",
            "value": "$50,000",
            "payment_terms": "Net 30"
        },
        "draft": "",
        "draft_version": 0,
        "approved": False,
        "review_history": []
    },
    config
)

# Result contains the interrupt payload
print(result)
# Output:
# {
#     "type": "contract_review",
#     "draft": "...",
#     "version": 1,
#     "options": ["approve", "reject", "request_changes"],
#     ...
# }


# ============================================================
# HUMAN REVIEWS (outside the graph)
# ============================================================

# ... time passes, human reviews in UI ...
# Human decides: request_changes with feedback


# ============================================================
# RESUME WITH HUMAN DECISION
# ============================================================

resume_result = app.invoke(
    Command(resume={
        "action": "request_changes",
        "feedback": "Please add a termination clause and clarify payment penalties.",
        "reviewer": "jane.attorney@firm.com"
    }),
    config  # Same thread_id!
)

# Graph continues: draft_contract revises based on feedback,
# then human_review triggers again

# Resume again with approval
final_result = app.invoke(
    Command(resume={
        "action": "approve",
        "reviewer": "jane.attorney@firm.com"
    }),
    config
)

# Graph completes: finalize_contract runs, workflow ends
print(final_result["approved"])  # True
print(final_result["draft_version"])  # 2 (after revision)

Multiple Approval Stages

Complex workflows often require multiple human touchpoints with different reviewers.

from typing import TypedDict, Literal
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END


class MultiStageState(TypedDict):
    """State for multi-stage approval workflow."""
    document: str
    document_type: str

    # Stage tracking
    current_stage: Literal["draft", "manager", "legal", "executive", "complete"]
    stages_completed: list[str]

    # Approvals
    manager_approval: bool
    legal_approval: bool
    executive_approval: bool

    # Feedback from each stage
    stage_feedback: dict


def manager_review(state: MultiStageState) -> Command:
    """First stage: Manager review."""
    decision = interrupt({
        "stage": "manager_review",
        "reviewer_role": "Department Manager",
        "document": state["document"],
        "document_type": state["document_type"],
        "required_action": "Review for business accuracy and completeness",
        "options": ["approve", "reject", "request_changes"],
        "escalate_option": True  # Can skip to executive
    })

    action = decision.get("action")
    feedback = decision.get("feedback", "")

    completed = state.get("stages_completed", []) + ["manager"]
    stage_feedback = state.get("stage_feedback", {})
    stage_feedback["manager"] = feedback

    if action == "approve":
        return Command(
            goto="legal_review",
            update={
                "manager_approval": True,
                "current_stage": "legal",
                "stages_completed": completed,
                "stage_feedback": stage_feedback
            }
        )
    elif action == "escalate":
        # Skip legal, go directly to executive
        return Command(
            goto="executive_review",
            update={
                "manager_approval": True,
                "current_stage": "executive",
                "stages_completed": completed,
                "stage_feedback": stage_feedback
            }
        )
    elif action == "request_changes":
        return Command(
            goto="revise_document",
            update={
                "current_stage": "draft",
                "stage_feedback": stage_feedback
            }
        )
    else:  # reject
        return Command(
            goto=END,
            update={
                "manager_approval": False,
                "stage_feedback": stage_feedback
            }
        )


def legal_review(state: MultiStageState) -> Command:
    """Second stage: Legal review."""
    decision = interrupt({
        "stage": "legal_review",
        "reviewer_role": "Legal Counsel",
        "document": state["document"],
        "document_type": state["document_type"],
        "required_action": "Review for legal compliance and risk",
        "previous_approvals": state.get("stages_completed", []),
        "options": ["approve", "reject", "request_changes"]
    })

    action = decision.get("action")
    feedback = decision.get("feedback", "")

    completed = state.get("stages_completed", []) + ["legal"]
    stage_feedback = state.get("stage_feedback", {})
    stage_feedback["legal"] = feedback

    if action == "approve":
        return Command(
            goto="executive_review",
            update={
                "legal_approval": True,
                "current_stage": "executive",
                "stages_completed": completed,
                "stage_feedback": stage_feedback
            }
        )
    elif action == "request_changes":
        return Command(
            goto="revise_document",
            update={
                "current_stage": "draft",
                "stage_feedback": stage_feedback
            }
        )
    else:
        return Command(
            goto=END,
            update={
                "legal_approval": False,
                "stage_feedback": stage_feedback
            }
        )


def executive_review(state: MultiStageState) -> Command:
    """Final stage: Executive approval."""
    decision = interrupt({
        "stage": "executive_review",
        "reviewer_role": "Executive",
        "document": state["document"],
        "document_type": state["document_type"],
        "required_action": "Final approval",
        "previous_approvals": state.get("stages_completed", []),
        "options": ["approve", "reject"]
    })

    action = decision.get("action")

    completed = state.get("stages_completed", []) + ["executive"]

    if action == "approve":
        return Command(
            goto="finalize",
            update={
                "executive_approval": True,
                "current_stage": "complete",
                "stages_completed": completed
            }
        )
    else:
        return Command(
            goto=END,
            update={
                "executive_approval": False,
                "stages_completed": completed
            }
        )


def revise_document(state: MultiStageState) -> dict:
    """Revise document based on feedback from any stage."""
    all_feedback = state.get("stage_feedback", {})

    prompt = f"""Revise this document based on feedback:

Document:
{state['document']}

Feedback from stages:
{all_feedback}

Provide the revised document."""

    response = llm.invoke(prompt)

    return {
        "document": response.content,
        "current_stage": "manager"  # Start approval process again
    }


def finalize(state: MultiStageState) -> dict:
    """Finalize fully approved document."""
    return {
        "document": f"{state['document']}\n\n[APPROVED BY ALL STAGES]"
    }

Timeout and Escalation Patterns

Production systems need to handle cases where humans don't respond in time.

from datetime import datetime, timedelta
from typing import TypedDict
from langgraph.types import interrupt, Command


class TimeoutState(TypedDict):
    """State with timeout tracking."""
    task: str
    result: str

    # Timeout tracking
    interrupt_timestamp: str
    timeout_hours: int
    escalated: bool
    escalation_reason: str


def review_with_timeout(state: TimeoutState) -> Command:
    """
    Human review with automatic escalation on timeout.

    Note: Timeout is checked on resume, not during wait.
    External scheduler should trigger resume after timeout.
    """
    timeout_hours = state.get("timeout_hours", 24)
    interrupt_time = datetime.now()

    decision = interrupt({
        "type": "review_with_timeout",
        "task": state["task"],
        "current_result": state["result"],
        "deadline": (interrupt_time + timedelta(hours=timeout_hours)).isoformat(),
        "escalation_contact": "senior.manager@company.com",
        "instructions": f"Please review within {timeout_hours} hours or task will escalate."
    })

    # Check if we resumed after timeout
    resume_time = datetime.now()
    elapsed = resume_time - interrupt_time

    if elapsed > timedelta(hours=timeout_hours):
        # Timeout occurred - escalate
        return Command(
            goto="escalate",
            update={
                "escalated": True,
                "escalation_reason": f"Review timeout after {elapsed.total_seconds() / 3600:.1f} hours"
            }
        )

    # Normal response within timeout
    return Command(
        goto="continue_workflow",
        update={
            "result": decision.get("result", state["result"]),
            "escalated": False
        }
    )


def escalate(state: TimeoutState) -> dict:
    """Handle escalated tasks."""
    # Notify senior manager, create ticket, etc.
    return {
        "result": f"ESCALATED: {state['task']} - {state['escalation_reason']}"
    }


# ============================================================
# EXTERNAL TIMEOUT HANDLER
# ============================================================

async def timeout_monitor(app, thread_id: str, timeout_hours: int = 24):
    """
    External service that triggers resume on timeout.
    Run this as a background task or scheduled job.
    """
    import asyncio

    await asyncio.sleep(timeout_hours * 3600)

    # Check if still waiting (not already resumed)
    state = app.get_state({"configurable": {"thread_id": thread_id}})

    if state.next:  # Still has pending nodes (waiting at interrupt)
        # Force resume with timeout indicator
        app.invoke(
            Command(resume={
                "action": "timeout",
                "auto_escalate": True
            }),
            {"configurable": {"thread_id": thread_id}}
        )

Streaming with Interrupts

For real-time UI updates, combine streaming with interrupt handling.

from langgraph.types import interrupt, Command, StreamWriter


async def streaming_review(
    state: ContractState,
    writer: StreamWriter
) -> Command:
    """Stream progress updates while waiting for human review."""
    # Stream status update
    writer({"status": "awaiting_review", "draft": state["draft"]})

    # Interrupt for human input
    decision = interrupt({
        "type": "review",
        "draft": state["draft"]
    })

    # Stream decision received
    writer({"status": "review_complete", "decision": decision["action"]})

    if decision["action"] == "approve":
        return Command(goto="finalize")
    else:
        return Command(goto="revise")


# Client-side streaming handler
async def handle_workflow_stream(app, config, initial_state):
    """Handle streaming workflow with interrupts."""
    async for event in app.astream(initial_state, config):
        if "status" in event:
            print(f"Status: {event['status']}")

        if event.get("type") == "interrupt":
            # Workflow paused - show review UI
            print(f"Review needed: {event['payload']}")

            # Get human decision (from UI, API, etc.)
            decision = await get_human_decision(event["payload"])

            # Resume workflow
            async for resume_event in app.astream(
                Command(resume=decision),
                config
            ):
                print(f"Resume event: {resume_event}")

Production Pattern: Approval Dashboard Backend

Complete backend for a human approval dashboard.

from typing import TypedDict, Optional
from datetime import datetime
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
import uuid


# ============================================================
# DATABASE MODELS (pseudo-code)
# ============================================================

class ApprovalRequest:
    """Database model for pending approvals."""
    id: str
    thread_id: str
    request_type: str
    payload: dict
    created_at: datetime
    deadline: datetime
    assigned_to: str
    status: str  # pending, approved, rejected, expired


# ============================================================
# APPROVAL WORKFLOW
# ============================================================

class ApprovalState(TypedDict):
    """State for approval workflow."""
    request_id: str
    request_type: str
    payload: dict
    assigned_to: str
    deadline_hours: int

    # Results
    approved: bool
    decision_notes: str
    decided_by: str
    decided_at: str


def create_approval_request(state: ApprovalState) -> dict:
    """Create approval request record in database."""
    request_id = str(uuid.uuid4())
    deadline = datetime.now() + timedelta(hours=state["deadline_hours"])

    # Save to database
    db.save(ApprovalRequest(
        id=request_id,
        thread_id=state.get("thread_id"),
        request_type=state["request_type"],
        payload=state["payload"],
        created_at=datetime.now(),
        deadline=deadline,
        assigned_to=state["assigned_to"],
        status="pending"
    ))

    # Send notification
    notify_user(
        user=state["assigned_to"],
        message=f"New approval request: {state['request_type']}",
        link=f"/approvals/{request_id}"
    )

    return {"request_id": request_id}


def await_decision(state: ApprovalState) -> Command:
    """Interrupt and wait for human decision."""
    decision = interrupt({
        "request_id": state["request_id"],
        "type": state["request_type"],
        "payload": state["payload"],
        "assigned_to": state["assigned_to"],
        "actions": ["approve", "reject"]
    })

    # Update database record
    db.update(
        ApprovalRequest,
        id=state["request_id"],
        status="approved" if decision["approved"] else "rejected"
    )

    return Command(
        goto="process_decision",
        update={
            "approved": decision["approved"],
            "decision_notes": decision.get("notes", ""),
            "decided_by": decision.get("user"),
            "decided_at": datetime.now().isoformat()
        }
    )


def process_decision(state: ApprovalState) -> dict:
    """Process the approval decision."""
    if state["approved"]:
        # Execute approved action
        execute_approved_action(state["request_type"], state["payload"])
        return {"status": "completed"}
    else:
        # Handle rejection
        handle_rejection(state["request_type"], state["payload"], state["decision_notes"])
        return {"status": "rejected"}


# ============================================================
# API ENDPOINTS
# ============================================================

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app_api = FastAPI()


class ApprovalDecision(BaseModel):
    approved: bool
    notes: Optional[str] = ""


@app_api.get("/approvals/pending")
async def get_pending_approvals(user: str):
    """Get all pending approvals for a user."""
    approvals = db.query(
        ApprovalRequest,
        assigned_to=user,
        status="pending"
    )
    return [
        {
            "id": a.id,
            "type": a.request_type,
            "payload": a.payload,
            "created_at": a.created_at.isoformat(),
            "deadline": a.deadline.isoformat()
        }
        for a in approvals
    ]


@app_api.post("/approvals/{request_id}/decide")
async def submit_decision(request_id: str, decision: ApprovalDecision, user: str):
    """Submit approval decision."""
    # Get the approval request
    approval = db.get(ApprovalRequest, id=request_id)
    if not approval:
        raise HTTPException(404, "Approval request not found")

    if approval.status != "pending":
        raise HTTPException(400, f"Approval already {approval.status}")

    # Resume the workflow
    config = {"configurable": {"thread_id": approval.thread_id}}

    try:
        result = workflow_app.invoke(
            Command(resume={
                "approved": decision.approved,
                "notes": decision.notes,
                "user": user
            }),
            config
        )
        return {"status": "success", "result": result}
    except Exception as e:
        raise HTTPException(500, str(e))


@app_api.get("/approvals/{request_id}/status")
async def get_approval_status(request_id: str):
    """Get status of an approval request."""
    approval = db.get(ApprovalRequest, id=request_id)
    if not approval:
        raise HTTPException(404, "Approval request not found")

    return {
        "id": approval.id,
        "status": approval.status,
        "created_at": approval.created_at.isoformat(),
        "deadline": approval.deadline.isoformat()
    }

Interview Questions

Q: How do you implement human-in-the-loop in LangGraph?

"Use the native interrupt() function. When executed, it pauses the graph and returns control to the caller with the interrupt payload. The graph state is automatically checkpointed. Later, resume with Command(resume=data) to continue execution from where it paused. The human's response is available as the return value of interrupt()."

Q: Why use interrupt() over simple polling or external queues?

"Interrupt is native to LangGraph's execution model. It properly checkpoints state at the exact pause point, integrates with LangGraph Platform's task queue, supports clean timeout and escalation patterns, and enables true pause-resume semantics. Unlike polling, there's no busy-waiting. Unlike external queues, state consistency is guaranteed because the checkpointer handles it."

Q: How do you handle timeouts in human-in-the-loop workflows?

"Record the interrupt timestamp in the interrupt payload, include a deadline. On resume, compare current time to the deadline. If exceeded, route to an escalation node. An external scheduler or cron job can trigger automatic resume after timeout by invoking Command(resume=timeout_indicator). This separation keeps timeout logic explicit and testable."

Q: How do you implement multi-stage approvals?

"Chain multiple interrupt points with different roles. Each interrupt specifies the reviewer role and required action. Use state to track completed stages. Routing logic after each interrupt decides whether to proceed to next stage, request revisions, or reject. The graph structure naturally supports complex approval chains including skip-levels and parallel approvals."


Key Takeaways

  • interrupt() pauses execution and returns control to caller with payload
  • Command(resume=data) continues execution from pause point with human response
  • Checkpointing required - interrupt only works with a configured checkpointer
  • Same thread_id - resume must use the same thread_id as the initial invoke
  • Multiple stages - chain interrupts for complex approval workflows
  • Timeout handling - check elapsed time on resume, use external scheduler for auto-escalation
  • Streaming compatible - combine with astream for real-time UI updates
  • Database integration - store approval requests for dashboard/API access

:::

Quiz

Module 5: Human-in-the-Loop & Production Patterns

Take Quiz