أنظمة RAG الإنتاجية
تحسين الأداء
4 دقيقة للقراءة
أنظمة RAG الإنتاجية تحتاج زمن استجابة أقل من ثانية. هذا الدرس يغطي تقنيات تحسين كل مرحلة من خط الأنابيب.
تفصيل زمن الاستجابة
┌────────────────────────────────────────────────────────────────┐
│ زمن استجابة RAG النموذجي │
├────────────────────────────────────────────────────────────────┤
│ │
│ تضمين الاستعلام: 50-150ms ████░░░░░░ │
│ بحث المتجهات: 20-100ms ███░░░░░░░ │
│ إعادة الترتيب: 100-300ms ██████░░░░ │
│ توليد LLM: 500-2000ms ██████████████████████ │
│ │
│ الإجمالي: 670-2550ms │
│ │
│ الهدف للإنتاج: < 1000ms (بدون تدفق) │
│ الهدف مع التدفق: < 300ms لأول رمز │
│ │
└────────────────────────────────────────────────────────────────┘
التخزين المؤقت الدلالي
تخزين الاستجابات للاستعلامات المتشابهة دلالياً:
import hashlib
from typing import Optional
import numpy as np
class SemanticCache:
"""تخزين مؤقت لاستجابات RAG بناءً على التشابه الدلالي."""
def __init__(
self,
embedding_model,
similarity_threshold: float = 0.95,
max_entries: int = 10000,
ttl_seconds: int = 3600,
):
self.embedding_model = embedding_model
self.similarity_threshold = similarity_threshold
self.max_entries = max_entries
self.ttl_seconds = ttl_seconds
# في الإنتاج، استخدم Redis أو مشابه
self.cache = {} # {query_hash: {embedding, response, timestamp}}
def get(self, query: str) -> Optional[str]:
"""تحقق من التخزين المؤقت لاستعلام متشابه دلالياً."""
query_embedding = self.embedding_model.embed(query)
for cached in self.cache.values():
# تحقق من TTL
if time.time() - cached["timestamp"] > self.ttl_seconds:
continue
# تحقق من التشابه الدلالي
similarity = self._cosine_similarity(
query_embedding,
cached["embedding"]
)
if similarity >= self.similarity_threshold:
return cached["response"]
return None
def set(self, query: str, response: str):
"""تخزين استجابة."""
if len(self.cache) >= self.max_entries:
self._evict_oldest()
query_hash = hashlib.md5(query.encode()).hexdigest()
self.cache[query_hash] = {
"embedding": self.embedding_model.embed(query),
"response": response,
"timestamp": time.time(),
}
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def _evict_oldest(self):
oldest = min(self.cache.items(), key=lambda x: x[1]["timestamp"])
del self.cache[oldest[0]]
# الاستخدام
cache = SemanticCache(embedding_model, similarity_threshold=0.95)
def query_rag(question: str) -> str:
# تحقق من التخزين المؤقت أولاً
cached = cache.get(question)
if cached:
return cached # ~5ms بدلاً من ~1500ms
# تشغيل خط الأنابيب الكامل
response = rag_pipeline.query(question)
# تخزين للمستقبل
cache.set(question, response)
return response
المعالجة غير المتزامنة والمتوازية
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncRAGPipeline:
"""خط أنابيب RAG مع عمليات غير متزامنة."""
def __init__(self, vectorstore, reranker, llm):
self.vectorstore = vectorstore
self.reranker = reranker
self.llm = llm
self.executor = ThreadPoolExecutor(max_workers=4)
async def query(self, question: str) -> str:
# متوازي: التضمين + بحث BM25
embedding_task = asyncio.create_task(
self._async_embed(question)
)
bm25_task = asyncio.create_task(
self._async_bm25_search(question)
)
# انتظار طريقتي الاسترجاع
query_embedding, bm25_results = await asyncio.gather(
embedding_task, bm25_task
)
# بحث المتجهات بالتضمين
vector_results = await self._async_vector_search(query_embedding)
# دمج وإعادة ترتيب
combined = self._merge_results(vector_results, bm25_results)
reranked = await self._async_rerank(question, combined)
# توليد الاستجابة
response = await self._async_generate(question, reranked[:5])
return response
async def _async_embed(self, text: str):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self.embedding_model.embed,
text
)
async def _async_vector_search(self, embedding):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.vectorstore.search(embedding, k=20)
)
async def _async_rerank(self, query: str, docs: list):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.reranker.rerank(query, docs)
)
async def _async_generate(self, question: str, contexts: list):
# معظم عملاء LLM يدعمون async أصلياً
return await self.llm.agenerate(
prompt=self._format_prompt(question, contexts)
)
# الاستخدام
pipeline = AsyncRAGPipeline(vectorstore, reranker, llm)
response = await pipeline.query("كيف أعيد تعيين كلمة المرور؟")
تدفق الاستجابات
from typing import AsyncGenerator
class StreamingRAG:
"""RAG مع تدفق لوقت أسرع لأول رمز."""
async def query_stream(
self,
question: str
) -> AsyncGenerator[str, None]:
"""تدفق رموز الاستجابة أثناء توليدها."""
# الاسترجاع (لا يمكن تدفقه، افعله أولاً)
contexts = await self._retrieve(question)
# تدفق التوليد
prompt = self._format_prompt(question, contexts)
async for token in self.llm.astream(prompt):
yield token
async def _retrieve(self, question: str) -> list[str]:
# مسار استرجاع سريع
docs = await self.vectorstore.asearch(question, k=5)
return [doc.page_content for doc in docs]
# نقطة نهاية FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
rag = StreamingRAG()
@app.get("/query")
async def query_endpoint(question: str):
async def generate():
async for token in rag.query_stream(question):
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
التجميع للإنتاجية
import asyncio
from dataclasses import dataclass
from typing import List
@dataclass
class BatchRequest:
question: str
future: asyncio.Future
class BatchingRAG:
"""تجميع استعلامات متعددة للمعالجة الفعالة."""
def __init__(self, batch_size: int = 8, max_wait_ms: int = 50):
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
self.queue: List[BatchRequest] = []
self.lock = asyncio.Lock()
self._processing = False
async def query(self, question: str) -> str:
"""إضافة استعلام للدفعة وانتظار النتيجة."""
future = asyncio.Future()
request = BatchRequest(question=question, future=future)
async with self.lock:
self.queue.append(request)
if len(self.queue) >= self.batch_size:
await self._process_batch()
elif not self._processing:
# بدء مؤقت للدفعة الجزئية
asyncio.create_task(self._wait_and_process())
return await future
async def _wait_and_process(self):
"""انتظار المزيد من الطلبات أو انتهاء المهلة."""
self._processing = True
await asyncio.sleep(self.max_wait_ms / 1000)
async with self.lock:
if self.queue:
await self._process_batch()
self._processing = False
async def _process_batch(self):
"""معالجة كل الطلبات في قائمة الانتظار كدفعة."""
batch = self.queue
self.queue = []
# تضمين كل الأسئلة كدفعة
questions = [r.question for r in batch]
embeddings = self.embedding_model.embed_batch(questions)
# بحث متجهات كدفعة
all_results = self.vectorstore.batch_search(embeddings, k=5)
# توليد الاستجابات (يمكن تجميعها مع بعض LLMs)
for request, results in zip(batch, all_results):
response = await self._generate(request.question, results)
request.future.set_result(response)
قائمة فحص التحسين
| التحسين | التأثير | الجهد | متى تستخدم |
|---|---|---|---|
| التخزين المؤقت الدلالي | عالي | متوسط | استعلامات متكررة متشابهة |
| الاسترجاع غير المتزامن | متوسط | منخفض | دائماً |
| التدفق | تجربة مستخدم عالية | منخفض | التطبيقات الموجهة للمستخدم |
| التجميع | إنتاجية عالية | متوسط | QPS عالي |
| التضمينات المكممة | متوسط | منخفض | الفهارس الكبيرة |
| سياق أصغر | متوسط | منخفض | عندما LLM هو عنق الزجاجة |
قياس أداء خط الأنابيب
import time
import statistics
def benchmark_rag(pipeline, questions: list, runs: int = 3) -> dict:
"""قياس أداء خط أنابيب RAG."""
latencies = {
"total": [],
"embedding": [],
"retrieval": [],
"reranking": [],
"generation": [],
}
for _ in range(runs):
for q in questions:
times = {}
start = time.perf_counter()
embedding = pipeline.embed(q)
times["embedding"] = time.perf_counter() - start
start = time.perf_counter()
docs = pipeline.retrieve(embedding)
times["retrieval"] = time.perf_counter() - start
start = time.perf_counter()
reranked = pipeline.rerank(q, docs)
times["reranking"] = time.perf_counter() - start
start = time.perf_counter()
response = pipeline.generate(q, reranked)
times["generation"] = time.perf_counter() - start
times["total"] = sum(times.values())
for key, value in times.items():
latencies[key].append(value * 1000) # تحويل لـ ms
return {
key: {
"mean": statistics.mean(values),
"p50": statistics.median(values),
"p95": sorted(values)[int(len(values) * 0.95)],
"p99": sorted(values)[int(len(values) * 0.99)],
}
for key, values in latencies.items()
}
# تشغيل القياس
results = benchmark_rag(pipeline, test_questions, runs=3)
print(f"الإجمالي P95: {results['total']['p95']:.0f}ms")
print(f"التوليد P95: {results['generation']['p95']:.0f}ms")
رؤية رئيسية: توليد LLM عادةً 70-80% من إجمالي زمن الاستجابة. التدفق لا يقلل الوقت الإجمالي لكنه يحسن بشكل كبير زمن الاستجابة المدرك بإظهار التقدم فوراً.
التالي، لننفذ أنماط الموثوقية للإنتاج. :::