Human-in-the-Loop & Production Patterns
Deployment Strategies
Taking a LangGraph application from development to production requires careful consideration of infrastructure, scaling, reliability, and operational concerns. This lesson covers the complete deployment landscape - from managed cloud platforms to self-hosted solutions across different infrastructure choices.
Real-World Context
January 2026 Production Story: A fintech startup built a document processing pipeline with 5 specialized agents. Their journey: Started with LangGraph Platform for a 2-day MVP launch. After reaching 50,000 documents/day and needing specific compliance controls, they migrated to self-hosted Kubernetes with PostgresSaver. The migration took 2 weeks but gave them data residency compliance and 40% cost reduction at scale.
The Deployment Decision Framework:
Small team, fast launch → LangGraph Platform
Compliance requirements → Self-hosted
Scale economics matter → Self-hosted Kubernetes
Hybrid needs → LangGraph Platform + self-hosted workers
LangGraph Platform (Managed Cloud)
LangGraph Platform provides a fully managed deployment environment with zero infrastructure configuration, automatic scaling, built-in persistence, and integrated observability.
Platform Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ LangGraph Platform │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ API Gateway │ │ Load Balancer │ │ SSL/TLS Term │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ ┌────────▼────────────────────▼────────────────────▼────────┐ │
│ │ Auto-Scaling Compute Layer │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │ Worker N │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └────────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────▼──────────────────────────────┐ │
│ │ Managed Persistence Layer │ │
│ │ ┌─────────────────┐ ┌─────────────────────────────┐ │ │
│ │ │ Checkpoint DB │ │ Blob Storage (attachments) │ │ │
│ │ └─────────────────┘ └─────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Integrated LangSmith Observability │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Project Configuration
Create a langgraph.json file at your project root to configure the deployment:
{
"graphs": {
"research_agent": "./src/graphs/research_graph.py:app",
"document_processor": "./src/graphs/document_graph.py:app",
"customer_support": "./src/graphs/support_graph.py:app"
},
"env": ".env.production",
"python_version": "3.11",
"dependencies": [
".",
"langchain-openai>=0.1.0",
"langchain-anthropic>=0.1.0",
"langgraph>=0.2.0",
"httpx>=0.25.0",
"pydantic>=2.0.0"
],
"pip_config": {
"extra_index_url": "https://my-private-pypi.example.com/simple/"
}
}
Complete Deployment Workflow
# Install the LangGraph CLI
pip install langgraph-cli
# Validate configuration locally
langgraph validate
# Test locally before deployment
langgraph dev
# Deploy to LangGraph Platform
langgraph deploy --project my-research-agent
# Deploy with specific environment
langgraph deploy --project my-research-agent --env production
# Check deployment status
langgraph status --project my-research-agent
# View deployment logs
langgraph logs --project my-research-agent --follow
# Rollback to previous version if needed
langgraph rollback --project my-research-agent --version v1.2.3
Invoking Deployed Graphs
Once deployed, your graphs are accessible via a REST API:
"""
Client code for invoking LangGraph Platform deployments.
"""
import httpx
from typing import Any
class LangGraphClient:
"""Client for interacting with LangGraph Platform deployments."""
def __init__(self, project_url: str, api_key: str):
self.base_url = project_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.headers,
timeout=300.0 # 5-minute timeout for long-running graphs
)
async def invoke(
self,
graph_name: str,
input_data: dict[str, Any],
thread_id: str,
config: dict[str, Any] | None = None
) -> dict[str, Any]:
"""
Invoke a graph synchronously and wait for the result.
Args:
graph_name: Name of the graph to invoke
input_data: Input state for the graph
thread_id: Thread ID for persistence
config: Optional additional configuration
Returns:
Final state from graph execution
"""
payload = {
"input": input_data,
"config": {
"configurable": {"thread_id": thread_id},
**(config or {})
}
}
response = await self.client.post(
f"/graphs/{graph_name}/invoke",
json=payload
)
response.raise_for_status()
return response.json()
async def stream(
self,
graph_name: str,
input_data: dict[str, Any],
thread_id: str
):
"""
Stream graph execution events.
Yields events as they occur during graph execution.
"""
payload = {
"input": input_data,
"config": {
"configurable": {"thread_id": thread_id}
}
}
async with self.client.stream(
"POST",
f"/graphs/{graph_name}/stream",
json=payload
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
import json
event = json.loads(line[6:])
yield event
async def get_state(
self,
graph_name: str,
thread_id: str
) -> dict[str, Any]:
"""Get current state for a thread."""
response = await self.client.get(
f"/graphs/{graph_name}/state",
params={"thread_id": thread_id}
)
response.raise_for_status()
return response.json()
async def update_state(
self,
graph_name: str,
thread_id: str,
values: dict[str, Any],
as_node: str | None = None
) -> dict[str, Any]:
"""Update state for a thread (useful for human-in-the-loop)."""
payload = {
"values": values,
"as_node": as_node
}
response = await self.client.post(
f"/graphs/{graph_name}/state",
params={"thread_id": thread_id},
json=payload
)
response.raise_for_status()
return response.json()
async def resume(
self,
graph_name: str,
thread_id: str,
resume_data: Any
) -> dict[str, Any]:
"""Resume an interrupted graph with human input."""
from langgraph.types import Command
payload = {
"input": Command(resume=resume_data),
"config": {
"configurable": {"thread_id": thread_id}
}
}
response = await self.client.post(
f"/graphs/{graph_name}/invoke",
json=payload
)
response.raise_for_status()
return response.json()
# Usage example
async def main():
client = LangGraphClient(
project_url="https://my-research-agent.langgraph.app",
api_key="lgp_xxxxx"
)
# Invoke research graph
result = await client.invoke(
graph_name="research_agent",
input_data={"query": "Latest developments in quantum computing"},
thread_id="user-123-session-456"
)
print(f"Research Result: {result}")
# Stream events for real-time updates
async for event in client.stream(
graph_name="research_agent",
input_data={"query": "AI safety research 2026"},
thread_id="user-123-session-789"
):
print(f"Event: {event}")
Self-Hosted with FastAPI
For teams requiring full control over infrastructure, data residency, or cost optimization at scale, self-hosted deployment with FastAPI provides a robust foundation.
Production-Ready FastAPI Application
"""
Production FastAPI deployment for LangGraph applications.
Features:
- Connection pooling with asyncpg
- Graceful shutdown handling
- Health and readiness checks
- Request validation with Pydantic
- Structured logging
- Error handling and recovery
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
from pydantic import BaseModel, Field
from typing import Any
import asyncpg
import asyncio
import logging
import json
import os
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.types import Command
# Import your graph definitions
from src.graphs.research_graph import research_graph
from src.graphs.document_graph import document_graph
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='{"time": "%(asctime)s", "level": "%(levelname)s", "message": "%(message)s"}'
)
logger = logging.getLogger(__name__)
# Database configuration
DATABASE_URL = os.getenv("DATABASE_URL")
POOL_MIN_SIZE = int(os.getenv("POOL_MIN_SIZE", "5"))
POOL_MAX_SIZE = int(os.getenv("POOL_MAX_SIZE", "20"))
# Global connection pool
pool: asyncpg.Pool | None = None
async def init_database_pool() -> asyncpg.Pool:
"""Initialize the database connection pool with proper settings."""
return await asyncpg.create_pool(
DATABASE_URL,
min_size=POOL_MIN_SIZE,
max_size=POOL_MAX_SIZE,
command_timeout=60,
max_inactive_connection_lifetime=300,
)
async def setup_checkpointer_tables(pool: asyncpg.Pool):
"""Ensure checkpoint tables exist in the database."""
async with pool.acquire() as conn:
# AsyncPostgresSaver will create tables automatically,
# but we can verify connectivity here
await conn.execute("SELECT 1")
logger.info("Database connection verified")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Application lifespan manager.
Handles startup (connection pool creation) and shutdown (cleanup).
"""
global pool
# Startup
logger.info("Starting application...")
pool = await init_database_pool()
await setup_checkpointer_tables(pool)
logger.info(f"Database pool initialized with {POOL_MIN_SIZE}-{POOL_MAX_SIZE} connections")
yield
# Shutdown
logger.info("Shutting down application...")
if pool:
await pool.close()
logger.info("Database pool closed")
# Create FastAPI application
app = FastAPI(
title="LangGraph Production API",
version="1.0.0",
lifespan=lifespan
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("ALLOWED_ORIGINS", "*").split(","),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Request/Response Models
class InvokeRequest(BaseModel):
"""Request model for graph invocation."""
input: dict[str, Any] = Field(..., description="Input state for the graph")
thread_id: str = Field(..., description="Thread ID for persistence")
config: dict[str, Any] | None = Field(None, description="Additional configuration")
class ResumeRequest(BaseModel):
"""Request model for resuming interrupted graphs."""
thread_id: str = Field(..., description="Thread ID to resume")
resume_data: Any = Field(..., description="Data to resume with")
class StateUpdateRequest(BaseModel):
"""Request model for state updates."""
thread_id: str = Field(..., description="Thread ID to update")
values: dict[str, Any] = Field(..., description="State values to update")
as_node: str | None = Field(None, description="Node to attribute the update to")
class InvokeResponse(BaseModel):
"""Response model for graph invocation."""
result: dict[str, Any]
thread_id: str
class HealthResponse(BaseModel):
"""Health check response."""
status: str
version: str
database: str
# Graph registry - maps names to compiled graphs
GRAPHS = {
"research": research_graph,
"document": document_graph,
}
def get_compiled_graph(graph_name: str, checkpointer):
"""Get a compiled graph with checkpointer attached."""
if graph_name not in GRAPHS:
raise HTTPException(
status_code=404,
detail=f"Graph '{graph_name}' not found. Available: {list(GRAPHS.keys())}"
)
return GRAPHS[graph_name].compile(checkpointer=checkpointer)
# Health Check Endpoints
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""
Basic health check - verifies the application is running.
Used by load balancers for basic liveness probes.
"""
return HealthResponse(
status="healthy",
version="1.0.0",
database="not_checked"
)
@app.get("/ready", response_model=HealthResponse)
async def readiness_check():
"""
Readiness check - verifies all dependencies are available.
Used by Kubernetes to determine if pod should receive traffic.
"""
# Check database connectivity
db_status = "error"
try:
async with pool.acquire() as conn:
await conn.execute("SELECT 1")
db_status = "connected"
except Exception as e:
logger.error(f"Database health check failed: {e}")
raise HTTPException(
status_code=503,
detail=f"Database unavailable: {str(e)}"
)
return HealthResponse(
status="ready",
version="1.0.0",
database=db_status
)
# Graph Invocation Endpoints
@app.post("/graphs/{graph_name}/invoke", response_model=InvokeResponse)
async def invoke_graph(graph_name: str, request: InvokeRequest):
"""
Invoke a graph synchronously.
This endpoint waits for the graph to complete and returns the final state.
For long-running graphs, consider using the streaming endpoint.
"""
logger.info(f"Invoking graph '{graph_name}' for thread '{request.thread_id}'")
try:
async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
await checkpointer.setup() # Ensure tables exist
compiled_graph = get_compiled_graph(graph_name, checkpointer)
config = {
"configurable": {
"thread_id": request.thread_id,
**(request.config.get("configurable", {}) if request.config else {})
},
**(request.config or {})
}
result = await compiled_graph.ainvoke(request.input, config)
logger.info(f"Graph '{graph_name}' completed for thread '{request.thread_id}'")
return InvokeResponse(
result=result,
thread_id=request.thread_id
)
except Exception as e:
logger.error(f"Graph invocation failed: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Graph execution failed: {str(e)}"
)
@app.post("/graphs/{graph_name}/stream")
async def stream_graph(graph_name: str, request: InvokeRequest):
"""
Stream graph execution events.
Returns Server-Sent Events (SSE) for real-time updates during execution.
"""
logger.info(f"Streaming graph '{graph_name}' for thread '{request.thread_id}'")
async def event_generator():
try:
async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
await checkpointer.setup()
compiled_graph = get_compiled_graph(graph_name, checkpointer)
config = {
"configurable": {"thread_id": request.thread_id}
}
async for event in compiled_graph.astream_events(
request.input,
config,
version="v2"
):
yield f"data: {json.dumps(event)}\n\n"
yield "data: {\"event\": \"done\"}\n\n"
except Exception as e:
logger.error(f"Stream error: {e}", exc_info=True)
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@app.post("/graphs/{graph_name}/resume", response_model=InvokeResponse)
async def resume_graph(graph_name: str, request: ResumeRequest):
"""
Resume an interrupted graph execution.
Use this after an interrupt() call to provide human input and continue execution.
"""
logger.info(f"Resuming graph '{graph_name}' for thread '{request.thread_id}'")
try:
async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
await checkpointer.setup()
compiled_graph = get_compiled_graph(graph_name, checkpointer)
config = {
"configurable": {"thread_id": request.thread_id}
}
# Use Command to resume with data
result = await compiled_graph.ainvoke(
Command(resume=request.resume_data),
config
)
logger.info(f"Graph '{graph_name}' resumed for thread '{request.thread_id}'")
return InvokeResponse(
result=result,
thread_id=request.thread_id
)
except Exception as e:
logger.error(f"Resume failed: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Resume failed: {str(e)}"
)
# State Management Endpoints
@app.get("/graphs/{graph_name}/state")
async def get_graph_state(graph_name: str, thread_id: str):
"""Get the current state for a thread."""
try:
async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
await checkpointer.setup()
compiled_graph = get_compiled_graph(graph_name, checkpointer)
config = {"configurable": {"thread_id": thread_id}}
state = await compiled_graph.aget_state(config)
return {
"values": state.values,
"next": state.next,
"config": state.config,
"created_at": state.created_at,
"parent_config": state.parent_config
}
except Exception as e:
logger.error(f"Get state failed: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to get state: {str(e)}"
)
@app.post("/graphs/{graph_name}/state")
async def update_graph_state(graph_name: str, request: StateUpdateRequest):
"""Update state for a thread."""
try:
async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
await checkpointer.setup()
compiled_graph = get_compiled_graph(graph_name, checkpointer)
config = {"configurable": {"thread_id": request.thread_id}}
await compiled_graph.aupdate_state(
config,
request.values,
as_node=request.as_node
)
return {"status": "updated", "thread_id": request.thread_id}
except Exception as e:
logger.error(f"Update state failed: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to update state: {str(e)}"
)
@app.get("/graphs/{graph_name}/history")
async def get_graph_history(graph_name: str, thread_id: str, limit: int = 10):
"""Get state history for a thread."""
try:
async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
await checkpointer.setup()
compiled_graph = get_compiled_graph(graph_name, checkpointer)
config = {"configurable": {"thread_id": thread_id}}
history = []
async for state in compiled_graph.aget_state_history(config):
history.append({
"values": state.values,
"next": state.next,
"created_at": state.created_at,
"checkpoint_id": state.config.get("configurable", {}).get("checkpoint_id")
})
if len(history) >= limit:
break
return {"history": history, "count": len(history)}
except Exception as e:
logger.error(f"Get history failed: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to get history: {str(e)}"
)
# Run with: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
Running the FastAPI Application
# Development
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# Production with multiple workers
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --loop uvloop
# With Gunicorn for more control
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000
Docker Deployment
Containerization provides consistency across environments and simplifies deployment orchestration.
Production Dockerfile
# Dockerfile
# Multi-stage build for smaller production image
# Build stage
FROM python:3.11-slim as builder
WORKDIR /app
# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# Production stage
FROM python:3.11-slim as production
WORKDIR /app
# Install runtime dependencies only
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq5 \
curl \
&& rm -rf /var/lib/apt/lists/* \
&& useradd --create-home --shell /bin/bash appuser
# Copy virtual environment from builder
COPY /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copy application code
COPY . .
# Switch to non-root user
USER appuser
# Health check
HEALTHCHECK \
CMD curl -f http://localhost:8000/health || exit 1
# Expose port
EXPOSE 8000
# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
Requirements File
# requirements.txt
langgraph>=0.2.0
langchain>=0.2.0
langchain-openai>=0.1.0
langchain-anthropic>=0.1.0
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
asyncpg>=0.29.0
pydantic>=2.5.0
httpx>=0.25.0
python-dotenv>=1.0.0
structlog>=24.0.0
Docker Compose for Full Stack
# docker-compose.yml
version: '3.8'
services:
# LangGraph Application
langgraph-app:
build:
context: .
dockerfile: Dockerfile
target: production
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://langgraph:langgraph_password@db:5432/langgraph
- REDIS_URL=redis://redis:6379/0
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- LANGCHAIN_TRACING_V2=true
- LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
- LANGCHAIN_PROJECT=langgraph-production
- POOL_MIN_SIZE=5
- POOL_MAX_SIZE=20
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
deploy:
replicas: 3
resources:
limits:
cpus: '2'
memory: 2G
reservations:
cpus: '0.5'
memory: 512M
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
networks:
- langgraph-network
# PostgreSQL Database
db:
image: postgres:16-alpine
volumes:
- pgdata:/var/lib/postgresql/data
- ./init-db.sql:/docker-entrypoint-initdb.d/init.sql:ro
environment:
- POSTGRES_DB=langgraph
- POSTGRES_USER=langgraph
- POSTGRES_PASSWORD=langgraph_password
healthcheck:
test: ["CMD-SHELL", "pg_isready -U langgraph -d langgraph"]
interval: 10s
timeout: 5s
retries: 5
deploy:
resources:
limits:
cpus: '1'
memory: 1G
networks:
- langgraph-network
# Redis for caching and rate limiting
redis:
image: redis:7-alpine
volumes:
- redisdata:/data
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
deploy:
resources:
limits:
cpus: '0.5'
memory: 512M
networks:
- langgraph-network
# Nginx Reverse Proxy
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- langgraph-app
networks:
- langgraph-network
volumes:
pgdata:
redisdata:
networks:
langgraph-network:
driver: bridge
Nginx Configuration for Load Balancing
# nginx.conf
events {
worker_connections 1024;
}
http {
upstream langgraph {
least_conn;
server langgraph-app:8000 weight=1;
keepalive 32;
}
server {
listen 80;
listen 443 ssl;
server_name api.example.com;
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
# Security headers
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
# Proxy settings
location / {
proxy_pass http://langgraph;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeout for long-running graph executions
proxy_read_timeout 300s;
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
}
# SSE streaming support
location /graphs/ {
proxy_pass http://langgraph;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
}
# Health check endpoint
location /health {
proxy_pass http://langgraph;
proxy_read_timeout 5s;
}
}
}
Docker Commands
# Build and start all services
docker-compose up -d --build
# View logs
docker-compose logs -f langgraph-app
# Scale application horizontally
docker-compose up -d --scale langgraph-app=5
# Rolling update (zero downtime)
docker-compose up -d --no-deps --build langgraph-app
# Stop all services
docker-compose down
# Stop and remove volumes (CAUTION: deletes data)
docker-compose down -v
Kubernetes Deployment
For enterprise-scale deployments, Kubernetes provides advanced orchestration, auto-scaling, and self-healing capabilities.
Complete Kubernetes Manifests
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: langgraph
labels:
name: langgraph
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: langgraph-config
namespace: langgraph
data:
POOL_MIN_SIZE: "5"
POOL_MAX_SIZE: "20"
LANGCHAIN_TRACING_V2: "true"
LANGCHAIN_PROJECT: "langgraph-production"
LOG_LEVEL: "INFO"
---
# k8s/secrets.yaml
apiVersion: v1
kind: Secret
metadata:
name: langgraph-secrets
namespace: langgraph
type: Opaque
stringData:
DATABASE_URL: postgresql://langgraph:password@postgres-service:5432/langgraph
OPENAI_API_KEY: sk-xxxxx
ANTHROPIC_API_KEY: sk-ant-xxxxx
LANGCHAIN_API_KEY: lsv2_xxxxx
---
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: langgraph-agent
namespace: langgraph
labels:
app: langgraph-agent
version: v1
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: langgraph-agent
template:
metadata:
labels:
app: langgraph-agent
version: v1
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
spec:
serviceAccountName: langgraph-sa
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
containers:
- name: langgraph
image: my-registry/langgraph-agent:v1.0.0
imagePullPolicy: Always
ports:
- name: http
containerPort: 8000
protocol: TCP
envFrom:
- configMapRef:
name: langgraph-config
- secretRef:
name: langgraph-secrets
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 10
periodSeconds: 30
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: http
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
startupProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 5
periodSeconds: 5
failureThreshold: 30
volumeMounts:
- name: tmp
mountPath: /tmp
volumes:
- name: tmp
emptyDir: {}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: langgraph-agent
topologyKey: kubernetes.io/hostname
topologySpreadConstraints:
- maxSkew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: ScheduleAnyway
labelSelector:
matchLabels:
app: langgraph-agent
---
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: langgraph-service
namespace: langgraph
labels:
app: langgraph-agent
spec:
type: ClusterIP
ports:
- port: 80
targetPort: http
protocol: TCP
name: http
selector:
app: langgraph-agent
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: langgraph-hpa
namespace: langgraph
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: langgraph-agent
minReplicas: 3
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "100"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
- type: Pods
value: 4
periodSeconds: 60
selectPolicy: Max
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 25
periodSeconds: 60
---
# k8s/pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: langgraph-pdb
namespace: langgraph
spec:
minAvailable: 2
selector:
matchLabels:
app: langgraph-agent
---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: langgraph-ingress
namespace: langgraph
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-body-size: "50m"
spec:
tls:
- hosts:
- api.example.com
secretName: langgraph-tls
rules:
- host: api.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: langgraph-service
port:
number: 80
---
# k8s/serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: langgraph-sa
namespace: langgraph
---
# k8s/networkpolicy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: langgraph-network-policy
namespace: langgraph
spec:
podSelector:
matchLabels:
app: langgraph-agent
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: ingress-nginx
ports:
- protocol: TCP
port: 8000
egress:
- to:
- namespaceSelector:
matchLabels:
name: database
ports:
- protocol: TCP
port: 5432
- to:
- namespaceSelector: {}
ports:
- protocol: TCP
port: 443 # External APIs (OpenAI, etc.)
PostgreSQL StatefulSet for Kubernetes
# k8s/postgres-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres
namespace: langgraph
spec:
serviceName: postgres-service
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:16-alpine
ports:
- containerPort: 5432
env:
- name: POSTGRES_DB
value: langgraph
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: postgres-secrets
key: username
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: postgres-secrets
key: password
- name: PGDATA
value: /var/lib/postgresql/data/pgdata
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "4Gi"
cpu: "2000m"
livenessProbe:
exec:
command:
- pg_isready
- -U
- langgraph
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
exec:
command:
- pg_isready
- -U
- langgraph
initialDelaySeconds: 5
periodSeconds: 5
volumeClaimTemplates:
- metadata:
name: postgres-storage
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: fast-ssd
resources:
requests:
storage: 100Gi
---
apiVersion: v1
kind: Service
metadata:
name: postgres-service
namespace: langgraph
spec:
type: ClusterIP
ports:
- port: 5432
selector:
app: postgres
Kubernetes Deployment Commands
# Apply all manifests
kubectl apply -f k8s/
# Check deployment status
kubectl -n langgraph get pods -w
# View logs
kubectl -n langgraph logs -f deployment/langgraph-agent
# Scale manually
kubectl -n langgraph scale deployment/langgraph-agent --replicas=10
# Check HPA status
kubectl -n langgraph get hpa
# Rolling restart
kubectl -n langgraph rollout restart deployment/langgraph-agent
# View rollout history
kubectl -n langgraph rollout history deployment/langgraph-agent
# Rollback to previous version
kubectl -n langgraph rollout undo deployment/langgraph-agent
Production Monitoring and Alerting
Prometheus Metrics Endpoint
"""
Add Prometheus metrics to your FastAPI application.
"""
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Request
from fastapi.responses import PlainTextResponse
import time
# Define metrics
REQUEST_COUNT = Counter(
'langgraph_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'langgraph_request_latency_seconds',
'Request latency in seconds',
['method', 'endpoint'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0]
)
GRAPH_EXECUTIONS = Counter(
'langgraph_graph_executions_total',
'Total graph executions',
['graph_name', 'status']
)
GRAPH_EXECUTION_TIME = Histogram(
'langgraph_graph_execution_seconds',
'Graph execution time in seconds',
['graph_name'],
buckets=[1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0]
)
ACTIVE_THREADS = Gauge(
'langgraph_active_threads',
'Number of active workflow threads'
)
DB_POOL_SIZE = Gauge(
'langgraph_db_pool_size',
'Database connection pool size',
['state']
)
# Middleware for automatic metrics
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
latency = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(latency)
return response
# Metrics endpoint
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
# Update pool metrics
if pool:
DB_POOL_SIZE.labels(state="free").set(pool.get_idle_size())
DB_POOL_SIZE.labels(state="used").set(pool.get_size() - pool.get_idle_size())
return PlainTextResponse(
generate_latest(),
media_type="text/plain"
)
# Instrument graph execution
async def instrumented_invoke(graph_name: str, *args, **kwargs):
"""Wrapper that adds metrics to graph invocation."""
start_time = time.time()
status = "success"
try:
result = await invoke_graph(graph_name, *args, **kwargs)
return result
except Exception as e:
status = "error"
raise
finally:
execution_time = time.time() - start_time
GRAPH_EXECUTIONS.labels(graph_name=graph_name, status=status).inc()
GRAPH_EXECUTION_TIME.labels(graph_name=graph_name).observe(execution_time)
Alerting Rules for Prometheus
# prometheus-rules.yaml
groups:
- name: langgraph-alerts
rules:
- alert: LangGraphHighErrorRate
expr: |
rate(langgraph_requests_total{status=~"5.."}[5m])
/ rate(langgraph_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is above 5% for the last 5 minutes"
- alert: LangGraphSlowResponses
expr: |
histogram_quantile(0.95, rate(langgraph_request_latency_seconds_bucket[5m])) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "Slow response times"
description: "95th percentile latency is above 30 seconds"
- alert: LangGraphHighMemory
expr: |
container_memory_usage_bytes{container="langgraph"}
/ container_spec_memory_limit_bytes{container="langgraph"} > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is above 90%"
- alert: LangGraphDatabasePoolExhausted
expr: |
langgraph_db_pool_size{state="free"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Database connection pool exhausted"
description: "No free database connections available"
- alert: LangGraphPodCrashLooping
expr: |
rate(kube_pod_container_status_restarts_total{namespace="langgraph"}[15m]) > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Pod crash looping"
description: "Pod {{ $labels.pod }} is restarting frequently"
Deployment Decision Matrix
| Factor | LangGraph Platform | Self-Hosted Docker | Self-Hosted K8s |
|---|---|---|---|
| Setup Time | Hours | Days | Weeks |
| Infrastructure Cost | Usage-based | Fixed + usage | Fixed + usage |
| Scaling | Automatic | Manual/compose | Auto (HPA) |
| Data Residency | Limited | Full control | Full control |
| Compliance (SOC2, HIPAA) | Platform-dependent | Full control | Full control |
| Customization | Limited | Moderate | Full |
| Operational Overhead | None | Moderate | High |
| Best For | Startups, MVPs | Small teams | Enterprise |
Interview Questions
Q: When would you choose LangGraph Platform vs self-hosted Kubernetes?
"LangGraph Platform for rapid deployment when time-to-market matters more than infrastructure control - startups, MVPs, small teams. Self-hosted Kubernetes when you need: data residency compliance (GDPR, HIPAA), specific security requirements, cost optimization at scale (beyond ~$10K/month platform costs), or custom infrastructure integrations. The transition usually happens when monthly spend exceeds $8-10K or compliance requirements mandate data control."
Q: How do you ensure zero-downtime deployments for LangGraph applications?
"Three key strategies: First, rolling deployments with proper readiness probes - Kubernetes only routes traffic to pods that pass health checks. Second, connection draining - set terminationGracePeriodSeconds to allow in-flight requests to complete (especially important for long-running graph executions). Third, checkpoint-based recovery - if a pod dies mid-execution, another pod can resume from the last checkpoint. PodDisruptionBudgets ensure minimum availability during voluntary disruptions."
Q: What's your database strategy for high-throughput LangGraph deployments?
"AsyncPostgresSaver with connection pooling is essential - use asyncpg with pool sizes tuned to your workload (typically 5-20 connections per pod). For very high throughput, consider read replicas for state queries. Partition checkpoint tables by date for efficient cleanup. Use pg_partman for automatic partition management. Monitor connection pool exhaustion and query latency. For extreme scale, consider sharding by thread_id across multiple databases."
Q: How do you handle secrets management in production LangGraph deployments?
"Never bake secrets into images. Use Kubernetes Secrets (encrypted at rest with KMS), or better, external secret managers like HashiCorp Vault, AWS Secrets Manager, or GCP Secret Manager. The External Secrets Operator syncs external secrets to Kubernetes. Rotate API keys regularly. Use workload identity where possible - pods assume cloud IAM roles instead of using static credentials."
Key Takeaways
| Deployment Option | Best Use Case | Key Consideration |
|---|---|---|
| LangGraph Platform | Rapid deployment, managed infrastructure | Zero operational overhead |
| FastAPI Self-Hosted | Full control, custom requirements | Connection pooling, async patterns |
| Docker Compose | Development, small production | Health checks, resource limits |
| Kubernetes | Enterprise scale, auto-scaling | HPA, PDB, proper probes |
| Hybrid | Complex requirements | Platform for simple flows, K8s for sensitive |
Critical Production Checklist:
- Async checkpointers (AsyncPostgresSaver) for non-blocking persistence
- Connection pooling with proper sizing
- Health and readiness probes
- Graceful shutdown handling
- Prometheus metrics for observability
- PodDisruptionBudgets for availability
- Network policies for security
- Rolling deployments for zero downtime
Module 5 Complete - Ready for Module Quiz!
:::