PyTorch Interview Challenges
8 advanced PyTorch coding challenges that test your ability to write production-quality deep learning code. These are the exact patterns asked in ML engineer interviews at companies using PyTorch.
Challenge 1: Custom Dataset Class
__len__, __getitem__, and data type conversions. Support train/val split.import torch
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import pandas as pd
class TabularDataset(Dataset):
"""Custom dataset for tabular data with mixed types.
Handles numeric features (as float tensors) and a target column.
Categorical features should be pre-encoded before passing.
"""
def __init__(self, csv_path=None, dataframe=None,
feature_cols=None, target_col=None):
"""
Args:
csv_path: Path to CSV file (mutually exclusive with dataframe).
dataframe: Pandas DataFrame (mutually exclusive with csv_path).
feature_cols: List of feature column names.
target_col: Name of the target column.
"""
if dataframe is not None:
df = dataframe
else:
df = pd.read_csv(csv_path)
self.features = torch.tensor(
df[feature_cols].values, dtype=torch.float32
)
self.targets = torch.tensor(
df[target_col].values, dtype=torch.long # For classification
)
def __len__(self):
return len(self.targets)
def __getitem__(self, idx):
return self.features[idx], self.targets[idx]
# Test with synthetic data
df = pd.DataFrame({
'feat1': np.random.randn(100),
'feat2': np.random.randn(100),
'feat3': np.random.randn(100),
'label': np.random.randint(0, 3, 100)
})
dataset = TabularDataset(
dataframe=df,
feature_cols=['feat1', 'feat2', 'feat3'],
target_col='label'
)
# Train/val split
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_ds, val_ds = random_split(dataset, [train_size, val_size])
# DataLoaders
train_loader = DataLoader(train_ds, batch_size=16, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=16, shuffle=False)
# Verify
batch_x, batch_y = next(iter(train_loader))
print(f"Batch features shape: {batch_x.shape}") # (16, 3)
print(f"Batch labels shape: {batch_y.shape}") # (16,)
print(f"Train size: {len(train_ds)}, Val size: {len(val_ds)}")
Interviewer focus: Correct dtype conversion (float32 for features, long for classification targets), using random_split for train/val, and shuffle=True only for training. Mention that for large datasets, you would load lazily in __getitem__ instead of loading all data in __init__.
Challenge 2: Custom Neural Network Layer
nn.Module. It should compute attention weights, apply them to values, and include learnable parameters. Handle masking for variable-length sequences.import torch
import torch.nn as nn
import torch.nn.functional as F
import math
class SimpleSelfAttention(nn.Module):
"""Simplified single-head self-attention layer.
Args:
embed_dim: Dimension of input embeddings.
dropout: Dropout rate for attention weights.
"""
def __init__(self, embed_dim, dropout=0.1):
super().__init__()
self.embed_dim = embed_dim
# Learnable projections for Q, K, V
self.query = nn.Linear(embed_dim, embed_dim)
self.key = nn.Linear(embed_dim, embed_dim)
self.value = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)
self.scale = math.sqrt(embed_dim)
def forward(self, x, mask=None):
"""
Args:
x: Input tensor of shape (batch, seq_len, embed_dim).
mask: Optional boolean mask of shape (batch, seq_len),
True = position to MASK (ignore).
Returns:
Output tensor of shape (batch, seq_len, embed_dim).
Attention weights of shape (batch, seq_len, seq_len).
"""
Q = self.query(x) # (batch, seq_len, embed_dim)
K = self.key(x) # (batch, seq_len, embed_dim)
V = self.value(x) # (batch, seq_len, embed_dim)
# Attention scores: Q @ K^T / sqrt(d)
scores = torch.bmm(Q, K.transpose(1, 2)) / self.scale
# shape: (batch, seq_len, seq_len)
# Apply mask (set masked positions to -inf before softmax)
if mask is not None:
# Expand mask: (batch, seq_len) -> (batch, 1, seq_len)
mask = mask.unsqueeze(1)
scores = scores.masked_fill(mask, float('-inf'))
# Softmax over last dimension (key dimension)
attn_weights = F.softmax(scores, dim=-1)
attn_weights = self.dropout(attn_weights)
# Weighted sum of values
output = torch.bmm(attn_weights, V)
return output, attn_weights
# Test
batch, seq_len, embed_dim = 2, 5, 8
x = torch.randn(batch, seq_len, embed_dim)
# Mask: True means "ignore this position"
mask = torch.tensor([
[False, False, False, True, True], # seq_len=3 (2 padding tokens)
[False, False, False, False, True], # seq_len=4 (1 padding token)
])
attn = SimpleSelfAttention(embed_dim)
output, weights = attn(x, mask)
print(f"Output shape: {output.shape}") # (2, 5, 8)
print(f"Attention shape: {weights.shape}") # (2, 5, 5)
print(f"Row sums (should be ~1): {weights[0, 0].sum().item():.4f}")
Interviewer focus: Correct scaling by sqrt(d), proper masking with masked_fill before softmax (not after), and understanding that -inf becomes 0 after softmax. This is the foundation of Transformer architectures.
Challenge 3: Complete Training Loop
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset, random_split
import copy
def train_model(model, train_loader, val_loader, epochs=50,
lr=1e-3, patience=5, grad_clip=1.0):
"""Complete training loop with best practices.
Args:
model: nn.Module to train.
train_loader: Training DataLoader.
val_loader: Validation DataLoader.
epochs: Maximum number of epochs.
lr: Initial learning rate.
patience: Early stopping patience.
grad_clip: Max gradient norm for clipping.
Returns:
Dict with trained model, history, and best score.
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=3
)
criterion = nn.CrossEntropyLoss()
best_val_loss = float('inf')
best_model_state = None
epochs_without_improvement = 0
history = {'train_loss': [], 'val_loss': [], 'val_acc': [], 'lr': []}
for epoch in range(epochs):
# ---- TRAINING ----
model.train()
train_loss = 0.0
for batch_x, batch_y in train_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()
# Gradient clipping (prevents exploding gradients)
torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
optimizer.step()
train_loss += loss.item() * len(batch_y)
train_loss /= len(train_loader.dataset)
# ---- VALIDATION ----
model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for batch_x, batch_y in val_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
val_loss += loss.item() * len(batch_y)
_, predicted = outputs.max(1)
correct += predicted.eq(batch_y).sum().item()
total += len(batch_y)
val_loss /= len(val_loader.dataset)
val_acc = correct / total
# Learning rate scheduling
scheduler.step(val_loss)
current_lr = optimizer.param_groups[0]['lr']
# Record history
history['train_loss'].append(train_loss)
history['val_loss'].append(val_loss)
history['val_acc'].append(val_acc)
history['lr'].append(current_lr)
print(f"Epoch {epoch+1}/{epochs} | "
f"Train Loss: {train_loss:.4f} | "
f"Val Loss: {val_loss:.4f} | "
f"Val Acc: {val_acc:.4f} | "
f"LR: {current_lr:.6f}")
# Early stopping with model checkpointing
if val_loss < best_val_loss:
best_val_loss = val_loss
best_model_state = copy.deepcopy(model.state_dict())
epochs_without_improvement = 0
else:
epochs_without_improvement += 1
if epochs_without_improvement >= patience:
print(f"Early stopping at epoch {epoch+1}")
break
# Restore best model
model.load_state_dict(best_model_state)
return {
'model': model,
'history': history,
'best_val_loss': best_val_loss
}
# Test
class SimpleNet(nn.Module):
def __init__(self, in_features, num_classes):
super().__init__()
self.net = nn.Sequential(
nn.Linear(in_features, 64),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, num_classes)
)
def forward(self, x):
return self.net(x)
# Synthetic data
X = torch.randn(500, 10)
y = torch.randint(0, 3, (500,))
dataset = TensorDataset(X, y)
train_ds, val_ds = random_split(dataset, [400, 100])
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=32, shuffle=False)
model = SimpleNet(in_features=10, num_classes=3)
result = train_model(model, train_loader, val_loader, epochs=30, patience=5)
Interviewer focus: The five critical elements: (1) model.train() / model.eval() switching, (2) torch.no_grad() during validation, (3) optimizer.zero_grad() before backward, (4) gradient clipping, (5) early stopping with model checkpointing using deepcopy. Missing any of these is a red flag.
Challenge 4: Custom Loss Function
import torch
import torch.nn as nn
import torch.nn.functional as F
class FocalLoss(nn.Module):
"""Focal Loss for imbalanced classification.
FL(p_t) = -alpha_t * (1 - p_t)^gamma * log(p_t)
When gamma=0, this is standard cross-entropy.
Higher gamma = more focus on hard examples.
Args:
alpha: Class weights tensor of shape (num_classes,), or None.
gamma: Focusing parameter (default 2.0).
reduction: 'mean', 'sum', or 'none'.
"""
def __init__(self, alpha=None, gamma=2.0, reduction='mean'):
super().__init__()
self.alpha = alpha
self.gamma = gamma
self.reduction = reduction
def forward(self, logits, targets):
"""
Args:
logits: Raw model output, shape (batch, num_classes).
targets: Ground truth labels, shape (batch,).
Returns:
Scalar loss (if reduction='mean' or 'sum').
"""
# Standard cross-entropy (log softmax)
ce_loss = F.cross_entropy(logits, targets, reduction='none')
# Probability of the correct class
p_t = torch.exp(-ce_loss)
# Focal modulation factor: (1 - p_t)^gamma
focal_weight = (1 - p_t) ** self.gamma
# Apply class weights if provided
if self.alpha is not None:
alpha_t = self.alpha[targets]
focal_weight = focal_weight * alpha_t
loss = focal_weight * ce_loss
if self.reduction == 'mean':
return loss.mean()
elif self.reduction == 'sum':
return loss.sum()
return loss
# Test: compare focal loss vs cross-entropy
torch.manual_seed(42)
logits = torch.randn(8, 3) # 8 samples, 3 classes
targets = torch.tensor([0, 0, 0, 0, 0, 1, 1, 2]) # Imbalanced
ce_loss = F.cross_entropy(logits, targets)
focal = FocalLoss(gamma=2.0)
fl_loss = focal(logits, targets)
print(f"Cross-Entropy Loss: {ce_loss.item():.4f}")
print(f"Focal Loss (gamma=2): {fl_loss.item():.4f}")
# With class weights
alpha = torch.tensor([0.25, 0.5, 0.25]) # Upweight minority class
focal_weighted = FocalLoss(alpha=alpha, gamma=2.0)
fl_weighted = focal_weighted(logits, targets)
print(f"Weighted Focal Loss: {fl_weighted.item():.4f}")
Interviewer focus: Understanding the mathematical intuition (easy examples with high p_t get down-weighted), using F.cross_entropy with reduction='none' as a building block, and proper handling of class weights. Focal loss is used in object detection (RetinaNet) and any imbalanced problem.
Challenge 5: Custom Learning Rate Scheduler
import torch
import torch.nn as nn
import math
class WarmupCosineScheduler:
"""Learning rate scheduler: linear warmup + cosine annealing.
Args:
optimizer: PyTorch optimizer.
warmup_steps: Number of warmup steps.
total_steps: Total training steps.
max_lr: Peak learning rate (after warmup).
min_lr: Minimum learning rate (at end of cosine decay).
"""
def __init__(self, optimizer, warmup_steps, total_steps,
max_lr=1e-3, min_lr=1e-6):
self.optimizer = optimizer
self.warmup_steps = warmup_steps
self.total_steps = total_steps
self.max_lr = max_lr
self.min_lr = min_lr
self.current_step = 0
def get_lr(self):
if self.current_step < self.warmup_steps:
# Linear warmup: 0 -> max_lr
return self.max_lr * self.current_step / self.warmup_steps
else:
# Cosine annealing: max_lr -> min_lr
progress = (self.current_step - self.warmup_steps) / \
max(1, self.total_steps - self.warmup_steps)
return self.min_lr + 0.5 * (self.max_lr - self.min_lr) * \
(1 + math.cos(math.pi * progress))
def step(self):
lr = self.get_lr()
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr
self.current_step += 1
return lr
# Test
model = nn.Linear(10, 2)
optimizer = torch.optim.Adam(model.parameters(), lr=0)
scheduler = WarmupCosineScheduler(
optimizer, warmup_steps=100, total_steps=1000,
max_lr=1e-3, min_lr=1e-6
)
# Simulate training and record LR
lrs = []
for step in range(1000):
lr = scheduler.step()
lrs.append(lr)
print(f"Step 0: LR = {lrs[0]:.8f}") # ~0 (start of warmup)
print(f"Step 50: LR = {lrs[50]:.8f}") # ~0.0005 (mid warmup)
print(f"Step 100: LR = {lrs[100]:.8f}") # ~0.001 (peak)
print(f"Step 500: LR = {lrs[500]:.8f}") # ~0.0005 (mid decay)
print(f"Step 999: LR = {lrs[999]:.8f}") # ~0.000001 (near min)
Interviewer focus: Understanding the warmup + cosine schedule (used in BERT, GPT, Vision Transformers). The warmup prevents early training instability with large LR. Cosine decay is smoother than step decay and often yields better final performance.
Challenge 6: Implement Multi-Head Attention
nn.MultiheadAttention). Split queries, keys, and values into multiple heads, compute attention per head, then concatenate and project.import torch
import torch.nn as nn
import torch.nn.functional as F
import math
class MultiHeadAttention(nn.Module):
"""Multi-Head Attention from scratch.
Args:
embed_dim: Total embedding dimension.
num_heads: Number of attention heads.
dropout: Dropout rate.
"""
def __init__(self, embed_dim, num_heads, dropout=0.1):
super().__init__()
assert embed_dim % num_heads == 0, \
"embed_dim must be divisible by num_heads"
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
# Combined projections for efficiency
self.qkv_proj = nn.Linear(embed_dim, 3 * embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
"""
Args:
x: Input of shape (batch, seq_len, embed_dim).
mask: Optional mask of shape (batch, seq_len).
Returns:
Output of shape (batch, seq_len, embed_dim).
"""
batch, seq_len, _ = x.shape
# Project to Q, K, V in one shot
qkv = self.qkv_proj(x) # (batch, seq_len, 3 * embed_dim)
qkv = qkv.reshape(batch, seq_len, 3, self.num_heads, self.head_dim)
qkv = qkv.permute(2, 0, 3, 1, 4)
Q, K, V = qkv[0], qkv[1], qkv[2]
# Each: (batch, num_heads, seq_len, head_dim)
# Scaled dot-product attention
scale = math.sqrt(self.head_dim)
scores = torch.matmul(Q, K.transpose(-2, -1)) / scale
# (batch, num_heads, seq_len, seq_len)
if mask is not None:
mask = mask.unsqueeze(1).unsqueeze(2) # (batch, 1, 1, seq_len)
scores = scores.masked_fill(mask, float('-inf'))
attn_weights = F.softmax(scores, dim=-1)
attn_weights = self.dropout(attn_weights)
# Apply attention to values
context = torch.matmul(attn_weights, V)
# (batch, num_heads, seq_len, head_dim)
# Concatenate heads
context = context.transpose(1, 2).contiguous()
context = context.reshape(batch, seq_len, self.embed_dim)
# Final projection
output = self.out_proj(context)
return output
# Test
batch, seq_len, embed_dim, num_heads = 2, 10, 64, 8
x = torch.randn(batch, seq_len, embed_dim)
mha = MultiHeadAttention(embed_dim, num_heads)
output = mha(x)
print(f"Input shape: {x.shape}") # (2, 10, 64)
print(f"Output shape: {output.shape}") # (2, 10, 64)
# With mask
mask = torch.zeros(batch, seq_len, dtype=torch.bool)
mask[0, 7:] = True # Mask last 3 positions in first sequence
output_masked = mha(x, mask)
print(f"Masked output shape: {output_masked.shape}") # (2, 10, 64)
Interviewer focus: The combined QKV projection for efficiency, correct reshaping for multi-head splitting (reshape + permute), proper masking before softmax, and the final concatenation + projection. This is the core building block of Transformers.
Challenge 7: Model Debugging — Find the Bug
# BUGGY CODE - 6 bugs to find
import torch
import torch.nn as nn
class BuggyModel(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(10, 64)
self.fc2 = nn.Linear(64, 32)
self.fc3 = nn.Linear(32, 3)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.5)
self.bn = nn.BatchNorm1d(64)
def forward(self, x):
x = self.relu(self.bn(self.fc1(x)))
x = self.dropout(x)
x = self.relu(self.fc2(x))
x = self.fc3(x) # No activation after last layer (logits)
return x
# BUGGY TRAINING LOOP
# model = BuggyModel()
# optimizer = torch.optim.SGD(model.parameters(), lr=10) # BUG 1: LR too high
# criterion = nn.CrossEntropyLoss()
#
# for epoch in range(100):
# for x, y in train_loader:
# outputs = model(x)
# loss = criterion(outputs, y)
# loss.backward()
# optimizer.step() # BUG 2: Missing optimizer.zero_grad()
#
# # Validation
# val_loss = 0
# for x, y in val_loader: # BUG 3: Missing model.eval() and torch.no_grad()
# outputs = model(x)
# loss = criterion(outputs, y)
# val_loss += loss.item()
#
# # BUG 4: Not switching back to model.train() after validation
# # BUG 5: Dropout is active during validation (model.eval() fixes this)
# # BUG 6: BatchNorm uses batch stats during validation instead of running stats
# FIXED CODE
model = BuggyModel()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) # FIX 1: Reasonable LR
criterion = nn.CrossEntropyLoss()
for epoch in range(100):
model.train() # FIX 4: Ensure train mode at start of each epoch
for x, y in train_loader:
optimizer.zero_grad() # FIX 2: Zero gradients before backward
outputs = model(x)
loss = criterion(outputs, y)
loss.backward()
optimizer.step()
# Validation
model.eval() # FIX 3, 5, 6: Disables dropout, uses running BN stats
val_loss = 0
with torch.no_grad(): # FIX 3: No gradient computation during eval
for x, y in val_loader:
outputs = model(x)
loss = criterion(outputs, y)
val_loss += loss.item()
print(f"Epoch {epoch+1}: Val Loss = {val_loss/len(val_loader):.4f}")
Interviewer focus: Bug-finding questions test your debugging experience. The most critical bugs: missing zero_grad() (causes gradient accumulation), missing model.eval() (affects dropout and batchnorm), and missing torch.no_grad() (wastes memory and compute).
Challenge 8: Transfer Learning Pipeline
import torch
import torch.nn as nn
# Simulate a pre-trained backbone (in real code: torchvision.models.resnet18)
class MockResNet(nn.Module):
"""Simulates a pre-trained ResNet backbone."""
def __init__(self):
super().__init__()
self.features = nn.Sequential(
nn.Linear(512, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
)
self.fc = nn.Linear(128, 1000) # ImageNet head
def forward(self, x):
features = self.features(x)
return self.fc(features)
def setup_transfer_learning(pretrained_model, num_classes, freeze=True):
"""Set up model for transfer learning.
Args:
pretrained_model: Pre-trained model.
num_classes: Number of target classes.
freeze: Whether to freeze backbone initially.
Returns:
Modified model ready for fine-tuning.
"""
# Step 1: Freeze backbone parameters
if freeze:
for param in pretrained_model.features.parameters():
param.requires_grad = False
# Step 2: Replace classification head
in_features = pretrained_model.fc.in_features
pretrained_model.fc = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(in_features, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, num_classes)
)
return pretrained_model
def get_optimizer_groups(model, head_lr=1e-3, backbone_lr=1e-5):
"""Create parameter groups with different learning rates.
The new head gets a higher LR, the pre-trained backbone gets a lower LR.
"""
head_params = list(model.fc.parameters())
head_param_ids = set(id(p) for p in head_params)
backbone_params = [p for p in model.parameters()
if id(p) not in head_param_ids and p.requires_grad]
return [
{'params': backbone_params, 'lr': backbone_lr},
{'params': head_params, 'lr': head_lr},
]
# Phase 1: Train only the new head (backbone frozen)
model = MockResNet()
model = setup_transfer_learning(model, num_classes=5, freeze=True)
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
total = sum(p.numel() for p in model.parameters())
print(f"Phase 1 - Trainable: {trainable}/{total} parameters")
optimizer = torch.optim.Adam(
filter(lambda p: p.requires_grad, model.parameters()),
lr=1e-3
)
# ... train for a few epochs ...
# Phase 2: Unfreeze backbone and fine-tune with lower LR
for param in model.features.parameters():
param.requires_grad = True
param_groups = get_optimizer_groups(model, head_lr=1e-4, backbone_lr=1e-5)
optimizer = torch.optim.Adam(param_groups)
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Phase 2 - Trainable: {trainable}/{total} parameters")
print(f"Backbone LR: {optimizer.param_groups[0]['lr']}")
print(f"Head LR: {optimizer.param_groups[1]['lr']}")
Interviewer focus: The two-phase training strategy (freeze then unfreeze), differential learning rates for backbone vs head, and using filter(lambda p: p.requires_grad, ...) to only optimize unfrozen parameters. This is standard practice for fine-tuning pre-trained models.
Lilly Tech Systems