تصميم نظام RAG
توسيع أنظمة RAG
4 دقيقة للقراءة
توسيع RAG من نموذج أولي إلى الإنتاج يتطلب دراسة متأنية للفهرسة والاسترجاع والبنية التحتية. يغطي هذا الدرس أنماط التعامل مع ملايين المستندات.
تحديات التوسع
| التحدي | عند 10K مستند | عند 10M مستند |
|---|---|---|
| وقت الفهرسة | دقائق | أيام |
| زمن استجابة الاستعلام | < 100 مللي ثانية | ثوانٍ بدون تحسين |
| التخزين | جيجابايت | تيرابايت |
| التكلفة | $10/شهر | $1000+/شهر |
استراتيجيات التجزئة
حسب المجموعة/مساحة الاسم
class ShardedVectorStore:
def __init__(self, base_client):
self.client = base_client
self.shards = {}
def get_shard(self, category: str):
"""التوجيه إلى التجزئة المناسبة بناءً على الفئة."""
if category not in self.shards:
self.shards[category] = self.client.collection(f"docs_{category}")
return self.shards[category]
async def insert(self, doc: dict):
category = doc["metadata"]["category"]
shard = self.get_shard(category)
await shard.insert(doc)
async def search(self, query_embedding, category: str = None, top_k: int = 10):
if category:
# البحث في تجزئة واحدة
shard = self.get_shard(category)
return await shard.search(query_embedding, top_k=top_k)
else:
# البحث في جميع التجزئات والدمج
all_results = []
for shard in self.shards.values():
results = await shard.search(query_embedding, top_k=top_k)
all_results.extend(results)
# الترتيب وإرجاع أفضل k عبر جميع التجزئات
all_results.sort(key=lambda x: x["score"], reverse=True)
return all_results[:top_k]
حسب الفترة الزمنية
class TimeBasedSharding:
def __init__(self, client):
self.client = client
def get_shard_name(self, timestamp: datetime) -> str:
"""التجزئة حسب الشهر."""
return f"docs_{timestamp.year}_{timestamp.month:02d}"
async def search_date_range(
self,
query_embedding,
start_date: datetime,
end_date: datetime,
top_k: int = 10
):
# تحديد التجزئات للاستعلام
shards = self._get_shards_in_range(start_date, end_date)
# الاستعلام عن التجزئات ذات الصلة بالتوازي
tasks = [
self.client.collection(shard).search(query_embedding, top_k=top_k)
for shard in shards
]
results = await asyncio.gather(*tasks)
# الدمج والإرجاع
merged = [r for batch in results for r in batch]
merged.sort(key=lambda x: x["score"], reverse=True)
return merged[:top_k]
تحسين الفهرس
فهارس الجار الأقرب التقريبي (ANN)
# أنواع فهارس pgvector
CREATE INDEX ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100); # جيد لـ < 1M متجه
# لمجموعات بيانات أكبر
CREATE INDEX ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64); # أفضل لـ > 1M متجه
مقارنة الفهارس:
| نوع الفهرس | وقت البناء | سرعة الاستعلام | الذاكرة | الأفضل لـ |
|---|---|---|---|---|
| Flat | سريع | بطيء (O(n)) | منخفضة | < 100K |
| IVF | متوسط | سريع | متوسطة | 100K-10M |
| HNSW | بطيء | الأسرع | عالية | > 1M |
الفهرسة التزايدية
لا تعد فهرسة كل شيء عند تغير المستندات:
class IncrementalIndexer:
def __init__(self, vector_store, embedding_model):
self.store = vector_store
self.embedder = embedding_model
self.pending_updates = []
self.batch_size = 100
async def add_document(self, doc: dict):
"""إضافة مستند للمعالجة الدفعية."""
embedding = await self.embedder.embed(doc["content"])
self.pending_updates.append({
"id": doc["id"],
"embedding": embedding,
"metadata": doc["metadata"]
})
if len(self.pending_updates) >= self.batch_size:
await self._flush()
async def update_document(self, doc_id: str, new_content: str):
"""تحديث مستند موجود."""
# حذف الإصدار القديم
await self.store.delete(doc_id)
# إضافة الإصدار الجديد
embedding = await self.embedder.embed(new_content)
await self.store.upsert({
"id": doc_id,
"embedding": embedding
})
async def _flush(self):
"""إدراج دفعة التحديثات المعلقة."""
if self.pending_updates:
await self.store.upsert_batch(self.pending_updates)
self.pending_updates = []
توجيه الاستعلام
توجيه الاستعلامات إلى مسارات الاسترجاع المثلى:
class SmartQueryRouter:
def __init__(self, retrievers: dict):
self.retrievers = retrievers
# "fast" - فهرس صغير، مخزن مؤقتاً
# "full" - فهرس كامل
# "archive" - بيانات تاريخية
async def route_and_search(self, query: str, metadata_filter: dict = None):
# تحديد المسار الأمثل
if self._is_recent_query(metadata_filter):
retriever = self.retrievers["fast"]
elif self._needs_archive(metadata_filter):
retriever = self.retrievers["archive"]
else:
retriever = self.retrievers["full"]
return await retriever.search(query, filter=metadata_filter)
def _is_recent_query(self, filter: dict) -> bool:
if filter and "date" in filter:
return filter["date"] > datetime.now() - timedelta(days=30)
return False
التخزين المؤقت لـ RAG
class RAGCache:
def __init__(self, redis_client, ttl=3600):
self.redis = redis_client
self.ttl = ttl
async def get_or_compute(
self,
query: str,
retriever,
generator
) -> dict:
# التحقق من ذاكرة الاسترجاع المؤقتة
cache_key = f"rag:{hash(query)}"
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
# حساب نتيجة جديدة
retrieved = await retriever.search(query)
response = await generator.generate(query, retrieved)
result = {
"answer": response,
"sources": [r["metadata"] for r in retrieved]
}
# تخزين النتيجة مؤقتاً
await self.redis.setex(cache_key, self.ttl, json.dumps(result))
return result
معايير الأداء
المقاييس المستهدفة لـ RAG الإنتاجي:
| المقياس | النموذج الأولي | الإنتاج | المؤسسات |
|---|---|---|---|
| حجم الفهرس | 10K مستند | 1M مستند | 100M مستند |
| زمن استجابة الاستعلام (p50) | 500 مللي ثانية | 200 مللي ثانية | 100 مللي ثانية |
| زمن استجابة الاستعلام (p99) | 2 ثانية | 500 مللي ثانية | 300 مللي ثانية |
| الإنتاجية | 10 QPS | 100 QPS | 1000 QPS |
| التوفر | 95% | 99.9% | 99.99% |
الآن لننتقل إلى تصميم أنظمة الوكلاء المتعددين—موضوع مقابلة حاسم آخر. :::