Advanced

Performance & Vectorization

Five challenges that test your ability to eliminate Python loops and write high-performance NumPy code. These patterns separate candidates who merely use NumPy from those who truly understand how to make it fast.

Challenge 1: Replace Loops with Vectorized Ops

Problem: Rewrite each of the following loop-based implementations as a single vectorized NumPy expression. Each has a one-line or two-line vectorized solution.

import numpy as np

# ---- LOOP VERSIONS (rewrite these) ----

# Problem A: Running sum
def running_sum_loop(x):
    result = np.zeros_like(x)
    total = 0
    for i in range(len(x)):
        total += x[i]
        result[i] = total
    return result

# Problem B: Pairwise products
def pairwise_products_loop(a, b):
    M, N = len(a), len(b)
    result = np.zeros((M, N))
    for i in range(M):
        for j in range(N):
            result[i, j] = a[i] * b[j]
    return result

# Problem C: Window mean (moving average)
def window_mean_loop(x, w):
    n = len(x) - w + 1
    result = np.zeros(n)
    for i in range(n):
        result[i] = x[i:i+w].mean()
    return result

# Problem D: Conditional replacement
def replace_negatives_loop(X):
    result = X.copy()
    for i in range(X.shape[0]):
        for j in range(X.shape[1]):
            if result[i, j] < 0:
                result[i, j] = 0
    return result

# Problem E: Row-wise argmax one-hot
def argmax_onehot_loop(X):
    N, C = X.shape
    result = np.zeros_like(X)
    for i in range(N):
        j = 0
        for k in range(1, C):
            if X[i, k] > X[i, j]:
                j = k
        result[i, j] = 1
    return result

# ---- VECTORIZED SOLUTIONS ----

# Solution A: Running sum -> cumsum
def running_sum_vec(x):
    return np.cumsum(x)

# Solution B: Pairwise products -> outer product
def pairwise_products_vec(a, b):
    return np.outer(a, b)
    # Alternative: a[:, None] * b[None, :]

# Solution C: Window mean -> stride tricks or cumsum trick
def window_mean_vec(x, w):
    # Cumsum trick: mean of x[i:i+w] = (cumsum[i+w] - cumsum[i]) / w
    cs = np.cumsum(x)
    cs = np.insert(cs, 0, 0)  # prepend 0
    return (cs[w:] - cs[:-w]) / w

# Solution D: Conditional replacement -> np.maximum
def replace_negatives_vec(X):
    return np.maximum(X, 0)
    # This is ReLU!

# Solution E: Row-wise argmax one-hot
def argmax_onehot_vec(X):
    result = np.zeros_like(X)
    result[np.arange(X.shape[0]), np.argmax(X, axis=1)] = 1
    return result

# ---- VERIFICATION ----
np.random.seed(42)

# A
x = np.random.randn(1000)
assert np.allclose(running_sum_loop(x), running_sum_vec(x))

# B
a, b = np.random.randn(50), np.random.randn(60)
assert np.allclose(pairwise_products_loop(a, b), pairwise_products_vec(a, b))

# C
x = np.random.randn(100)
assert np.allclose(window_mean_loop(x, 5), window_mean_vec(x, 5))

# D
X = np.random.randn(100, 50)
assert np.allclose(replace_negatives_loop(X), replace_negatives_vec(X))

# E
X = np.random.randn(100, 10)
assert np.allclose(argmax_onehot_loop(X), argmax_onehot_vec(X))

print("Challenge 1 passed! All 5 vectorizations correct.")

Challenge 2: Einsum Mastery

Problem: Rewrite each operation using np.einsum. Einsum is the Swiss Army knife of NumPy — it can express any tensor contraction in a single readable call. Mastering it is a strong interview signal.

💡
Einsum notation: Repeated indices are summed over. Free indices appear in the output. Example: 'ij,jk->ik' means sum over j, producing matrix multiply.
import numpy as np

# ---- CHALLENGE: Rewrite each using np.einsum ----

np.random.seed(42)
A = np.random.randn(3, 4)
B = np.random.randn(4, 5)
v = np.random.randn(4)
X = np.random.randn(8, 3, 4)   # batch of matrices
Y = np.random.randn(8, 4, 5)   # batch of matrices

# ---- SOLUTIONS ----

# 1. Matrix multiply: A @ B
result1 = np.einsum('ij,jk->ik', A, B)
assert np.allclose(result1, A @ B)

# 2. Dot product: a . b
a, b = np.random.randn(5), np.random.randn(5)
result2 = np.einsum('i,i->', a, b)
assert np.allclose(result2, np.dot(a, b))

# 3. Outer product: a outer b
result3 = np.einsum('i,j->ij', a, b)
assert np.allclose(result3, np.outer(a, b))

# 4. Trace: sum of diagonal elements
result4 = np.einsum('ii->', np.random.randn(4, 4).astype(float))
M = np.random.randn(4, 4)
assert np.allclose(np.einsum('ii->', M), np.trace(M))

# 5. Batch matrix multiply: X[b] @ Y[b] for each b
result5 = np.einsum('bij,bjk->bik', X, Y)
expected5 = np.array([X[i] @ Y[i] for i in range(8)])
assert np.allclose(result5, expected5)

# 6. Matrix transpose
result6 = np.einsum('ij->ji', A)
assert np.allclose(result6, A.T)

# 7. Row-wise dot product: sum(A * B, axis=1) for matching-dimension matrices
C = np.random.randn(10, 5)
D = np.random.randn(10, 5)
result7 = np.einsum('ij,ij->i', C, D)
assert np.allclose(result7, (C * D).sum(axis=1))

# 8. Bilinear form: x^T M x
x = np.random.randn(4)
M = np.random.randn(4, 4)
result8 = np.einsum('i,ij,j->', x, M, x)
assert np.allclose(result8, x @ M @ x)

# 9. Attention scores: Q @ K^T / sqrt(d) for batched multi-head attention
B, H, T, D = 2, 4, 8, 16   # batch, heads, seq_len, dim
Q = np.random.randn(B, H, T, D)
K = np.random.randn(B, H, T, D)
scores = np.einsum('bhid,bhjd->bhij', Q, K) / np.sqrt(D)
expected_scores = np.zeros((B, H, T, T))
for bi in range(B):
    for hi in range(H):
        expected_scores[bi, hi] = Q[bi, hi] @ K[bi, hi].T / np.sqrt(D)
assert np.allclose(scores, expected_scores)

print("Challenge 2 passed! All 9 einsum operations correct.")

Challenge 3: Memory-Efficient Operations

Problem: Implement operations that minimize memory usage. Key concepts: views vs. copies, in-place operations, chunked processing, and the out parameter.

import numpy as np

# ---- CHALLENGE ----
def memory_efficient_normalize(X):
    """Normalize X in-place without creating unnecessary copies."""
    pass

def chunked_pairwise_distance(A, B, chunk_size=1000):
    """Compute pairwise distances in chunks to limit memory usage."""
    pass

# ---- SOLUTION ----
def memory_efficient_normalize(X):
    """In-place z-score normalization."""
    # Compute stats (these are small: shape (D,))
    mean = X.mean(axis=0)
    std = X.std(axis=0)
    std[std == 0] = 1.0

    # In-place operations: modify X directly
    X -= mean            # in-place subtract (no new array)
    X /= std             # in-place divide (no new array)
    return X  # same object, modified

def chunked_pairwise_distance(A, B, chunk_size=1000):
    """Process distance computation in chunks.
    For A (M, D) and B (N, D), avoids creating (M, N, D) intermediate.
    """
    M, D = A.shape
    N = B.shape[0]
    distances = np.zeros((M, N))

    # Pre-compute B squared norms (reused across chunks)
    B_sq = np.sum(B ** 2, axis=1)  # (N,)

    for start in range(0, M, chunk_size):
        end = min(start + chunk_size, M)
        A_chunk = A[start:end]                    # (chunk, D) - this is a VIEW
        A_sq = np.sum(A_chunk ** 2, axis=1, keepdims=True)  # (chunk, 1)
        cross = A_chunk @ B.T                      # (chunk, N)

        dist_sq = A_sq + B_sq - 2 * cross          # (chunk, N)
        np.maximum(dist_sq, 0, out=dist_sq)        # in-place clip
        np.sqrt(dist_sq, out=dist_sq)              # in-place sqrt

        distances[start:end] = dist_sq

    return distances

# ---- VERIFICATION ----
# Test in-place normalization
X = np.random.randn(100, 10).copy()  # ensure it owns its data
X_id = id(X)
result = memory_efficient_normalize(X)
assert id(result) == X_id, "Should return the same array (in-place)"
assert np.allclose(X.mean(axis=0), 0, atol=1e-10), "Mean should be ~0"

# Test chunked distance
A = np.random.randn(500, 64)
B = np.random.randn(300, 64)

dist_chunked = chunked_pairwise_distance(A, B, chunk_size=100)
# Compare with full computation
A_sq = np.sum(A ** 2, axis=1, keepdims=True)
B_sq = np.sum(B ** 2, axis=1, keepdims=True)
dist_full = np.sqrt(np.maximum(A_sq + B_sq.T - 2 * A @ B.T, 0))
assert np.allclose(dist_chunked, dist_full, atol=1e-6), "Chunked should match full"

# View vs. copy awareness
X = np.arange(12).reshape(3, 4)
row_view = X[0]           # VIEW: shares memory
row_copy = X[0].copy()    # COPY: independent
assert np.shares_memory(X, row_view), "Slicing creates views"
assert not np.shares_memory(X, row_copy), "copy() creates independent array"

transpose_view = X.T      # VIEW
assert np.shares_memory(X, transpose_view), "Transpose is a view"

fancy_copy = X[[0, 2]]    # COPY (fancy indexing always copies)
assert not np.shares_memory(X, fancy_copy), "Fancy indexing creates copies"
print("Challenge 3 passed!")

Challenge 4: Broadcasting Tricks

Problem: Use broadcasting to solve these problems without loops or explicit tiling. Each solution should be 1-3 lines.

import numpy as np

# ---- CHALLENGES AND SOLUTIONS ----

np.random.seed(42)

# Problem A: Create a multiplication table (10x10)
# Expected: result[i,j] = (i+1) * (j+1)
r = np.arange(1, 11)
mult_table = r[:, None] * r[None, :]  # (10, 1) * (1, 10) -> (10, 10)
assert mult_table[0, 0] == 1 and mult_table[9, 9] == 100
assert mult_table[2, 3] == 12  # 3 * 4

# Problem B: Subtract row mean from each element (centering)
X = np.random.randn(5, 4)
X_centered = X - X.mean(axis=1, keepdims=True)
assert np.allclose(X_centered.mean(axis=1), 0)

# Problem C: Divide each row by its L2 norm
norms = np.linalg.norm(X, axis=1, keepdims=True)  # (5, 1)
X_normalized = X / norms                            # (5, 4) / (5, 1)
assert np.allclose(np.linalg.norm(X_normalized, axis=1), 1.0)

# Problem D: Create a distance matrix between 1D points
points = np.array([1.0, 3.0, 5.0, 7.0])
dist_1d = np.abs(points[:, None] - points[None, :])  # (4, 1) - (1, 4) -> (4, 4)
assert dist_1d[0, 3] == 6.0 and dist_1d[1, 2] == 2.0

# Problem E: Apply different thresholds to each feature
X = np.random.randn(100, 5)
thresholds = np.array([0.0, 0.5, -0.5, 1.0, -1.0])  # one per feature
# Boolean mask: (100, 5) > (5,) broadcasts correctly
mask = X > thresholds  # (100, 5) boolean array
assert mask.shape == (100, 5)

# Problem F: Softmax temperature scaling
logits = np.random.randn(10, 5)
temperatures = np.array([0.1, 0.5, 1.0, 2.0, 5.0])  # one per column
scaled = logits / temperatures[None, :]  # (10, 5) / (1, 5)
assert scaled.shape == (10, 5)

# Problem G: Weighted average of multiple feature matrices
matrices = np.random.randn(3, 100, 64)  # 3 feature matrices
weights = np.array([0.5, 0.3, 0.2])     # weights for each
# (3, 1, 1) * (3, 100, 64) -> sum over axis 0 -> (100, 64)
weighted_avg = (weights[:, None, None] * matrices).sum(axis=0)
assert weighted_avg.shape == (100, 64)

print("Challenge 4 passed! All 7 broadcasting tricks correct.")

Challenge 5: Advanced Indexing Patterns

Problem: Use advanced indexing to solve these common ML operations without loops. These patterns appear frequently in loss computation, attention mechanisms, and data augmentation.

import numpy as np

np.random.seed(42)

# ---- Problem A: Gather operation (used in embedding lookup) ----
# Given embedding table E (V, D) and indices idx (N,), fetch embeddings
V, D, N = 10000, 128, 64
E = np.random.randn(V, D)           # vocabulary embeddings
idx = np.random.randint(0, V, N)    # token indices
embeddings = E[idx]                  # (N, D) - fancy indexing
assert embeddings.shape == (N, D)
assert np.array_equal(embeddings[0], E[idx[0]])

# ---- Problem B: Scatter operation (used in gradient accumulation) ----
# Accumulate gradients back to embedding table
grad_output = np.random.randn(N, D)  # gradient from loss
grad_E = np.zeros_like(E)
np.add.at(grad_E, idx, grad_output)  # scatter-add (handles duplicates!)
# If idx has duplicates, simple assignment E[idx] = grad would lose gradients

# ---- Problem C: Diagonal blocks extraction ----
# Extract k-th diagonal from a matrix
M = np.random.randn(5, 5)
main_diag = np.diag(M)            # [M[0,0], M[1,1], ...]
super_diag = np.diag(M, k=1)     # [M[0,1], M[1,2], ...]
sub_diag = np.diag(M, k=-1)      # [M[1,0], M[2,1], ...]
assert len(main_diag) == 5
assert len(super_diag) == 4

# ---- Problem D: Batch diagonal extraction ----
# For a batch of matrices (B, N, N), extract diagonals -> (B, N)
batch_matrices = np.random.randn(8, 4, 4)
# Use advanced indexing: batch_matrices[b, i, i] for each b
B, N_dim = batch_matrices.shape[0], batch_matrices.shape[1]
b_idx = np.arange(B)[:, None]         # (B, 1)
n_idx = np.arange(N_dim)[None, :]     # (1, N)
batch_diags = batch_matrices[b_idx, n_idx, n_idx]  # (B, N)
assert batch_diags.shape == (B, N_dim)
# Verify
for b in range(B):
    assert np.array_equal(batch_diags[b], np.diag(batch_matrices[b]))

# ---- Problem E: Top-k per row with values ----
# For each row, get the k largest values and their indices
X = np.random.randn(100, 50)
k = 5
# Step 1: get indices of k largest per row
top_k_idx = np.argpartition(X, -k, axis=1)[:, -k:]  # (100, k)
# Step 2: get the actual values
row_idx = np.arange(100)[:, None]  # (100, 1)
top_k_vals = X[row_idx, top_k_idx]                    # (100, k)

# Step 3: sort within the top-k (descending)
sort_idx = np.argsort(-top_k_vals, axis=1)
top_k_idx_sorted = np.take_along_axis(top_k_idx, sort_idx, axis=1)
top_k_vals_sorted = np.take_along_axis(top_k_vals, sort_idx, axis=1)

# Verify: first element of each row should be the row max
assert np.allclose(top_k_vals_sorted[:, 0], X.max(axis=1))

# ---- Problem F: Masked fill (attention mask) ----
# Set specific positions to -inf (used in causal attention)
seq_len = 8
# Create causal mask: upper triangle is True (positions to mask)
causal_mask = np.triu(np.ones((seq_len, seq_len), dtype=bool), k=1)
attention_scores = np.random.randn(seq_len, seq_len)
# Apply mask: set future positions to -inf
attention_scores[causal_mask] = -np.inf
# After softmax, future positions will be 0
assert np.all(attention_scores[0, 1:] == -np.inf), "Future positions should be -inf"
assert attention_scores[0, 0] != -np.inf, "Current position should not be masked"

print("Challenge 5 passed! All 6 advanced indexing patterns correct.")

Key Takeaways

💡
  • Every Python for-loop over array elements has a vectorized NumPy equivalent — find it
  • np.einsum can express any tensor operation: matrix multiply, batch operations, traces, contractions
  • Know which operations return views (slicing, reshape, transpose) vs. copies (fancy indexing, boolean indexing)
  • Use out= parameter and in-place operators (+=, *=) to reduce memory allocations
  • For large pairwise computations, process in chunks to avoid creating enormous intermediate arrays
  • np.add.at is the scatter-add operation needed for gradient accumulation with duplicate indices