Human-in-the-Loop & Production Patterns

LangSmith Observability

4 min read

Production LangGraph applications need deep visibility into agent behavior. LangSmith provides native integration for tracing, monitoring, and debugging LangGraph workflows. This lesson covers production-grade observability patterns including custom annotations, feedback loops, and alerting.


Why Observability Matters

Challenge Solution
Mysterious failures Full trace of every node, LLM call, and state transition
Performance issues Latency breakdown by node, identify bottlenecks
Cost control Token tracking per request, user, and feature
Quality assurance User feedback linked to specific traces
Debugging Replay any execution with exact inputs

Real Scenario (January 2026): A customer service AI had mysterious failures at 3 AM. LangSmith traces revealed: token limit exceeded on long conversations. Root cause found in 10 minutes. Fixed with summarization at 80% context window. Downtime reduced from 4 hours to 15 minutes.


Automatic Tracing Setup

LangSmith traces LangGraph executions automatically when configured via environment variables.

import os
from typing import TypedDict, Annotated
import operator
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

# ============================================================
# ENABLE LANGSMITH TRACING
# ============================================================

# Set these BEFORE importing LangGraph/LangChain
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls_your_api_key_here"
os.environ["LANGCHAIN_PROJECT"] = "research-agent-production"

# Optional: Additional configuration
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"  # Default
os.environ["LANGCHAIN_TAGS"] = "production,v2.1.0"  # Tags for all runs


# ============================================================
# STATE AND GRAPH DEFINITION
# ============================================================

class ResearchState(TypedDict):
    """State for research workflow."""
    query: str
    documents: Annotated[list[str], operator.add]
    analysis: str
    iteration: int


llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)


def search_documents(state: ResearchState) -> dict:
    """Search for relevant documents."""
    query = state["query"]
    # Simulated search - in production, connect to vector DB
    docs = [f"Document about {query} - result {i}" for i in range(3)]
    return {"documents": docs}


def analyze_documents(state: ResearchState) -> dict:
    """Analyze documents with LLM."""
    docs = state["documents"]

    prompt = f"""Analyze these documents and provide insights:

Documents:
{docs}

Provide a comprehensive analysis."""

    response = llm.invoke(prompt)
    return {"analysis": response.content}


# Build graph
builder = StateGraph(ResearchState)
builder.add_node("search", search_documents)
builder.add_node("analyze", analyze_documents)
builder.add_edge(START, "search")
builder.add_edge("search", "analyze")
builder.add_edge("analyze", END)

app = builder.compile()

# ============================================================
# EXECUTE WITH AUTOMATIC TRACING
# ============================================================

# Every invoke is fully traced
result = app.invoke({"query": "AI trends 2026", "documents": [], "iteration": 0})

# The trace shows:
# - Full graph execution timeline
# - Each node's inputs/outputs
# - LLM calls with prompts/completions
# - Token usage and latency
# - State snapshots at each step

Custom Span Annotations

Add rich metadata to traces for better debugging and analysis.

from langsmith import traceable
from langsmith.run_trees import RunTree
import langsmith


# ============================================================
# BASIC @TRACEABLE DECORATOR
# ============================================================

@traceable(name="research_agent", tags=["agent", "research"])
def research_node(state: ResearchState) -> dict:
    """Research with custom tracing."""
    results = search_documents(state["query"])
    return {"documents": results}


@traceable(name="llm_analysis", run_type="llm")
def analyze_with_llm(documents: list[str]) -> str:
    """LLM call with proper categorization."""
    # run_type="llm" categorizes this for cost tracking
    return llm.invoke(f"Analyze: {documents}").content


@traceable(name="tool_search", run_type="tool")
def tool_search(query: str) -> list[str]:
    """Tool call with proper categorization."""
    # run_type="tool" for tool/retrieval operations
    return [f"Result for {query}"]


# ============================================================
# ADVANCED: CUSTOM METADATA AND CONTEXT
# ============================================================

@traceable(
    name="advanced_research",
    tags=["agent", "research", "v2"],
    metadata={"version": "2.0", "model": "gpt-4o-mini"}
)
def advanced_research_node(state: ResearchState) -> dict:
    """Research with detailed metadata tracking."""
    query = state["query"]
    existing_docs = state.get("documents", [])

    # Add runtime metadata to the trace
    langsmith.set_trace_tags(
        tags=[f"query_length_{len(query)}"]
    )

    # Search with context tracking
    results = search_documents(query)

    # Log custom metrics
    langsmith.log_metadata({
        "query": query,
        "doc_count_before": len(existing_docs),
        "doc_count_after": len(existing_docs) + len(results),
        "search_source": "vector_db",
        "iteration": state.get("iteration", 0)
    })

    return {"documents": results}


# ============================================================
# TRACING WITH ERROR CONTEXT
# ============================================================

@traceable(name="safe_llm_call", run_type="llm")
def safe_llm_call(prompt: str, max_retries: int = 3) -> str:
    """LLM call with error tracking."""
    for attempt in range(max_retries):
        try:
            response = llm.invoke(prompt)
            langsmith.log_metadata({
                "attempt": attempt + 1,
                "success": True
            })
            return response.content
        except Exception as e:
            langsmith.log_metadata({
                "attempt": attempt + 1,
                "error": str(e),
                "error_type": type(e).__name__
            })
            if attempt == max_retries - 1:
                raise

    return ""  # Should never reach here


# ============================================================
# TRACING ASYNC OPERATIONS
# ============================================================

@traceable(name="async_research", run_type="chain")
async def async_research_node(state: ResearchState) -> dict:
    """Async research with proper tracing."""
    import asyncio

    async def search_source(source: str) -> list[str]:
        # Simulate async search
        await asyncio.sleep(0.1)
        return [f"Result from {source}"]

    # Parallel searches are traced as child spans
    results = await asyncio.gather(
        search_source("web"),
        search_source("database"),
        search_source("archive")
    )

    all_docs = [doc for source_docs in results for doc in source_docs]

    return {"documents": all_docs}

Production Monitoring Dashboard

Key metrics and queries for monitoring LangGraph in production.

from langsmith import Client
from datetime import datetime, timedelta
import json


client = Client()


# ============================================================
# KEY METRICS TO TRACK
# ============================================================

"""
1. LATENCY BY NODE
   - p50, p95, p99 latency for each node
   - Identify slow nodes and bottlenecks
   - Set alerts for p95 > threshold

2. TOKEN USAGE
   - Track by run, user, feature
   - Cost allocation and budgeting
   - Detect anomalies (sudden spikes)

3. ERROR RATES
   - By node, by error type
   - Automatic alerting on spikes
   - Correlation with inputs

4. SUCCESS CRITERIA
   - Custom evaluators for domain-specific quality
   - Human feedback integration
   - A/B test comparisons

5. THROUGHPUT
   - Requests per minute by endpoint
   - Queue depths for async processing
   - Capacity planning
"""


# ============================================================
# PROGRAMMATIC DASHBOARD QUERIES
# ============================================================

def get_latency_metrics(project_name: str, hours: int = 24) -> dict:
    """Get latency metrics for a project."""
    runs = client.list_runs(
        project_name=project_name,
        start_time=datetime.now() - timedelta(hours=hours),
        execution_order=1  # Top-level runs only
    )

    latencies = []
    for run in runs:
        if run.end_time and run.start_time:
            latency = (run.end_time - run.start_time).total_seconds()
            latencies.append(latency)

    if not latencies:
        return {"p50": 0, "p95": 0, "p99": 0}

    latencies.sort()
    n = len(latencies)

    return {
        "p50": latencies[int(n * 0.5)],
        "p95": latencies[int(n * 0.95)],
        "p99": latencies[int(n * 0.99)] if n > 100 else latencies[-1],
        "count": n
    }


def get_error_breakdown(project_name: str, hours: int = 24) -> dict:
    """Get error breakdown by type."""
    runs = client.list_runs(
        project_name=project_name,
        start_time=datetime.now() - timedelta(hours=hours),
        error=True  # Only failed runs
    )

    error_counts = {}
    for run in runs:
        error_type = run.error.split(":")[0] if run.error else "Unknown"
        error_counts[error_type] = error_counts.get(error_type, 0) + 1

    return error_counts


def get_token_usage(project_name: str, hours: int = 24) -> dict:
    """Get token usage summary."""
    runs = client.list_runs(
        project_name=project_name,
        start_time=datetime.now() - timedelta(hours=hours),
        run_type="llm"  # LLM runs only
    )

    total_tokens = 0
    prompt_tokens = 0
    completion_tokens = 0

    for run in runs:
        if run.extra and "token_usage" in run.extra:
            usage = run.extra["token_usage"]
            total_tokens += usage.get("total_tokens", 0)
            prompt_tokens += usage.get("prompt_tokens", 0)
            completion_tokens += usage.get("completion_tokens", 0)

    return {
        "total_tokens": total_tokens,
        "prompt_tokens": prompt_tokens,
        "completion_tokens": completion_tokens,
        "estimated_cost_usd": total_tokens * 0.00002  # Rough estimate
    }


def get_node_performance(project_name: str, hours: int = 24) -> dict:
    """Get performance breakdown by node."""
    runs = client.list_runs(
        project_name=project_name,
        start_time=datetime.now() - timedelta(hours=hours)
    )

    node_stats = {}

    for run in runs:
        node_name = run.name
        if node_name not in node_stats:
            node_stats[node_name] = {
                "count": 0,
                "total_latency": 0,
                "errors": 0
            }

        node_stats[node_name]["count"] += 1

        if run.end_time and run.start_time:
            latency = (run.end_time - run.start_time).total_seconds()
            node_stats[node_name]["total_latency"] += latency

        if run.error:
            node_stats[node_name]["errors"] += 1

    # Calculate averages
    for node in node_stats:
        count = node_stats[node]["count"]
        node_stats[node]["avg_latency"] = (
            node_stats[node]["total_latency"] / count if count > 0 else 0
        )
        node_stats[node]["error_rate"] = (
            node_stats[node]["errors"] / count if count > 0 else 0
        )

    return node_stats


# ============================================================
# DASHBOARD QUERY EXAMPLES (LangSmith UI)
# ============================================================

"""
LangSmith Query Language Examples:

# Find slow production runs
runs where latency > 10s and tag = 'production'

# Error analysis by node
runs where error is not null group by name

# Token usage over time
token_usage sum by project last 7 days

# Failed runs with context
runs where status = 'error' and metadata.user_id exists

# High-cost runs
runs where token_usage.total > 5000 order by token_usage.total desc

# Specific user's runs
runs where metadata.user_id = 'user_123' last 24 hours

# Compare versions
runs where tag in ['v1', 'v2'] group by tags
"""

Feedback Collection and Evaluation

Link user feedback to traces for quality monitoring.

from langsmith import Client
from langsmith.evaluation import evaluate
from typing import Optional
import uuid


client = Client()


# ============================================================
# BASIC FEEDBACK COLLECTION
# ============================================================

def collect_user_feedback(
    run_id: str,
    score: float,
    comment: str = "",
    feedback_type: str = "user_rating"
) -> None:
    """Attach user feedback to a trace."""
    client.create_feedback(
        run_id=run_id,
        key=feedback_type,
        score=score,  # 0.0 to 1.0
        comment=comment
    )


def collect_thumbs_feedback(run_id: str, thumbs_up: bool) -> None:
    """Simple thumbs up/down feedback."""
    client.create_feedback(
        run_id=run_id,
        key="thumbs",
        score=1.0 if thumbs_up else 0.0
    )


def collect_categorical_feedback(
    run_id: str,
    category: str,
    subcategory: Optional[str] = None
) -> None:
    """Categorical feedback for classification."""
    client.create_feedback(
        run_id=run_id,
        key="category",
        value=category,
        comment=subcategory
    )


# ============================================================
# CAPTURING RUN ID DURING EXECUTION
# ============================================================

def execute_with_feedback_capture(app, inputs: dict, config: dict) -> tuple:
    """Execute workflow and capture run ID for feedback."""
    # Method 1: Using callbacks
    from langchain_core.callbacks import BaseCallbackHandler

    class RunIDCapture(BaseCallbackHandler):
        def __init__(self):
            self.run_id = None

        def on_chain_start(self, serialized, inputs, run_id, **kwargs):
            if self.run_id is None:  # Capture top-level run ID
                self.run_id = str(run_id)

    callback = RunIDCapture()
    result = app.invoke(inputs, config={"callbacks": [callback]})

    return result, callback.run_id


# Method 2: Using run tree context
def execute_with_run_tree(app, inputs: dict) -> tuple:
    """Execute with explicit run tree for ID capture."""
    from langsmith.run_trees import RunTree

    run_tree = RunTree(
        name="research_workflow",
        run_type="chain",
        inputs=inputs
    )

    with run_tree:
        result = app.invoke(inputs)

    run_tree.end(outputs=result)
    run_tree.post()

    return result, str(run_tree.id)


# ============================================================
# FEEDBACK INTEGRATION IN PRODUCTION
# ============================================================

class FeedbackCollector:
    """Production feedback collection with batching."""

    def __init__(self, project_name: str):
        self.client = Client()
        self.project_name = project_name
        self.pending_feedback = []
        self.batch_size = 10

    def add_feedback(
        self,
        run_id: str,
        feedback_type: str,
        score: Optional[float] = None,
        value: Optional[str] = None,
        comment: str = ""
    ) -> None:
        """Add feedback to batch."""
        self.pending_feedback.append({
            "run_id": run_id,
            "key": feedback_type,
            "score": score,
            "value": value,
            "comment": comment
        })

        if len(self.pending_feedback) >= self.batch_size:
            self.flush()

    def flush(self) -> None:
        """Send all pending feedback."""
        for feedback in self.pending_feedback:
            try:
                self.client.create_feedback(**feedback)
            except Exception as e:
                print(f"Failed to send feedback: {e}")

        self.pending_feedback = []

    def collect_detailed_feedback(
        self,
        run_id: str,
        accuracy: float,
        helpfulness: float,
        relevance: float,
        comment: str = ""
    ) -> None:
        """Collect multi-dimensional feedback."""
        dimensions = {
            "accuracy": accuracy,
            "helpfulness": helpfulness,
            "relevance": relevance
        }

        for dimension, score in dimensions.items():
            self.add_feedback(
                run_id=run_id,
                feedback_type=dimension,
                score=score
            )

        if comment:
            self.add_feedback(
                run_id=run_id,
                feedback_type="comment",
                value=comment
            )


# ============================================================
# AUTOMATED EVALUATION
# ============================================================

def run_automated_evaluation(
    project_name: str,
    evaluator_name: str,
    sample_size: int = 100
) -> dict:
    """Run automated evaluation on recent runs."""
    # Define custom evaluator
    def quality_evaluator(run, example=None):
        """Evaluate output quality."""
        output = run.outputs.get("analysis", "")

        # Simple quality checks
        score = 1.0

        if len(output) < 100:
            score -= 0.3  # Too short

        if "error" in output.lower():
            score -= 0.2  # Contains error mention

        if not any(word in output.lower() for word in ["analysis", "insight", "finding"]):
            score -= 0.2  # Missing key terms

        return {
            "key": "quality_score",
            "score": max(0, score),
            "comment": f"Length: {len(output)} chars"
        }

    # Get recent runs
    runs = list(client.list_runs(
        project_name=project_name,
        execution_order=1,
        limit=sample_size
    ))

    # Run evaluation
    results = []
    for run in runs:
        eval_result = quality_evaluator(run)
        results.append(eval_result)

        # Optionally attach evaluation as feedback
        client.create_feedback(
            run_id=str(run.id),
            **eval_result
        )

    # Aggregate results
    scores = [r["score"] for r in results]
    return {
        "mean_score": sum(scores) / len(scores) if scores else 0,
        "min_score": min(scores) if scores else 0,
        "max_score": max(scores) if scores else 0,
        "evaluated_count": len(results)
    }

Alert Configuration

Set up production alerts for LangGraph workflows.

from langsmith import Client
from typing import Callable
import asyncio
from datetime import datetime, timedelta


# ============================================================
# ALERT DEFINITIONS (Pseudo-code for LangSmith UI configuration)
# ============================================================

"""
LANGSMITH ALERT EXAMPLES:

Alert: High Latency
- Condition: p95_latency > 30s
- Window: 5 minutes
- Action: Slack notification to #alerts-ai

Alert: Error Spike
- Condition: error_rate > 5%
- Window: 15 minutes
- Comparison: vs previous 24 hours baseline
- Action: PagerDuty incident

Alert: Cost Threshold
- Condition: daily_token_cost > $100
- Window: Rolling 24 hours
- Action: Email to team + Slack

Alert: Low Quality
- Condition: avg(user_rating) < 0.7
- Window: 1 hour
- Minimum samples: 10
- Action: Slack + create JIRA ticket

Alert: Checkpoint Bloat
- Condition: avg(checkpoint_size_mb) > 50
- Window: 1 hour
- Action: Slack to engineering
"""


# ============================================================
# CUSTOM ALERTING WITH LANGSMITH API
# ============================================================

class AlertMonitor:
    """Custom alert monitoring using LangSmith API."""

    def __init__(self, project_name: str):
        self.client = Client()
        self.project_name = project_name
        self.alert_handlers: dict[str, Callable] = {}

    def register_alert(self, name: str, handler: Callable) -> None:
        """Register an alert handler."""
        self.alert_handlers[name] = handler

    async def check_latency_alert(
        self,
        threshold_seconds: float,
        window_minutes: int = 5
    ) -> bool:
        """Check if latency exceeds threshold."""
        runs = list(self.client.list_runs(
            project_name=self.project_name,
            start_time=datetime.now() - timedelta(minutes=window_minutes),
            execution_order=1
        ))

        if not runs:
            return False

        latencies = []
        for run in runs:
            if run.end_time and run.start_time:
                latency = (run.end_time - run.start_time).total_seconds()
                latencies.append(latency)

        if not latencies:
            return False

        latencies.sort()
        p95_index = int(len(latencies) * 0.95)
        p95_latency = latencies[p95_index] if p95_index < len(latencies) else latencies[-1]

        if p95_latency > threshold_seconds:
            if "high_latency" in self.alert_handlers:
                await self.alert_handlers["high_latency"](
                    p95_latency=p95_latency,
                    threshold=threshold_seconds,
                    sample_size=len(latencies)
                )
            return True

        return False

    async def check_error_rate_alert(
        self,
        threshold_percent: float,
        window_minutes: int = 15
    ) -> bool:
        """Check if error rate exceeds threshold."""
        start_time = datetime.now() - timedelta(minutes=window_minutes)

        all_runs = list(self.client.list_runs(
            project_name=self.project_name,
            start_time=start_time,
            execution_order=1
        ))

        error_runs = list(self.client.list_runs(
            project_name=self.project_name,
            start_time=start_time,
            execution_order=1,
            error=True
        ))

        if not all_runs:
            return False

        error_rate = (len(error_runs) / len(all_runs)) * 100

        if error_rate > threshold_percent:
            if "error_spike" in self.alert_handlers:
                await self.alert_handlers["error_spike"](
                    error_rate=error_rate,
                    threshold=threshold_percent,
                    error_count=len(error_runs),
                    total_count=len(all_runs)
                )
            return True

        return False

    async def check_quality_alert(
        self,
        min_score: float,
        feedback_key: str = "user_rating",
        window_hours: int = 1,
        min_samples: int = 10
    ) -> bool:
        """Check if quality score drops below threshold."""
        runs = list(self.client.list_runs(
            project_name=self.project_name,
            start_time=datetime.now() - timedelta(hours=window_hours),
            execution_order=1
        ))

        scores = []
        for run in runs:
            feedbacks = list(self.client.list_feedback(run_ids=[str(run.id)]))
            for feedback in feedbacks:
                if feedback.key == feedback_key and feedback.score is not None:
                    scores.append(feedback.score)

        if len(scores) < min_samples:
            return False  # Not enough data

        avg_score = sum(scores) / len(scores)

        if avg_score < min_score:
            if "low_quality" in self.alert_handlers:
                await self.alert_handlers["low_quality"](
                    avg_score=avg_score,
                    threshold=min_score,
                    sample_size=len(scores)
                )
            return True

        return False

    async def run_continuous_monitoring(self, interval_seconds: int = 60):
        """Run continuous monitoring loop."""
        while True:
            await self.check_latency_alert(threshold_seconds=30)
            await self.check_error_rate_alert(threshold_percent=5)
            await self.check_quality_alert(min_score=0.7)

            await asyncio.sleep(interval_seconds)


# ============================================================
# INTEGRATION WITH NOTIFICATION SERVICES
# ============================================================

async def send_slack_alert(webhook_url: str, message: dict) -> None:
    """Send alert to Slack."""
    import aiohttp

    async with aiohttp.ClientSession() as session:
        await session.post(webhook_url, json=message)


async def send_pagerduty_incident(
    routing_key: str,
    summary: str,
    severity: str = "error"
) -> None:
    """Create PagerDuty incident."""
    import aiohttp

    payload = {
        "routing_key": routing_key,
        "event_action": "trigger",
        "payload": {
            "summary": summary,
            "severity": severity,
            "source": "langsmith-monitor"
        }
    }

    async with aiohttp.ClientSession() as session:
        await session.post(
            "https://events.pagerduty.com/v2/enqueue",
            json=payload
        )


# Example usage
async def setup_production_alerts():
    """Set up production alerting."""
    monitor = AlertMonitor("research-agent-production")

    # Register Slack handler for latency
    async def handle_high_latency(**kwargs):
        await send_slack_alert(
            webhook_url="https://hooks.slack.com/services/...",
            message={
                "text": f"High latency alert: p95={kwargs['p95_latency']:.2f}s "
                        f"(threshold: {kwargs['threshold']}s)"
            }
        )

    monitor.register_alert("high_latency", handle_high_latency)

    # Register PagerDuty handler for errors
    async def handle_error_spike(**kwargs):
        await send_pagerduty_incident(
            routing_key="your-routing-key",
            summary=f"Error spike: {kwargs['error_rate']:.1f}% "
                    f"({kwargs['error_count']}/{kwargs['total_count']} runs)",
            severity="error"
        )

    monitor.register_alert("error_spike", handle_error_spike)

    # Start monitoring
    await monitor.run_continuous_monitoring()

Interview Questions

Q: How does LangSmith integrate with LangGraph?

"Automatically when LANGCHAIN_TRACING_V2 is enabled. Every node execution, state update, and LLM call is traced without code changes. Custom annotations with @traceable decorator add metadata and tags. Production dashboards show latency by node, token usage for cost tracking, and error rates. You can filter and analyze runs by any metadata field."

Q: What metrics matter for production LangGraph applications?

"Five key areas: (1) Latency p50/p95/p99 by node to identify bottlenecks, (2) Token usage per request/user/feature for cost control, (3) Error rates by node and error type for reliability, (4) Checkpoint sizes for storage planning, and (5) User feedback scores for quality. Custom evaluators add domain-specific quality metrics. All metrics should have alerts with appropriate thresholds."

Q: How do you implement feedback loops for quality improvement?

"Capture the run ID during execution using callbacks or run tree context. Expose a feedback endpoint in your API that calls client.create_feedback() linking ratings to traces. Use multi-dimensional feedback (accuracy, helpfulness, relevance) for detailed analysis. Run automated evaluators on a schedule to detect quality regressions. Correlate feedback with input patterns to identify problem areas."

Q: How do you set up alerting for LangGraph in production?

"Use LangSmith's built-in alerting for common patterns: latency p95, error rate spikes, cost thresholds. For custom alerts, poll the LangSmith API periodically and check conditions programmatically. Integrate with Slack, PagerDuty, or OpsGenie for notifications. Set appropriate thresholds based on baseline performance--avoid alert fatigue by using proper windowing and minimum sample requirements."


Key Takeaways

  • Automatic tracing via environment variables - no code changes needed
  • @traceable decorator adds metadata, tags, and run types to traces
  • Run types (llm, chain, tool) categorize operations for proper analysis
  • Programmatic queries using LangSmith Client for custom dashboards
  • Feedback collection links user ratings to specific traces
  • Multi-dimensional feedback (accuracy, helpfulness, relevance) for detailed quality tracking
  • Automated evaluation runs quality checks on production traffic
  • Custom alerting using LangSmith API with notification integrations

:::

Quiz

Module 5: Human-in-the-Loop & Production Patterns

Take Quiz