RAG Architecture Deep Dive
The RAG Pipeline
3 min read
A production RAG system has two main pipelines: ingestion (indexing) and retrieval (querying). Understanding each stage is essential for optimization.
Pipeline Overview
INGESTION PIPELINE RETRIEVAL PIPELINE
┌─────────────────┐ ┌─────────────────┐
│ Raw Documents │ │ User Query │
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Preprocessing │ │ Query Transform │
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Chunking │ │ Retrieval │
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Embedding │ │ Reranking │
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Vector Store │ ◄─────────────────│ Generation │
└─────────────────┘ └─────────────────┘
Ingestion Pipeline
1. Document Loading
from langchain_community.document_loaders import (
PyPDFLoader,
UnstructuredMarkdownLoader,
CSVLoader
)
def load_documents(file_path: str):
"""Load documents based on file type."""
loaders = {
".pdf": PyPDFLoader,
".md": UnstructuredMarkdownLoader,
".csv": CSVLoader,
}
ext = Path(file_path).suffix.lower()
loader_class = loaders.get(ext)
if not loader_class:
raise ValueError(f"Unsupported file type: {ext}")
return loader_class(file_path).load()
2. Preprocessing
Clean and normalize content:
def preprocess(text: str) -> str:
"""Clean text for embedding."""
# Remove excessive whitespace
text = " ".join(text.split())
# Remove special characters that don't add meaning
text = re.sub(r'[^\w\s\.\,\?\!\-\:\;]', '', text)
# Normalize unicode
text = unicodedata.normalize('NFKC', text)
return text.strip()
3. Chunking
Split into meaningful segments (covered in depth in Module 3):
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=50,
separators=["\n\n", "\n", ". ", " ", ""]
)
chunks = splitter.split_documents(documents)
4. Embedding & Storage
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db"
)
Retrieval Pipeline
1. Query Transformation
Improve query before retrieval:
def transform_query(query: str, llm) -> list[str]:
"""Generate multiple query variations."""
prompt = f"""Generate 3 different versions of this search query
to improve retrieval coverage:
Original: {query}
Return only the queries, one per line."""
response = llm.invoke(prompt)
queries = [query] + response.content.strip().split("\n")
return queries[:4] # Original + 3 variations
2. Retrieval
def hybrid_retrieve(query: str, k: int = 10):
"""Combine semantic and keyword search."""
# Semantic search
semantic_results = vectorstore.similarity_search(query, k=k)
# Keyword search (BM25)
keyword_results = bm25_retriever.get_relevant_documents(query)[:k]
# Reciprocal Rank Fusion
return fuse_results(semantic_results, keyword_results)
3. Reranking
Score and filter retrieved chunks:
from sentence_transformers import CrossEncoder
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
def rerank(query: str, documents: list, top_k: int = 4):
"""Rerank documents by relevance."""
pairs = [(query, doc.page_content) for doc in documents]
scores = reranker.predict(pairs)
# Sort by score descending
ranked = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
return [doc for doc, score in ranked[:top_k]]
4. Generation
def generate_response(query: str, context_docs: list):
"""Generate answer with retrieved context."""
context = "\n\n".join([doc.page_content for doc in context_docs])
prompt = f"""Answer the question based on the context provided.
If the context doesn't contain the answer, say so.
Context:
{context}
Question: {query}
Answer:"""
return llm.invoke(prompt)
Complete Pipeline
class RAGPipeline:
def __init__(self, vectorstore, llm, reranker):
self.vectorstore = vectorstore
self.llm = llm
self.reranker = reranker
def query(self, user_query: str) -> dict:
# 1. Transform query
queries = transform_query(user_query, self.llm)
# 2. Retrieve (multi-query)
all_docs = []
for q in queries:
all_docs.extend(self.vectorstore.similarity_search(q, k=5))
# 3. Deduplicate
unique_docs = deduplicate(all_docs)
# 4. Rerank
top_docs = self.reranker.rerank(user_query, unique_docs, top_k=4)
# 5. Generate
answer = generate_response(user_query, top_docs)
return {
"answer": answer,
"sources": [doc.metadata for doc in top_docs]
}
Architecture Principle: Each pipeline stage is a potential optimization point. Start simple, measure performance, and optimize the weakest link.
Next, let's examine common failure modes that undermine RAG quality. :::