الدرس 21 من 23

أنظمة RAG الإنتاجية

الموثوقية ومعالجة الأخطاء

4 دقيقة للقراءة

الأنظمة الإنتاجية تفشل. هذا الدرس يغطي أنماطاً للتعامل مع الفشل برشاقة والحفاظ على توفر الخدمة.

نقاط الفشل الشائعة

┌────────────────────────────────────────────────────────────────┐
│                    نقاط فشل RAG                                │
├────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐       │
│  │    API      │────▶│   مخزن     │────▶│    API      │       │
│  │  التضمين   │     │  المتجهات  │     │    LLM      │       │
│  └─────────────┘     └─────────────┘     └─────────────┘       │
│         │                   │                   │               │
│         ▼                   ▼                   ▼               │
│    • حدود المعدل      • الاتصال          • حدود المعدل        │
│    • المهلات          • مهلة الاستعلام   • المهلات            │
│    • أخطاء API       • مشاكل الفهرس    • تجاوز السياق        │
│    • تغييرات النموذج  • السعة           • تصفية المحتوى       │
│                                                                 │
└────────────────────────────────────────────────────────────────┘

إعادة المحاولة مع التراجع الأسي

import asyncio
import random
from functools import wraps
from typing import Type, Tuple

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,),
):
    """مزخرف لإعادة المحاولة مع التراجع الأسي."""

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)

                except retryable_exceptions as e:
                    last_exception = e

                    if attempt == max_retries:
                        raise

                    # حساب التأخير
                    delay = min(
                        base_delay * (exponential_base ** attempt),
                        max_delay
                    )

                    # إضافة تذبذب لمنع القطيع الهادر
                    if jitter:
                        delay = delay * (0.5 + random.random())

                    print(f"المحاولة {attempt + 1} فشلت: {e}. إعادة المحاولة خلال {delay:.1f}s")
                    await asyncio.sleep(delay)

            raise last_exception

        return wrapper
    return decorator

# الاستخدام
class RAGPipeline:

    @retry_with_backoff(
        max_retries=3,
        retryable_exceptions=(RateLimitError, TimeoutError, ConnectionError)
    )
    async def embed(self, text: str) -> list[float]:
        return await self.embedding_client.embed(text)

    @retry_with_backoff(
        max_retries=2,
        base_delay=2.0,
        retryable_exceptions=(RateLimitError, TimeoutError)
    )
    async def generate(self, prompt: str) -> str:
        return await self.llm_client.generate(prompt)

نمط قاطع الدائرة

import time
from enum import Enum
from dataclasses import dataclass

class CircuitState(Enum):
    CLOSED = "closed"      # تشغيل عادي
    OPEN = "open"          # فاشل، رفض الطلبات
    HALF_OPEN = "half_open"  # اختبار إذا تعافى

@dataclass
class CircuitBreaker:
    """قاطع دائرة لاستدعاءات الخدمات الخارجية."""

    failure_threshold: int = 5
    recovery_timeout: float = 30.0
    half_open_max_calls: int = 3

    def __post_init__(self):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0
        self.half_open_calls = 0

    def can_execute(self) -> bool:
        """تحقق إذا كان يجب أن يستمر الطلب."""

        if self.state == CircuitState.CLOSED:
            return True

        if self.state == CircuitState.OPEN:
            # تحقق إذا مرت مهلة التعافي
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
                return True
            return False

        if self.state == CircuitState.HALF_OPEN:
            return self.half_open_calls < self.half_open_max_calls

        return False

    def record_success(self):
        """تسجيل استدعاء ناجح."""
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max_calls:
                # تعافى
                self.state = CircuitState.CLOSED
                self.failure_count = 0

        elif self.state == CircuitState.CLOSED:
            self.failure_count = 0

    def record_failure(self):
        """تسجيل استدعاء فاشل."""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.state == CircuitState.HALF_OPEN:
            # فشل أثناء اختبار التعافي
            self.state = CircuitState.OPEN

        elif self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# الاستخدام
class ResilientRAG:

    def __init__(self):
        self.embedding_breaker = CircuitBreaker(failure_threshold=5)
        self.llm_breaker = CircuitBreaker(failure_threshold=3)

    async def query(self, question: str) -> str:
        # تحقق من قواطع الدائرة
        if not self.embedding_breaker.can_execute():
            return self._fallback_response("خدمة التضمين غير متوفرة")

        if not self.llm_breaker.can_execute():
            return self._fallback_response("خدمة LLM غير متوفرة")

        try:
            embedding = await self.embed(question)
            self.embedding_breaker.record_success()

            contexts = await self.retrieve(embedding)

            response = await self.generate(question, contexts)
            self.llm_breaker.record_success()

            return response

        except EmbeddingError as e:
            self.embedding_breaker.record_failure()
            return self._fallback_response(str(e))

        except LLMError as e:
            self.llm_breaker.record_failure()
            return self._fallback_response(str(e))

استراتيجيات الاحتياط

from typing import Optional, Callable

class FallbackRAG:
    """RAG مع استراتيجيات احتياط متعددة."""

    def __init__(
        self,
        primary_llm,
        fallback_llm,
        cached_responses: dict = None,
    ):
        self.primary_llm = primary_llm
        self.fallback_llm = fallback_llm
        self.cached_responses = cached_responses or {}

    async def query(self, question: str) -> dict:
        """استعلام مع سلسلة احتياط."""

        # جرب المسار الأساسي
        try:
            contexts = await self._retrieve(question)
            response = await self._generate_with_primary(question, contexts)

            return {
                "answer": response,
                "source": "primary",
                "contexts": contexts,
            }

        except PrimaryLLMError:
            pass  # المرور للاحتياط

        # الاحتياط 1: استخدم LLM بديل
        try:
            response = await self._generate_with_fallback(question, contexts)

            return {
                "answer": response,
                "source": "fallback_llm",
                "contexts": contexts,
            }

        except FallbackLLMError:
            pass  # المرور للتخزين المؤقت

        # الاحتياط 2: تحقق من التخزين المؤقت الدلالي لإجابات مشابهة
        cached = self._find_similar_cached(question)
        if cached:
            return {
                "answer": cached["answer"],
                "source": "cache",
                "contexts": cached.get("contexts", []),
                "warning": "استجابة من التخزين المؤقت - قد لا تكون حالية",
            }

        # الاحتياط 3: إرجاع خطأ مفيد
        return {
            "answer": self._graceful_error_message(question),
            "source": "error",
            "contexts": [],
        }

    def _graceful_error_message(self, question: str) -> str:
        """توليد رسالة خطأ مفيدة."""
        return (
            "لا أستطيع حالياً معالجة سؤالك بسبب "
            "مشكلة خدمة مؤقتة. يرجى المحاولة مرة أخرى بعد لحظات. "
            "إذا كنت بحاجة لمساعدة فورية، يرجى التواصل مع الدعم."
        )

    async def _generate_with_primary(self, question: str, contexts: list) -> str:
        """التوليد بـ LLM الأساسي (مثل GPT-4)."""
        return await self.primary_llm.generate(
            self._format_prompt(question, contexts),
            timeout=30,
        )

    async def _generate_with_fallback(self, question: str, contexts: list) -> str:
        """التوليد بـ LLM الاحتياطي (مثل GPT-3.5 أو نموذج محلي)."""
        return await self.fallback_llm.generate(
            self._format_prompt(question, contexts),
            timeout=60,  # مهلة أكثر سخاءً للاحتياط
        )

التدهور الرشيق

class DegradableRAG:
    """RAG يتدهور برشاقة تحت الحمل."""

    def __init__(self):
        self.load_level = "normal"  # normal, high, critical

    async def query(self, question: str, priority: str = "normal") -> dict:
        """استعلام مع تدهور مدرك للحمل."""

        # تحقق من مستوى الحمل الحالي
        self.load_level = await self._check_load()

        if self.load_level == "critical":
            # معالجة طلبات الأولوية العالية فقط
            if priority != "high":
                return self._queue_response(question)

            # معالجة أدنى
            return await self._minimal_query(question)

        if self.load_level == "high":
            # تخطي إعادة الترتيب، استخدم سياقات أقل
            return await self._reduced_query(question)

        # معالجة عادية
        return await self._full_query(question)

    async def _full_query(self, question: str) -> dict:
        """خط أنابيب كامل بكل الميزات."""
        embedding = await self.embed(question)
        docs = await self.retrieve(embedding, k=20)
        reranked = await self.rerank(question, docs)
        response = await self.generate(question, reranked[:5])

        return {"answer": response, "mode": "full"}

    async def _reduced_query(self, question: str) -> dict:
        """خط أنابيب مخفف - تخطي إعادة الترتيب."""
        embedding = await self.embed(question)
        docs = await self.retrieve(embedding, k=5)  # مستندات أقل
        response = await self.generate(question, docs)

        return {"answer": response, "mode": "reduced"}

    async def _minimal_query(self, question: str) -> dict:
        """خط أنابيب أدنى - تخزين مؤقت دلالي أو استرجاع بسيط."""
        # جرب التخزين المؤقت أولاً
        cached = self.cache.get(question)
        if cached:
            return {"answer": cached, "mode": "cached"}

        # بحث كلمات مفتاحية بسيط + نموذج سريع
        docs = await self.keyword_search(question, k=3)
        response = await self.fast_llm.generate(question, docs)

        return {"answer": response, "mode": "minimal"}

    def _queue_response(self, question: str) -> dict:
        """وضع الطلب في قائمة انتظار للمعالجة لاحقاً."""
        ticket_id = self.queue.add(question)

        return {
            "answer": f"النظام تحت حمل عالي. طلبك تم وضعه في قائمة الانتظار. التذكرة: {ticket_id}",
            "mode": "queued",
            "ticket_id": ticket_id,
        }

فحوصات الصحة

from fastapi import FastAPI, Response
from datetime import datetime

app = FastAPI()

class HealthChecker:
    """فحص صحة لمكونات RAG."""

    async def check_all(self) -> dict:
        """تشغيل كل فحوصات الصحة."""

        checks = {
            "embedding_api": await self._check_embedding(),
            "vector_store": await self._check_vectorstore(),
            "llm_api": await self._check_llm(),
            "cache": await self._check_cache(),
        }

        overall = all(c["healthy"] for c in checks.values())

        return {
            "healthy": overall,
            "timestamp": datetime.utcnow().isoformat(),
            "checks": checks,
        }

    async def _check_embedding(self) -> dict:
        try:
            start = time.time()
            await self.embedding_client.embed("فحص صحة")
            latency = (time.time() - start) * 1000

            return {"healthy": True, "latency_ms": latency}
        except Exception as e:
            return {"healthy": False, "error": str(e)}

    async def _check_vectorstore(self) -> dict:
        try:
            start = time.time()
            await self.vectorstore.search([0.0] * 1536, k=1)
            latency = (time.time() - start) * 1000

            return {"healthy": True, "latency_ms": latency}
        except Exception as e:
            return {"healthy": False, "error": str(e)}

    async def _check_llm(self) -> dict:
        try:
            start = time.time()
            await self.llm.generate("قل 'موافق'", max_tokens=5)
            latency = (time.time() - start) * 1000

            return {"healthy": True, "latency_ms": latency}
        except Exception as e:
            return {"healthy": False, "error": str(e)}

health_checker = HealthChecker()

@app.get("/health")
async def health_endpoint():
    result = await health_checker.check_all()
    status_code = 200 if result["healthy"] else 503
    return Response(content=json.dumps(result), status_code=status_code)

رؤية رئيسية: صمم للفشل من البداية. المستخدمون يفضلون استجابة متدهورة مع شفافية ("أستخدم بيانات مخزنة مؤقتاً") على خطأ غامض أو مهلة.

التالي، لنغطي المراقبة وإدارة التكاليف. :::

اختبار

الوحدة 6: أنظمة RAG الإنتاجية

خذ الاختبار