الدرس 22 من 23

أنظمة RAG الإنتاجية

المراقبة وإدارة التكاليف

4 دقيقة للقراءة

أنظمة RAG الإنتاجية تحتاج مراقبة شاملة للحفاظ على الجودة والتحكم في التكاليف. هذا الدرس يغطي استراتيجيات المراقبة وتحسين التكاليف.

مجموعة مراقبة RAG

┌────────────────────────────────────────────────────────────────┐
│                    مراقبة RAG                                   │
├────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                      المقاييس                           │   │
│  │  • زمن الاستجابة (p50, p95, p99)                       │   │
│  │  • الإنتاجية (QPS)                                      │   │
│  │  • معدلات الأخطاء                                       │   │
│  │  • معدلات إصابة التخزين المؤقت                         │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                      التتبعات                           │   │
│  │  • رحلة الطلب الكاملة                                   │   │
│  │  • أزمنة استجابة المكونات                              │   │
│  │  • السياقات المسترجعة                                   │   │
│  │  • مدخلات/مخرجات LLM                                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                      السجلات                            │   │
│  │  • الأخطاء والاستثناءات                                 │   │
│  │  • إشارات الجودة                                        │   │
│  │  • ملاحظات المستخدم                                     │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└────────────────────────────────────────────────────────────────┘

تكامل LangSmith

import os
from langsmith import Client
from langsmith.run_trees import RunTree

# الإعداد
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-key"
os.environ["LANGCHAIN_PROJECT"] = "rag-production"

client = Client()

class TracedRAG:
    """خط أنابيب RAG مع تتبع LangSmith."""

    async def query(self, question: str) -> dict:
        # إنشاء تتبع جذري
        with RunTree(
            name="rag_query",
            run_type="chain",
            inputs={"question": question},
        ) as root:

            # تتبع التضمين
            with root.create_child(
                name="embed_query",
                run_type="embedding",
            ) as embed_run:
                embedding = await self.embed(question)
                embed_run.end(outputs={"dimensions": len(embedding)})

            # تتبع الاسترجاع
            with root.create_child(
                name="retrieve",
                run_type="retriever",
            ) as retrieve_run:
                docs = await self.retrieve(embedding)
                retrieve_run.end(outputs={
                    "num_docs": len(docs),
                    "docs": [d.page_content[:100] for d in docs],
                })

            # تتبع التوليد
            with root.create_child(
                name="generate",
                run_type="llm",
            ) as llm_run:
                response = await self.generate(question, docs)
                llm_run.end(outputs={"response": response})

            root.end(outputs={"answer": response})

        return {"answer": response, "trace_id": root.id}

مقاييس مخصصة مع Prometheus

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# تعريف المقاييس
QUERY_COUNTER = Counter(
    "rag_queries_total",
    "إجمالي استعلامات RAG",
    ["status", "cache_hit"]
)

QUERY_LATENCY = Histogram(
    "rag_query_latency_seconds",
    "زمن استجابة استعلام RAG",
    ["component"],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

CONTEXT_SIZE = Histogram(
    "rag_context_tokens",
    "عدد رموز السياق",
    buckets=[100, 500, 1000, 2000, 4000, 8000]
)

ACTIVE_QUERIES = Gauge(
    "rag_active_queries",
    "الاستعلامات قيد المعالجة حالياً"
)

class MetricsRAG:
    """RAG مع مقاييس Prometheus."""

    async def query(self, question: str) -> str:
        ACTIVE_QUERIES.inc()
        cache_hit = "false"

        try:
            # تحقق من التخزين المؤقت
            with QUERY_LATENCY.labels(component="cache_check").time():
                cached = self.cache.get(question)
                if cached:
                    cache_hit = "true"
                    QUERY_COUNTER.labels(status="success", cache_hit="true").inc()
                    return cached

            # التضمين
            with QUERY_LATENCY.labels(component="embedding").time():
                embedding = await self.embed(question)

            # الاسترجاع
            with QUERY_LATENCY.labels(component="retrieval").time():
                docs = await self.retrieve(embedding)

            # تتبع حجم السياق
            context_tokens = sum(len(d.page_content.split()) for d in docs)
            CONTEXT_SIZE.observe(context_tokens)

            # التوليد
            with QUERY_LATENCY.labels(component="generation").time():
                response = await self.generate(question, docs)

            QUERY_COUNTER.labels(status="success", cache_hit=cache_hit).inc()
            return response

        except Exception as e:
            QUERY_COUNTER.labels(status="error", cache_hit=cache_hit).inc()
            raise

        finally:
            ACTIVE_QUERIES.dec()

# بدء خادم المقاييس
start_http_server(8000)

تتبع التكاليف

from dataclasses import dataclass
from datetime import datetime
from typing import Dict

@dataclass
class CostConfig:
    """تكوين تسعير API."""
    # تسعير OpenAI (لكل 1M رمز)
    embedding_cost_per_1m: float = 0.13  # text-embedding-3-small
    gpt4_input_per_1m: float = 2.50
    gpt4_output_per_1m: float = 10.00
    gpt35_input_per_1m: float = 0.50
    gpt35_output_per_1m: float = 1.50

class CostTracker:
    """تتبع وتقرير تكاليف API."""

    def __init__(self, config: CostConfig = None):
        self.config = config or CostConfig()
        self.costs: Dict[str, float] = {
            "embedding": 0.0,
            "llm_input": 0.0,
            "llm_output": 0.0,
        }
        self.usage: Dict[str, int] = {
            "embedding_tokens": 0,
            "llm_input_tokens": 0,
            "llm_output_tokens": 0,
            "queries": 0,
        }

    def track_embedding(self, tokens: int):
        """تتبع استخدام API التضمين."""
        cost = (tokens / 1_000_000) * self.config.embedding_cost_per_1m
        self.costs["embedding"] += cost
        self.usage["embedding_tokens"] += tokens

    def track_llm(self, input_tokens: int, output_tokens: int, model: str = "gpt-4"):
        """تتبع استخدام API LLM."""
        if "gpt-4" in model:
            input_cost = (input_tokens / 1_000_000) * self.config.gpt4_input_per_1m
            output_cost = (output_tokens / 1_000_000) * self.config.gpt4_output_per_1m
        else:
            input_cost = (input_tokens / 1_000_000) * self.config.gpt35_input_per_1m
            output_cost = (output_tokens / 1_000_000) * self.config.gpt35_output_per_1m

        self.costs["llm_input"] += input_cost
        self.costs["llm_output"] += output_cost
        self.usage["llm_input_tokens"] += input_tokens
        self.usage["llm_output_tokens"] += output_tokens
        self.usage["queries"] += 1

    def get_report(self) -> dict:
        """توليد تقرير التكاليف."""
        total_cost = sum(self.costs.values())
        cost_per_query = total_cost / max(self.usage["queries"], 1)

        return {
            "total_cost_usd": round(total_cost, 4),
            "cost_per_query_usd": round(cost_per_query, 6),
            "breakdown": {
                "embedding": round(self.costs["embedding"], 4),
                "llm_input": round(self.costs["llm_input"], 4),
                "llm_output": round(self.costs["llm_output"], 4),
            },
            "usage": self.usage,
        }

# الاستخدام
cost_tracker = CostTracker()

async def query_with_tracking(question: str) -> str:
    # تتبع التضمين
    embedding, embed_tokens = await embed_with_count(question)
    cost_tracker.track_embedding(embed_tokens)

    # الاسترجاع
    docs = await retrieve(embedding)

    # تتبع LLM
    response, usage = await generate_with_usage(question, docs)
    cost_tracker.track_llm(
        input_tokens=usage["prompt_tokens"],
        output_tokens=usage["completion_tokens"],
        model="gpt-4"
    )

    return response

# الحصول على التقرير
print(cost_tracker.get_report())
# {
#     "total_cost_usd": 0.0234,
#     "cost_per_query_usd": 0.000234,
#     "breakdown": {"embedding": 0.0001, "llm_input": 0.0083, "llm_output": 0.015},
#     "usage": {"embedding_tokens": 850, "llm_input_tokens": 3320, "llm_output_tokens": 1500, "queries": 100}
# }

استراتيجيات تحسين التكاليف

class CostOptimizedRAG:
    """RAG مع استراتيجيات تحسين التكاليف."""

    def __init__(self):
        self.fast_llm = "gpt-4o-mini"  # أرخص للاستعلامات البسيطة
        self.quality_llm = "gpt-4o"     # جودة أعلى للمعقدة

    async def query(self, question: str) -> str:
        # الاستراتيجية 1: استخدم التخزين المؤقت الدلالي (مجاني بعد أول استعلام)
        cached = await self.semantic_cache.get(question)
        if cached:
            return cached

        # الاسترجاع مع سياق محسن
        docs = await self._retrieve_optimized(question)

        # الاستراتيجية 2: التوجيه للنموذج المناسب
        complexity = await self._assess_complexity(question)

        if complexity == "simple":
            # استخدم النموذج الأسرع والأرخص
            response = await self.generate(question, docs, model=self.fast_llm)
        else:
            # استخدم النموذج الأعلى جودة
            response = await self.generate(question, docs, model=self.quality_llm)

        # تخزين مؤقت للمستقبل
        self.semantic_cache.set(question, response)

        return response

    async def _retrieve_optimized(self, question: str) -> list:
        """الاسترجاع مع تحسين حجم السياق."""

        # الحصول على مستندات أكثر مبدئياً
        docs = await self.retrieve(question, k=10)

        # إعادة الترتيب واختيار الأفضل ضمن ميزانية الرموز
        reranked = await self.rerank(question, docs)

        # الاستراتيجية 3: اقتطاع لميزانية الرموز
        token_budget = 2000  # ~$0.005 لكل استعلام بدلاً من ~$0.02 لـ 8000
        selected = []
        current_tokens = 0

        for doc in reranked:
            doc_tokens = len(doc.page_content.split())
            if current_tokens + doc_tokens <= token_budget:
                selected.append(doc)
                current_tokens += doc_tokens
            else:
                break

        return selected

    async def _assess_complexity(self, question: str) -> str:
        """تحديد تعقيد السؤال لتوجيه النموذج."""

        # استدلالات بسيطة
        if len(question.split()) < 10:
            return "simple"

        complex_indicators = [
            "قارن", "حلل", "اشرح لماذا",
            "الفرق بين", "إيجابيات وسلبيات"
        ]

        if any(ind in question.lower() for ind in complex_indicators):
            return "complex"

        return "simple"

مقاييس لوحة المعلومات

المقاييس الرئيسية للعرض على لوحة المراقبة:

الفئة المقياس عتبة التنبيه
زمن الاستجابة P95 وقت الاستجابة > 3 ثواني
زمن الاستجابة وقت أول رمز > 500ms
الجودة درجة الأمانة < 0.8
الجودة ملاحظات المستخدم (% إعجاب) < 70%
التكلفة التكلفة لكل استعلام > $0.05
التكلفة الإنفاق اليومي > الميزانية
الموثوقية معدل الأخطاء > 1%
الموثوقية فتح قاطع الدائرة أي
التخزين المؤقت معدل إصابة التخزين المؤقت < 20%

قواعد التنبيه

# prometheus/alerts.yml
groups:
  - name: rag_alerts
    rules:
      - alert: HighLatency
        expr: histogram_quantile(0.95, rag_query_latency_seconds) > 3
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "P95 زمن استجابة RAG فوق 3 ثواني"

      - alert: HighErrorRate
        expr: rate(rag_queries_total{status="error"}[5m]) / rate(rag_queries_total[5m]) > 0.01
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "معدل أخطاء RAG فوق 1%"

      - alert: LowCacheHitRate
        expr: rate(rag_queries_total{cache_hit="true"}[1h]) / rate(rag_queries_total[1h]) < 0.1
        for: 1h
        labels:
          severity: info
        annotations:
          summary: "معدل إصابة التخزين المؤقت تحت 10% - فكر في الضبط"

      - alert: HighCost
        expr: sum(increase(rag_cost_usd_total[24h])) > 100
        labels:
          severity: warning
        annotations:
          summary: "تكلفة RAG اليومية تتجاوز $100"

رؤية رئيسية: راقب كلاً من المقاييس التقنية (زمن الاستجابة، الأخطاء) ومقاييس الجودة (الأمانة، ملاحظات المستخدم). نظام سريع يعطي إجابات خاطئة أسوأ من نظام أبطأ ودقيق.

التالي، لنختم بالخطوات التالية ومسارات التعلم الموصى بها. :::

اختبار

الوحدة 6: أنظمة RAG الإنتاجية

خذ الاختبار