Step 5: REST API & Caching Intermediate

A recommendation model is only useful if it can serve predictions in real time. This lesson builds a production-ready REST API with FastAPI, adds a Redis caching layer for sub-millisecond response times on repeated queries, and defines clean response schemas with Pydantic.

Pydantic Response Schemas

Python - api/schemas.py
from pydantic import BaseModel, Field
from typing import List, Optional

class RecommendationItem(BaseModel):
    """A single recommended item."""
    item_id: int
    title: str
    score: float = Field(..., description="Predicted relevance score")
    genres: List[str] = []

class RecommendationResponse(BaseModel):
    """API response for recommendation requests."""
    user_id: int
    model: str = Field(..., description="Model used: cf, content, ncf, hybrid")
    recommendations: List[RecommendationItem]
    cached: bool = False
    latency_ms: float = 0.0

class SimilarItemsResponse(BaseModel):
    """API response for similar items query."""
    item_id: int
    title: str
    similar_items: List[RecommendationItem]

class HealthResponse(BaseModel):
    """Health check response."""
    status: str
    models_loaded: List[str]
    redis_connected: bool

Redis Cache Layer

Python - api/cache.py
import json
import redis
from typing import Optional

class RecommendationCache:
    """Redis-backed cache for recommendation results."""

    def __init__(self, host="localhost", port=6379, db=0, ttl=3600):
        self.ttl = ttl  # Cache TTL in seconds (1 hour)
        try:
            self.client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
            self.client.ping()
            self.connected = True
        except redis.ConnectionError:
            self.client = None
            self.connected = False
            print("WARNING: Redis not available, running without cache")

    def _make_key(self, user_id: int, model: str, n: int) -> str:
        return f"rec:{model}:{user_id}:{n}"

    def get(self, user_id: int, model: str, n: int) -> Optional[list]:
        """Get cached recommendations. Returns None on miss."""
        if not self.connected:
            return None
        try:
            key = self._make_key(user_id, model, n)
            data = self.client.get(key)
            return json.loads(data) if data else None
        except Exception:
            return None

    def set(self, user_id: int, model: str, n: int, recommendations: list):
        """Cache recommendation results."""
        if not self.connected:
            return
        try:
            key = self._make_key(user_id, model, n)
            self.client.setex(key, self.ttl, json.dumps(recommendations))
        except Exception:
            pass

    def invalidate_user(self, user_id: int):
        """Invalidate all cached recommendations for a user."""
        if not self.connected:
            return
        try:
            pattern = f"rec:*:{user_id}:*"
            keys = self.client.keys(pattern)
            if keys:
                self.client.delete(*keys)
        except Exception:
            pass

FastAPI Application

Python - api/main.py
import time
import pickle
from fastapi import FastAPI, HTTPException, Query
from contextlib import asynccontextmanager

from api.schemas import (
    RecommendationResponse, RecommendationItem,
    SimilarItemsResponse, HealthResponse
)
from api.cache import RecommendationCache

# Global state
models = {}
cache = None
data = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Load models and data on startup."""
    global models, cache, data

    # Load processed data
    with open("data/processed_data.pkl", "rb") as f:
        data = pickle.load(f)

    # Load models (trained in previous lessons)
    with open("models/item_cf.pkl", "rb") as f:
        models["cf"] = pickle.load(f)

    with open("models/content_based.pkl", "rb") as f:
        models["content"] = pickle.load(f)

    # Load NCF model
    import torch
    from models.ncf_model import NCFModel
    ncf = NCFModel(len(data["user_map"]), len(data["item_map"]))
    ncf.load_state_dict(torch.load("models/ncf_best.pt", map_location="cpu"))
    ncf.eval()
    models["ncf"] = ncf

    # Initialize Redis cache
    cache = RecommendationCache()

    print(f"Loaded models: {list(models.keys())}")
    print(f"Redis connected: {cache.connected}")

    yield

    # Cleanup
    print("Shutting down...")


app = FastAPI(
    title="Recommendation Engine API",
    version="1.0.0",
    description="Production recommendation API with multiple models and Redis caching.",
    lifespan=lifespan
)


@app.get("/health", response_model=HealthResponse)
async def health_check():
    return HealthResponse(
        status="healthy",
        models_loaded=list(models.keys()),
        redis_connected=cache.connected if cache else False
    )


@app.get("/recommend/{user_id}", response_model=RecommendationResponse)
async def get_recommendations(
    user_id: int,
    model: str = Query("hybrid", enum=["cf", "content", "ncf", "hybrid"]),
    n: int = Query(10, ge=1, le=50)
):
    """Get top-N recommendations for a user."""
    start = time.time()

    # Validate user
    if user_id not in data["user_map"]:
        raise HTTPException(
            status_code=404,
            detail=f"User {user_id} not found"
        )

    # Check cache
    cached_result = cache.get(user_id, model, n)
    if cached_result:
        latency = (time.time() - start) * 1000
        return RecommendationResponse(
            user_id=user_id,
            model=model,
            recommendations=[RecommendationItem(**r) for r in cached_result],
            cached=True,
            latency_ms=round(latency, 2)
        )

    # Generate recommendations
    user_idx = data["user_map"][user_id]
    recs = _generate_recommendations(user_idx, model, n)

    # Cache the result
    cache_data = [r.dict() for r in recs]
    cache.set(user_id, model, n, cache_data)

    latency = (time.time() - start) * 1000
    return RecommendationResponse(
        user_id=user_id,
        model=model,
        recommendations=recs,
        cached=False,
        latency_ms=round(latency, 2)
    )


def _generate_recommendations(user_idx, model_name, n):
    """Generate recommendations using the specified model."""
    if model_name == "cf":
        raw_recs = models["cf"].recommend(user_idx, n=n)
    elif model_name == "ncf":
        import numpy as np
        rated = np.where(
            models["cf"].matrix[user_idx] != 0
        )[0].tolist()
        raw_recs = ncf_recommend(models["ncf"], user_idx, rated, n=n)
    else:
        raw_recs = models["cf"].recommend(user_idx, n=n)

    # Convert to response items
    movies_df = data["movies"]
    items = []
    for item_idx, score in raw_recs:
        item_id = data["reverse_item_map"][item_idx]
        movie = movies_df[movies_df["item_id"] == item_id].iloc[0]
        items.append(RecommendationItem(
            item_id=item_id,
            title=movie["title"],
            score=round(float(score), 3),
            genres=[]
        ))
    return items


@app.get("/similar/{item_id}", response_model=SimilarItemsResponse)
async def get_similar_items(
    item_id: int,
    n: int = Query(10, ge=1, le=50)
):
    """Get items similar to a given item."""
    if item_id not in data["item_map"]:
        raise HTTPException(status_code=404, detail=f"Item {item_id} not found")

    item_idx = data["item_map"][item_id]
    similar = models["content"].get_similar_items(item_idx, n=n)

    movies_df = data["movies"]
    source_movie = movies_df[movies_df["item_id"] == item_id].iloc[0]

    items = []
    for idx, score in similar:
        sim_id = data["reverse_item_map"].get(idx, idx)
        movie = movies_df[movies_df["item_id"] == sim_id].iloc[0]
        items.append(RecommendationItem(
            item_id=sim_id,
            title=movie["title"],
            score=round(float(score), 3)
        ))

    return SimilarItemsResponse(
        item_id=item_id,
        title=source_movie["title"],
        similar_items=items
    )

Run the API

Bash
# Start Redis (Docker)
docker run -d --name redis -p 6379:6379 redis:alpine

# Start the API server
uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload

# Test endpoints
curl http://localhost:8000/health
curl "http://localhost:8000/recommend/1?model=cf&n=5"
curl "http://localhost:8000/recommend/1?model=ncf&n=10"
curl "http://localhost:8000/similar/50?n=5"

Example API Response

JSON
{
  "user_id": 1,
  "model": "cf",
  "recommendations": [
    {"item_id": 50, "title": "Star Wars (1977)", "score": 4.823, "genres": []},
    {"item_id": 181, "title": "Return of the Jedi (1983)", "score": 4.712, "genres": []},
    {"item_id": 100, "title": "Fargo (1996)", "score": 4.689, "genres": []},
    {"item_id": 258, "title": "Contact (1997)", "score": 4.551, "genres": []},
    {"item_id": 294, "title": "Liar Liar (1997)", "score": 4.498, "genres": []}
  ],
  "cached": false,
  "latency_ms": 23.45
}
Performance: First request to the API generates fresh recommendations (20-50ms). Subsequent requests for the same user/model/n combination hit Redis and return in under 1ms. The cache auto-expires after 1 hour (configurable TTL).

Next: Evaluation & A/B Testing

Measure how good your recommendations actually are with offline metrics and design online A/B experiments.

Step 6: Evaluation & A/B Testing →