Intermediate

Real-Time Personalization

Static recommendations computed once per day miss the most valuable signal: what the user is doing right now. This lesson covers how to build personalization that adapts in real time to user sessions, balances exploration with exploitation, and provides a clean API for frontend consumption.

User Session Modeling

Session modeling captures the user's intent within a single browsing session. A user who just clicked on three running shoes has a very different intent than one who browsed kitchen appliances, even if their long-term profiles are identical.

import torch
import torch.nn as nn

class SessionEncoder(nn.Module):
    """Encodes a sequence of item interactions into a session embedding.

    Uses a GRU to capture sequential patterns in user behavior.
    The session embedding is used to adjust ranking scores in real time.
    """
    def __init__(self, item_embedding_dim: int, hidden_dim: int = 128):
        super().__init__()
        self.gru = nn.GRU(
            input_size=item_embedding_dim,
            hidden_size=hidden_dim,
            num_layers=2,
            batch_first=True,
            dropout=0.1
        )
        self.attention = nn.Sequential(
            nn.Linear(hidden_dim, 64),
            nn.Tanh(),
            nn.Linear(64, 1)
        )

    def forward(self, item_embeddings, lengths):
        """
        Args:
            item_embeddings: (batch, max_seq_len, embed_dim) - items viewed in session
            lengths: (batch,) - actual sequence lengths
        Returns:
            session_embedding: (batch, hidden_dim)
        """
        # Pack for variable-length sequences
        packed = nn.utils.rnn.pack_padded_sequence(
            item_embeddings, lengths, batch_first=True, enforce_sorted=False
        )
        gru_out, _ = self.gru(packed)
        gru_out, _ = nn.utils.rnn.pad_packed_sequence(gru_out, batch_first=True)

        # Attention over sequence positions
        attn_weights = self.attention(gru_out).squeeze(-1)  # (batch, seq_len)

        # Mask padding positions
        mask = torch.arange(gru_out.size(1)).unsqueeze(0) < lengths.unsqueeze(1)
        attn_weights = attn_weights.masked_fill(~mask, float("-inf"))
        attn_weights = torch.softmax(attn_weights, dim=1)

        # Weighted sum of GRU outputs
        session_embedding = torch.bmm(
            attn_weights.unsqueeze(1), gru_out
        ).squeeze(1)

        return session_embedding


class SessionAwareRanker(nn.Module):
    """Ranking model that incorporates real-time session context."""

    def __init__(self, feature_dim, session_dim=128):
        super().__init__()
        self.session_encoder = SessionEncoder(item_embedding_dim=128, hidden_dim=session_dim)
        self.ranker = nn.Sequential(
            nn.Linear(feature_dim + session_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )

    def forward(self, ranking_features, session_item_embeddings, session_lengths):
        session_emb = self.session_encoder(session_item_embeddings, session_lengths)
        combined = torch.cat([ranking_features, session_emb], dim=1)
        return self.ranker(combined).squeeze(-1)

Contextual Bandits for Exploration

Pure exploitation (always showing the highest-ranked items) creates filter bubbles and prevents you from discovering that a user might like a new category. Contextual bandits balance exploration (trying new items) with exploitation (showing what the model is confident about).

import numpy as np
from typing import Optional

class EpsilonGreedyBandit:
    """Simple epsilon-greedy exploration for recommendations.

    With probability epsilon, show a random item from candidates.
    With probability 1-epsilon, show the top-ranked item.
    """
    def __init__(self, epsilon: float = 0.1, decay: float = 0.999):
        self.epsilon = epsilon
        self.decay = decay

    def select(self, ranked_items: list[dict], n: int = 10) -> list[dict]:
        selected = []
        remaining = list(ranked_items)

        for _ in range(n):
            if not remaining:
                break
            if np.random.random() < self.epsilon:
                # Explore: pick a random item (not from top)
                idx = np.random.randint(min(5, len(remaining)), len(remaining)) \
                    if len(remaining) > 5 else np.random.randint(len(remaining))
                selected.append({**remaining.pop(idx), "exploration": True})
            else:
                # Exploit: pick the top-ranked item
                selected.append({**remaining.pop(0), "exploration": False})

        self.epsilon *= self.decay  # Decay exploration over time
        return selected


class ThompsonSamplingBandit:
    """Thompson Sampling for recommendation exploration.

    Maintains a Beta distribution for each item category's CTR.
    Samples from the posterior to decide what to explore.
    """
    def __init__(self):
        # Track (successes, failures) per category
        self.category_stats = {}

    def update(self, category_id: str, clicked: bool):
        if category_id not in self.category_stats:
            self.category_stats[category_id] = {"alpha": 1.0, "beta": 1.0}
        if clicked:
            self.category_stats[category_id]["alpha"] += 1
        else:
            self.category_stats[category_id]["beta"] += 1

    def sample_score(self, category_id: str) -> float:
        stats = self.category_stats.get(category_id, {"alpha": 1.0, "beta": 1.0})
        return np.random.beta(stats["alpha"], stats["beta"])

    def select(self, ranked_items: list[dict], n: int = 10) -> list[dict]:
        # Add Thompson sampling bonus to each item's score
        for item in ranked_items:
            exploration_bonus = self.sample_score(item["category_id"])
            item["adjusted_score"] = item["score"] * 0.8 + exploration_bonus * 0.2

        # Re-sort by adjusted score
        ranked_items.sort(key=lambda x: -x["adjusted_score"])
        return ranked_items[:n]
💡
Apply at work: Start with epsilon-greedy (epsilon=0.1) because it is simple to implement and explain. Log which items were shown due to exploration vs exploitation. After 2 weeks, analyze whether explored items got clicked — if exploration is finding good items the ranker missed, keep it. If not, reduce epsilon. Graduate to Thompson Sampling when you have per-category CTR data.

Online Learning: Updating Models in Real Time

Traditional recommendation systems retrain models daily or weekly. Online learning updates model parameters continuously as new interaction data arrives, keeping recommendations fresh.

import torch
from collections import deque

class OnlineLearningRanker:
    """Ranker with online learning via mini-batch SGD on streaming data.

    Collects recent interactions in a buffer, then performs gradient updates
    periodically without a full retraining cycle.
    """
    def __init__(self, model, learning_rate=1e-4, buffer_size=10000,
                 update_interval=1000):
        self.model = model
        self.optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        self.buffer = deque(maxlen=buffer_size)
        self.update_interval = update_interval
        self.interactions_since_update = 0

    def log_interaction(self, features: dict, label: float):
        """Log a user interaction (click=1, no-click=0)."""
        self.buffer.append((features, label))
        self.interactions_since_update += 1

        if self.interactions_since_update >= self.update_interval:
            self._update_model()
            self.interactions_since_update = 0

    def _update_model(self):
        """Perform a mini-batch gradient update on recent data."""
        if len(self.buffer) < 100:
            return

        # Sample a mini-batch from the buffer
        indices = torch.randint(0, len(self.buffer), (256,))
        batch_features = []
        batch_labels = []
        for idx in indices:
            features, label = self.buffer[idx]
            batch_features.append(features)
            batch_labels.append(label)

        features_tensor = torch.stack(batch_features)
        labels_tensor = torch.tensor(batch_labels)

        # Gradient update
        self.model.train()
        self.optimizer.zero_grad()
        predictions = self.model(features_tensor)
        loss = torch.nn.functional.binary_cross_entropy(predictions, labels_tensor)
        loss.backward()

        # Gradient clipping to prevent catastrophic updates
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
        self.optimizer.step()
        self.model.eval()

        print(f"[OnlineLearning] Updated model, loss={loss.item():.4f}, "
              f"buffer_size={len(self.buffer)}")

Personalization API Design

The personalization API is the interface between your recommendation engine and the frontend. It needs to be fast, cacheable, and provide enough metadata for the UI to render recommendations effectively.

from fastapi import FastAPI, Query
from pydantic import BaseModel
from typing import Optional
import time

app = FastAPI()

class RecommendationRequest(BaseModel):
    user_id: str
    page_type: str = "home"          # home, product, category, cart
    context_item_id: Optional[str] = None  # for "similar items" on product pages
    session_id: Optional[str] = None
    device: str = "mobile"
    limit: int = 20

class RecommendedItem(BaseModel):
    item_id: str
    score: float
    reason: str          # "Because you viewed Electronics" (for UI)
    source: str          # "collaborative", "trending", "similar" (for debugging)
    position: int
    is_exploration: bool

class RecommendationResponse(BaseModel):
    request_id: str
    user_id: str
    items: list[RecommendedItem]
    model_version: str
    latency_ms: float
    experiment_group: str  # A/B test group

@app.post("/v1/recommendations", response_model=RecommendationResponse)
async def get_recommendations(req: RecommendationRequest):
    start = time.monotonic()
    request_id = generate_request_id()

    # 1. Determine experiment group
    experiment = get_experiment_assignment(req.user_id)

    # 2. Generate candidates (parallel, with timeout)
    candidates = await candidate_generator.generate(
        user_id=req.user_id,
        context={"page_type": req.page_type, "item_id": req.context_item_id},
        timeout_ms=30
    )

    # 3. Fetch features (batch Redis lookup)
    features = await feature_service.get_batch_features(
        user_id=req.user_id,
        item_ids=[c.item_id for c in candidates],
        session_id=req.session_id
    )

    # 4. Rank candidates
    ranked = ranker.rank(features, model_version=experiment.model_version)

    # 5. Apply business rules and diversity
    final = business_rules.apply(ranked, req.page_type)

    # 6. Apply exploration
    final = bandit.select(final, n=req.limit)

    # 7. Generate explanations
    items = [
        RecommendedItem(
            item_id=item["item_id"],
            score=round(item["score"], 4),
            reason=explain(item, req.user_id),
            source=item["source"],
            position=i + 1,
            is_exploration=item.get("exploration", False)
        )
        for i, item in enumerate(final)
    ]

    latency_ms = (time.monotonic() - start) * 1000

    # 8. Log for training data collection
    log_recommendation_event(request_id, req, items, experiment)

    return RecommendationResponse(
        request_id=request_id,
        user_id=req.user_id,
        items=items,
        model_version=experiment.model_version,
        latency_ms=round(latency_ms, 1),
        experiment_group=experiment.group_name
    )

A/B Testing Recommendations

A/B testing is essential for measuring the real-world impact of recommendation changes. Every recommendation system change should be tested before full rollout.

import hashlib
from dataclasses import dataclass

@dataclass
class Experiment:
    name: str
    group_name: str
    model_version: str
    config: dict

class ExperimentManager:
    """Deterministic experiment assignment for A/B testing."""

    def __init__(self, experiments: list[dict]):
        """
        experiments: [
            {"name": "ranking_v2", "traffic": 0.1, "model": "dcn_v2",
             "config": {"exploration_rate": 0.1}},
            {"name": "control", "traffic": 0.9, "model": "dcn_v1",
             "config": {"exploration_rate": 0.05}},
        ]
        """
        self.experiments = experiments
        # Pre-compute traffic boundaries
        cumulative = 0
        self.boundaries = []
        for exp in experiments:
            cumulative += exp["traffic"]
            self.boundaries.append(cumulative)

    def assign(self, user_id: str) -> Experiment:
        """Deterministically assign user to experiment group.

        Same user always gets the same assignment (consistent hashing).
        """
        hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        bucket = (hash_value % 1000) / 1000  # 0.0 to 1.0

        for i, boundary in enumerate(self.boundaries):
            if bucket < boundary:
                exp = self.experiments[i]
                return Experiment(
                    name=exp["name"],
                    group_name=exp["name"],
                    model_version=exp["model"],
                    config=exp.get("config", {})
                )

        # Fallback to last group
        exp = self.experiments[-1]
        return Experiment(
            name=exp["name"],
            group_name=exp["name"],
            model_version=exp["model"],
            config=exp.get("config", {})
        )

    def analyze_results(self, metrics: dict) -> dict:
        """Compare metrics across experiment groups.

        Args:
            metrics: {group_name: {"ctr": [...], "revenue": [...], "sessions": int}}
        """
        from scipy import stats

        results = {}
        control_metrics = metrics.get("control", {})

        for group_name, group_metrics in metrics.items():
            if group_name == "control":
                continue

            # T-test for CTR difference
            t_stat, p_value = stats.ttest_ind(
                group_metrics["ctr"], control_metrics["ctr"]
            )

            ctr_lift = (
                (np.mean(group_metrics["ctr"]) - np.mean(control_metrics["ctr"]))
                / np.mean(control_metrics["ctr"]) * 100
            )

            results[group_name] = {
                "ctr_lift_pct": round(ctr_lift, 2),
                "p_value": round(p_value, 4),
                "significant": p_value < 0.05,
                "sessions_control": control_metrics["sessions"],
                "sessions_treatment": group_metrics["sessions"],
            }

        return results
📝
Production reality: Never deploy a recommendation model change without A/B testing. Even changes that improve offline metrics (NDCG, recall) can hurt online metrics (CTR, revenue) due to distributional shift, latency differences, or unexpected user behavior. Run experiments for at least 2 weeks to account for day-of-week effects, and always check for novelty effects (initial bump that fades).

Key Takeaways

  • Session modeling with GRUs or Transformers captures real-time user intent — what the user clicked 30 seconds ago is more predictive than their long-term profile.
  • Contextual bandits (epsilon-greedy or Thompson Sampling) prevent filter bubbles by exploring items the ranker has not seen enough data to score confidently.
  • Online learning keeps models fresh by performing mini-batch updates on streaming interaction data, avoiding the need for daily full retrains.
  • Design your personalization API to return explanations ("Because you viewed...") and exploration flags alongside scores for better UX and debugging.
  • A/B test every recommendation change for at least 2 weeks. Use consistent hashing for deterministic user assignment and check for novelty effects before declaring a winner.

What Is Next

In the next lesson, we will tackle diversity, fairness, and business rules — the post-processing layer that ensures recommendations are not just accurate but also diverse, fair, and aligned with business objectives.