Step 4: Neural Collaborative Filtering Advanced
Neural Collaborative Filtering (NCF) replaces the fixed dot product of traditional matrix factorization with a learned neural network. This lesson implements a two-tower architecture in PyTorch: separate user and item embedding towers that feed into shared dense layers for rating prediction.
NCF Architecture
Architecture
User ID -----> [User Embedding] [Item Embedding] <----- Item ID
| |
v v
[Dense 128] [Dense 128]
| |
+----------+ +------------+
| |
v v
[Concatenate]
|
[Dense 256]
[Dense 128]
[Dense 64]
|
[Output: 1] (predicted rating)
PyTorch Dataset
Python
import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader import numpy as np class RatingDataset(Dataset): """PyTorch Dataset for user-item ratings.""" def __init__(self, ratings_df, user_map, item_map): self.users = torch.LongTensor( ratings_df["user_id"].map(user_map).values ) self.items = torch.LongTensor( ratings_df["item_id"].map(item_map).values ) self.ratings = torch.FloatTensor( ratings_df["rating"].values ) def __len__(self): return len(self.ratings) def __getitem__(self, idx): return self.users[idx], self.items[idx], self.ratings[idx] # Create data loaders train_dataset = RatingDataset(train_df, user_map, item_map) test_dataset = RatingDataset(test_df, user_map, item_map) train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)
NCF Model Definition
Python
class NCFModel(nn.Module): """Neural Collaborative Filtering with two-tower architecture.""" def __init__(self, n_users, n_items, embed_dim=64, hidden_dims=None): super().__init__() if hidden_dims is None: hidden_dims = [256, 128, 64] # Embedding layers (the "two towers") self.user_embedding = nn.Embedding(n_users, embed_dim) self.item_embedding = nn.Embedding(n_items, embed_dim) # User and item projection layers self.user_fc = nn.Sequential( nn.Linear(embed_dim, 128), nn.ReLU(), nn.Dropout(0.2) ) self.item_fc = nn.Sequential( nn.Linear(embed_dim, 128), nn.ReLU(), nn.Dropout(0.2) ) # Shared dense layers after concatenation layers = [] input_dim = 256 # 128 + 128 from both towers for hidden_dim in hidden_dims: layers.extend([ nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.BatchNorm1d(hidden_dim), nn.Dropout(0.2) ]) input_dim = hidden_dim # Output layer (single rating prediction) layers.append(nn.Linear(input_dim, 1)) self.fc_layers = nn.Sequential(*layers) # Initialize weights self._init_weights() def _init_weights(self): for m in self.modules(): if isinstance(m, nn.Embedding): nn.init.normal_(m.weight, std=0.01) elif isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) nn.init.constant_(m.bias, 0) def forward(self, user_ids, item_ids): # Embedding lookup user_emb = self.user_embedding(user_ids) item_emb = self.item_embedding(item_ids) # Tower projections user_out = self.user_fc(user_emb) item_out = self.item_fc(item_emb) # Concatenate and pass through shared layers concat = torch.cat([user_out, item_out], dim=1) output = self.fc_layers(concat) return output.squeeze() # Instantiate n_users = len(user_map) n_items = len(item_map) model = NCFModel(n_users, n_items, embed_dim=64) print(model) print(f"Parameters: {sum(p.numel() for p in model.parameters()):,}")
Training Loop
Python
def train_ncf(model, train_loader, test_loader, epochs=20, lr=0.001): """Train the NCF model with early stopping.""" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = model.to(device) optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode="min", patience=3, factor=0.5 ) criterion = nn.MSELoss() best_val_loss = float("inf") patience_counter = 0 max_patience = 5 for epoch in range(epochs): # Training phase model.train() train_loss = 0 for users, items, ratings in train_loader: users = users.to(device) items = items.to(device) ratings = ratings.to(device) optimizer.zero_grad() predictions = model(users, items) loss = criterion(predictions, ratings) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() train_loss += loss.item() * len(ratings) train_loss /= len(train_loader.dataset) # Validation phase model.eval() val_loss = 0 with torch.no_grad(): for users, items, ratings in test_loader: users = users.to(device) items = items.to(device) ratings = ratings.to(device) predictions = model(users, items) loss = criterion(predictions, ratings) val_loss += loss.item() * len(ratings) val_loss /= len(test_loader.dataset) val_rmse = np.sqrt(val_loss) scheduler.step(val_loss) print(f"Epoch {epoch+1:2d}/{epochs} " f"Train MSE: {train_loss:.4f} " f"Val MSE: {val_loss:.4f} " f"Val RMSE: {val_rmse:.4f}") # Early stopping if val_loss < best_val_loss: best_val_loss = val_loss patience_counter = 0 torch.save(model.state_dict(), "models/ncf_best.pt") else: patience_counter += 1 if patience_counter >= max_patience: print(f"Early stopping at epoch {epoch+1}") break # Load best model model.load_state_dict(torch.load("models/ncf_best.pt")) return model # Train model = train_ncf(model, train_loader, test_loader, epochs=20, lr=0.001)
Inference: Generate Recommendations
Python
def ncf_recommend(model, user_idx, rated_items, n=10, n_items=1682): """Generate top-N recommendations using trained NCF model.""" device = next(model.parameters()).device model.eval() # All candidate items (not yet rated) all_items = set(range(n_items)) candidates = list(all_items - set(rated_items)) if not candidates: return [] # Batch predict user_tensor = torch.LongTensor([user_idx] * len(candidates)).to(device) item_tensor = torch.LongTensor(candidates).to(device) with torch.no_grad(): scores = model(user_tensor, item_tensor).cpu().numpy() # Sort by predicted score item_scores = list(zip(candidates, scores)) item_scores.sort(key=lambda x: x[1], reverse=True) return item_scores[:n] # Get recommendations user_idx = 0 rated = np.where(train_matrix.toarray()[user_idx] != 0)[0].tolist() recs = ncf_recommend(model, user_idx, rated, n=10) print("NCF Recommendations:") for item_idx, score in recs: item_id = reverse_item_map[item_idx] title = movies[movies["item_id"] == item_id]["title"].values[0] print(f" {title}: {score:.2f}")
GPU Optional: MovieLens 100K is small enough that training completes in minutes on CPU. For larger datasets (MovieLens 25M, or production-scale), GPU acceleration becomes essential. The code automatically detects and uses CUDA if available.
Next: REST API & Caching
Serve your trained models via FastAPI with Redis caching for production-ready recommendation serving.
Step 5: REST API & Caching →