الدرس 3 من 23

تعمق في بنية RAG

خط أنابيب RAG

3 دقيقة للقراءة

نظام RAG الإنتاجي له خطي أنابيب رئيسيين: الاستيعاب (الفهرسة) والاسترجاع (الاستعلام). فهم كل مرحلة ضروري للتحسين.

نظرة عامة على خط الأنابيب

خط أنابيب الاستيعاب                   خط أنابيب الاسترجاع
┌─────────────────┐                   ┌─────────────────┐
│ المستندات الخام │                   │  استعلام المستخدم │
└────────┬────────┘                   └────────┬────────┘
         │                                     │
         ▼                                     ▼
┌─────────────────┐                   ┌─────────────────┐
│  المعالجة المسبقة │                   │ تحويل الاستعلام │
└────────┬────────┘                   └────────┬────────┘
         │                                     │
         ▼                                     ▼
┌─────────────────┐                   ┌─────────────────┐
│    التقطيع     │                   │    الاسترجاع    │
└────────┬────────┘                   └────────┬────────┘
         │                                     │
         ▼                                     ▼
┌─────────────────┐                   ┌─────────────────┐
│    التضمين     │                   │  إعادة الترتيب  │
└────────┬────────┘                   └────────┬────────┘
         │                                     │
         ▼                                     ▼
┌─────────────────┐                   ┌─────────────────┐
│ المخزن المتجه  │ ◄─────────────────│     التوليد     │
└─────────────────┘                   └─────────────────┘

خط أنابيب الاستيعاب

1. تحميل المستندات

from langchain_community.document_loaders import (
    PyPDFLoader,
    UnstructuredMarkdownLoader,
    CSVLoader
)

def load_documents(file_path: str):
    """تحميل المستندات بناءً على نوع الملف."""
    loaders = {
        ".pdf": PyPDFLoader,
        ".md": UnstructuredMarkdownLoader,
        ".csv": CSVLoader,
    }

    ext = Path(file_path).suffix.lower()
    loader_class = loaders.get(ext)

    if not loader_class:
        raise ValueError(f"نوع ملف غير مدعوم: {ext}")

    return loader_class(file_path).load()

2. المعالجة المسبقة

تنظيف وتطبيع المحتوى:

def preprocess(text: str) -> str:
    """تنظيف النص للتضمين."""
    # إزالة المسافات الزائدة
    text = " ".join(text.split())

    # إزالة الأحرف الخاصة التي لا تضيف معنى
    text = re.sub(r'[^\w\s\.\,\?\!\-\:\;]', '', text)

    # تطبيع Unicode
    text = unicodedata.normalize('NFKC', text)

    return text.strip()

3. التقطيع

التقسيم إلى قطع ذات معنى (يُغطى بعمق في الوحدة 3):

from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=50,
    separators=["\n\n", "\n", ". ", " ", ""]
)

chunks = splitter.split_documents(documents)

4. التضمين والتخزين

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./chroma_db"
)

خط أنابيب الاسترجاع

1. تحويل الاستعلام

تحسين الاستعلام قبل الاسترجاع:

def transform_query(query: str, llm) -> list[str]:
    """توليد نسخ متعددة من الاستعلام."""
    prompt = f"""ولّد 3 نسخ مختلفة من استعلام البحث هذا
    لتحسين تغطية الاسترجاع:

    الأصلي: {query}

    أرجع الاستعلامات فقط، واحد في كل سطر."""

    response = llm.invoke(prompt)
    queries = [query] + response.content.strip().split("\n")
    return queries[:4]  # الأصلي + 3 تنويعات

2. الاسترجاع

def hybrid_retrieve(query: str, k: int = 10):
    """دمج البحث الدلالي والكلمات المفتاحية."""
    # البحث الدلالي
    semantic_results = vectorstore.similarity_search(query, k=k)

    # بحث الكلمات المفتاحية (BM25)
    keyword_results = bm25_retriever.get_relevant_documents(query)[:k]

    # دمج الترتيب التبادلي
    return fuse_results(semantic_results, keyword_results)

3. إعادة الترتيب

تسجيل وتصفية القطع المسترجعة:

from sentence_transformers import CrossEncoder

reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

def rerank(query: str, documents: list, top_k: int = 4):
    """إعادة ترتيب المستندات حسب الصلة."""
    pairs = [(query, doc.page_content) for doc in documents]
    scores = reranker.predict(pairs)

    # ترتيب حسب الدرجة تنازلياً
    ranked = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
    return [doc for doc, score in ranked[:top_k]]

4. التوليد

def generate_response(query: str, context_docs: list):
    """توليد إجابة مع السياق المسترجع."""
    context = "\n\n".join([doc.page_content for doc in context_docs])

    prompt = f"""أجب على السؤال بناءً على السياق المقدم.
    إذا لم يحتوي السياق على الإجابة، قل ذلك.

    السياق:
    {context}

    السؤال: {query}

    الإجابة:"""

    return llm.invoke(prompt)

خط الأنابيب الكامل

class RAGPipeline:
    def __init__(self, vectorstore, llm, reranker):
        self.vectorstore = vectorstore
        self.llm = llm
        self.reranker = reranker

    def query(self, user_query: str) -> dict:
        # 1. تحويل الاستعلام
        queries = transform_query(user_query, self.llm)

        # 2. الاسترجاع (متعدد الاستعلامات)
        all_docs = []
        for q in queries:
            all_docs.extend(self.vectorstore.similarity_search(q, k=5))

        # 3. إزالة التكرار
        unique_docs = deduplicate(all_docs)

        # 4. إعادة الترتيب
        top_docs = self.reranker.rerank(user_query, unique_docs, top_k=4)

        # 5. التوليد
        answer = generate_response(user_query, top_docs)

        return {
            "answer": answer,
            "sources": [doc.metadata for doc in top_docs]
        }

مبدأ البنية: كل مرحلة في خط الأنابيب هي نقطة تحسين محتملة. ابدأ ببساطة، قِس الأداء، وحسّن الحلقة الأضعف.

التالي، لنفحص أنماط الفشل الشائعة التي تقوض جودة RAG. :::

اختبار

الوحدة 1: تعمق في بنية RAG

خذ الاختبار