Lesson 22 of 23

Production RAG Systems

Monitoring & Cost Management

4 min read

Production RAG systems need comprehensive monitoring to maintain quality and control costs. This lesson covers observability and cost optimization strategies.

RAG Observability Stack

┌────────────────────────────────────────────────────────────────┐
│                    RAG Observability                            │
├────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                      METRICS                             │   │
│  │  • Latency (p50, p95, p99)                              │   │
│  │  • Throughput (QPS)                                      │   │
│  │  • Error rates                                           │   │
│  │  • Cache hit rates                                       │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                      TRACES                              │   │
│  │  • Full request journey                                  │   │
│  │  • Component latencies                                   │   │
│  │  • Retrieved contexts                                    │   │
│  │  • LLM inputs/outputs                                    │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                       LOGS                               │   │
│  │  • Errors and exceptions                                 │   │
│  │  • Quality signals                                       │   │
│  │  • User feedback                                         │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└────────────────────────────────────────────────────────────────┘

LangSmith Integration

import os
from langsmith import Client
from langsmith.run_trees import RunTree

# Setup
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-key"
os.environ["LANGCHAIN_PROJECT"] = "rag-production"

client = Client()

class TracedRAG:
    """RAG pipeline with LangSmith tracing."""

    async def query(self, question: str) -> dict:
        # Create root trace
        with RunTree(
            name="rag_query",
            run_type="chain",
            inputs={"question": question},
        ) as root:

            # Trace embedding
            with root.create_child(
                name="embed_query",
                run_type="embedding",
            ) as embed_run:
                embedding = await self.embed(question)
                embed_run.end(outputs={"dimensions": len(embedding)})

            # Trace retrieval
            with root.create_child(
                name="retrieve",
                run_type="retriever",
            ) as retrieve_run:
                docs = await self.retrieve(embedding)
                retrieve_run.end(outputs={
                    "num_docs": len(docs),
                    "docs": [d.page_content[:100] for d in docs],
                })

            # Trace generation
            with root.create_child(
                name="generate",
                run_type="llm",
            ) as llm_run:
                response = await self.generate(question, docs)
                llm_run.end(outputs={"response": response})

            root.end(outputs={"answer": response})

        return {"answer": response, "trace_id": root.id}

Custom Metrics with Prometheus

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Define metrics
QUERY_COUNTER = Counter(
    "rag_queries_total",
    "Total RAG queries",
    ["status", "cache_hit"]
)

QUERY_LATENCY = Histogram(
    "rag_query_latency_seconds",
    "RAG query latency",
    ["component"],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

CONTEXT_SIZE = Histogram(
    "rag_context_tokens",
    "Number of context tokens",
    buckets=[100, 500, 1000, 2000, 4000, 8000]
)

ACTIVE_QUERIES = Gauge(
    "rag_active_queries",
    "Currently processing queries"
)

class MetricsRAG:
    """RAG with Prometheus metrics."""

    async def query(self, question: str) -> str:
        ACTIVE_QUERIES.inc()
        cache_hit = "false"

        try:
            # Check cache
            with QUERY_LATENCY.labels(component="cache_check").time():
                cached = self.cache.get(question)
                if cached:
                    cache_hit = "true"
                    QUERY_COUNTER.labels(status="success", cache_hit="true").inc()
                    return cached

            # Embedding
            with QUERY_LATENCY.labels(component="embedding").time():
                embedding = await self.embed(question)

            # Retrieval
            with QUERY_LATENCY.labels(component="retrieval").time():
                docs = await self.retrieve(embedding)

            # Track context size
            context_tokens = sum(len(d.page_content.split()) for d in docs)
            CONTEXT_SIZE.observe(context_tokens)

            # Generation
            with QUERY_LATENCY.labels(component="generation").time():
                response = await self.generate(question, docs)

            QUERY_COUNTER.labels(status="success", cache_hit=cache_hit).inc()
            return response

        except Exception as e:
            QUERY_COUNTER.labels(status="error", cache_hit=cache_hit).inc()
            raise

        finally:
            ACTIVE_QUERIES.dec()

# Start metrics server
start_http_server(8000)

Cost Tracking

from dataclasses import dataclass
from datetime import datetime
from typing import Dict

@dataclass
class CostConfig:
    """API pricing configuration."""
    # OpenAI pricing (per 1M tokens)
    embedding_cost_per_1m: float = 0.13  # text-embedding-3-small
    gpt4_input_per_1m: float = 2.50
    gpt4_output_per_1m: float = 10.00
    gpt35_input_per_1m: float = 0.50
    gpt35_output_per_1m: float = 1.50

class CostTracker:
    """Track and report API costs."""

    def __init__(self, config: CostConfig = None):
        self.config = config or CostConfig()
        self.costs: Dict[str, float] = {
            "embedding": 0.0,
            "llm_input": 0.0,
            "llm_output": 0.0,
        }
        self.usage: Dict[str, int] = {
            "embedding_tokens": 0,
            "llm_input_tokens": 0,
            "llm_output_tokens": 0,
            "queries": 0,
        }

    def track_embedding(self, tokens: int):
        """Track embedding API usage."""
        cost = (tokens / 1_000_000) * self.config.embedding_cost_per_1m
        self.costs["embedding"] += cost
        self.usage["embedding_tokens"] += tokens

    def track_llm(self, input_tokens: int, output_tokens: int, model: str = "gpt-4"):
        """Track LLM API usage."""
        if "gpt-4" in model:
            input_cost = (input_tokens / 1_000_000) * self.config.gpt4_input_per_1m
            output_cost = (output_tokens / 1_000_000) * self.config.gpt4_output_per_1m
        else:
            input_cost = (input_tokens / 1_000_000) * self.config.gpt35_input_per_1m
            output_cost = (output_tokens / 1_000_000) * self.config.gpt35_output_per_1m

        self.costs["llm_input"] += input_cost
        self.costs["llm_output"] += output_cost
        self.usage["llm_input_tokens"] += input_tokens
        self.usage["llm_output_tokens"] += output_tokens
        self.usage["queries"] += 1

    def get_report(self) -> dict:
        """Generate cost report."""
        total_cost = sum(self.costs.values())
        cost_per_query = total_cost / max(self.usage["queries"], 1)

        return {
            "total_cost_usd": round(total_cost, 4),
            "cost_per_query_usd": round(cost_per_query, 6),
            "breakdown": {
                "embedding": round(self.costs["embedding"], 4),
                "llm_input": round(self.costs["llm_input"], 4),
                "llm_output": round(self.costs["llm_output"], 4),
            },
            "usage": self.usage,
        }

# Usage
cost_tracker = CostTracker()

async def query_with_tracking(question: str) -> str:
    # Track embedding
    embedding, embed_tokens = await embed_with_count(question)
    cost_tracker.track_embedding(embed_tokens)

    # Retrieve
    docs = await retrieve(embedding)

    # Track LLM
    response, usage = await generate_with_usage(question, docs)
    cost_tracker.track_llm(
        input_tokens=usage["prompt_tokens"],
        output_tokens=usage["completion_tokens"],
        model="gpt-4"
    )

    return response

# Get report
print(cost_tracker.get_report())
# {
#     "total_cost_usd": 0.0234,
#     "cost_per_query_usd": 0.000234,
#     "breakdown": {"embedding": 0.0001, "llm_input": 0.0083, "llm_output": 0.015},
#     "usage": {"embedding_tokens": 850, "llm_input_tokens": 3320, "llm_output_tokens": 1500, "queries": 100}
# }

Cost Optimization Strategies

class CostOptimizedRAG:
    """RAG with cost optimization strategies."""

    def __init__(self):
        self.fast_llm = "gpt-4o-mini"  # Cheaper for simple queries
        self.quality_llm = "gpt-4o"     # Higher quality for complex

    async def query(self, question: str) -> str:
        # Strategy 1: Use semantic cache (free after first query)
        cached = await self.semantic_cache.get(question)
        if cached:
            return cached

        # Retrieve with optimized context
        docs = await self._retrieve_optimized(question)

        # Strategy 2: Route to appropriate model
        complexity = await self._assess_complexity(question)

        if complexity == "simple":
            # Use faster, cheaper model
            response = await self.generate(question, docs, model=self.fast_llm)
        else:
            # Use higher quality model
            response = await self.generate(question, docs, model=self.quality_llm)

        # Cache for future
        self.semantic_cache.set(question, response)

        return response

    async def _retrieve_optimized(self, question: str) -> list:
        """Retrieve with context size optimization."""

        # Get more docs initially
        docs = await self.retrieve(question, k=10)

        # Rerank and select top with token budget
        reranked = await self.rerank(question, docs)

        # Strategy 3: Truncate to token budget
        token_budget = 2000  # ~$0.005 per query vs ~$0.02 for 8000
        selected = []
        current_tokens = 0

        for doc in reranked:
            doc_tokens = len(doc.page_content.split())
            if current_tokens + doc_tokens <= token_budget:
                selected.append(doc)
                current_tokens += doc_tokens
            else:
                break

        return selected

    async def _assess_complexity(self, question: str) -> str:
        """Determine question complexity for model routing."""

        # Simple heuristics
        if len(question.split()) < 10:
            return "simple"

        complex_indicators = [
            "compare", "analyze", "explain why",
            "difference between", "pros and cons"
        ]

        if any(ind in question.lower() for ind in complex_indicators):
            return "complex"

        return "simple"

Dashboard Metrics

Key metrics to display on your monitoring dashboard:

Category Metric Alert Threshold
Latency P95 response time > 3s
Latency Time to first token > 500ms
Quality Faithfulness score < 0.8
Quality User feedback (thumbs up %) < 70%
Cost Cost per query > $0.05
Cost Daily spend > budget
Reliability Error rate > 1%
Reliability Circuit breaker opens Any
Cache Cache hit rate < 20%

Alerting Rules

# prometheus/alerts.yml
groups:
  - name: rag_alerts
    rules:
      - alert: HighLatency
        expr: histogram_quantile(0.95, rag_query_latency_seconds) > 3
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "RAG P95 latency above 3 seconds"

      - alert: HighErrorRate
        expr: rate(rag_queries_total{status="error"}[5m]) / rate(rag_queries_total[5m]) > 0.01
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "RAG error rate above 1%"

      - alert: LowCacheHitRate
        expr: rate(rag_queries_total{cache_hit="true"}[1h]) / rate(rag_queries_total[1h]) < 0.1
        for: 1h
        labels:
          severity: info
        annotations:
          summary: "Cache hit rate below 10% - consider tuning"

      - alert: HighCost
        expr: sum(increase(rag_cost_usd_total[24h])) > 100
        labels:
          severity: warning
        annotations:
          summary: "Daily RAG cost exceeds $100"

Key Insight: Monitor both technical metrics (latency, errors) and quality metrics (faithfulness, user feedback). A fast system that gives wrong answers is worse than a slower accurate one.

Next, let's wrap up with next steps and recommended learning paths. :::

Quiz

Module 6: Production RAG Systems

Take Quiz