Human-in-the-Loop & Production Patterns

Deployment Strategies

5 min read

Taking a LangGraph application from development to production requires careful consideration of infrastructure, scaling, reliability, and operational concerns. This lesson covers the complete deployment landscape - from managed cloud platforms to self-hosted solutions across different infrastructure choices.

Real-World Context

January 2026 Production Story: A fintech startup built a document processing pipeline with 5 specialized agents. Their journey: Started with LangGraph Platform for a 2-day MVP launch. After reaching 50,000 documents/day and needing specific compliance controls, they migrated to self-hosted Kubernetes with PostgresSaver. The migration took 2 weeks but gave them data residency compliance and 40% cost reduction at scale.

The Deployment Decision Framework:

Small team, fast launch → LangGraph Platform
Compliance requirements → Self-hosted
Scale economics matter → Self-hosted Kubernetes
Hybrid needs → LangGraph Platform + self-hosted workers

LangGraph Platform (Managed Cloud)

LangGraph Platform provides a fully managed deployment environment with zero infrastructure configuration, automatic scaling, built-in persistence, and integrated observability.

Platform Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                    LangGraph Platform                            │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  │
│  │   API Gateway   │  │  Load Balancer  │  │  SSL/TLS Term   │  │
│  └────────┬────────┘  └────────┬────────┘  └────────┬────────┘  │
│           │                    │                    │            │
│  ┌────────▼────────────────────▼────────────────────▼────────┐  │
│  │              Auto-Scaling Compute Layer                    │  │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐   │  │
│  │  │ Worker 1 │  │ Worker 2 │  │ Worker 3 │  │ Worker N │   │  │
│  │  └──────────┘  └──────────┘  └──────────┘  └──────────┘   │  │
│  └────────────────────────────┬──────────────────────────────┘  │
│                               │                                  │
│  ┌────────────────────────────▼──────────────────────────────┐  │
│  │              Managed Persistence Layer                     │  │
│  │  ┌─────────────────┐      ┌─────────────────────────────┐ │  │
│  │  │  Checkpoint DB  │      │  Blob Storage (attachments) │ │  │
│  │  └─────────────────┘      └─────────────────────────────┘ │  │
│  └───────────────────────────────────────────────────────────┘  │
│                                                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │           Integrated LangSmith Observability               │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

Project Configuration

Create a langgraph.json file at your project root to configure the deployment:

{
    "graphs": {
        "research_agent": "./src/graphs/research_graph.py:app",
        "document_processor": "./src/graphs/document_graph.py:app",
        "customer_support": "./src/graphs/support_graph.py:app"
    },
    "env": ".env.production",
    "python_version": "3.11",
    "dependencies": [
        ".",
        "langchain-openai>=0.1.0",
        "langchain-anthropic>=0.1.0",
        "langgraph>=0.2.0",
        "httpx>=0.25.0",
        "pydantic>=2.0.0"
    ],
    "pip_config": {
        "extra_index_url": "https://my-private-pypi.example.com/simple/"
    }
}

Complete Deployment Workflow

# Install the LangGraph CLI
pip install langgraph-cli

# Validate configuration locally
langgraph validate

# Test locally before deployment
langgraph dev

# Deploy to LangGraph Platform
langgraph deploy --project my-research-agent

# Deploy with specific environment
langgraph deploy --project my-research-agent --env production

# Check deployment status
langgraph status --project my-research-agent

# View deployment logs
langgraph logs --project my-research-agent --follow

# Rollback to previous version if needed
langgraph rollback --project my-research-agent --version v1.2.3

Invoking Deployed Graphs

Once deployed, your graphs are accessible via a REST API:

"""
Client code for invoking LangGraph Platform deployments.
"""
import httpx
from typing import Any


class LangGraphClient:
    """Client for interacting with LangGraph Platform deployments."""

    def __init__(self, project_url: str, api_key: str):
        self.base_url = project_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.client = httpx.AsyncClient(
            base_url=self.base_url,
            headers=self.headers,
            timeout=300.0  # 5-minute timeout for long-running graphs
        )

    async def invoke(
        self,
        graph_name: str,
        input_data: dict[str, Any],
        thread_id: str,
        config: dict[str, Any] | None = None
    ) -> dict[str, Any]:
        """
        Invoke a graph synchronously and wait for the result.

        Args:
            graph_name: Name of the graph to invoke
            input_data: Input state for the graph
            thread_id: Thread ID for persistence
            config: Optional additional configuration

        Returns:
            Final state from graph execution
        """
        payload = {
            "input": input_data,
            "config": {
                "configurable": {"thread_id": thread_id},
                **(config or {})
            }
        }

        response = await self.client.post(
            f"/graphs/{graph_name}/invoke",
            json=payload
        )
        response.raise_for_status()
        return response.json()

    async def stream(
        self,
        graph_name: str,
        input_data: dict[str, Any],
        thread_id: str
    ):
        """
        Stream graph execution events.

        Yields events as they occur during graph execution.
        """
        payload = {
            "input": input_data,
            "config": {
                "configurable": {"thread_id": thread_id}
            }
        }

        async with self.client.stream(
            "POST",
            f"/graphs/{graph_name}/stream",
            json=payload
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    import json
                    event = json.loads(line[6:])
                    yield event

    async def get_state(
        self,
        graph_name: str,
        thread_id: str
    ) -> dict[str, Any]:
        """Get current state for a thread."""
        response = await self.client.get(
            f"/graphs/{graph_name}/state",
            params={"thread_id": thread_id}
        )
        response.raise_for_status()
        return response.json()

    async def update_state(
        self,
        graph_name: str,
        thread_id: str,
        values: dict[str, Any],
        as_node: str | None = None
    ) -> dict[str, Any]:
        """Update state for a thread (useful for human-in-the-loop)."""
        payload = {
            "values": values,
            "as_node": as_node
        }

        response = await self.client.post(
            f"/graphs/{graph_name}/state",
            params={"thread_id": thread_id},
            json=payload
        )
        response.raise_for_status()
        return response.json()

    async def resume(
        self,
        graph_name: str,
        thread_id: str,
        resume_data: Any
    ) -> dict[str, Any]:
        """Resume an interrupted graph with human input."""
        from langgraph.types import Command

        payload = {
            "input": Command(resume=resume_data),
            "config": {
                "configurable": {"thread_id": thread_id}
            }
        }

        response = await self.client.post(
            f"/graphs/{graph_name}/invoke",
            json=payload
        )
        response.raise_for_status()
        return response.json()


# Usage example
async def main():
    client = LangGraphClient(
        project_url="https://my-research-agent.langgraph.app",
        api_key="lgp_xxxxx"
    )

    # Invoke research graph
    result = await client.invoke(
        graph_name="research_agent",
        input_data={"query": "Latest developments in quantum computing"},
        thread_id="user-123-session-456"
    )

    print(f"Research Result: {result}")

    # Stream events for real-time updates
    async for event in client.stream(
        graph_name="research_agent",
        input_data={"query": "AI safety research 2026"},
        thread_id="user-123-session-789"
    ):
        print(f"Event: {event}")

Self-Hosted with FastAPI

For teams requiring full control over infrastructure, data residency, or cost optimization at scale, self-hosted deployment with FastAPI provides a robust foundation.

Production-Ready FastAPI Application

"""
Production FastAPI deployment for LangGraph applications.

Features:
- Connection pooling with asyncpg
- Graceful shutdown handling
- Health and readiness checks
- Request validation with Pydantic
- Structured logging
- Error handling and recovery
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
from pydantic import BaseModel, Field
from typing import Any
import asyncpg
import asyncio
import logging
import json
import os

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.types import Command

# Import your graph definitions
from src.graphs.research_graph import research_graph
from src.graphs.document_graph import document_graph

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='{"time": "%(asctime)s", "level": "%(levelname)s", "message": "%(message)s"}'
)
logger = logging.getLogger(__name__)

# Database configuration
DATABASE_URL = os.getenv("DATABASE_URL")
POOL_MIN_SIZE = int(os.getenv("POOL_MIN_SIZE", "5"))
POOL_MAX_SIZE = int(os.getenv("POOL_MAX_SIZE", "20"))

# Global connection pool
pool: asyncpg.Pool | None = None


async def init_database_pool() -> asyncpg.Pool:
    """Initialize the database connection pool with proper settings."""
    return await asyncpg.create_pool(
        DATABASE_URL,
        min_size=POOL_MIN_SIZE,
        max_size=POOL_MAX_SIZE,
        command_timeout=60,
        max_inactive_connection_lifetime=300,
    )


async def setup_checkpointer_tables(pool: asyncpg.Pool):
    """Ensure checkpoint tables exist in the database."""
    async with pool.acquire() as conn:
        # AsyncPostgresSaver will create tables automatically,
        # but we can verify connectivity here
        await conn.execute("SELECT 1")
        logger.info("Database connection verified")


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Application lifespan manager.

    Handles startup (connection pool creation) and shutdown (cleanup).
    """
    global pool

    # Startup
    logger.info("Starting application...")
    pool = await init_database_pool()
    await setup_checkpointer_tables(pool)
    logger.info(f"Database pool initialized with {POOL_MIN_SIZE}-{POOL_MAX_SIZE} connections")

    yield

    # Shutdown
    logger.info("Shutting down application...")
    if pool:
        await pool.close()
        logger.info("Database pool closed")


# Create FastAPI application
app = FastAPI(
    title="LangGraph Production API",
    version="1.0.0",
    lifespan=lifespan
)

# Configure CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=os.getenv("ALLOWED_ORIGINS", "*").split(","),
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# Request/Response Models
class InvokeRequest(BaseModel):
    """Request model for graph invocation."""
    input: dict[str, Any] = Field(..., description="Input state for the graph")
    thread_id: str = Field(..., description="Thread ID for persistence")
    config: dict[str, Any] | None = Field(None, description="Additional configuration")


class ResumeRequest(BaseModel):
    """Request model for resuming interrupted graphs."""
    thread_id: str = Field(..., description="Thread ID to resume")
    resume_data: Any = Field(..., description="Data to resume with")


class StateUpdateRequest(BaseModel):
    """Request model for state updates."""
    thread_id: str = Field(..., description="Thread ID to update")
    values: dict[str, Any] = Field(..., description="State values to update")
    as_node: str | None = Field(None, description="Node to attribute the update to")


class InvokeResponse(BaseModel):
    """Response model for graph invocation."""
    result: dict[str, Any]
    thread_id: str


class HealthResponse(BaseModel):
    """Health check response."""
    status: str
    version: str
    database: str


# Graph registry - maps names to compiled graphs
GRAPHS = {
    "research": research_graph,
    "document": document_graph,
}


def get_compiled_graph(graph_name: str, checkpointer):
    """Get a compiled graph with checkpointer attached."""
    if graph_name not in GRAPHS:
        raise HTTPException(
            status_code=404,
            detail=f"Graph '{graph_name}' not found. Available: {list(GRAPHS.keys())}"
        )
    return GRAPHS[graph_name].compile(checkpointer=checkpointer)


# Health Check Endpoints
@app.get("/health", response_model=HealthResponse)
async def health_check():
    """
    Basic health check - verifies the application is running.
    Used by load balancers for basic liveness probes.
    """
    return HealthResponse(
        status="healthy",
        version="1.0.0",
        database="not_checked"
    )


@app.get("/ready", response_model=HealthResponse)
async def readiness_check():
    """
    Readiness check - verifies all dependencies are available.
    Used by Kubernetes to determine if pod should receive traffic.
    """
    # Check database connectivity
    db_status = "error"
    try:
        async with pool.acquire() as conn:
            await conn.execute("SELECT 1")
            db_status = "connected"
    except Exception as e:
        logger.error(f"Database health check failed: {e}")
        raise HTTPException(
            status_code=503,
            detail=f"Database unavailable: {str(e)}"
        )

    return HealthResponse(
        status="ready",
        version="1.0.0",
        database=db_status
    )


# Graph Invocation Endpoints
@app.post("/graphs/{graph_name}/invoke", response_model=InvokeResponse)
async def invoke_graph(graph_name: str, request: InvokeRequest):
    """
    Invoke a graph synchronously.

    This endpoint waits for the graph to complete and returns the final state.
    For long-running graphs, consider using the streaming endpoint.
    """
    logger.info(f"Invoking graph '{graph_name}' for thread '{request.thread_id}'")

    try:
        async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
            await checkpointer.setup()  # Ensure tables exist

            compiled_graph = get_compiled_graph(graph_name, checkpointer)

            config = {
                "configurable": {
                    "thread_id": request.thread_id,
                    **(request.config.get("configurable", {}) if request.config else {})
                },
                **(request.config or {})
            }

            result = await compiled_graph.ainvoke(request.input, config)

            logger.info(f"Graph '{graph_name}' completed for thread '{request.thread_id}'")

            return InvokeResponse(
                result=result,
                thread_id=request.thread_id
            )

    except Exception as e:
        logger.error(f"Graph invocation failed: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"Graph execution failed: {str(e)}"
        )


@app.post("/graphs/{graph_name}/stream")
async def stream_graph(graph_name: str, request: InvokeRequest):
    """
    Stream graph execution events.

    Returns Server-Sent Events (SSE) for real-time updates during execution.
    """
    logger.info(f"Streaming graph '{graph_name}' for thread '{request.thread_id}'")

    async def event_generator():
        try:
            async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
                await checkpointer.setup()

                compiled_graph = get_compiled_graph(graph_name, checkpointer)

                config = {
                    "configurable": {"thread_id": request.thread_id}
                }

                async for event in compiled_graph.astream_events(
                    request.input,
                    config,
                    version="v2"
                ):
                    yield f"data: {json.dumps(event)}\n\n"

                yield "data: {\"event\": \"done\"}\n\n"

        except Exception as e:
            logger.error(f"Stream error: {e}", exc_info=True)
            yield f"data: {json.dumps({'error': str(e)})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )


@app.post("/graphs/{graph_name}/resume", response_model=InvokeResponse)
async def resume_graph(graph_name: str, request: ResumeRequest):
    """
    Resume an interrupted graph execution.

    Use this after an interrupt() call to provide human input and continue execution.
    """
    logger.info(f"Resuming graph '{graph_name}' for thread '{request.thread_id}'")

    try:
        async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
            await checkpointer.setup()

            compiled_graph = get_compiled_graph(graph_name, checkpointer)

            config = {
                "configurable": {"thread_id": request.thread_id}
            }

            # Use Command to resume with data
            result = await compiled_graph.ainvoke(
                Command(resume=request.resume_data),
                config
            )

            logger.info(f"Graph '{graph_name}' resumed for thread '{request.thread_id}'")

            return InvokeResponse(
                result=result,
                thread_id=request.thread_id
            )

    except Exception as e:
        logger.error(f"Resume failed: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"Resume failed: {str(e)}"
        )


# State Management Endpoints
@app.get("/graphs/{graph_name}/state")
async def get_graph_state(graph_name: str, thread_id: str):
    """Get the current state for a thread."""
    try:
        async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
            await checkpointer.setup()

            compiled_graph = get_compiled_graph(graph_name, checkpointer)

            config = {"configurable": {"thread_id": thread_id}}
            state = await compiled_graph.aget_state(config)

            return {
                "values": state.values,
                "next": state.next,
                "config": state.config,
                "created_at": state.created_at,
                "parent_config": state.parent_config
            }

    except Exception as e:
        logger.error(f"Get state failed: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"Failed to get state: {str(e)}"
        )


@app.post("/graphs/{graph_name}/state")
async def update_graph_state(graph_name: str, request: StateUpdateRequest):
    """Update state for a thread."""
    try:
        async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
            await checkpointer.setup()

            compiled_graph = get_compiled_graph(graph_name, checkpointer)

            config = {"configurable": {"thread_id": request.thread_id}}

            await compiled_graph.aupdate_state(
                config,
                request.values,
                as_node=request.as_node
            )

            return {"status": "updated", "thread_id": request.thread_id}

    except Exception as e:
        logger.error(f"Update state failed: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"Failed to update state: {str(e)}"
        )


@app.get("/graphs/{graph_name}/history")
async def get_graph_history(graph_name: str, thread_id: str, limit: int = 10):
    """Get state history for a thread."""
    try:
        async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
            await checkpointer.setup()

            compiled_graph = get_compiled_graph(graph_name, checkpointer)

            config = {"configurable": {"thread_id": thread_id}}

            history = []
            async for state in compiled_graph.aget_state_history(config):
                history.append({
                    "values": state.values,
                    "next": state.next,
                    "created_at": state.created_at,
                    "checkpoint_id": state.config.get("configurable", {}).get("checkpoint_id")
                })
                if len(history) >= limit:
                    break

            return {"history": history, "count": len(history)}

    except Exception as e:
        logger.error(f"Get history failed: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"Failed to get history: {str(e)}"
        )


# Run with: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

Running the FastAPI Application

# Development
uvicorn main:app --reload --host 0.0.0.0 --port 8000

# Production with multiple workers
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --loop uvloop

# With Gunicorn for more control
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000

Docker Deployment

Containerization provides consistency across environments and simplifies deployment orchestration.

Production Dockerfile

# Dockerfile
# Multi-stage build for smaller production image

# Build stage
FROM python:3.11-slim as builder

WORKDIR /app

# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
    pip install --no-cache-dir -r requirements.txt


# Production stage
FROM python:3.11-slim as production

WORKDIR /app

# Install runtime dependencies only
RUN apt-get update && apt-get install -y --no-install-recommends \
    libpq5 \
    curl \
    && rm -rf /var/lib/apt/lists/* \
    && useradd --create-home --shell /bin/bash appuser

# Copy virtual environment from builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# Copy application code
COPY --chown=appuser:appuser . .

# Switch to non-root user
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Expose port
EXPOSE 8000

# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Requirements File

# requirements.txt
langgraph>=0.2.0
langchain>=0.2.0
langchain-openai>=0.1.0
langchain-anthropic>=0.1.0
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
asyncpg>=0.29.0
pydantic>=2.5.0
httpx>=0.25.0
python-dotenv>=1.0.0
structlog>=24.0.0

Docker Compose for Full Stack

# docker-compose.yml
version: '3.8'

services:
  # LangGraph Application
  langgraph-app:
    build:
      context: .
      dockerfile: Dockerfile
      target: production
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://langgraph:langgraph_password@db:5432/langgraph
      - REDIS_URL=redis://redis:6379/0
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - LANGCHAIN_TRACING_V2=true
      - LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
      - LANGCHAIN_PROJECT=langgraph-production
      - POOL_MIN_SIZE=5
      - POOL_MAX_SIZE=20
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_healthy
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '2'
          memory: 2G
        reservations:
          cpus: '0.5'
          memory: 512M
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 3
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 10s
    networks:
      - langgraph-network

  # PostgreSQL Database
  db:
    image: postgres:16-alpine
    volumes:
      - pgdata:/var/lib/postgresql/data
      - ./init-db.sql:/docker-entrypoint-initdb.d/init.sql:ro
    environment:
      - POSTGRES_DB=langgraph
      - POSTGRES_USER=langgraph
      - POSTGRES_PASSWORD=langgraph_password
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U langgraph -d langgraph"]
      interval: 10s
      timeout: 5s
      retries: 5
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 1G
    networks:
      - langgraph-network

  # Redis for caching and rate limiting
  redis:
    image: redis:7-alpine
    volumes:
      - redisdata:/data
    command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    deploy:
      resources:
        limits:
          cpus: '0.5'
          memory: 512M
    networks:
      - langgraph-network

  # Nginx Reverse Proxy
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./ssl:/etc/nginx/ssl:ro
    depends_on:
      - langgraph-app
    networks:
      - langgraph-network

volumes:
  pgdata:
  redisdata:

networks:
  langgraph-network:
    driver: bridge

Nginx Configuration for Load Balancing

# nginx.conf
events {
    worker_connections 1024;
}

http {
    upstream langgraph {
        least_conn;
        server langgraph-app:8000 weight=1;
        keepalive 32;
    }

    server {
        listen 80;
        listen 443 ssl;
        server_name api.example.com;

        ssl_certificate /etc/nginx/ssl/cert.pem;
        ssl_certificate_key /etc/nginx/ssl/key.pem;

        # Security headers
        add_header X-Frame-Options "SAMEORIGIN" always;
        add_header X-Content-Type-Options "nosniff" always;
        add_header X-XSS-Protection "1; mode=block" always;

        # Proxy settings
        location / {
            proxy_pass http://langgraph;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;

            # Timeout for long-running graph executions
            proxy_read_timeout 300s;
            proxy_connect_timeout 60s;
            proxy_send_timeout 60s;
        }

        # SSE streaming support
        location /graphs/ {
            proxy_pass http://langgraph;
            proxy_http_version 1.1;
            proxy_set_header Connection "";
            proxy_buffering off;
            proxy_cache off;
            proxy_read_timeout 3600s;
        }

        # Health check endpoint
        location /health {
            proxy_pass http://langgraph;
            proxy_read_timeout 5s;
        }
    }
}

Docker Commands

# Build and start all services
docker-compose up -d --build

# View logs
docker-compose logs -f langgraph-app

# Scale application horizontally
docker-compose up -d --scale langgraph-app=5

# Rolling update (zero downtime)
docker-compose up -d --no-deps --build langgraph-app

# Stop all services
docker-compose down

# Stop and remove volumes (CAUTION: deletes data)
docker-compose down -v

Kubernetes Deployment

For enterprise-scale deployments, Kubernetes provides advanced orchestration, auto-scaling, and self-healing capabilities.

Complete Kubernetes Manifests

# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: langgraph
  labels:
    name: langgraph
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: langgraph-config
  namespace: langgraph
data:
  POOL_MIN_SIZE: "5"
  POOL_MAX_SIZE: "20"
  LANGCHAIN_TRACING_V2: "true"
  LANGCHAIN_PROJECT: "langgraph-production"
  LOG_LEVEL: "INFO"
---
# k8s/secrets.yaml
apiVersion: v1
kind: Secret
metadata:
  name: langgraph-secrets
  namespace: langgraph
type: Opaque
stringData:
  DATABASE_URL: postgresql://langgraph:password@postgres-service:5432/langgraph
  OPENAI_API_KEY: sk-xxxxx
  ANTHROPIC_API_KEY: sk-ant-xxxxx
  LANGCHAIN_API_KEY: lsv2_xxxxx
---
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langgraph-agent
  namespace: langgraph
  labels:
    app: langgraph-agent
    version: v1
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: langgraph-agent
  template:
    metadata:
      labels:
        app: langgraph-agent
        version: v1
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    spec:
      serviceAccountName: langgraph-sa
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000
      containers:
      - name: langgraph
        image: my-registry/langgraph-agent:v1.0.0
        imagePullPolicy: Always
        ports:
        - name: http
          containerPort: 8000
          protocol: TCP
        envFrom:
        - configMapRef:
            name: langgraph-config
        - secretRef:
            name: langgraph-secrets
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 10
          periodSeconds: 30
          timeoutSeconds: 5
          failureThreshold: 3
        readinessProbe:
          httpGet:
            path: /ready
            port: http
          initialDelaySeconds: 5
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 3
        startupProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 5
          periodSeconds: 5
          failureThreshold: 30
        volumeMounts:
        - name: tmp
          mountPath: /tmp
      volumes:
      - name: tmp
        emptyDir: {}
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchLabels:
                  app: langgraph-agent
              topologyKey: kubernetes.io/hostname
      topologySpreadConstraints:
      - maxSkew: 1
        topologyKey: topology.kubernetes.io/zone
        whenUnsatisfiable: ScheduleAnyway
        labelSelector:
          matchLabels:
            app: langgraph-agent
---
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
  name: langgraph-service
  namespace: langgraph
  labels:
    app: langgraph-agent
spec:
  type: ClusterIP
  ports:
  - port: 80
    targetPort: http
    protocol: TCP
    name: http
  selector:
    app: langgraph-agent
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: langgraph-hpa
  namespace: langgraph
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: langgraph-agent
  minReplicas: 3
  maxReplicas: 50
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: http_requests_per_second
      target:
        type: AverageValue
        averageValue: "100"
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 60
      - type: Pods
        value: 4
        periodSeconds: 60
      selectPolicy: Max
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 25
        periodSeconds: 60
---
# k8s/pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: langgraph-pdb
  namespace: langgraph
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: langgraph-agent
---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: langgraph-ingress
  namespace: langgraph
  annotations:
    kubernetes.io/ingress.class: nginx
    cert-manager.io/cluster-issuer: letsencrypt-prod
    nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-body-size: "50m"
spec:
  tls:
  - hosts:
    - api.example.com
    secretName: langgraph-tls
  rules:
  - host: api.example.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: langgraph-service
            port:
              number: 80
---
# k8s/serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: langgraph-sa
  namespace: langgraph
---
# k8s/networkpolicy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: langgraph-network-policy
  namespace: langgraph
spec:
  podSelector:
    matchLabels:
      app: langgraph-agent
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          name: ingress-nginx
    ports:
    - protocol: TCP
      port: 8000
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          name: database
    ports:
    - protocol: TCP
      port: 5432
  - to:
    - namespaceSelector: {}
    ports:
    - protocol: TCP
      port: 443  # External APIs (OpenAI, etc.)

PostgreSQL StatefulSet for Kubernetes

# k8s/postgres-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: postgres
  namespace: langgraph
spec:
  serviceName: postgres-service
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:16-alpine
        ports:
        - containerPort: 5432
        env:
        - name: POSTGRES_DB
          value: langgraph
        - name: POSTGRES_USER
          valueFrom:
            secretKeyRef:
              name: postgres-secrets
              key: username
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              name: postgres-secrets
              key: password
        - name: PGDATA
          value: /var/lib/postgresql/data/pgdata
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        livenessProbe:
          exec:
            command:
            - pg_isready
            - -U
            - langgraph
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          exec:
            command:
            - pg_isready
            - -U
            - langgraph
          initialDelaySeconds: 5
          periodSeconds: 5
  volumeClaimTemplates:
  - metadata:
      name: postgres-storage
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: fast-ssd
      resources:
        requests:
          storage: 100Gi
---
apiVersion: v1
kind: Service
metadata:
  name: postgres-service
  namespace: langgraph
spec:
  type: ClusterIP
  ports:
  - port: 5432
  selector:
    app: postgres

Kubernetes Deployment Commands

# Apply all manifests
kubectl apply -f k8s/

# Check deployment status
kubectl -n langgraph get pods -w

# View logs
kubectl -n langgraph logs -f deployment/langgraph-agent

# Scale manually
kubectl -n langgraph scale deployment/langgraph-agent --replicas=10

# Check HPA status
kubectl -n langgraph get hpa

# Rolling restart
kubectl -n langgraph rollout restart deployment/langgraph-agent

# View rollout history
kubectl -n langgraph rollout history deployment/langgraph-agent

# Rollback to previous version
kubectl -n langgraph rollout undo deployment/langgraph-agent

Production Monitoring and Alerting

Prometheus Metrics Endpoint

"""
Add Prometheus metrics to your FastAPI application.
"""
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Request
from fastapi.responses import PlainTextResponse
import time

# Define metrics
REQUEST_COUNT = Counter(
    'langgraph_requests_total',
    'Total number of requests',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'langgraph_request_latency_seconds',
    'Request latency in seconds',
    ['method', 'endpoint'],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0]
)

GRAPH_EXECUTIONS = Counter(
    'langgraph_graph_executions_total',
    'Total graph executions',
    ['graph_name', 'status']
)

GRAPH_EXECUTION_TIME = Histogram(
    'langgraph_graph_execution_seconds',
    'Graph execution time in seconds',
    ['graph_name'],
    buckets=[1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0]
)

ACTIVE_THREADS = Gauge(
    'langgraph_active_threads',
    'Number of active workflow threads'
)

DB_POOL_SIZE = Gauge(
    'langgraph_db_pool_size',
    'Database connection pool size',
    ['state']
)


# Middleware for automatic metrics
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()

    response = await call_next(request)

    latency = time.time() - start_time

    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    REQUEST_LATENCY.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(latency)

    return response


# Metrics endpoint
@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint."""
    # Update pool metrics
    if pool:
        DB_POOL_SIZE.labels(state="free").set(pool.get_idle_size())
        DB_POOL_SIZE.labels(state="used").set(pool.get_size() - pool.get_idle_size())

    return PlainTextResponse(
        generate_latest(),
        media_type="text/plain"
    )


# Instrument graph execution
async def instrumented_invoke(graph_name: str, *args, **kwargs):
    """Wrapper that adds metrics to graph invocation."""
    start_time = time.time()
    status = "success"

    try:
        result = await invoke_graph(graph_name, *args, **kwargs)
        return result
    except Exception as e:
        status = "error"
        raise
    finally:
        execution_time = time.time() - start_time
        GRAPH_EXECUTIONS.labels(graph_name=graph_name, status=status).inc()
        GRAPH_EXECUTION_TIME.labels(graph_name=graph_name).observe(execution_time)

Alerting Rules for Prometheus

# prometheus-rules.yaml
groups:
- name: langgraph-alerts
  rules:
  - alert: LangGraphHighErrorRate
    expr: |
      rate(langgraph_requests_total{status=~"5.."}[5m])
      / rate(langgraph_requests_total[5m]) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "High error rate detected"
      description: "Error rate is above 5% for the last 5 minutes"

  - alert: LangGraphSlowResponses
    expr: |
      histogram_quantile(0.95, rate(langgraph_request_latency_seconds_bucket[5m])) > 30
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Slow response times"
      description: "95th percentile latency is above 30 seconds"

  - alert: LangGraphHighMemory
    expr: |
      container_memory_usage_bytes{container="langgraph"}
      / container_spec_memory_limit_bytes{container="langgraph"} > 0.9
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "High memory usage"
      description: "Memory usage is above 90%"

  - alert: LangGraphDatabasePoolExhausted
    expr: |
      langgraph_db_pool_size{state="free"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Database connection pool exhausted"
      description: "No free database connections available"

  - alert: LangGraphPodCrashLooping
    expr: |
      rate(kube_pod_container_status_restarts_total{namespace="langgraph"}[15m]) > 0
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "Pod crash looping"
      description: "Pod {{ $labels.pod }} is restarting frequently"

Deployment Decision Matrix

Factor LangGraph Platform Self-Hosted Docker Self-Hosted K8s
Setup Time Hours Days Weeks
Infrastructure Cost Usage-based Fixed + usage Fixed + usage
Scaling Automatic Manual/compose Auto (HPA)
Data Residency Limited Full control Full control
Compliance (SOC2, HIPAA) Platform-dependent Full control Full control
Customization Limited Moderate Full
Operational Overhead None Moderate High
Best For Startups, MVPs Small teams Enterprise

Interview Questions

Q: When would you choose LangGraph Platform vs self-hosted Kubernetes?

"LangGraph Platform for rapid deployment when time-to-market matters more than infrastructure control - startups, MVPs, small teams. Self-hosted Kubernetes when you need: data residency compliance (GDPR, HIPAA), specific security requirements, cost optimization at scale (beyond ~$10K/month platform costs), or custom infrastructure integrations. The transition usually happens when monthly spend exceeds $8-10K or compliance requirements mandate data control."

Q: How do you ensure zero-downtime deployments for LangGraph applications?

"Three key strategies: First, rolling deployments with proper readiness probes - Kubernetes only routes traffic to pods that pass health checks. Second, connection draining - set terminationGracePeriodSeconds to allow in-flight requests to complete (especially important for long-running graph executions). Third, checkpoint-based recovery - if a pod dies mid-execution, another pod can resume from the last checkpoint. PodDisruptionBudgets ensure minimum availability during voluntary disruptions."

Q: What's your database strategy for high-throughput LangGraph deployments?

"AsyncPostgresSaver with connection pooling is essential - use asyncpg with pool sizes tuned to your workload (typically 5-20 connections per pod). For very high throughput, consider read replicas for state queries. Partition checkpoint tables by date for efficient cleanup. Use pg_partman for automatic partition management. Monitor connection pool exhaustion and query latency. For extreme scale, consider sharding by thread_id across multiple databases."

Q: How do you handle secrets management in production LangGraph deployments?

"Never bake secrets into images. Use Kubernetes Secrets (encrypted at rest with KMS), or better, external secret managers like HashiCorp Vault, AWS Secrets Manager, or GCP Secret Manager. The External Secrets Operator syncs external secrets to Kubernetes. Rotate API keys regularly. Use workload identity where possible - pods assume cloud IAM roles instead of using static credentials."


Key Takeaways

Deployment Option Best Use Case Key Consideration
LangGraph Platform Rapid deployment, managed infrastructure Zero operational overhead
FastAPI Self-Hosted Full control, custom requirements Connection pooling, async patterns
Docker Compose Development, small production Health checks, resource limits
Kubernetes Enterprise scale, auto-scaling HPA, PDB, proper probes
Hybrid Complex requirements Platform for simple flows, K8s for sensitive

Critical Production Checklist:

  • Async checkpointers (AsyncPostgresSaver) for non-blocking persistence
  • Connection pooling with proper sizing
  • Health and readiness probes
  • Graceful shutdown handling
  • Prometheus metrics for observability
  • PodDisruptionBudgets for availability
  • Network policies for security
  • Rolling deployments for zero downtime

Module 5 Complete - Ready for Module Quiz!

:::

Quiz

Module 5: Human-in-the-Loop & Production Patterns

Take Quiz