الدرس 20 من 23

أنظمة RAG الإنتاجية

تحسين الأداء

4 دقيقة للقراءة

أنظمة RAG الإنتاجية تحتاج زمن استجابة أقل من ثانية. هذا الدرس يغطي تقنيات تحسين كل مرحلة من خط الأنابيب.

تفصيل زمن الاستجابة

┌────────────────────────────────────────────────────────────────┐
│                    زمن استجابة RAG النموذجي                     │
├────────────────────────────────────────────────────────────────┤
│                                                                 │
│  تضمين الاستعلام:     50-150ms   ████░░░░░░                     │
│  بحث المتجهات:        20-100ms   ███░░░░░░░                     │
│  إعادة الترتيب:      100-300ms   ██████░░░░                     │
│  توليد LLM:        500-2000ms   ██████████████████████         │
│                                                                 │
│  الإجمالي:          670-2550ms                                  │
│                                                                 │
│  الهدف للإنتاج: < 1000ms (بدون تدفق)                           │
│  الهدف مع التدفق: < 300ms لأول رمز                             │
│                                                                 │
└────────────────────────────────────────────────────────────────┘

التخزين المؤقت الدلالي

تخزين الاستجابات للاستعلامات المتشابهة دلالياً:

import hashlib
from typing import Optional
import numpy as np

class SemanticCache:
    """تخزين مؤقت لاستجابات RAG بناءً على التشابه الدلالي."""

    def __init__(
        self,
        embedding_model,
        similarity_threshold: float = 0.95,
        max_entries: int = 10000,
        ttl_seconds: int = 3600,
    ):
        self.embedding_model = embedding_model
        self.similarity_threshold = similarity_threshold
        self.max_entries = max_entries
        self.ttl_seconds = ttl_seconds

        # في الإنتاج، استخدم Redis أو مشابه
        self.cache = {}  # {query_hash: {embedding, response, timestamp}}

    def get(self, query: str) -> Optional[str]:
        """تحقق من التخزين المؤقت لاستعلام متشابه دلالياً."""

        query_embedding = self.embedding_model.embed(query)

        for cached in self.cache.values():
            # تحقق من TTL
            if time.time() - cached["timestamp"] > self.ttl_seconds:
                continue

            # تحقق من التشابه الدلالي
            similarity = self._cosine_similarity(
                query_embedding,
                cached["embedding"]
            )

            if similarity >= self.similarity_threshold:
                return cached["response"]

        return None

    def set(self, query: str, response: str):
        """تخزين استجابة."""

        if len(self.cache) >= self.max_entries:
            self._evict_oldest()

        query_hash = hashlib.md5(query.encode()).hexdigest()
        self.cache[query_hash] = {
            "embedding": self.embedding_model.embed(query),
            "response": response,
            "timestamp": time.time(),
        }

    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

    def _evict_oldest(self):
        oldest = min(self.cache.items(), key=lambda x: x[1]["timestamp"])
        del self.cache[oldest[0]]

# الاستخدام
cache = SemanticCache(embedding_model, similarity_threshold=0.95)

def query_rag(question: str) -> str:
    # تحقق من التخزين المؤقت أولاً
    cached = cache.get(question)
    if cached:
        return cached  # ~5ms بدلاً من ~1500ms

    # تشغيل خط الأنابيب الكامل
    response = rag_pipeline.query(question)

    # تخزين للمستقبل
    cache.set(question, response)

    return response

المعالجة غير المتزامنة والمتوازية

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncRAGPipeline:
    """خط أنابيب RAG مع عمليات غير متزامنة."""

    def __init__(self, vectorstore, reranker, llm):
        self.vectorstore = vectorstore
        self.reranker = reranker
        self.llm = llm
        self.executor = ThreadPoolExecutor(max_workers=4)

    async def query(self, question: str) -> str:
        # متوازي: التضمين + بحث BM25
        embedding_task = asyncio.create_task(
            self._async_embed(question)
        )
        bm25_task = asyncio.create_task(
            self._async_bm25_search(question)
        )

        # انتظار طريقتي الاسترجاع
        query_embedding, bm25_results = await asyncio.gather(
            embedding_task, bm25_task
        )

        # بحث المتجهات بالتضمين
        vector_results = await self._async_vector_search(query_embedding)

        # دمج وإعادة ترتيب
        combined = self._merge_results(vector_results, bm25_results)
        reranked = await self._async_rerank(question, combined)

        # توليد الاستجابة
        response = await self._async_generate(question, reranked[:5])

        return response

    async def _async_embed(self, text: str):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            self.embedding_model.embed,
            text
        )

    async def _async_vector_search(self, embedding):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            lambda: self.vectorstore.search(embedding, k=20)
        )

    async def _async_rerank(self, query: str, docs: list):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            lambda: self.reranker.rerank(query, docs)
        )

    async def _async_generate(self, question: str, contexts: list):
        # معظم عملاء LLM يدعمون async أصلياً
        return await self.llm.agenerate(
            prompt=self._format_prompt(question, contexts)
        )

# الاستخدام
pipeline = AsyncRAGPipeline(vectorstore, reranker, llm)
response = await pipeline.query("كيف أعيد تعيين كلمة المرور؟")

تدفق الاستجابات

from typing import AsyncGenerator

class StreamingRAG:
    """RAG مع تدفق لوقت أسرع لأول رمز."""

    async def query_stream(
        self,
        question: str
    ) -> AsyncGenerator[str, None]:
        """تدفق رموز الاستجابة أثناء توليدها."""

        # الاسترجاع (لا يمكن تدفقه، افعله أولاً)
        contexts = await self._retrieve(question)

        # تدفق التوليد
        prompt = self._format_prompt(question, contexts)

        async for token in self.llm.astream(prompt):
            yield token

    async def _retrieve(self, question: str) -> list[str]:
        # مسار استرجاع سريع
        docs = await self.vectorstore.asearch(question, k=5)
        return [doc.page_content for doc in docs]

# نقطة نهاية FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()
rag = StreamingRAG()

@app.get("/query")
async def query_endpoint(question: str):
    async def generate():
        async for token in rag.query_stream(question):
            yield f"data: {token}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

التجميع للإنتاجية

import asyncio
from dataclasses import dataclass
from typing import List

@dataclass
class BatchRequest:
    question: str
    future: asyncio.Future

class BatchingRAG:
    """تجميع استعلامات متعددة للمعالجة الفعالة."""

    def __init__(self, batch_size: int = 8, max_wait_ms: int = 50):
        self.batch_size = batch_size
        self.max_wait_ms = max_wait_ms
        self.queue: List[BatchRequest] = []
        self.lock = asyncio.Lock()
        self._processing = False

    async def query(self, question: str) -> str:
        """إضافة استعلام للدفعة وانتظار النتيجة."""

        future = asyncio.Future()
        request = BatchRequest(question=question, future=future)

        async with self.lock:
            self.queue.append(request)

            if len(self.queue) >= self.batch_size:
                await self._process_batch()
            elif not self._processing:
                # بدء مؤقت للدفعة الجزئية
                asyncio.create_task(self._wait_and_process())

        return await future

    async def _wait_and_process(self):
        """انتظار المزيد من الطلبات أو انتهاء المهلة."""
        self._processing = True
        await asyncio.sleep(self.max_wait_ms / 1000)

        async with self.lock:
            if self.queue:
                await self._process_batch()
            self._processing = False

    async def _process_batch(self):
        """معالجة كل الطلبات في قائمة الانتظار كدفعة."""

        batch = self.queue
        self.queue = []

        # تضمين كل الأسئلة كدفعة
        questions = [r.question for r in batch]
        embeddings = self.embedding_model.embed_batch(questions)

        # بحث متجهات كدفعة
        all_results = self.vectorstore.batch_search(embeddings, k=5)

        # توليد الاستجابات (يمكن تجميعها مع بعض LLMs)
        for request, results in zip(batch, all_results):
            response = await self._generate(request.question, results)
            request.future.set_result(response)

قائمة فحص التحسين

التحسين التأثير الجهد متى تستخدم
التخزين المؤقت الدلالي عالي متوسط استعلامات متكررة متشابهة
الاسترجاع غير المتزامن متوسط منخفض دائماً
التدفق تجربة مستخدم عالية منخفض التطبيقات الموجهة للمستخدم
التجميع إنتاجية عالية متوسط QPS عالي
التضمينات المكممة متوسط منخفض الفهارس الكبيرة
سياق أصغر متوسط منخفض عندما LLM هو عنق الزجاجة

قياس أداء خط الأنابيب

import time
import statistics

def benchmark_rag(pipeline, questions: list, runs: int = 3) -> dict:
    """قياس أداء خط أنابيب RAG."""

    latencies = {
        "total": [],
        "embedding": [],
        "retrieval": [],
        "reranking": [],
        "generation": [],
    }

    for _ in range(runs):
        for q in questions:
            times = {}

            start = time.perf_counter()
            embedding = pipeline.embed(q)
            times["embedding"] = time.perf_counter() - start

            start = time.perf_counter()
            docs = pipeline.retrieve(embedding)
            times["retrieval"] = time.perf_counter() - start

            start = time.perf_counter()
            reranked = pipeline.rerank(q, docs)
            times["reranking"] = time.perf_counter() - start

            start = time.perf_counter()
            response = pipeline.generate(q, reranked)
            times["generation"] = time.perf_counter() - start

            times["total"] = sum(times.values())

            for key, value in times.items():
                latencies[key].append(value * 1000)  # تحويل لـ ms

    return {
        key: {
            "mean": statistics.mean(values),
            "p50": statistics.median(values),
            "p95": sorted(values)[int(len(values) * 0.95)],
            "p99": sorted(values)[int(len(values) * 0.99)],
        }
        for key, values in latencies.items()
    }

# تشغيل القياس
results = benchmark_rag(pipeline, test_questions, runs=3)
print(f"الإجمالي P95: {results['total']['p95']:.0f}ms")
print(f"التوليد P95: {results['generation']['p95']:.0f}ms")

رؤية رئيسية: توليد LLM عادةً 70-80% من إجمالي زمن الاستجابة. التدفق لا يقلل الوقت الإجمالي لكنه يحسن بشكل كبير زمن الاستجابة المدرك بإظهار التقدم فوراً.

التالي، لننفذ أنماط الموثوقية للإنتاج. :::

اختبار

الوحدة 6: أنظمة RAG الإنتاجية

خذ الاختبار