Human-in-the-Loop & Production Patterns
LangSmith Observability
Production LangGraph applications need deep visibility into agent behavior. LangSmith provides native integration for tracing, monitoring, and debugging LangGraph workflows. This lesson covers production-grade observability patterns including custom annotations, feedback loops, and alerting.
Why Observability Matters
| Challenge | Solution |
|---|---|
| Mysterious failures | Full trace of every node, LLM call, and state transition |
| Performance issues | Latency breakdown by node, identify bottlenecks |
| Cost control | Token tracking per request, user, and feature |
| Quality assurance | User feedback linked to specific traces |
| Debugging | Replay any execution with exact inputs |
Real Scenario (January 2026): A customer service AI had mysterious failures at 3 AM. LangSmith traces revealed: token limit exceeded on long conversations. Root cause found in 10 minutes. Fixed with summarization at 80% context window. Downtime reduced from 4 hours to 15 minutes.
Automatic Tracing Setup
LangSmith traces LangGraph executions automatically when configured via environment variables.
import os
from typing import TypedDict, Annotated
import operator
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
# ============================================================
# ENABLE LANGSMITH TRACING
# ============================================================
# Set these BEFORE importing LangGraph/LangChain
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls_your_api_key_here"
os.environ["LANGCHAIN_PROJECT"] = "research-agent-production"
# Optional: Additional configuration
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com" # Default
os.environ["LANGCHAIN_TAGS"] = "production,v2.1.0" # Tags for all runs
# ============================================================
# STATE AND GRAPH DEFINITION
# ============================================================
class ResearchState(TypedDict):
"""State for research workflow."""
query: str
documents: Annotated[list[str], operator.add]
analysis: str
iteration: int
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def search_documents(state: ResearchState) -> dict:
"""Search for relevant documents."""
query = state["query"]
# Simulated search - in production, connect to vector DB
docs = [f"Document about {query} - result {i}" for i in range(3)]
return {"documents": docs}
def analyze_documents(state: ResearchState) -> dict:
"""Analyze documents with LLM."""
docs = state["documents"]
prompt = f"""Analyze these documents and provide insights:
Documents:
{docs}
Provide a comprehensive analysis."""
response = llm.invoke(prompt)
return {"analysis": response.content}
# Build graph
builder = StateGraph(ResearchState)
builder.add_node("search", search_documents)
builder.add_node("analyze", analyze_documents)
builder.add_edge(START, "search")
builder.add_edge("search", "analyze")
builder.add_edge("analyze", END)
app = builder.compile()
# ============================================================
# EXECUTE WITH AUTOMATIC TRACING
# ============================================================
# Every invoke is fully traced
result = app.invoke({"query": "AI trends 2026", "documents": [], "iteration": 0})
# The trace shows:
# - Full graph execution timeline
# - Each node's inputs/outputs
# - LLM calls with prompts/completions
# - Token usage and latency
# - State snapshots at each step
Custom Span Annotations
Add rich metadata to traces for better debugging and analysis.
from langsmith import traceable
from langsmith.run_trees import RunTree
import langsmith
# ============================================================
# BASIC @TRACEABLE DECORATOR
# ============================================================
@traceable(name="research_agent", tags=["agent", "research"])
def research_node(state: ResearchState) -> dict:
"""Research with custom tracing."""
results = search_documents(state["query"])
return {"documents": results}
@traceable(name="llm_analysis", run_type="llm")
def analyze_with_llm(documents: list[str]) -> str:
"""LLM call with proper categorization."""
# run_type="llm" categorizes this for cost tracking
return llm.invoke(f"Analyze: {documents}").content
@traceable(name="tool_search", run_type="tool")
def tool_search(query: str) -> list[str]:
"""Tool call with proper categorization."""
# run_type="tool" for tool/retrieval operations
return [f"Result for {query}"]
# ============================================================
# ADVANCED: CUSTOM METADATA AND CONTEXT
# ============================================================
@traceable(
name="advanced_research",
tags=["agent", "research", "v2"],
metadata={"version": "2.0", "model": "gpt-4o-mini"}
)
def advanced_research_node(state: ResearchState) -> dict:
"""Research with detailed metadata tracking."""
query = state["query"]
existing_docs = state.get("documents", [])
# Add runtime metadata to the trace
langsmith.set_trace_tags(
tags=[f"query_length_{len(query)}"]
)
# Search with context tracking
results = search_documents(query)
# Log custom metrics
langsmith.log_metadata({
"query": query,
"doc_count_before": len(existing_docs),
"doc_count_after": len(existing_docs) + len(results),
"search_source": "vector_db",
"iteration": state.get("iteration", 0)
})
return {"documents": results}
# ============================================================
# TRACING WITH ERROR CONTEXT
# ============================================================
@traceable(name="safe_llm_call", run_type="llm")
def safe_llm_call(prompt: str, max_retries: int = 3) -> str:
"""LLM call with error tracking."""
for attempt in range(max_retries):
try:
response = llm.invoke(prompt)
langsmith.log_metadata({
"attempt": attempt + 1,
"success": True
})
return response.content
except Exception as e:
langsmith.log_metadata({
"attempt": attempt + 1,
"error": str(e),
"error_type": type(e).__name__
})
if attempt == max_retries - 1:
raise
return "" # Should never reach here
# ============================================================
# TRACING ASYNC OPERATIONS
# ============================================================
@traceable(name="async_research", run_type="chain")
async def async_research_node(state: ResearchState) -> dict:
"""Async research with proper tracing."""
import asyncio
async def search_source(source: str) -> list[str]:
# Simulate async search
await asyncio.sleep(0.1)
return [f"Result from {source}"]
# Parallel searches are traced as child spans
results = await asyncio.gather(
search_source("web"),
search_source("database"),
search_source("archive")
)
all_docs = [doc for source_docs in results for doc in source_docs]
return {"documents": all_docs}
Production Monitoring Dashboard
Key metrics and queries for monitoring LangGraph in production.
from langsmith import Client
from datetime import datetime, timedelta
import json
client = Client()
# ============================================================
# KEY METRICS TO TRACK
# ============================================================
"""
1. LATENCY BY NODE
- p50, p95, p99 latency for each node
- Identify slow nodes and bottlenecks
- Set alerts for p95 > threshold
2. TOKEN USAGE
- Track by run, user, feature
- Cost allocation and budgeting
- Detect anomalies (sudden spikes)
3. ERROR RATES
- By node, by error type
- Automatic alerting on spikes
- Correlation with inputs
4. SUCCESS CRITERIA
- Custom evaluators for domain-specific quality
- Human feedback integration
- A/B test comparisons
5. THROUGHPUT
- Requests per minute by endpoint
- Queue depths for async processing
- Capacity planning
"""
# ============================================================
# PROGRAMMATIC DASHBOARD QUERIES
# ============================================================
def get_latency_metrics(project_name: str, hours: int = 24) -> dict:
"""Get latency metrics for a project."""
runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(hours=hours),
execution_order=1 # Top-level runs only
)
latencies = []
for run in runs:
if run.end_time and run.start_time:
latency = (run.end_time - run.start_time).total_seconds()
latencies.append(latency)
if not latencies:
return {"p50": 0, "p95": 0, "p99": 0}
latencies.sort()
n = len(latencies)
return {
"p50": latencies[int(n * 0.5)],
"p95": latencies[int(n * 0.95)],
"p99": latencies[int(n * 0.99)] if n > 100 else latencies[-1],
"count": n
}
def get_error_breakdown(project_name: str, hours: int = 24) -> dict:
"""Get error breakdown by type."""
runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(hours=hours),
error=True # Only failed runs
)
error_counts = {}
for run in runs:
error_type = run.error.split(":")[0] if run.error else "Unknown"
error_counts[error_type] = error_counts.get(error_type, 0) + 1
return error_counts
def get_token_usage(project_name: str, hours: int = 24) -> dict:
"""Get token usage summary."""
runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(hours=hours),
run_type="llm" # LLM runs only
)
total_tokens = 0
prompt_tokens = 0
completion_tokens = 0
for run in runs:
if run.extra and "token_usage" in run.extra:
usage = run.extra["token_usage"]
total_tokens += usage.get("total_tokens", 0)
prompt_tokens += usage.get("prompt_tokens", 0)
completion_tokens += usage.get("completion_tokens", 0)
return {
"total_tokens": total_tokens,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"estimated_cost_usd": total_tokens * 0.00002 # Rough estimate
}
def get_node_performance(project_name: str, hours: int = 24) -> dict:
"""Get performance breakdown by node."""
runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(hours=hours)
)
node_stats = {}
for run in runs:
node_name = run.name
if node_name not in node_stats:
node_stats[node_name] = {
"count": 0,
"total_latency": 0,
"errors": 0
}
node_stats[node_name]["count"] += 1
if run.end_time and run.start_time:
latency = (run.end_time - run.start_time).total_seconds()
node_stats[node_name]["total_latency"] += latency
if run.error:
node_stats[node_name]["errors"] += 1
# Calculate averages
for node in node_stats:
count = node_stats[node]["count"]
node_stats[node]["avg_latency"] = (
node_stats[node]["total_latency"] / count if count > 0 else 0
)
node_stats[node]["error_rate"] = (
node_stats[node]["errors"] / count if count > 0 else 0
)
return node_stats
# ============================================================
# DASHBOARD QUERY EXAMPLES (LangSmith UI)
# ============================================================
"""
LangSmith Query Language Examples:
# Find slow production runs
runs where latency > 10s and tag = 'production'
# Error analysis by node
runs where error is not null group by name
# Token usage over time
token_usage sum by project last 7 days
# Failed runs with context
runs where status = 'error' and metadata.user_id exists
# High-cost runs
runs where token_usage.total > 5000 order by token_usage.total desc
# Specific user's runs
runs where metadata.user_id = 'user_123' last 24 hours
# Compare versions
runs where tag in ['v1', 'v2'] group by tags
"""
Feedback Collection and Evaluation
Link user feedback to traces for quality monitoring.
from langsmith import Client
from langsmith.evaluation import evaluate
from typing import Optional
import uuid
client = Client()
# ============================================================
# BASIC FEEDBACK COLLECTION
# ============================================================
def collect_user_feedback(
run_id: str,
score: float,
comment: str = "",
feedback_type: str = "user_rating"
) -> None:
"""Attach user feedback to a trace."""
client.create_feedback(
run_id=run_id,
key=feedback_type,
score=score, # 0.0 to 1.0
comment=comment
)
def collect_thumbs_feedback(run_id: str, thumbs_up: bool) -> None:
"""Simple thumbs up/down feedback."""
client.create_feedback(
run_id=run_id,
key="thumbs",
score=1.0 if thumbs_up else 0.0
)
def collect_categorical_feedback(
run_id: str,
category: str,
subcategory: Optional[str] = None
) -> None:
"""Categorical feedback for classification."""
client.create_feedback(
run_id=run_id,
key="category",
value=category,
comment=subcategory
)
# ============================================================
# CAPTURING RUN ID DURING EXECUTION
# ============================================================
def execute_with_feedback_capture(app, inputs: dict, config: dict) -> tuple:
"""Execute workflow and capture run ID for feedback."""
# Method 1: Using callbacks
from langchain_core.callbacks import BaseCallbackHandler
class RunIDCapture(BaseCallbackHandler):
def __init__(self):
self.run_id = None
def on_chain_start(self, serialized, inputs, run_id, **kwargs):
if self.run_id is None: # Capture top-level run ID
self.run_id = str(run_id)
callback = RunIDCapture()
result = app.invoke(inputs, config={"callbacks": [callback]})
return result, callback.run_id
# Method 2: Using run tree context
def execute_with_run_tree(app, inputs: dict) -> tuple:
"""Execute with explicit run tree for ID capture."""
from langsmith.run_trees import RunTree
run_tree = RunTree(
name="research_workflow",
run_type="chain",
inputs=inputs
)
with run_tree:
result = app.invoke(inputs)
run_tree.end(outputs=result)
run_tree.post()
return result, str(run_tree.id)
# ============================================================
# FEEDBACK INTEGRATION IN PRODUCTION
# ============================================================
class FeedbackCollector:
"""Production feedback collection with batching."""
def __init__(self, project_name: str):
self.client = Client()
self.project_name = project_name
self.pending_feedback = []
self.batch_size = 10
def add_feedback(
self,
run_id: str,
feedback_type: str,
score: Optional[float] = None,
value: Optional[str] = None,
comment: str = ""
) -> None:
"""Add feedback to batch."""
self.pending_feedback.append({
"run_id": run_id,
"key": feedback_type,
"score": score,
"value": value,
"comment": comment
})
if len(self.pending_feedback) >= self.batch_size:
self.flush()
def flush(self) -> None:
"""Send all pending feedback."""
for feedback in self.pending_feedback:
try:
self.client.create_feedback(**feedback)
except Exception as e:
print(f"Failed to send feedback: {e}")
self.pending_feedback = []
def collect_detailed_feedback(
self,
run_id: str,
accuracy: float,
helpfulness: float,
relevance: float,
comment: str = ""
) -> None:
"""Collect multi-dimensional feedback."""
dimensions = {
"accuracy": accuracy,
"helpfulness": helpfulness,
"relevance": relevance
}
for dimension, score in dimensions.items():
self.add_feedback(
run_id=run_id,
feedback_type=dimension,
score=score
)
if comment:
self.add_feedback(
run_id=run_id,
feedback_type="comment",
value=comment
)
# ============================================================
# AUTOMATED EVALUATION
# ============================================================
def run_automated_evaluation(
project_name: str,
evaluator_name: str,
sample_size: int = 100
) -> dict:
"""Run automated evaluation on recent runs."""
# Define custom evaluator
def quality_evaluator(run, example=None):
"""Evaluate output quality."""
output = run.outputs.get("analysis", "")
# Simple quality checks
score = 1.0
if len(output) < 100:
score -= 0.3 # Too short
if "error" in output.lower():
score -= 0.2 # Contains error mention
if not any(word in output.lower() for word in ["analysis", "insight", "finding"]):
score -= 0.2 # Missing key terms
return {
"key": "quality_score",
"score": max(0, score),
"comment": f"Length: {len(output)} chars"
}
# Get recent runs
runs = list(client.list_runs(
project_name=project_name,
execution_order=1,
limit=sample_size
))
# Run evaluation
results = []
for run in runs:
eval_result = quality_evaluator(run)
results.append(eval_result)
# Optionally attach evaluation as feedback
client.create_feedback(
run_id=str(run.id),
**eval_result
)
# Aggregate results
scores = [r["score"] for r in results]
return {
"mean_score": sum(scores) / len(scores) if scores else 0,
"min_score": min(scores) if scores else 0,
"max_score": max(scores) if scores else 0,
"evaluated_count": len(results)
}
Alert Configuration
Set up production alerts for LangGraph workflows.
from langsmith import Client
from typing import Callable
import asyncio
from datetime import datetime, timedelta
# ============================================================
# ALERT DEFINITIONS (Pseudo-code for LangSmith UI configuration)
# ============================================================
"""
LANGSMITH ALERT EXAMPLES:
Alert: High Latency
- Condition: p95_latency > 30s
- Window: 5 minutes
- Action: Slack notification to #alerts-ai
Alert: Error Spike
- Condition: error_rate > 5%
- Window: 15 minutes
- Comparison: vs previous 24 hours baseline
- Action: PagerDuty incident
Alert: Cost Threshold
- Condition: daily_token_cost > $100
- Window: Rolling 24 hours
- Action: Email to team + Slack
Alert: Low Quality
- Condition: avg(user_rating) < 0.7
- Window: 1 hour
- Minimum samples: 10
- Action: Slack + create JIRA ticket
Alert: Checkpoint Bloat
- Condition: avg(checkpoint_size_mb) > 50
- Window: 1 hour
- Action: Slack to engineering
"""
# ============================================================
# CUSTOM ALERTING WITH LANGSMITH API
# ============================================================
class AlertMonitor:
"""Custom alert monitoring using LangSmith API."""
def __init__(self, project_name: str):
self.client = Client()
self.project_name = project_name
self.alert_handlers: dict[str, Callable] = {}
def register_alert(self, name: str, handler: Callable) -> None:
"""Register an alert handler."""
self.alert_handlers[name] = handler
async def check_latency_alert(
self,
threshold_seconds: float,
window_minutes: int = 5
) -> bool:
"""Check if latency exceeds threshold."""
runs = list(self.client.list_runs(
project_name=self.project_name,
start_time=datetime.now() - timedelta(minutes=window_minutes),
execution_order=1
))
if not runs:
return False
latencies = []
for run in runs:
if run.end_time and run.start_time:
latency = (run.end_time - run.start_time).total_seconds()
latencies.append(latency)
if not latencies:
return False
latencies.sort()
p95_index = int(len(latencies) * 0.95)
p95_latency = latencies[p95_index] if p95_index < len(latencies) else latencies[-1]
if p95_latency > threshold_seconds:
if "high_latency" in self.alert_handlers:
await self.alert_handlers["high_latency"](
p95_latency=p95_latency,
threshold=threshold_seconds,
sample_size=len(latencies)
)
return True
return False
async def check_error_rate_alert(
self,
threshold_percent: float,
window_minutes: int = 15
) -> bool:
"""Check if error rate exceeds threshold."""
start_time = datetime.now() - timedelta(minutes=window_minutes)
all_runs = list(self.client.list_runs(
project_name=self.project_name,
start_time=start_time,
execution_order=1
))
error_runs = list(self.client.list_runs(
project_name=self.project_name,
start_time=start_time,
execution_order=1,
error=True
))
if not all_runs:
return False
error_rate = (len(error_runs) / len(all_runs)) * 100
if error_rate > threshold_percent:
if "error_spike" in self.alert_handlers:
await self.alert_handlers["error_spike"](
error_rate=error_rate,
threshold=threshold_percent,
error_count=len(error_runs),
total_count=len(all_runs)
)
return True
return False
async def check_quality_alert(
self,
min_score: float,
feedback_key: str = "user_rating",
window_hours: int = 1,
min_samples: int = 10
) -> bool:
"""Check if quality score drops below threshold."""
runs = list(self.client.list_runs(
project_name=self.project_name,
start_time=datetime.now() - timedelta(hours=window_hours),
execution_order=1
))
scores = []
for run in runs:
feedbacks = list(self.client.list_feedback(run_ids=[str(run.id)]))
for feedback in feedbacks:
if feedback.key == feedback_key and feedback.score is not None:
scores.append(feedback.score)
if len(scores) < min_samples:
return False # Not enough data
avg_score = sum(scores) / len(scores)
if avg_score < min_score:
if "low_quality" in self.alert_handlers:
await self.alert_handlers["low_quality"](
avg_score=avg_score,
threshold=min_score,
sample_size=len(scores)
)
return True
return False
async def run_continuous_monitoring(self, interval_seconds: int = 60):
"""Run continuous monitoring loop."""
while True:
await self.check_latency_alert(threshold_seconds=30)
await self.check_error_rate_alert(threshold_percent=5)
await self.check_quality_alert(min_score=0.7)
await asyncio.sleep(interval_seconds)
# ============================================================
# INTEGRATION WITH NOTIFICATION SERVICES
# ============================================================
async def send_slack_alert(webhook_url: str, message: dict) -> None:
"""Send alert to Slack."""
import aiohttp
async with aiohttp.ClientSession() as session:
await session.post(webhook_url, json=message)
async def send_pagerduty_incident(
routing_key: str,
summary: str,
severity: str = "error"
) -> None:
"""Create PagerDuty incident."""
import aiohttp
payload = {
"routing_key": routing_key,
"event_action": "trigger",
"payload": {
"summary": summary,
"severity": severity,
"source": "langsmith-monitor"
}
}
async with aiohttp.ClientSession() as session:
await session.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload
)
# Example usage
async def setup_production_alerts():
"""Set up production alerting."""
monitor = AlertMonitor("research-agent-production")
# Register Slack handler for latency
async def handle_high_latency(**kwargs):
await send_slack_alert(
webhook_url="https://hooks.slack.com/services/...",
message={
"text": f"High latency alert: p95={kwargs['p95_latency']:.2f}s "
f"(threshold: {kwargs['threshold']}s)"
}
)
monitor.register_alert("high_latency", handle_high_latency)
# Register PagerDuty handler for errors
async def handle_error_spike(**kwargs):
await send_pagerduty_incident(
routing_key="your-routing-key",
summary=f"Error spike: {kwargs['error_rate']:.1f}% "
f"({kwargs['error_count']}/{kwargs['total_count']} runs)",
severity="error"
)
monitor.register_alert("error_spike", handle_error_spike)
# Start monitoring
await monitor.run_continuous_monitoring()
Interview Questions
Q: How does LangSmith integrate with LangGraph?
"Automatically when
LANGCHAIN_TRACING_V2is enabled. Every node execution, state update, and LLM call is traced without code changes. Custom annotations with@traceabledecorator add metadata and tags. Production dashboards show latency by node, token usage for cost tracking, and error rates. You can filter and analyze runs by any metadata field."
Q: What metrics matter for production LangGraph applications?
"Five key areas: (1) Latency p50/p95/p99 by node to identify bottlenecks, (2) Token usage per request/user/feature for cost control, (3) Error rates by node and error type for reliability, (4) Checkpoint sizes for storage planning, and (5) User feedback scores for quality. Custom evaluators add domain-specific quality metrics. All metrics should have alerts with appropriate thresholds."
Q: How do you implement feedback loops for quality improvement?
"Capture the run ID during execution using callbacks or run tree context. Expose a feedback endpoint in your API that calls
client.create_feedback()linking ratings to traces. Use multi-dimensional feedback (accuracy, helpfulness, relevance) for detailed analysis. Run automated evaluators on a schedule to detect quality regressions. Correlate feedback with input patterns to identify problem areas."
Q: How do you set up alerting for LangGraph in production?
"Use LangSmith's built-in alerting for common patterns: latency p95, error rate spikes, cost thresholds. For custom alerts, poll the LangSmith API periodically and check conditions programmatically. Integrate with Slack, PagerDuty, or OpsGenie for notifications. Set appropriate thresholds based on baseline performance--avoid alert fatigue by using proper windowing and minimum sample requirements."
Key Takeaways
- Automatic tracing via environment variables - no code changes needed
- @traceable decorator adds metadata, tags, and run types to traces
- Run types (llm, chain, tool) categorize operations for proper analysis
- Programmatic queries using LangSmith Client for custom dashboards
- Feedback collection links user ratings to specific traces
- Multi-dimensional feedback (accuracy, helpfulness, relevance) for detailed quality tracking
- Automated evaluation runs quality checks on production traffic
- Custom alerting using LangSmith API with notification integrations
:::