الدرس 12 من 23
تصميم نظام RAG

توسيع أنظمة RAG

4 دقيقة للقراءة

توسيع RAG من نموذج أولي إلى الإنتاج يتطلب دراسة متأنية للفهرسة والاسترجاع والبنية التحتية. يغطي هذا الدرس أنماط التعامل مع ملايين المستندات.

تحديات التوسع

التحديعند 10K مستندعند 10M مستند
وقت الفهرسةدقائقأيام
زمن استجابة الاستعلام< 100 مللي ثانيةثوانٍ بدون تحسين
التخزينجيجابايتتيرابايت
التكلفة$10/شهر$1000+/شهر

استراتيجيات التجزئة

حسب المجموعة/مساحة الاسم

class ShardedVectorStore:
    def __init__(self, base_client):
        self.client = base_client
        self.shards = {}

    def get_shard(self, category: str):
        """التوجيه إلى التجزئة المناسبة بناءً على الفئة."""
        if category not in self.shards:
            self.shards[category] = self.client.collection(f"docs_{category}")
        return self.shards[category]

    async def insert(self, doc: dict):
        category = doc["metadata"]["category"]
        shard = self.get_shard(category)
        await shard.insert(doc)

    async def search(self, query_embedding, category: str = None, top_k: int = 10):
        if category:
            # البحث في تجزئة واحدة
            shard = self.get_shard(category)
            return await shard.search(query_embedding, top_k=top_k)
        else:
            # البحث في جميع التجزئات والدمج
            all_results = []
            for shard in self.shards.values():
                results = await shard.search(query_embedding, top_k=top_k)
                all_results.extend(results)

            # الترتيب وإرجاع أفضل k عبر جميع التجزئات
            all_results.sort(key=lambda x: x["score"], reverse=True)
            return all_results[:top_k]

حسب الفترة الزمنية

class TimeBasedSharding:
    def __init__(self, client):
        self.client = client

    def get_shard_name(self, timestamp: datetime) -> str:
        """التجزئة حسب الشهر."""
        return f"docs_{timestamp.year}_{timestamp.month:02d}"

    async def search_date_range(
        self,
        query_embedding,
        start_date: datetime,
        end_date: datetime,
        top_k: int = 10
    ):
        # تحديد التجزئات للاستعلام
        shards = self._get_shards_in_range(start_date, end_date)

        # الاستعلام عن التجزئات ذات الصلة بالتوازي
        tasks = [
            self.client.collection(shard).search(query_embedding, top_k=top_k)
            for shard in shards
        ]
        results = await asyncio.gather(*tasks)

        # الدمج والإرجاع
        merged = [r for batch in results for r in batch]
        merged.sort(key=lambda x: x["score"], reverse=True)
        return merged[:top_k]

تحسين الفهرس

فهارس الجار الأقرب التقريبي (ANN)

# أنواع فهارس pgvector
CREATE INDEX ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);  # جيد لـ < 1M متجه

# لمجموعات بيانات أكبر
CREATE INDEX ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);  # أفضل لـ > 1M متجه

مقارنة الفهارس:

نوع الفهرسوقت البناءسرعة الاستعلامالذاكرةالأفضل لـ
Flatسريعبطيء (O(n))منخفضة< 100K
IVFمتوسطسريعمتوسطة100K-10M
HNSWبطيءالأسرععالية> 1M

الفهرسة التزايدية

لا تعد فهرسة كل شيء عند تغير المستندات:

class IncrementalIndexer:
    def __init__(self, vector_store, embedding_model):
        self.store = vector_store
        self.embedder = embedding_model
        self.pending_updates = []
        self.batch_size = 100

    async def add_document(self, doc: dict):
        """إضافة مستند للمعالجة الدفعية."""
        embedding = await self.embedder.embed(doc["content"])
        self.pending_updates.append({
            "id": doc["id"],
            "embedding": embedding,
            "metadata": doc["metadata"]
        })

        if len(self.pending_updates) >= self.batch_size:
            await self._flush()

    async def update_document(self, doc_id: str, new_content: str):
        """تحديث مستند موجود."""
        # حذف الإصدار القديم
        await self.store.delete(doc_id)

        # إضافة الإصدار الجديد
        embedding = await self.embedder.embed(new_content)
        await self.store.upsert({
            "id": doc_id,
            "embedding": embedding
        })

    async def _flush(self):
        """إدراج دفعة التحديثات المعلقة."""
        if self.pending_updates:
            await self.store.upsert_batch(self.pending_updates)
            self.pending_updates = []

توجيه الاستعلام

توجيه الاستعلامات إلى مسارات الاسترجاع المثلى:

class SmartQueryRouter:
    def __init__(self, retrievers: dict):
        self.retrievers = retrievers
        # "fast" - فهرس صغير، مخزن مؤقتاً
        # "full" - فهرس كامل
        # "archive" - بيانات تاريخية

    async def route_and_search(self, query: str, metadata_filter: dict = None):
        # تحديد المسار الأمثل
        if self._is_recent_query(metadata_filter):
            retriever = self.retrievers["fast"]
        elif self._needs_archive(metadata_filter):
            retriever = self.retrievers["archive"]
        else:
            retriever = self.retrievers["full"]

        return await retriever.search(query, filter=metadata_filter)

    def _is_recent_query(self, filter: dict) -> bool:
        if filter and "date" in filter:
            return filter["date"] > datetime.now() - timedelta(days=30)
        return False

التخزين المؤقت لـ RAG

class RAGCache:
    def __init__(self, redis_client, ttl=3600):
        self.redis = redis_client
        self.ttl = ttl

    async def get_or_compute(
        self,
        query: str,
        retriever,
        generator
    ) -> dict:
        # التحقق من ذاكرة الاسترجاع المؤقتة
        cache_key = f"rag:{hash(query)}"
        cached = await self.redis.get(cache_key)

        if cached:
            return json.loads(cached)

        # حساب نتيجة جديدة
        retrieved = await retriever.search(query)
        response = await generator.generate(query, retrieved)

        result = {
            "answer": response,
            "sources": [r["metadata"] for r in retrieved]
        }

        # تخزين النتيجة مؤقتاً
        await self.redis.setex(cache_key, self.ttl, json.dumps(result))

        return result

معايير الأداء

المقاييس المستهدفة لـ RAG الإنتاجي:

المقياسالنموذج الأوليالإنتاجالمؤسسات
حجم الفهرس10K مستند1M مستند100M مستند
زمن استجابة الاستعلام (p50)500 مللي ثانية200 مللي ثانية100 مللي ثانية
زمن استجابة الاستعلام (p99)2 ثانية500 مللي ثانية300 مللي ثانية
الإنتاجية10 QPS100 QPS1000 QPS
التوفر95%99.9%99.99%

الآن لننتقل إلى تصميم أنظمة الوكلاء المتعددين—موضوع مقابلة حاسم آخر. :::

مراجعة سريعة: كيف تجد هذا الدرس؟

اختبار

الوحدة 3: تصميم نظام RAG

خذ الاختبار