Step 5: REST API & Caching Intermediate
A recommendation model is only useful if it can serve predictions in real time. This lesson builds a production-ready REST API with FastAPI, adds a Redis caching layer for sub-millisecond response times on repeated queries, and defines clean response schemas with Pydantic.
Pydantic Response Schemas
Python - api/schemas.py
from pydantic import BaseModel, Field from typing import List, Optional class RecommendationItem(BaseModel): """A single recommended item.""" item_id: int title: str score: float = Field(..., description="Predicted relevance score") genres: List[str] = [] class RecommendationResponse(BaseModel): """API response for recommendation requests.""" user_id: int model: str = Field(..., description="Model used: cf, content, ncf, hybrid") recommendations: List[RecommendationItem] cached: bool = False latency_ms: float = 0.0 class SimilarItemsResponse(BaseModel): """API response for similar items query.""" item_id: int title: str similar_items: List[RecommendationItem] class HealthResponse(BaseModel): """Health check response.""" status: str models_loaded: List[str] redis_connected: bool
Redis Cache Layer
Python - api/cache.py
import json import redis from typing import Optional class RecommendationCache: """Redis-backed cache for recommendation results.""" def __init__(self, host="localhost", port=6379, db=0, ttl=3600): self.ttl = ttl # Cache TTL in seconds (1 hour) try: self.client = redis.Redis(host=host, port=port, db=db, decode_responses=True) self.client.ping() self.connected = True except redis.ConnectionError: self.client = None self.connected = False print("WARNING: Redis not available, running without cache") def _make_key(self, user_id: int, model: str, n: int) -> str: return f"rec:{model}:{user_id}:{n}" def get(self, user_id: int, model: str, n: int) -> Optional[list]: """Get cached recommendations. Returns None on miss.""" if not self.connected: return None try: key = self._make_key(user_id, model, n) data = self.client.get(key) return json.loads(data) if data else None except Exception: return None def set(self, user_id: int, model: str, n: int, recommendations: list): """Cache recommendation results.""" if not self.connected: return try: key = self._make_key(user_id, model, n) self.client.setex(key, self.ttl, json.dumps(recommendations)) except Exception: pass def invalidate_user(self, user_id: int): """Invalidate all cached recommendations for a user.""" if not self.connected: return try: pattern = f"rec:*:{user_id}:*" keys = self.client.keys(pattern) if keys: self.client.delete(*keys) except Exception: pass
FastAPI Application
Python - api/main.py
import time import pickle from fastapi import FastAPI, HTTPException, Query from contextlib import asynccontextmanager from api.schemas import ( RecommendationResponse, RecommendationItem, SimilarItemsResponse, HealthResponse ) from api.cache import RecommendationCache # Global state models = {} cache = None data = {} @asynccontextmanager async def lifespan(app: FastAPI): """Load models and data on startup.""" global models, cache, data # Load processed data with open("data/processed_data.pkl", "rb") as f: data = pickle.load(f) # Load models (trained in previous lessons) with open("models/item_cf.pkl", "rb") as f: models["cf"] = pickle.load(f) with open("models/content_based.pkl", "rb") as f: models["content"] = pickle.load(f) # Load NCF model import torch from models.ncf_model import NCFModel ncf = NCFModel(len(data["user_map"]), len(data["item_map"])) ncf.load_state_dict(torch.load("models/ncf_best.pt", map_location="cpu")) ncf.eval() models["ncf"] = ncf # Initialize Redis cache cache = RecommendationCache() print(f"Loaded models: {list(models.keys())}") print(f"Redis connected: {cache.connected}") yield # Cleanup print("Shutting down...") app = FastAPI( title="Recommendation Engine API", version="1.0.0", description="Production recommendation API with multiple models and Redis caching.", lifespan=lifespan ) @app.get("/health", response_model=HealthResponse) async def health_check(): return HealthResponse( status="healthy", models_loaded=list(models.keys()), redis_connected=cache.connected if cache else False ) @app.get("/recommend/{user_id}", response_model=RecommendationResponse) async def get_recommendations( user_id: int, model: str = Query("hybrid", enum=["cf", "content", "ncf", "hybrid"]), n: int = Query(10, ge=1, le=50) ): """Get top-N recommendations for a user.""" start = time.time() # Validate user if user_id not in data["user_map"]: raise HTTPException( status_code=404, detail=f"User {user_id} not found" ) # Check cache cached_result = cache.get(user_id, model, n) if cached_result: latency = (time.time() - start) * 1000 return RecommendationResponse( user_id=user_id, model=model, recommendations=[RecommendationItem(**r) for r in cached_result], cached=True, latency_ms=round(latency, 2) ) # Generate recommendations user_idx = data["user_map"][user_id] recs = _generate_recommendations(user_idx, model, n) # Cache the result cache_data = [r.dict() for r in recs] cache.set(user_id, model, n, cache_data) latency = (time.time() - start) * 1000 return RecommendationResponse( user_id=user_id, model=model, recommendations=recs, cached=False, latency_ms=round(latency, 2) ) def _generate_recommendations(user_idx, model_name, n): """Generate recommendations using the specified model.""" if model_name == "cf": raw_recs = models["cf"].recommend(user_idx, n=n) elif model_name == "ncf": import numpy as np rated = np.where( models["cf"].matrix[user_idx] != 0 )[0].tolist() raw_recs = ncf_recommend(models["ncf"], user_idx, rated, n=n) else: raw_recs = models["cf"].recommend(user_idx, n=n) # Convert to response items movies_df = data["movies"] items = [] for item_idx, score in raw_recs: item_id = data["reverse_item_map"][item_idx] movie = movies_df[movies_df["item_id"] == item_id].iloc[0] items.append(RecommendationItem( item_id=item_id, title=movie["title"], score=round(float(score), 3), genres=[] )) return items @app.get("/similar/{item_id}", response_model=SimilarItemsResponse) async def get_similar_items( item_id: int, n: int = Query(10, ge=1, le=50) ): """Get items similar to a given item.""" if item_id not in data["item_map"]: raise HTTPException(status_code=404, detail=f"Item {item_id} not found") item_idx = data["item_map"][item_id] similar = models["content"].get_similar_items(item_idx, n=n) movies_df = data["movies"] source_movie = movies_df[movies_df["item_id"] == item_id].iloc[0] items = [] for idx, score in similar: sim_id = data["reverse_item_map"].get(idx, idx) movie = movies_df[movies_df["item_id"] == sim_id].iloc[0] items.append(RecommendationItem( item_id=sim_id, title=movie["title"], score=round(float(score), 3) )) return SimilarItemsResponse( item_id=item_id, title=source_movie["title"], similar_items=items )
Run the API
Bash
# Start Redis (Docker) docker run -d --name redis -p 6379:6379 redis:alpine # Start the API server uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload # Test endpoints curl http://localhost:8000/health curl "http://localhost:8000/recommend/1?model=cf&n=5" curl "http://localhost:8000/recommend/1?model=ncf&n=10" curl "http://localhost:8000/similar/50?n=5"
Example API Response
JSON
{
"user_id": 1,
"model": "cf",
"recommendations": [
{"item_id": 50, "title": "Star Wars (1977)", "score": 4.823, "genres": []},
{"item_id": 181, "title": "Return of the Jedi (1983)", "score": 4.712, "genres": []},
{"item_id": 100, "title": "Fargo (1996)", "score": 4.689, "genres": []},
{"item_id": 258, "title": "Contact (1997)", "score": 4.551, "genres": []},
{"item_id": 294, "title": "Liar Liar (1997)", "score": 4.498, "genres": []}
],
"cached": false,
"latency_ms": 23.45
}
Performance: First request to the API generates fresh recommendations (20-50ms). Subsequent requests for the same user/model/n combination hit Redis and return in under 1ms. The cache auto-expires after 1 hour (configurable TTL).
Next: Evaluation & A/B Testing
Measure how good your recommendations actually are with offline metrics and design online A/B experiments.
Step 6: Evaluation & A/B Testing →