Intermediate
K-Means & KNN
Clustering and nearest neighbor algorithms are interview staples. K-means tests your understanding of iterative optimization and initialization strategies. KNN tests distance metric implementation and efficient neighbor search. We also implement DBSCAN for density-based clustering.
K-Means Clustering
K-means partitions data into K clusters by iteratively assigning points to the nearest centroid and recomputing centroids as cluster means.
import numpy as np
class KMeans:
"""K-Means Clustering from scratch."""
def __init__(self, n_clusters=3, max_iterations=100, tol=1e-6,
init='kmeans++', random_state=None):
self.n_clusters = n_clusters
self.max_iter = max_iterations
self.tol = tol
self.init = init
self.random_state = random_state
self.centroids = None
self.labels = None
self.inertia_ = None # Sum of squared distances to nearest centroid
def _init_centroids_random(self, X):
"""Random initialization: pick K random data points."""
rng = np.random.RandomState(self.random_state)
idx = rng.choice(X.shape[0], size=self.n_clusters, replace=False)
return X[idx].copy()
def _init_centroids_kmeanspp(self, X):
"""K-means++ initialization: spread initial centroids apart.
1. Choose first centroid randomly
2. For each subsequent centroid, choose point with probability
proportional to squared distance from nearest existing centroid
"""
rng = np.random.RandomState(self.random_state)
n_samples = X.shape[0]
centroids = np.zeros((self.n_clusters, X.shape[1]))
# First centroid: random
centroids[0] = X[rng.randint(n_samples)]
for k in range(1, self.n_clusters):
# Compute distances from each point to nearest centroid
distances = np.min([
np.sum((X - centroids[j]) ** 2, axis=1)
for j in range(k)
], axis=0)
# Choose next centroid with probability proportional to distance^2
probs = distances / distances.sum()
idx = rng.choice(n_samples, p=probs)
centroids[k] = X[idx]
return centroids
def fit(self, X):
# Initialize centroids
if self.init == 'kmeans++':
self.centroids = self._init_centroids_kmeanspp(X)
else:
self.centroids = self._init_centroids_random(X)
for iteration in range(self.max_iter):
# ---- Assignment step ----
# Compute distance from each point to each centroid
# distances shape: (n_samples, n_clusters)
distances = np.sqrt(np.sum(
(X[:, np.newaxis] - self.centroids[np.newaxis]) ** 2,
axis=2
))
self.labels = np.argmin(distances, axis=1)
# ---- Update step ----
new_centroids = np.zeros_like(self.centroids)
for k in range(self.n_clusters):
cluster_points = X[self.labels == k]
if len(cluster_points) > 0:
new_centroids[k] = cluster_points.mean(axis=0)
else:
# Empty cluster: reinitialize to random point
new_centroids[k] = X[np.random.randint(X.shape[0])]
# Check convergence
centroid_shift = np.sum((new_centroids - self.centroids) ** 2)
self.centroids = new_centroids
if centroid_shift < self.tol:
break
# Compute inertia (sum of squared distances)
self.inertia_ = sum(
np.sum((X[self.labels == k] - self.centroids[k]) ** 2)
for k in range(self.n_clusters)
)
return self
def predict(self, X):
"""Assign new points to nearest centroid."""
distances = np.sqrt(np.sum(
(X[:, np.newaxis] - self.centroids[np.newaxis]) ** 2,
axis=2
))
return np.argmin(distances, axis=1)
# ---- Test ----
np.random.seed(42)
X_c0 = np.random.randn(100, 2) + np.array([5, 5])
X_c1 = np.random.randn(100, 2) + np.array([-5, -5])
X_c2 = np.random.randn(100, 2) + np.array([5, -5])
X = np.vstack([X_c0, X_c1, X_c2])
kmeans = KMeans(n_clusters=3, init='kmeans++', random_state=42)
kmeans.fit(X)
print(f"Centroids:\n{kmeans.centroids}")
print(f"Inertia: {kmeans.inertia_:.2f}")
print(f"Cluster sizes: {[np.sum(kmeans.labels == k) for k in range(3)]}")
Interview tip: Always implement K-means++ initialization, not random. K-means++ gives much better results by spreading initial centroids apart. The key insight: each new centroid is chosen with probability proportional to D(x)^2, the squared distance from the nearest existing centroid.
K-Nearest Neighbors (KNN)
KNN is a non-parametric algorithm: no training step, just store the data and classify new points by majority vote of their K nearest neighbors.
class KNearestNeighbors:
"""K-Nearest Neighbors Classifier from scratch."""
def __init__(self, n_neighbors=5, metric='euclidean', weighted=False):
self.k = n_neighbors
self.metric = metric
self.weighted = weighted
self.X_train = None
self.y_train = None
def fit(self, X, y):
"""KNN is lazy: just store the training data."""
self.X_train = X.copy()
self.y_train = y.copy()
return self
def _compute_distances(self, X):
"""Compute pairwise distances between X and training data.
Returns shape: (n_test, n_train)
"""
if self.metric == 'euclidean':
# Efficient vectorized Euclidean distance
# ||a - b||^2 = ||a||^2 + ||b||^2 - 2*a.b
X_sq = np.sum(X ** 2, axis=1, keepdims=True) # (n_test, 1)
train_sq = np.sum(self.X_train ** 2, axis=1) # (n_train,)
cross = X @ self.X_train.T # (n_test, n_train)
distances = np.sqrt(np.maximum(X_sq + train_sq - 2 * cross, 0))
elif self.metric == 'manhattan':
# Manhattan: sum of absolute differences
distances = np.sum(
np.abs(X[:, np.newaxis] - self.X_train[np.newaxis]),
axis=2
)
elif self.metric == 'cosine':
# Cosine distance = 1 - cosine_similarity
X_norm = X / np.linalg.norm(X, axis=1, keepdims=True)
train_norm = self.X_train / np.linalg.norm(
self.X_train, axis=1, keepdims=True
)
similarity = X_norm @ train_norm.T
distances = 1 - similarity
else:
raise ValueError(f"Unknown metric: {self.metric}")
return distances
def predict(self, X):
distances = self._compute_distances(X)
predictions = np.zeros(X.shape[0], dtype=self.y_train.dtype)
for i in range(X.shape[0]):
# Find K nearest neighbors
k_nearest_idx = np.argsort(distances[i])[:self.k]
k_nearest_labels = self.y_train[k_nearest_idx]
if self.weighted:
# Distance-weighted voting
k_distances = distances[i][k_nearest_idx]
# Add epsilon to avoid division by zero
weights = 1.0 / (k_distances + 1e-10)
classes = np.unique(k_nearest_labels)
weighted_votes = np.zeros(len(classes))
for j, cls in enumerate(classes):
weighted_votes[j] = np.sum(
weights[k_nearest_labels == cls]
)
predictions[i] = classes[np.argmax(weighted_votes)]
else:
# Simple majority vote
values, counts = np.unique(k_nearest_labels,
return_counts=True)
predictions[i] = values[np.argmax(counts)]
return predictions
def accuracy(self, X, y):
return np.mean(self.predict(X) == y)
# ---- Test ----
np.random.seed(42)
X_train = np.vstack([
np.random.randn(50, 2) + [2, 2],
np.random.randn(50, 2) + [-2, -2]
])
y_train = np.array([0]*50 + [1]*50)
X_test = np.vstack([
np.random.randn(20, 2) + [2, 2],
np.random.randn(20, 2) + [-2, -2]
])
y_test = np.array([0]*20 + [1]*20)
knn = KNearestNeighbors(n_neighbors=5, metric='euclidean')
knn.fit(X_train, y_train)
print(f"KNN accuracy: {knn.accuracy(X_test, y_test):.4f}")
DBSCAN
DBSCAN is a density-based clustering algorithm that can find arbitrarily shaped clusters and identify noise points. It does not require specifying the number of clusters.
class DBSCAN:
"""DBSCAN clustering from scratch.
Parameters:
eps: maximum distance between two points to be neighbors
min_samples: minimum points in eps-neighborhood to be a core point
"""
def __init__(self, eps=0.5, min_samples=5):
self.eps = eps
self.min_samples = min_samples
self.labels = None
def fit(self, X):
n_samples = X.shape[0]
self.labels = np.full(n_samples, -1) # -1 = unvisited/noise
# Precompute pairwise distances
distances = np.sqrt(np.sum(
(X[:, np.newaxis] - X[np.newaxis]) ** 2, axis=2
))
# Find neighbors for each point
neighborhoods = [
np.where(distances[i] <= self.eps)[0]
for i in range(n_samples)
]
# Identify core points
core_mask = np.array([
len(neighbors) >= self.min_samples
for neighbors in neighborhoods
])
cluster_id = 0
for i in range(n_samples):
if self.labels[i] != -1: # Already assigned
continue
if not core_mask[i]: # Not a core point
continue
# Start new cluster from this core point
self._expand_cluster(
i, neighborhoods, core_mask, cluster_id
)
cluster_id += 1
return self
def _expand_cluster(self, seed_idx, neighborhoods, core_mask, cluster_id):
"""Expand cluster from a core point using BFS."""
queue = [seed_idx]
self.labels[seed_idx] = cluster_id
while queue:
current = queue.pop(0)
neighbors = neighborhoods[current]
for neighbor in neighbors:
if self.labels[neighbor] == -1: # Unvisited
self.labels[neighbor] = cluster_id
# If neighbor is also a core point, expand from it
if core_mask[neighbor]:
queue.append(neighbor)
# ---- Test ----
np.random.seed(42)
# Two circular clusters + noise
theta = np.linspace(0, 2*np.pi, 100)
X_circle1 = np.column_stack([np.cos(theta), np.sin(theta)]) * 2 + \
np.random.randn(100, 2) * 0.1
X_circle2 = np.column_stack([np.cos(theta), np.sin(theta)]) * 5 + \
np.random.randn(100, 2) * 0.1
X_noise = np.random.uniform(-7, 7, (20, 2))
X_db = np.vstack([X_circle1, X_circle2, X_noise])
dbscan = DBSCAN(eps=0.5, min_samples=5)
dbscan.fit(X_db)
n_clusters = len(set(dbscan.labels)) - (1 if -1 in dbscan.labels else 0)
n_noise = np.sum(dbscan.labels == -1)
print(f"DBSCAN found {n_clusters} clusters and {n_noise} noise points")
Common interview mistake: In the KNN distance computation, computing
np.sqrt(negative_number) due to floating-point errors in the ||a||^2 + ||b||^2 - 2*a.b formula. Always wrap with np.maximum(..., 0) before taking the square root.Key Takeaways
- K-means alternates between assignment (nearest centroid) and update (recompute means)
- K-means++ initialization selects centroids proportional to D(x)^2 for better convergence
- KNN is a lazy learner: no training, just store data and query at prediction time
- Vectorized Euclidean distance:
||a-b||^2 = ||a||^2 + ||b||^2 - 2*a.b - DBSCAN finds arbitrary-shaped clusters and requires eps and min_samples, not K
- Always handle empty clusters in K-means (reinitialize centroid to random point)
Lilly Tech Systems