الدرس 12 من 23

تصميم نظام RAG

توسيع أنظمة RAG

4 دقيقة للقراءة

توسيع RAG من نموذج أولي إلى الإنتاج يتطلب دراسة متأنية للفهرسة والاسترجاع والبنية التحتية. يغطي هذا الدرس أنماط التعامل مع ملايين المستندات.

تحديات التوسع

التحدي عند 10K مستند عند 10M مستند
وقت الفهرسة دقائق أيام
زمن استجابة الاستعلام < 100 مللي ثانية ثوانٍ بدون تحسين
التخزين جيجابايت تيرابايت
التكلفة $10/شهر $1000+/شهر

استراتيجيات التجزئة

حسب المجموعة/مساحة الاسم

class ShardedVectorStore:
    def __init__(self, base_client):
        self.client = base_client
        self.shards = {}

    def get_shard(self, category: str):
        """التوجيه إلى التجزئة المناسبة بناءً على الفئة."""
        if category not in self.shards:
            self.shards[category] = self.client.collection(f"docs_{category}")
        return self.shards[category]

    async def insert(self, doc: dict):
        category = doc["metadata"]["category"]
        shard = self.get_shard(category)
        await shard.insert(doc)

    async def search(self, query_embedding, category: str = None, top_k: int = 10):
        if category:
            # البحث في تجزئة واحدة
            shard = self.get_shard(category)
            return await shard.search(query_embedding, top_k=top_k)
        else:
            # البحث في جميع التجزئات والدمج
            all_results = []
            for shard in self.shards.values():
                results = await shard.search(query_embedding, top_k=top_k)
                all_results.extend(results)

            # الترتيب وإرجاع أفضل k عبر جميع التجزئات
            all_results.sort(key=lambda x: x["score"], reverse=True)
            return all_results[:top_k]

حسب الفترة الزمنية

class TimeBasedSharding:
    def __init__(self, client):
        self.client = client

    def get_shard_name(self, timestamp: datetime) -> str:
        """التجزئة حسب الشهر."""
        return f"docs_{timestamp.year}_{timestamp.month:02d}"

    async def search_date_range(
        self,
        query_embedding,
        start_date: datetime,
        end_date: datetime,
        top_k: int = 10
    ):
        # تحديد التجزئات للاستعلام
        shards = self._get_shards_in_range(start_date, end_date)

        # الاستعلام عن التجزئات ذات الصلة بالتوازي
        tasks = [
            self.client.collection(shard).search(query_embedding, top_k=top_k)
            for shard in shards
        ]
        results = await asyncio.gather(*tasks)

        # الدمج والإرجاع
        merged = [r for batch in results for r in batch]
        merged.sort(key=lambda x: x["score"], reverse=True)
        return merged[:top_k]

تحسين الفهرس

فهارس الجار الأقرب التقريبي (ANN)

# أنواع فهارس pgvector
CREATE INDEX ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);  # جيد لـ < 1M متجه

# لمجموعات بيانات أكبر
CREATE INDEX ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);  # أفضل لـ > 1M متجه

مقارنة الفهارس:

نوع الفهرس وقت البناء سرعة الاستعلام الذاكرة الأفضل لـ
Flat سريع بطيء (O(n)) منخفضة < 100K
IVF متوسط سريع متوسطة 100K-10M
HNSW بطيء الأسرع عالية > 1M

الفهرسة التزايدية

لا تعد فهرسة كل شيء عند تغير المستندات:

class IncrementalIndexer:
    def __init__(self, vector_store, embedding_model):
        self.store = vector_store
        self.embedder = embedding_model
        self.pending_updates = []
        self.batch_size = 100

    async def add_document(self, doc: dict):
        """إضافة مستند للمعالجة الدفعية."""
        embedding = await self.embedder.embed(doc["content"])
        self.pending_updates.append({
            "id": doc["id"],
            "embedding": embedding,
            "metadata": doc["metadata"]
        })

        if len(self.pending_updates) >= self.batch_size:
            await self._flush()

    async def update_document(self, doc_id: str, new_content: str):
        """تحديث مستند موجود."""
        # حذف الإصدار القديم
        await self.store.delete(doc_id)

        # إضافة الإصدار الجديد
        embedding = await self.embedder.embed(new_content)
        await self.store.upsert({
            "id": doc_id,
            "embedding": embedding
        })

    async def _flush(self):
        """إدراج دفعة التحديثات المعلقة."""
        if self.pending_updates:
            await self.store.upsert_batch(self.pending_updates)
            self.pending_updates = []

توجيه الاستعلام

توجيه الاستعلامات إلى مسارات الاسترجاع المثلى:

class SmartQueryRouter:
    def __init__(self, retrievers: dict):
        self.retrievers = retrievers
        # "fast" - فهرس صغير، مخزن مؤقتاً
        # "full" - فهرس كامل
        # "archive" - بيانات تاريخية

    async def route_and_search(self, query: str, metadata_filter: dict = None):
        # تحديد المسار الأمثل
        if self._is_recent_query(metadata_filter):
            retriever = self.retrievers["fast"]
        elif self._needs_archive(metadata_filter):
            retriever = self.retrievers["archive"]
        else:
            retriever = self.retrievers["full"]

        return await retriever.search(query, filter=metadata_filter)

    def _is_recent_query(self, filter: dict) -> bool:
        if filter and "date" in filter:
            return filter["date"] > datetime.now() - timedelta(days=30)
        return False

التخزين المؤقت لـ RAG

class RAGCache:
    def __init__(self, redis_client, ttl=3600):
        self.redis = redis_client
        self.ttl = ttl

    async def get_or_compute(
        self,
        query: str,
        retriever,
        generator
    ) -> dict:
        # التحقق من ذاكرة الاسترجاع المؤقتة
        cache_key = f"rag:{hash(query)}"
        cached = await self.redis.get(cache_key)

        if cached:
            return json.loads(cached)

        # حساب نتيجة جديدة
        retrieved = await retriever.search(query)
        response = await generator.generate(query, retrieved)

        result = {
            "answer": response,
            "sources": [r["metadata"] for r in retrieved]
        }

        # تخزين النتيجة مؤقتاً
        await self.redis.setex(cache_key, self.ttl, json.dumps(result))

        return result

معايير الأداء

المقاييس المستهدفة لـ RAG الإنتاجي:

المقياس النموذج الأولي الإنتاج المؤسسات
حجم الفهرس 10K مستند 1M مستند 100M مستند
زمن استجابة الاستعلام (p50) 500 مللي ثانية 200 مللي ثانية 100 مللي ثانية
زمن استجابة الاستعلام (p99) 2 ثانية 500 مللي ثانية 300 مللي ثانية
الإنتاجية 10 QPS 100 QPS 1000 QPS
التوفر 95% 99.9% 99.99%

الآن لننتقل إلى تصميم أنظمة الوكلاء المتعددين—موضوع مقابلة حاسم آخر. :::

اختبار

الوحدة 3: تصميم نظام RAG

خذ الاختبار