Lesson 3 of 23

RAG Architecture Deep Dive

The RAG Pipeline

3 min read

A production RAG system has two main pipelines: ingestion (indexing) and retrieval (querying). Understanding each stage is essential for optimization.

Pipeline Overview

INGESTION PIPELINE                    RETRIEVAL PIPELINE
┌─────────────────┐                   ┌─────────────────┐
│   Raw Documents │                   │   User Query    │
└────────┬────────┘                   └────────┬────────┘
         │                                     │
         ▼                                     ▼
┌─────────────────┐                   ┌─────────────────┐
│   Preprocessing │                   │ Query Transform │
└────────┬────────┘                   └────────┬────────┘
         │                                     │
         ▼                                     ▼
┌─────────────────┐                   ┌─────────────────┐
│    Chunking     │                   │    Retrieval    │
└────────┬────────┘                   └────────┬────────┘
         │                                     │
         ▼                                     ▼
┌─────────────────┐                   ┌─────────────────┐
│   Embedding     │                   │    Reranking    │
└────────┬────────┘                   └────────┬────────┘
         │                                     │
         ▼                                     ▼
┌─────────────────┐                   ┌─────────────────┐
│  Vector Store   │ ◄─────────────────│   Generation    │
└─────────────────┘                   └─────────────────┘

Ingestion Pipeline

1. Document Loading

from langchain_community.document_loaders import (
    PyPDFLoader,
    UnstructuredMarkdownLoader,
    CSVLoader
)

def load_documents(file_path: str):
    """Load documents based on file type."""
    loaders = {
        ".pdf": PyPDFLoader,
        ".md": UnstructuredMarkdownLoader,
        ".csv": CSVLoader,
    }

    ext = Path(file_path).suffix.lower()
    loader_class = loaders.get(ext)

    if not loader_class:
        raise ValueError(f"Unsupported file type: {ext}")

    return loader_class(file_path).load()

2. Preprocessing

Clean and normalize content:

def preprocess(text: str) -> str:
    """Clean text for embedding."""
    # Remove excessive whitespace
    text = " ".join(text.split())

    # Remove special characters that don't add meaning
    text = re.sub(r'[^\w\s\.\,\?\!\-\:\;]', '', text)

    # Normalize unicode
    text = unicodedata.normalize('NFKC', text)

    return text.strip()

3. Chunking

Split into meaningful segments (covered in depth in Module 3):

from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=50,
    separators=["\n\n", "\n", ". ", " ", ""]
)

chunks = splitter.split_documents(documents)

4. Embedding & Storage

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./chroma_db"
)

Retrieval Pipeline

1. Query Transformation

Improve query before retrieval:

def transform_query(query: str, llm) -> list[str]:
    """Generate multiple query variations."""
    prompt = f"""Generate 3 different versions of this search query
    to improve retrieval coverage:

    Original: {query}

    Return only the queries, one per line."""

    response = llm.invoke(prompt)
    queries = [query] + response.content.strip().split("\n")
    return queries[:4]  # Original + 3 variations

2. Retrieval

def hybrid_retrieve(query: str, k: int = 10):
    """Combine semantic and keyword search."""
    # Semantic search
    semantic_results = vectorstore.similarity_search(query, k=k)

    # Keyword search (BM25)
    keyword_results = bm25_retriever.get_relevant_documents(query)[:k]

    # Reciprocal Rank Fusion
    return fuse_results(semantic_results, keyword_results)

3. Reranking

Score and filter retrieved chunks:

from sentence_transformers import CrossEncoder

reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

def rerank(query: str, documents: list, top_k: int = 4):
    """Rerank documents by relevance."""
    pairs = [(query, doc.page_content) for doc in documents]
    scores = reranker.predict(pairs)

    # Sort by score descending
    ranked = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
    return [doc for doc, score in ranked[:top_k]]

4. Generation

def generate_response(query: str, context_docs: list):
    """Generate answer with retrieved context."""
    context = "\n\n".join([doc.page_content for doc in context_docs])

    prompt = f"""Answer the question based on the context provided.
    If the context doesn't contain the answer, say so.

    Context:
    {context}

    Question: {query}

    Answer:"""

    return llm.invoke(prompt)

Complete Pipeline

class RAGPipeline:
    def __init__(self, vectorstore, llm, reranker):
        self.vectorstore = vectorstore
        self.llm = llm
        self.reranker = reranker

    def query(self, user_query: str) -> dict:
        # 1. Transform query
        queries = transform_query(user_query, self.llm)

        # 2. Retrieve (multi-query)
        all_docs = []
        for q in queries:
            all_docs.extend(self.vectorstore.similarity_search(q, k=5))

        # 3. Deduplicate
        unique_docs = deduplicate(all_docs)

        # 4. Rerank
        top_docs = self.reranker.rerank(user_query, unique_docs, top_k=4)

        # 5. Generate
        answer = generate_response(user_query, top_docs)

        return {
            "answer": answer,
            "sources": [doc.metadata for doc in top_docs]
        }

Architecture Principle: Each pipeline stage is a potential optimization point. Start simple, measure performance, and optimize the weakest link.

Next, let's examine common failure modes that undermine RAG quality. :::

Quiz

Module 1: RAG Architecture Deep Dive

Take Quiz