أنظمة RAG الإنتاجية
الموثوقية ومعالجة الأخطاء
4 دقيقة للقراءة
الأنظمة الإنتاجية تفشل. هذا الدرس يغطي أنماطاً للتعامل مع الفشل برشاقة والحفاظ على توفر الخدمة.
نقاط الفشل الشائعة
┌────────────────────────────────────────────────────────────────┐
│ نقاط فشل RAG │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ API │────▶│ مخزن │────▶│ API │ │
│ │ التضمين │ │ المتجهات │ │ LLM │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ • حدود المعدل • الاتصال • حدود المعدل │
│ • المهلات • مهلة الاستعلام • المهلات │
│ • أخطاء API • مشاكل الفهرس • تجاوز السياق │
│ • تغييرات النموذج • السعة • تصفية المحتوى │
│ │
└────────────────────────────────────────────────────────────────┘
إعادة المحاولة مع التراجع الأسي
import asyncio
import random
from functools import wraps
from typing import Type, Tuple
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,),
):
"""مزخرف لإعادة المحاولة مع التراجع الأسي."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt == max_retries:
raise
# حساب التأخير
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
# إضافة تذبذب لمنع القطيع الهادر
if jitter:
delay = delay * (0.5 + random.random())
print(f"المحاولة {attempt + 1} فشلت: {e}. إعادة المحاولة خلال {delay:.1f}s")
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
# الاستخدام
class RAGPipeline:
@retry_with_backoff(
max_retries=3,
retryable_exceptions=(RateLimitError, TimeoutError, ConnectionError)
)
async def embed(self, text: str) -> list[float]:
return await self.embedding_client.embed(text)
@retry_with_backoff(
max_retries=2,
base_delay=2.0,
retryable_exceptions=(RateLimitError, TimeoutError)
)
async def generate(self, prompt: str) -> str:
return await self.llm_client.generate(prompt)
نمط قاطع الدائرة
import time
from enum import Enum
from dataclasses import dataclass
class CircuitState(Enum):
CLOSED = "closed" # تشغيل عادي
OPEN = "open" # فاشل، رفض الطلبات
HALF_OPEN = "half_open" # اختبار إذا تعافى
@dataclass
class CircuitBreaker:
"""قاطع دائرة لاستدعاءات الخدمات الخارجية."""
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_max_calls: int = 3
def __post_init__(self):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_calls = 0
def can_execute(self) -> bool:
"""تحقق إذا كان يجب أن يستمر الطلب."""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# تحقق إذا مرت مهلة التعافي
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
return True
return False
if self.state == CircuitState.HALF_OPEN:
return self.half_open_calls < self.half_open_max_calls
return False
def record_success(self):
"""تسجيل استدعاء ناجح."""
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max_calls:
# تعافى
self.state = CircuitState.CLOSED
self.failure_count = 0
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
def record_failure(self):
"""تسجيل استدعاء فاشل."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
# فشل أثناء اختبار التعافي
self.state = CircuitState.OPEN
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# الاستخدام
class ResilientRAG:
def __init__(self):
self.embedding_breaker = CircuitBreaker(failure_threshold=5)
self.llm_breaker = CircuitBreaker(failure_threshold=3)
async def query(self, question: str) -> str:
# تحقق من قواطع الدائرة
if not self.embedding_breaker.can_execute():
return self._fallback_response("خدمة التضمين غير متوفرة")
if not self.llm_breaker.can_execute():
return self._fallback_response("خدمة LLM غير متوفرة")
try:
embedding = await self.embed(question)
self.embedding_breaker.record_success()
contexts = await self.retrieve(embedding)
response = await self.generate(question, contexts)
self.llm_breaker.record_success()
return response
except EmbeddingError as e:
self.embedding_breaker.record_failure()
return self._fallback_response(str(e))
except LLMError as e:
self.llm_breaker.record_failure()
return self._fallback_response(str(e))
استراتيجيات الاحتياط
from typing import Optional, Callable
class FallbackRAG:
"""RAG مع استراتيجيات احتياط متعددة."""
def __init__(
self,
primary_llm,
fallback_llm,
cached_responses: dict = None,
):
self.primary_llm = primary_llm
self.fallback_llm = fallback_llm
self.cached_responses = cached_responses or {}
async def query(self, question: str) -> dict:
"""استعلام مع سلسلة احتياط."""
# جرب المسار الأساسي
try:
contexts = await self._retrieve(question)
response = await self._generate_with_primary(question, contexts)
return {
"answer": response,
"source": "primary",
"contexts": contexts,
}
except PrimaryLLMError:
pass # المرور للاحتياط
# الاحتياط 1: استخدم LLM بديل
try:
response = await self._generate_with_fallback(question, contexts)
return {
"answer": response,
"source": "fallback_llm",
"contexts": contexts,
}
except FallbackLLMError:
pass # المرور للتخزين المؤقت
# الاحتياط 2: تحقق من التخزين المؤقت الدلالي لإجابات مشابهة
cached = self._find_similar_cached(question)
if cached:
return {
"answer": cached["answer"],
"source": "cache",
"contexts": cached.get("contexts", []),
"warning": "استجابة من التخزين المؤقت - قد لا تكون حالية",
}
# الاحتياط 3: إرجاع خطأ مفيد
return {
"answer": self._graceful_error_message(question),
"source": "error",
"contexts": [],
}
def _graceful_error_message(self, question: str) -> str:
"""توليد رسالة خطأ مفيدة."""
return (
"لا أستطيع حالياً معالجة سؤالك بسبب "
"مشكلة خدمة مؤقتة. يرجى المحاولة مرة أخرى بعد لحظات. "
"إذا كنت بحاجة لمساعدة فورية، يرجى التواصل مع الدعم."
)
async def _generate_with_primary(self, question: str, contexts: list) -> str:
"""التوليد بـ LLM الأساسي (مثل GPT-4)."""
return await self.primary_llm.generate(
self._format_prompt(question, contexts),
timeout=30,
)
async def _generate_with_fallback(self, question: str, contexts: list) -> str:
"""التوليد بـ LLM الاحتياطي (مثل GPT-3.5 أو نموذج محلي)."""
return await self.fallback_llm.generate(
self._format_prompt(question, contexts),
timeout=60, # مهلة أكثر سخاءً للاحتياط
)
التدهور الرشيق
class DegradableRAG:
"""RAG يتدهور برشاقة تحت الحمل."""
def __init__(self):
self.load_level = "normal" # normal, high, critical
async def query(self, question: str, priority: str = "normal") -> dict:
"""استعلام مع تدهور مدرك للحمل."""
# تحقق من مستوى الحمل الحالي
self.load_level = await self._check_load()
if self.load_level == "critical":
# معالجة طلبات الأولوية العالية فقط
if priority != "high":
return self._queue_response(question)
# معالجة أدنى
return await self._minimal_query(question)
if self.load_level == "high":
# تخطي إعادة الترتيب، استخدم سياقات أقل
return await self._reduced_query(question)
# معالجة عادية
return await self._full_query(question)
async def _full_query(self, question: str) -> dict:
"""خط أنابيب كامل بكل الميزات."""
embedding = await self.embed(question)
docs = await self.retrieve(embedding, k=20)
reranked = await self.rerank(question, docs)
response = await self.generate(question, reranked[:5])
return {"answer": response, "mode": "full"}
async def _reduced_query(self, question: str) -> dict:
"""خط أنابيب مخفف - تخطي إعادة الترتيب."""
embedding = await self.embed(question)
docs = await self.retrieve(embedding, k=5) # مستندات أقل
response = await self.generate(question, docs)
return {"answer": response, "mode": "reduced"}
async def _minimal_query(self, question: str) -> dict:
"""خط أنابيب أدنى - تخزين مؤقت دلالي أو استرجاع بسيط."""
# جرب التخزين المؤقت أولاً
cached = self.cache.get(question)
if cached:
return {"answer": cached, "mode": "cached"}
# بحث كلمات مفتاحية بسيط + نموذج سريع
docs = await self.keyword_search(question, k=3)
response = await self.fast_llm.generate(question, docs)
return {"answer": response, "mode": "minimal"}
def _queue_response(self, question: str) -> dict:
"""وضع الطلب في قائمة انتظار للمعالجة لاحقاً."""
ticket_id = self.queue.add(question)
return {
"answer": f"النظام تحت حمل عالي. طلبك تم وضعه في قائمة الانتظار. التذكرة: {ticket_id}",
"mode": "queued",
"ticket_id": ticket_id,
}
فحوصات الصحة
from fastapi import FastAPI, Response
from datetime import datetime
app = FastAPI()
class HealthChecker:
"""فحص صحة لمكونات RAG."""
async def check_all(self) -> dict:
"""تشغيل كل فحوصات الصحة."""
checks = {
"embedding_api": await self._check_embedding(),
"vector_store": await self._check_vectorstore(),
"llm_api": await self._check_llm(),
"cache": await self._check_cache(),
}
overall = all(c["healthy"] for c in checks.values())
return {
"healthy": overall,
"timestamp": datetime.utcnow().isoformat(),
"checks": checks,
}
async def _check_embedding(self) -> dict:
try:
start = time.time()
await self.embedding_client.embed("فحص صحة")
latency = (time.time() - start) * 1000
return {"healthy": True, "latency_ms": latency}
except Exception as e:
return {"healthy": False, "error": str(e)}
async def _check_vectorstore(self) -> dict:
try:
start = time.time()
await self.vectorstore.search([0.0] * 1536, k=1)
latency = (time.time() - start) * 1000
return {"healthy": True, "latency_ms": latency}
except Exception as e:
return {"healthy": False, "error": str(e)}
async def _check_llm(self) -> dict:
try:
start = time.time()
await self.llm.generate("قل 'موافق'", max_tokens=5)
latency = (time.time() - start) * 1000
return {"healthy": True, "latency_ms": latency}
except Exception as e:
return {"healthy": False, "error": str(e)}
health_checker = HealthChecker()
@app.get("/health")
async def health_endpoint():
result = await health_checker.check_all()
status_code = 200 if result["healthy"] else 503
return Response(content=json.dumps(result), status_code=status_code)
رؤية رئيسية: صمم للفشل من البداية. المستخدمون يفضلون استجابة متدهورة مع شفافية ("أستخدم بيانات مخزنة مؤقتاً") على خطأ غامض أو مهلة.
التالي، لنغطي المراقبة وإدارة التكاليف. :::