Real-Time Personalization
Static recommendations computed once per day miss the most valuable signal: what the user is doing right now. This lesson covers how to build personalization that adapts in real time to user sessions, balances exploration with exploitation, and provides a clean API for frontend consumption.
User Session Modeling
Session modeling captures the user's intent within a single browsing session. A user who just clicked on three running shoes has a very different intent than one who browsed kitchen appliances, even if their long-term profiles are identical.
import torch
import torch.nn as nn
class SessionEncoder(nn.Module):
"""Encodes a sequence of item interactions into a session embedding.
Uses a GRU to capture sequential patterns in user behavior.
The session embedding is used to adjust ranking scores in real time.
"""
def __init__(self, item_embedding_dim: int, hidden_dim: int = 128):
super().__init__()
self.gru = nn.GRU(
input_size=item_embedding_dim,
hidden_size=hidden_dim,
num_layers=2,
batch_first=True,
dropout=0.1
)
self.attention = nn.Sequential(
nn.Linear(hidden_dim, 64),
nn.Tanh(),
nn.Linear(64, 1)
)
def forward(self, item_embeddings, lengths):
"""
Args:
item_embeddings: (batch, max_seq_len, embed_dim) - items viewed in session
lengths: (batch,) - actual sequence lengths
Returns:
session_embedding: (batch, hidden_dim)
"""
# Pack for variable-length sequences
packed = nn.utils.rnn.pack_padded_sequence(
item_embeddings, lengths, batch_first=True, enforce_sorted=False
)
gru_out, _ = self.gru(packed)
gru_out, _ = nn.utils.rnn.pad_packed_sequence(gru_out, batch_first=True)
# Attention over sequence positions
attn_weights = self.attention(gru_out).squeeze(-1) # (batch, seq_len)
# Mask padding positions
mask = torch.arange(gru_out.size(1)).unsqueeze(0) < lengths.unsqueeze(1)
attn_weights = attn_weights.masked_fill(~mask, float("-inf"))
attn_weights = torch.softmax(attn_weights, dim=1)
# Weighted sum of GRU outputs
session_embedding = torch.bmm(
attn_weights.unsqueeze(1), gru_out
).squeeze(1)
return session_embedding
class SessionAwareRanker(nn.Module):
"""Ranking model that incorporates real-time session context."""
def __init__(self, feature_dim, session_dim=128):
super().__init__()
self.session_encoder = SessionEncoder(item_embedding_dim=128, hidden_dim=session_dim)
self.ranker = nn.Sequential(
nn.Linear(feature_dim + session_dim, 512),
nn.ReLU(),
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, 1),
nn.Sigmoid()
)
def forward(self, ranking_features, session_item_embeddings, session_lengths):
session_emb = self.session_encoder(session_item_embeddings, session_lengths)
combined = torch.cat([ranking_features, session_emb], dim=1)
return self.ranker(combined).squeeze(-1)
Contextual Bandits for Exploration
Pure exploitation (always showing the highest-ranked items) creates filter bubbles and prevents you from discovering that a user might like a new category. Contextual bandits balance exploration (trying new items) with exploitation (showing what the model is confident about).
import numpy as np
from typing import Optional
class EpsilonGreedyBandit:
"""Simple epsilon-greedy exploration for recommendations.
With probability epsilon, show a random item from candidates.
With probability 1-epsilon, show the top-ranked item.
"""
def __init__(self, epsilon: float = 0.1, decay: float = 0.999):
self.epsilon = epsilon
self.decay = decay
def select(self, ranked_items: list[dict], n: int = 10) -> list[dict]:
selected = []
remaining = list(ranked_items)
for _ in range(n):
if not remaining:
break
if np.random.random() < self.epsilon:
# Explore: pick a random item (not from top)
idx = np.random.randint(min(5, len(remaining)), len(remaining)) \
if len(remaining) > 5 else np.random.randint(len(remaining))
selected.append({**remaining.pop(idx), "exploration": True})
else:
# Exploit: pick the top-ranked item
selected.append({**remaining.pop(0), "exploration": False})
self.epsilon *= self.decay # Decay exploration over time
return selected
class ThompsonSamplingBandit:
"""Thompson Sampling for recommendation exploration.
Maintains a Beta distribution for each item category's CTR.
Samples from the posterior to decide what to explore.
"""
def __init__(self):
# Track (successes, failures) per category
self.category_stats = {}
def update(self, category_id: str, clicked: bool):
if category_id not in self.category_stats:
self.category_stats[category_id] = {"alpha": 1.0, "beta": 1.0}
if clicked:
self.category_stats[category_id]["alpha"] += 1
else:
self.category_stats[category_id]["beta"] += 1
def sample_score(self, category_id: str) -> float:
stats = self.category_stats.get(category_id, {"alpha": 1.0, "beta": 1.0})
return np.random.beta(stats["alpha"], stats["beta"])
def select(self, ranked_items: list[dict], n: int = 10) -> list[dict]:
# Add Thompson sampling bonus to each item's score
for item in ranked_items:
exploration_bonus = self.sample_score(item["category_id"])
item["adjusted_score"] = item["score"] * 0.8 + exploration_bonus * 0.2
# Re-sort by adjusted score
ranked_items.sort(key=lambda x: -x["adjusted_score"])
return ranked_items[:n]
Online Learning: Updating Models in Real Time
Traditional recommendation systems retrain models daily or weekly. Online learning updates model parameters continuously as new interaction data arrives, keeping recommendations fresh.
import torch
from collections import deque
class OnlineLearningRanker:
"""Ranker with online learning via mini-batch SGD on streaming data.
Collects recent interactions in a buffer, then performs gradient updates
periodically without a full retraining cycle.
"""
def __init__(self, model, learning_rate=1e-4, buffer_size=10000,
update_interval=1000):
self.model = model
self.optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
self.buffer = deque(maxlen=buffer_size)
self.update_interval = update_interval
self.interactions_since_update = 0
def log_interaction(self, features: dict, label: float):
"""Log a user interaction (click=1, no-click=0)."""
self.buffer.append((features, label))
self.interactions_since_update += 1
if self.interactions_since_update >= self.update_interval:
self._update_model()
self.interactions_since_update = 0
def _update_model(self):
"""Perform a mini-batch gradient update on recent data."""
if len(self.buffer) < 100:
return
# Sample a mini-batch from the buffer
indices = torch.randint(0, len(self.buffer), (256,))
batch_features = []
batch_labels = []
for idx in indices:
features, label = self.buffer[idx]
batch_features.append(features)
batch_labels.append(label)
features_tensor = torch.stack(batch_features)
labels_tensor = torch.tensor(batch_labels)
# Gradient update
self.model.train()
self.optimizer.zero_grad()
predictions = self.model(features_tensor)
loss = torch.nn.functional.binary_cross_entropy(predictions, labels_tensor)
loss.backward()
# Gradient clipping to prevent catastrophic updates
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
self.model.eval()
print(f"[OnlineLearning] Updated model, loss={loss.item():.4f}, "
f"buffer_size={len(self.buffer)}")
Personalization API Design
The personalization API is the interface between your recommendation engine and the frontend. It needs to be fast, cacheable, and provide enough metadata for the UI to render recommendations effectively.
from fastapi import FastAPI, Query
from pydantic import BaseModel
from typing import Optional
import time
app = FastAPI()
class RecommendationRequest(BaseModel):
user_id: str
page_type: str = "home" # home, product, category, cart
context_item_id: Optional[str] = None # for "similar items" on product pages
session_id: Optional[str] = None
device: str = "mobile"
limit: int = 20
class RecommendedItem(BaseModel):
item_id: str
score: float
reason: str # "Because you viewed Electronics" (for UI)
source: str # "collaborative", "trending", "similar" (for debugging)
position: int
is_exploration: bool
class RecommendationResponse(BaseModel):
request_id: str
user_id: str
items: list[RecommendedItem]
model_version: str
latency_ms: float
experiment_group: str # A/B test group
@app.post("/v1/recommendations", response_model=RecommendationResponse)
async def get_recommendations(req: RecommendationRequest):
start = time.monotonic()
request_id = generate_request_id()
# 1. Determine experiment group
experiment = get_experiment_assignment(req.user_id)
# 2. Generate candidates (parallel, with timeout)
candidates = await candidate_generator.generate(
user_id=req.user_id,
context={"page_type": req.page_type, "item_id": req.context_item_id},
timeout_ms=30
)
# 3. Fetch features (batch Redis lookup)
features = await feature_service.get_batch_features(
user_id=req.user_id,
item_ids=[c.item_id for c in candidates],
session_id=req.session_id
)
# 4. Rank candidates
ranked = ranker.rank(features, model_version=experiment.model_version)
# 5. Apply business rules and diversity
final = business_rules.apply(ranked, req.page_type)
# 6. Apply exploration
final = bandit.select(final, n=req.limit)
# 7. Generate explanations
items = [
RecommendedItem(
item_id=item["item_id"],
score=round(item["score"], 4),
reason=explain(item, req.user_id),
source=item["source"],
position=i + 1,
is_exploration=item.get("exploration", False)
)
for i, item in enumerate(final)
]
latency_ms = (time.monotonic() - start) * 1000
# 8. Log for training data collection
log_recommendation_event(request_id, req, items, experiment)
return RecommendationResponse(
request_id=request_id,
user_id=req.user_id,
items=items,
model_version=experiment.model_version,
latency_ms=round(latency_ms, 1),
experiment_group=experiment.group_name
)
A/B Testing Recommendations
A/B testing is essential for measuring the real-world impact of recommendation changes. Every recommendation system change should be tested before full rollout.
import hashlib
from dataclasses import dataclass
@dataclass
class Experiment:
name: str
group_name: str
model_version: str
config: dict
class ExperimentManager:
"""Deterministic experiment assignment for A/B testing."""
def __init__(self, experiments: list[dict]):
"""
experiments: [
{"name": "ranking_v2", "traffic": 0.1, "model": "dcn_v2",
"config": {"exploration_rate": 0.1}},
{"name": "control", "traffic": 0.9, "model": "dcn_v1",
"config": {"exploration_rate": 0.05}},
]
"""
self.experiments = experiments
# Pre-compute traffic boundaries
cumulative = 0
self.boundaries = []
for exp in experiments:
cumulative += exp["traffic"]
self.boundaries.append(cumulative)
def assign(self, user_id: str) -> Experiment:
"""Deterministically assign user to experiment group.
Same user always gets the same assignment (consistent hashing).
"""
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
bucket = (hash_value % 1000) / 1000 # 0.0 to 1.0
for i, boundary in enumerate(self.boundaries):
if bucket < boundary:
exp = self.experiments[i]
return Experiment(
name=exp["name"],
group_name=exp["name"],
model_version=exp["model"],
config=exp.get("config", {})
)
# Fallback to last group
exp = self.experiments[-1]
return Experiment(
name=exp["name"],
group_name=exp["name"],
model_version=exp["model"],
config=exp.get("config", {})
)
def analyze_results(self, metrics: dict) -> dict:
"""Compare metrics across experiment groups.
Args:
metrics: {group_name: {"ctr": [...], "revenue": [...], "sessions": int}}
"""
from scipy import stats
results = {}
control_metrics = metrics.get("control", {})
for group_name, group_metrics in metrics.items():
if group_name == "control":
continue
# T-test for CTR difference
t_stat, p_value = stats.ttest_ind(
group_metrics["ctr"], control_metrics["ctr"]
)
ctr_lift = (
(np.mean(group_metrics["ctr"]) - np.mean(control_metrics["ctr"]))
/ np.mean(control_metrics["ctr"]) * 100
)
results[group_name] = {
"ctr_lift_pct": round(ctr_lift, 2),
"p_value": round(p_value, 4),
"significant": p_value < 0.05,
"sessions_control": control_metrics["sessions"],
"sessions_treatment": group_metrics["sessions"],
}
return results
Key Takeaways
- Session modeling with GRUs or Transformers captures real-time user intent — what the user clicked 30 seconds ago is more predictive than their long-term profile.
- Contextual bandits (epsilon-greedy or Thompson Sampling) prevent filter bubbles by exploring items the ranker has not seen enough data to score confidently.
- Online learning keeps models fresh by performing mini-batch updates on streaming interaction data, avoiding the need for daily full retrains.
- Design your personalization API to return explanations ("Because you viewed...") and exploration flags alongside scores for better UX and debugging.
- A/B test every recommendation change for at least 2 weeks. Use consistent hashing for deterministic user assignment and check for novelty effects before declaring a winner.
What Is Next
In the next lesson, we will tackle diversity, fairness, and business rules — the post-processing layer that ensures recommendations are not just accurate but also diverse, fair, and aligned with business objectives.
Lilly Tech Systems