أنظمة RAG الإنتاجية
المراقبة وإدارة التكاليف
4 دقيقة للقراءة
أنظمة RAG الإنتاجية تحتاج مراقبة شاملة للحفاظ على الجودة والتحكم في التكاليف. هذا الدرس يغطي استراتيجيات المراقبة وتحسين التكاليف.
مجموعة مراقبة RAG
┌────────────────────────────────────────────────────────────────┐
│ مراقبة RAG │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ المقاييس │ │
│ │ • زمن الاستجابة (p50, p95, p99) │ │
│ │ • الإنتاجية (QPS) │ │
│ │ • معدلات الأخطاء │ │
│ │ • معدلات إصابة التخزين المؤقت │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ التتبعات │ │
│ │ • رحلة الطلب الكاملة │ │
│ │ • أزمنة استجابة المكونات │ │
│ │ • السياقات المسترجعة │ │
│ │ • مدخلات/مخرجات LLM │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ السجلات │ │
│ │ • الأخطاء والاستثناءات │ │
│ │ • إشارات الجودة │ │
│ │ • ملاحظات المستخدم │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────┘
تكامل LangSmith
import os
from langsmith import Client
from langsmith.run_trees import RunTree
# الإعداد
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-key"
os.environ["LANGCHAIN_PROJECT"] = "rag-production"
client = Client()
class TracedRAG:
"""خط أنابيب RAG مع تتبع LangSmith."""
async def query(self, question: str) -> dict:
# إنشاء تتبع جذري
with RunTree(
name="rag_query",
run_type="chain",
inputs={"question": question},
) as root:
# تتبع التضمين
with root.create_child(
name="embed_query",
run_type="embedding",
) as embed_run:
embedding = await self.embed(question)
embed_run.end(outputs={"dimensions": len(embedding)})
# تتبع الاسترجاع
with root.create_child(
name="retrieve",
run_type="retriever",
) as retrieve_run:
docs = await self.retrieve(embedding)
retrieve_run.end(outputs={
"num_docs": len(docs),
"docs": [d.page_content[:100] for d in docs],
})
# تتبع التوليد
with root.create_child(
name="generate",
run_type="llm",
) as llm_run:
response = await self.generate(question, docs)
llm_run.end(outputs={"response": response})
root.end(outputs={"answer": response})
return {"answer": response, "trace_id": root.id}
مقاييس مخصصة مع Prometheus
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# تعريف المقاييس
QUERY_COUNTER = Counter(
"rag_queries_total",
"إجمالي استعلامات RAG",
["status", "cache_hit"]
)
QUERY_LATENCY = Histogram(
"rag_query_latency_seconds",
"زمن استجابة استعلام RAG",
["component"],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
CONTEXT_SIZE = Histogram(
"rag_context_tokens",
"عدد رموز السياق",
buckets=[100, 500, 1000, 2000, 4000, 8000]
)
ACTIVE_QUERIES = Gauge(
"rag_active_queries",
"الاستعلامات قيد المعالجة حالياً"
)
class MetricsRAG:
"""RAG مع مقاييس Prometheus."""
async def query(self, question: str) -> str:
ACTIVE_QUERIES.inc()
cache_hit = "false"
try:
# تحقق من التخزين المؤقت
with QUERY_LATENCY.labels(component="cache_check").time():
cached = self.cache.get(question)
if cached:
cache_hit = "true"
QUERY_COUNTER.labels(status="success", cache_hit="true").inc()
return cached
# التضمين
with QUERY_LATENCY.labels(component="embedding").time():
embedding = await self.embed(question)
# الاسترجاع
with QUERY_LATENCY.labels(component="retrieval").time():
docs = await self.retrieve(embedding)
# تتبع حجم السياق
context_tokens = sum(len(d.page_content.split()) for d in docs)
CONTEXT_SIZE.observe(context_tokens)
# التوليد
with QUERY_LATENCY.labels(component="generation").time():
response = await self.generate(question, docs)
QUERY_COUNTER.labels(status="success", cache_hit=cache_hit).inc()
return response
except Exception as e:
QUERY_COUNTER.labels(status="error", cache_hit=cache_hit).inc()
raise
finally:
ACTIVE_QUERIES.dec()
# بدء خادم المقاييس
start_http_server(8000)
تتبع التكاليف
from dataclasses import dataclass
from datetime import datetime
from typing import Dict
@dataclass
class CostConfig:
"""تكوين تسعير API."""
# تسعير OpenAI (لكل 1M رمز)
embedding_cost_per_1m: float = 0.13 # text-embedding-3-small
gpt4_input_per_1m: float = 2.50
gpt4_output_per_1m: float = 10.00
gpt35_input_per_1m: float = 0.50
gpt35_output_per_1m: float = 1.50
class CostTracker:
"""تتبع وتقرير تكاليف API."""
def __init__(self, config: CostConfig = None):
self.config = config or CostConfig()
self.costs: Dict[str, float] = {
"embedding": 0.0,
"llm_input": 0.0,
"llm_output": 0.0,
}
self.usage: Dict[str, int] = {
"embedding_tokens": 0,
"llm_input_tokens": 0,
"llm_output_tokens": 0,
"queries": 0,
}
def track_embedding(self, tokens: int):
"""تتبع استخدام API التضمين."""
cost = (tokens / 1_000_000) * self.config.embedding_cost_per_1m
self.costs["embedding"] += cost
self.usage["embedding_tokens"] += tokens
def track_llm(self, input_tokens: int, output_tokens: int, model: str = "gpt-4"):
"""تتبع استخدام API LLM."""
if "gpt-4" in model:
input_cost = (input_tokens / 1_000_000) * self.config.gpt4_input_per_1m
output_cost = (output_tokens / 1_000_000) * self.config.gpt4_output_per_1m
else:
input_cost = (input_tokens / 1_000_000) * self.config.gpt35_input_per_1m
output_cost = (output_tokens / 1_000_000) * self.config.gpt35_output_per_1m
self.costs["llm_input"] += input_cost
self.costs["llm_output"] += output_cost
self.usage["llm_input_tokens"] += input_tokens
self.usage["llm_output_tokens"] += output_tokens
self.usage["queries"] += 1
def get_report(self) -> dict:
"""توليد تقرير التكاليف."""
total_cost = sum(self.costs.values())
cost_per_query = total_cost / max(self.usage["queries"], 1)
return {
"total_cost_usd": round(total_cost, 4),
"cost_per_query_usd": round(cost_per_query, 6),
"breakdown": {
"embedding": round(self.costs["embedding"], 4),
"llm_input": round(self.costs["llm_input"], 4),
"llm_output": round(self.costs["llm_output"], 4),
},
"usage": self.usage,
}
# الاستخدام
cost_tracker = CostTracker()
async def query_with_tracking(question: str) -> str:
# تتبع التضمين
embedding, embed_tokens = await embed_with_count(question)
cost_tracker.track_embedding(embed_tokens)
# الاسترجاع
docs = await retrieve(embedding)
# تتبع LLM
response, usage = await generate_with_usage(question, docs)
cost_tracker.track_llm(
input_tokens=usage["prompt_tokens"],
output_tokens=usage["completion_tokens"],
model="gpt-4"
)
return response
# الحصول على التقرير
print(cost_tracker.get_report())
# {
# "total_cost_usd": 0.0234,
# "cost_per_query_usd": 0.000234,
# "breakdown": {"embedding": 0.0001, "llm_input": 0.0083, "llm_output": 0.015},
# "usage": {"embedding_tokens": 850, "llm_input_tokens": 3320, "llm_output_tokens": 1500, "queries": 100}
# }
استراتيجيات تحسين التكاليف
class CostOptimizedRAG:
"""RAG مع استراتيجيات تحسين التكاليف."""
def __init__(self):
self.fast_llm = "gpt-4o-mini" # أرخص للاستعلامات البسيطة
self.quality_llm = "gpt-4o" # جودة أعلى للمعقدة
async def query(self, question: str) -> str:
# الاستراتيجية 1: استخدم التخزين المؤقت الدلالي (مجاني بعد أول استعلام)
cached = await self.semantic_cache.get(question)
if cached:
return cached
# الاسترجاع مع سياق محسن
docs = await self._retrieve_optimized(question)
# الاستراتيجية 2: التوجيه للنموذج المناسب
complexity = await self._assess_complexity(question)
if complexity == "simple":
# استخدم النموذج الأسرع والأرخص
response = await self.generate(question, docs, model=self.fast_llm)
else:
# استخدم النموذج الأعلى جودة
response = await self.generate(question, docs, model=self.quality_llm)
# تخزين مؤقت للمستقبل
self.semantic_cache.set(question, response)
return response
async def _retrieve_optimized(self, question: str) -> list:
"""الاسترجاع مع تحسين حجم السياق."""
# الحصول على مستندات أكثر مبدئياً
docs = await self.retrieve(question, k=10)
# إعادة الترتيب واختيار الأفضل ضمن ميزانية الرموز
reranked = await self.rerank(question, docs)
# الاستراتيجية 3: اقتطاع لميزانية الرموز
token_budget = 2000 # ~$0.005 لكل استعلام بدلاً من ~$0.02 لـ 8000
selected = []
current_tokens = 0
for doc in reranked:
doc_tokens = len(doc.page_content.split())
if current_tokens + doc_tokens <= token_budget:
selected.append(doc)
current_tokens += doc_tokens
else:
break
return selected
async def _assess_complexity(self, question: str) -> str:
"""تحديد تعقيد السؤال لتوجيه النموذج."""
# استدلالات بسيطة
if len(question.split()) < 10:
return "simple"
complex_indicators = [
"قارن", "حلل", "اشرح لماذا",
"الفرق بين", "إيجابيات وسلبيات"
]
if any(ind in question.lower() for ind in complex_indicators):
return "complex"
return "simple"
مقاييس لوحة المعلومات
المقاييس الرئيسية للعرض على لوحة المراقبة:
| الفئة | المقياس | عتبة التنبيه |
|---|---|---|
| زمن الاستجابة | P95 وقت الاستجابة | > 3 ثواني |
| زمن الاستجابة | وقت أول رمز | > 500ms |
| الجودة | درجة الأمانة | < 0.8 |
| الجودة | ملاحظات المستخدم (% إعجاب) | < 70% |
| التكلفة | التكلفة لكل استعلام | > $0.05 |
| التكلفة | الإنفاق اليومي | > الميزانية |
| الموثوقية | معدل الأخطاء | > 1% |
| الموثوقية | فتح قاطع الدائرة | أي |
| التخزين المؤقت | معدل إصابة التخزين المؤقت | < 20% |
قواعد التنبيه
# prometheus/alerts.yml
groups:
- name: rag_alerts
rules:
- alert: HighLatency
expr: histogram_quantile(0.95, rag_query_latency_seconds) > 3
for: 5m
labels:
severity: warning
annotations:
summary: "P95 زمن استجابة RAG فوق 3 ثواني"
- alert: HighErrorRate
expr: rate(rag_queries_total{status="error"}[5m]) / rate(rag_queries_total[5m]) > 0.01
for: 5m
labels:
severity: critical
annotations:
summary: "معدل أخطاء RAG فوق 1%"
- alert: LowCacheHitRate
expr: rate(rag_queries_total{cache_hit="true"}[1h]) / rate(rag_queries_total[1h]) < 0.1
for: 1h
labels:
severity: info
annotations:
summary: "معدل إصابة التخزين المؤقت تحت 10% - فكر في الضبط"
- alert: HighCost
expr: sum(increase(rag_cost_usd_total[24h])) > 100
labels:
severity: warning
annotations:
summary: "تكلفة RAG اليومية تتجاوز $100"
رؤية رئيسية: راقب كلاً من المقاييس التقنية (زمن الاستجابة، الأخطاء) ومقاييس الجودة (الأمانة، ملاحظات المستخدم). نظام سريع يعطي إجابات خاطئة أسوأ من نظام أبطأ ودقيق.
التالي، لنختم بالخطوات التالية ومسارات التعلم الموصى بها. :::