RAG System Design
Hybrid Retrieval Strategies
5 min read
Pure vector search has limitations. Hybrid retrieval combines dense (semantic) and sparse (keyword) search for better results. This is a common topic in AI system design interviews.
Why Hybrid Retrieval?
| Search Type | Strengths | Weaknesses |
|---|---|---|
| Dense (Vector) | Semantic understanding, synonyms | Misses exact matches, entities |
| Sparse (BM25) | Exact keywords, rare terms | No semantic understanding |
| Hybrid | Best of both | Slightly more complex |
Example Problem
Query: "What are the side effects of aspirin?"
| Document | BM25 Score | Vector Score | Relevant? |
|---|---|---|---|
| "Aspirin may cause stomach bleeding..." | High | High | Yes |
| "Pain relievers can have adverse effects..." | Low | High | Yes |
| "Take aspirin with food..." | High | Medium | Maybe |
Hybrid search catches both keyword matches AND semantic matches.
BM25 + Dense Retrieval
from rank_bm25 import BM25Okapi
import numpy as np
class HybridRetriever:
def __init__(self, documents, embeddings, embedding_model):
self.documents = documents
self.embeddings = embeddings
self.embedding_model = embedding_model
# Initialize BM25
tokenized_docs = [doc.split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
async def search(
self,
query: str,
top_k: int = 10,
alpha: float = 0.5 # Weight for dense vs sparse
) -> list:
# Sparse retrieval (BM25)
tokenized_query = query.split()
bm25_scores = self.bm25.get_scores(tokenized_query)
bm25_scores = self._normalize(bm25_scores)
# Dense retrieval (Vector)
query_embedding = await self.embedding_model.embed(query)
dense_scores = np.array([
self._cosine_similarity(query_embedding, emb)
for emb in self.embeddings
])
dense_scores = self._normalize(dense_scores)
# Combine scores
hybrid_scores = alpha * dense_scores + (1 - alpha) * bm25_scores
# Get top-k
top_indices = np.argsort(hybrid_scores)[::-1][:top_k]
return [
{
"document": self.documents[i],
"score": hybrid_scores[i],
"dense_score": dense_scores[i],
"sparse_score": bm25_scores[i]
}
for i in top_indices
]
def _normalize(self, scores: np.array) -> np.array:
min_s, max_s = scores.min(), scores.max()
if max_s - min_s == 0:
return np.zeros_like(scores)
return (scores - min_s) / (max_s - min_s)
def _cosine_similarity(self, a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
Reciprocal Rank Fusion (RRF)
A more sophisticated way to combine results:
class RRFRetriever:
def __init__(self, retrievers: list, k: int = 60):
self.retrievers = retrievers
self.k = k # RRF constant
async def search(self, query: str, top_k: int = 10) -> list:
# Get results from each retriever
all_results = []
for retriever in self.retrievers:
results = await retriever.search(query, top_k=top_k * 2)
all_results.append(results)
# Calculate RRF scores
doc_scores = {}
for retriever_results in all_results:
for rank, result in enumerate(retriever_results):
doc_id = result["id"]
# RRF formula: 1 / (k + rank)
rrf_score = 1 / (self.k + rank + 1)
if doc_id not in doc_scores:
doc_scores[doc_id] = {
"document": result["document"],
"score": 0
}
doc_scores[doc_id]["score"] += rrf_score
# Sort by combined score
ranked = sorted(
doc_scores.values(),
key=lambda x: x["score"],
reverse=True
)
return ranked[:top_k]
Reranking
After initial retrieval, rerank with a cross-encoder:
from sentence_transformers import CrossEncoder
class RerankedRetriever:
def __init__(self, base_retriever, reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
self.retriever = base_retriever
self.reranker = CrossEncoder(reranker_model)
async def search(
self,
query: str,
initial_k: int = 50,
final_k: int = 10
) -> list:
# Step 1: Get initial candidates
candidates = await self.retriever.search(query, top_k=initial_k)
# Step 2: Prepare pairs for reranking
pairs = [(query, c["document"]) for c in candidates]
# Step 3: Get reranker scores
rerank_scores = self.reranker.predict(pairs)
# Step 4: Combine with original scores
for i, candidate in enumerate(candidates):
candidate["rerank_score"] = float(rerank_scores[i])
# Weighted combination
candidate["final_score"] = (
0.3 * candidate["score"] +
0.7 * candidate["rerank_score"]
)
# Step 5: Sort by final score
candidates.sort(key=lambda x: x["final_score"], reverse=True)
return candidates[:final_k]
Query Expansion
Improve retrieval by expanding the original query:
class QueryExpander:
def __init__(self, llm):
self.llm = llm
async def expand(self, query: str) -> list:
"""Generate alternative phrasings of the query."""
prompt = f"""Generate 3 alternative ways to ask this question.
Return only the questions, one per line.
Original question: {query}
Alternative questions:"""
response = await self.llm.complete(prompt)
alternatives = response.strip().split("\n")
return [query] + alternatives[:3]
# Usage in retrieval
async def search_with_expansion(query: str, retriever, expander) -> list:
# Expand query
expanded_queries = await expander.expand(query)
# Search with all queries
all_results = []
for q in expanded_queries:
results = await retriever.search(q, top_k=5)
all_results.extend(results)
# Deduplicate and rank
unique_results = deduplicate_by_id(all_results)
return sorted(unique_results, key=lambda x: x["score"], reverse=True)[:10]
Choosing the Right Strategy
| Use Case | Recommended Strategy |
|---|---|
| General Q&A | Hybrid (BM25 + Dense) |
| Technical docs | Dense + BM25 with high keyword weight |
| Legal/Medical | Hybrid + Reranking |
| Multi-language | Dense only (embeddings handle translation) |
| Exact entity search | BM25 first, then dense |
Next, we'll learn how to scale RAG systems to handle millions of documents. :::