Retrieval & Ranking Pipeline
Production search uses multi-stage pipelines: a fast retrieval stage casts a wide net, then progressively more expensive rankers narrow down to the best results. This lesson covers the full pipeline with production code at each stage.
Multi-Stage Retrieval Architecture
The core principle: use cheap models on many candidates, expensive models on few candidates.
| Stage | Input | Output | Latency | Model |
|---|---|---|---|---|
| L0: Candidate Generation | Full index (millions) | ~1,000 candidates | 10–20ms | Inverted index / ANN |
| L1: Lightweight Ranking | 1,000 candidates | ~100 candidates | 10–30ms | Feature-based scorer |
| L2: Neural Re-ranking | 100 candidates | ~20 candidates | 30–50ms | Cross-encoder |
| L3: Business Rules | 20 candidates | 10 results | <5ms | Rule engine |
from dataclasses import dataclass
from typing import List
@dataclass
class SearchResult:
doc_id: str
score: float
title: str
snippet: str
metadata: dict
class MultiStageRanker:
"""Production multi-stage retrieval and ranking pipeline."""
def __init__(self, retriever, lightweight_ranker, neural_ranker, business_rules):
self.retriever = retriever
self.lightweight_ranker = lightweight_ranker
self.neural_ranker = neural_ranker
self.business_rules = business_rules
def search(self, query: str, filters: dict = None, top_k: int = 10):
# L0: Candidate generation (BM25 + kNN, ~1000 results, ~15ms)
candidates = self.retriever.retrieve(query, filters=filters, limit=1000)
# L1: Lightweight ranking with features (~100 results, ~20ms)
ranked = self.lightweight_ranker.rank(query, candidates, limit=100)
# L2: Neural re-ranking with cross-encoder (~20 results, ~40ms)
reranked = self.neural_ranker.rerank(query, ranked, limit=20)
# L3: Apply business rules (boost sponsored, filter blocked, ~2ms)
final = self.business_rules.apply(reranked, limit=top_k)
return final
Hybrid Scoring: BM25 + Vector
Reciprocal Rank Fusion (RRF)
def reciprocal_rank_fusion(result_lists, k=60):
"""
RRF merges ranked lists using position-based scoring.
score = sum(1 / (k + rank)) across all lists.
k=60 prevents top-ranked docs from dominating:
rank 1 scores 1/61=0.0164, rank 2 scores 1/62=0.0161.
A doc ranked #5 in both lists beats a doc ranked #1 in only one.
"""
fused_scores = {}
for result_list in result_lists:
for rank, (doc_id, _score) in enumerate(result_list):
if doc_id not in fused_scores:
fused_scores[doc_id] = 0.0
fused_scores[doc_id] += 1.0 / (k + rank + 1)
return sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
# Example:
bm25_results = [("doc_a", 12.5), ("doc_b", 11.2), ("doc_c", 9.8)]
vector_results = [("doc_c", 0.95), ("doc_d", 0.91), ("doc_a", 0.87)]
fused = reciprocal_rank_fusion([bm25_results, vector_results])
# doc_a and doc_c score highest (appear in both lists)
Linear Combination with Score Normalization
def linear_combination(bm25_results, vector_results, alpha=0.5):
"""
Combine BM25 and vector scores with min-max normalization.
alpha > 0.5: favor keyword matching (product search, SKU lookup)
alpha < 0.5: favor semantic matching (document search, Q&A)
"""
def normalize(results):
if not results:
return {}
scores = [s for _, s in results]
min_s, max_s = min(scores), max(scores)
r = max_s - min_s if max_s != min_s else 1.0
return {doc_id: (score - min_s) / r for doc_id, score in results}
bm25_norm = normalize(bm25_results)
vector_norm = normalize(vector_results)
all_docs = set(bm25_norm.keys()) | set(vector_norm.keys())
combined = {}
for doc_id in all_docs:
combined[doc_id] = (alpha * bm25_norm.get(doc_id, 0.0) +
(1 - alpha) * vector_norm.get(doc_id, 0.0))
return sorted(combined.items(), key=lambda x: x[1], reverse=True)
Cross-Encoder Re-ranking
Cross-encoders process query and document together through a transformer, producing more accurate relevance scores than bi-encoders. Too slow for first-stage retrieval, but ideal for re-ranking 50–100 candidates.
from sentence_transformers import CrossEncoder
class CrossEncoderReranker:
"""Re-rank candidates using a cross-encoder model."""
def __init__(self, model_name="cross-encoder/ms-marco-MiniLM-L-12-v2"):
self.model = CrossEncoder(model_name, max_length=512)
def rerank(self, query: str, candidates: list, top_k: int = 10):
if not candidates:
return []
# Create query-document pairs
pairs = [(query, f"{c.title}. {c.snippet}") for c in candidates]
# Score all pairs in a single batch (GPU-accelerated)
scores = self.model.predict(pairs, batch_size=32)
# Sort by cross-encoder score
scored = sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)
results = []
for candidate, score in scored[:top_k]:
candidate.score = float(score)
results.append(candidate)
return results
# Performance: ~40ms for 100 candidates on GPU, ~200ms on CPU
# Accuracy: typically +10-20% NDCG over BM25 alone
Learning-to-Rank (LTR)
LTR trains a model on labeled search data to combine hundreds of features into a single relevance score. This powers search at Google, Amazon, and LinkedIn.
import lightgbm as lgb
import numpy as np
class LearningToRank:
"""LambdaMART learning-to-rank model using LightGBM."""
FEATURES = [
"bm25_score", # BM25 relevance score
"vector_similarity", # Cosine similarity from embeddings
"title_match_ratio", # % of query terms in title
"doc_freshness", # Days since last update (normalized)
"click_through_rate", # Historical CTR for this doc
"avg_dwell_time", # Average time users spend on doc
"doc_authority_score", # PageRank-like authority metric
"num_exact_matches", # Count of exact phrase matches
]
def __init__(self):
self.model = None
def train(self, features: np.ndarray, labels: np.ndarray,
query_groups: np.ndarray):
"""
Train LambdaMART ranking model.
labels: relevance (0=irrelevant, 1=partial, 2=relevant, 3=perfect)
query_groups: number of candidates per query
"""
train_data = lgb.Dataset(features, label=labels, group=query_groups)
params = {
"objective": "lambdarank",
"metric": "ndcg",
"ndcg_eval_at": [5, 10],
"num_leaves": 63,
"learning_rate": 0.05,
"min_data_in_leaf": 50,
"feature_fraction": 0.8,
}
self.model = lgb.train(
params, train_data, num_boost_round=500,
valid_sets=[train_data],
callbacks=[lgb.early_stopping(50)]
)
def predict(self, features: np.ndarray) -> np.ndarray:
return self.model.predict(features)
Key Takeaways
- Production search uses multi-stage ranking: cheap models on many candidates, expensive models on few.
- RRF is the simplest hybrid fusion and requires no tuning. Linear combination needs evaluation data to tune alpha.
- Cross-encoders improve NDCG by 10–20% over BM25 alone. Apply them to the top 50–100 candidates.
- LambdaMART LTR combines hundreds of features. It requires click data to train effectively.
- Always add a business rules layer as the final stage for sponsored results, policies, and freshness.
What Is Next
In the next lesson, we build the query understanding layer — interpreting what users mean before retrieval begins. You will learn intent detection, spell correction, entity recognition, and LLM-powered query rewriting.
Lilly Tech Systems