Production RAG Systems
Monitoring & Cost Management
4 min read
Production RAG systems need comprehensive monitoring to maintain quality and control costs. This lesson covers observability and cost optimization strategies.
RAG Observability Stack
┌────────────────────────────────────────────────────────────────┐
│ RAG Observability │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ METRICS │ │
│ │ • Latency (p50, p95, p99) │ │
│ │ • Throughput (QPS) │ │
│ │ • Error rates │ │
│ │ • Cache hit rates │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ TRACES │ │
│ │ • Full request journey │ │
│ │ • Component latencies │ │
│ │ • Retrieved contexts │ │
│ │ • LLM inputs/outputs │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ LOGS │ │
│ │ • Errors and exceptions │ │
│ │ • Quality signals │ │
│ │ • User feedback │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────┘
LangSmith Integration
import os
from langsmith import Client
from langsmith.run_trees import RunTree
# Setup
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-key"
os.environ["LANGCHAIN_PROJECT"] = "rag-production"
client = Client()
class TracedRAG:
"""RAG pipeline with LangSmith tracing."""
async def query(self, question: str) -> dict:
# Create root trace
with RunTree(
name="rag_query",
run_type="chain",
inputs={"question": question},
) as root:
# Trace embedding
with root.create_child(
name="embed_query",
run_type="embedding",
) as embed_run:
embedding = await self.embed(question)
embed_run.end(outputs={"dimensions": len(embedding)})
# Trace retrieval
with root.create_child(
name="retrieve",
run_type="retriever",
) as retrieve_run:
docs = await self.retrieve(embedding)
retrieve_run.end(outputs={
"num_docs": len(docs),
"docs": [d.page_content[:100] for d in docs],
})
# Trace generation
with root.create_child(
name="generate",
run_type="llm",
) as llm_run:
response = await self.generate(question, docs)
llm_run.end(outputs={"response": response})
root.end(outputs={"answer": response})
return {"answer": response, "trace_id": root.id}
Custom Metrics with Prometheus
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Define metrics
QUERY_COUNTER = Counter(
"rag_queries_total",
"Total RAG queries",
["status", "cache_hit"]
)
QUERY_LATENCY = Histogram(
"rag_query_latency_seconds",
"RAG query latency",
["component"],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
CONTEXT_SIZE = Histogram(
"rag_context_tokens",
"Number of context tokens",
buckets=[100, 500, 1000, 2000, 4000, 8000]
)
ACTIVE_QUERIES = Gauge(
"rag_active_queries",
"Currently processing queries"
)
class MetricsRAG:
"""RAG with Prometheus metrics."""
async def query(self, question: str) -> str:
ACTIVE_QUERIES.inc()
cache_hit = "false"
try:
# Check cache
with QUERY_LATENCY.labels(component="cache_check").time():
cached = self.cache.get(question)
if cached:
cache_hit = "true"
QUERY_COUNTER.labels(status="success", cache_hit="true").inc()
return cached
# Embedding
with QUERY_LATENCY.labels(component="embedding").time():
embedding = await self.embed(question)
# Retrieval
with QUERY_LATENCY.labels(component="retrieval").time():
docs = await self.retrieve(embedding)
# Track context size
context_tokens = sum(len(d.page_content.split()) for d in docs)
CONTEXT_SIZE.observe(context_tokens)
# Generation
with QUERY_LATENCY.labels(component="generation").time():
response = await self.generate(question, docs)
QUERY_COUNTER.labels(status="success", cache_hit=cache_hit).inc()
return response
except Exception as e:
QUERY_COUNTER.labels(status="error", cache_hit=cache_hit).inc()
raise
finally:
ACTIVE_QUERIES.dec()
# Start metrics server
start_http_server(8000)
Cost Tracking
from dataclasses import dataclass
from datetime import datetime
from typing import Dict
@dataclass
class CostConfig:
"""API pricing configuration."""
# OpenAI pricing (per 1M tokens)
embedding_cost_per_1m: float = 0.13 # text-embedding-3-small
gpt4_input_per_1m: float = 2.50
gpt4_output_per_1m: float = 10.00
gpt35_input_per_1m: float = 0.50
gpt35_output_per_1m: float = 1.50
class CostTracker:
"""Track and report API costs."""
def __init__(self, config: CostConfig = None):
self.config = config or CostConfig()
self.costs: Dict[str, float] = {
"embedding": 0.0,
"llm_input": 0.0,
"llm_output": 0.0,
}
self.usage: Dict[str, int] = {
"embedding_tokens": 0,
"llm_input_tokens": 0,
"llm_output_tokens": 0,
"queries": 0,
}
def track_embedding(self, tokens: int):
"""Track embedding API usage."""
cost = (tokens / 1_000_000) * self.config.embedding_cost_per_1m
self.costs["embedding"] += cost
self.usage["embedding_tokens"] += tokens
def track_llm(self, input_tokens: int, output_tokens: int, model: str = "gpt-4"):
"""Track LLM API usage."""
if "gpt-4" in model:
input_cost = (input_tokens / 1_000_000) * self.config.gpt4_input_per_1m
output_cost = (output_tokens / 1_000_000) * self.config.gpt4_output_per_1m
else:
input_cost = (input_tokens / 1_000_000) * self.config.gpt35_input_per_1m
output_cost = (output_tokens / 1_000_000) * self.config.gpt35_output_per_1m
self.costs["llm_input"] += input_cost
self.costs["llm_output"] += output_cost
self.usage["llm_input_tokens"] += input_tokens
self.usage["llm_output_tokens"] += output_tokens
self.usage["queries"] += 1
def get_report(self) -> dict:
"""Generate cost report."""
total_cost = sum(self.costs.values())
cost_per_query = total_cost / max(self.usage["queries"], 1)
return {
"total_cost_usd": round(total_cost, 4),
"cost_per_query_usd": round(cost_per_query, 6),
"breakdown": {
"embedding": round(self.costs["embedding"], 4),
"llm_input": round(self.costs["llm_input"], 4),
"llm_output": round(self.costs["llm_output"], 4),
},
"usage": self.usage,
}
# Usage
cost_tracker = CostTracker()
async def query_with_tracking(question: str) -> str:
# Track embedding
embedding, embed_tokens = await embed_with_count(question)
cost_tracker.track_embedding(embed_tokens)
# Retrieve
docs = await retrieve(embedding)
# Track LLM
response, usage = await generate_with_usage(question, docs)
cost_tracker.track_llm(
input_tokens=usage["prompt_tokens"],
output_tokens=usage["completion_tokens"],
model="gpt-4"
)
return response
# Get report
print(cost_tracker.get_report())
# {
# "total_cost_usd": 0.0234,
# "cost_per_query_usd": 0.000234,
# "breakdown": {"embedding": 0.0001, "llm_input": 0.0083, "llm_output": 0.015},
# "usage": {"embedding_tokens": 850, "llm_input_tokens": 3320, "llm_output_tokens": 1500, "queries": 100}
# }
Cost Optimization Strategies
class CostOptimizedRAG:
"""RAG with cost optimization strategies."""
def __init__(self):
self.fast_llm = "gpt-4o-mini" # Cheaper for simple queries
self.quality_llm = "gpt-4o" # Higher quality for complex
async def query(self, question: str) -> str:
# Strategy 1: Use semantic cache (free after first query)
cached = await self.semantic_cache.get(question)
if cached:
return cached
# Retrieve with optimized context
docs = await self._retrieve_optimized(question)
# Strategy 2: Route to appropriate model
complexity = await self._assess_complexity(question)
if complexity == "simple":
# Use faster, cheaper model
response = await self.generate(question, docs, model=self.fast_llm)
else:
# Use higher quality model
response = await self.generate(question, docs, model=self.quality_llm)
# Cache for future
self.semantic_cache.set(question, response)
return response
async def _retrieve_optimized(self, question: str) -> list:
"""Retrieve with context size optimization."""
# Get more docs initially
docs = await self.retrieve(question, k=10)
# Rerank and select top with token budget
reranked = await self.rerank(question, docs)
# Strategy 3: Truncate to token budget
token_budget = 2000 # ~$0.005 per query vs ~$0.02 for 8000
selected = []
current_tokens = 0
for doc in reranked:
doc_tokens = len(doc.page_content.split())
if current_tokens + doc_tokens <= token_budget:
selected.append(doc)
current_tokens += doc_tokens
else:
break
return selected
async def _assess_complexity(self, question: str) -> str:
"""Determine question complexity for model routing."""
# Simple heuristics
if len(question.split()) < 10:
return "simple"
complex_indicators = [
"compare", "analyze", "explain why",
"difference between", "pros and cons"
]
if any(ind in question.lower() for ind in complex_indicators):
return "complex"
return "simple"
Dashboard Metrics
Key metrics to display on your monitoring dashboard:
| Category | Metric | Alert Threshold |
|---|---|---|
| Latency | P95 response time | > 3s |
| Latency | Time to first token | > 500ms |
| Quality | Faithfulness score | < 0.8 |
| Quality | User feedback (thumbs up %) | < 70% |
| Cost | Cost per query | > $0.05 |
| Cost | Daily spend | > budget |
| Reliability | Error rate | > 1% |
| Reliability | Circuit breaker opens | Any |
| Cache | Cache hit rate | < 20% |
Alerting Rules
# prometheus/alerts.yml
groups:
- name: rag_alerts
rules:
- alert: HighLatency
expr: histogram_quantile(0.95, rag_query_latency_seconds) > 3
for: 5m
labels:
severity: warning
annotations:
summary: "RAG P95 latency above 3 seconds"
- alert: HighErrorRate
expr: rate(rag_queries_total{status="error"}[5m]) / rate(rag_queries_total[5m]) > 0.01
for: 5m
labels:
severity: critical
annotations:
summary: "RAG error rate above 1%"
- alert: LowCacheHitRate
expr: rate(rag_queries_total{cache_hit="true"}[1h]) / rate(rag_queries_total[1h]) < 0.1
for: 1h
labels:
severity: info
annotations:
summary: "Cache hit rate below 10% - consider tuning"
- alert: HighCost
expr: sum(increase(rag_cost_usd_total[24h])) > 100
labels:
severity: warning
annotations:
summary: "Daily RAG cost exceeds $100"
Key Insight: Monitor both technical metrics (latency, errors) and quality metrics (faithfulness, user feedback). A fast system that gives wrong answers is worse than a slower accurate one.
Next, let's wrap up with next steps and recommended learning paths. :::