Intermediate

K-Means & KNN

Clustering and nearest neighbor algorithms are interview staples. K-means tests your understanding of iterative optimization and initialization strategies. KNN tests distance metric implementation and efficient neighbor search. We also implement DBSCAN for density-based clustering.

K-Means Clustering

K-means partitions data into K clusters by iteratively assigning points to the nearest centroid and recomputing centroids as cluster means.

import numpy as np

class KMeans:
    """K-Means Clustering from scratch."""

    def __init__(self, n_clusters=3, max_iterations=100, tol=1e-6,
                 init='kmeans++', random_state=None):
        self.n_clusters = n_clusters
        self.max_iter = max_iterations
        self.tol = tol
        self.init = init
        self.random_state = random_state
        self.centroids = None
        self.labels = None
        self.inertia_ = None  # Sum of squared distances to nearest centroid

    def _init_centroids_random(self, X):
        """Random initialization: pick K random data points."""
        rng = np.random.RandomState(self.random_state)
        idx = rng.choice(X.shape[0], size=self.n_clusters, replace=False)
        return X[idx].copy()

    def _init_centroids_kmeanspp(self, X):
        """K-means++ initialization: spread initial centroids apart.

        1. Choose first centroid randomly
        2. For each subsequent centroid, choose point with probability
           proportional to squared distance from nearest existing centroid
        """
        rng = np.random.RandomState(self.random_state)
        n_samples = X.shape[0]
        centroids = np.zeros((self.n_clusters, X.shape[1]))

        # First centroid: random
        centroids[0] = X[rng.randint(n_samples)]

        for k in range(1, self.n_clusters):
            # Compute distances from each point to nearest centroid
            distances = np.min([
                np.sum((X - centroids[j]) ** 2, axis=1)
                for j in range(k)
            ], axis=0)

            # Choose next centroid with probability proportional to distance^2
            probs = distances / distances.sum()
            idx = rng.choice(n_samples, p=probs)
            centroids[k] = X[idx]

        return centroids

    def fit(self, X):
        # Initialize centroids
        if self.init == 'kmeans++':
            self.centroids = self._init_centroids_kmeanspp(X)
        else:
            self.centroids = self._init_centroids_random(X)

        for iteration in range(self.max_iter):
            # ---- Assignment step ----
            # Compute distance from each point to each centroid
            # distances shape: (n_samples, n_clusters)
            distances = np.sqrt(np.sum(
                (X[:, np.newaxis] - self.centroids[np.newaxis]) ** 2,
                axis=2
            ))
            self.labels = np.argmin(distances, axis=1)

            # ---- Update step ----
            new_centroids = np.zeros_like(self.centroids)
            for k in range(self.n_clusters):
                cluster_points = X[self.labels == k]
                if len(cluster_points) > 0:
                    new_centroids[k] = cluster_points.mean(axis=0)
                else:
                    # Empty cluster: reinitialize to random point
                    new_centroids[k] = X[np.random.randint(X.shape[0])]

            # Check convergence
            centroid_shift = np.sum((new_centroids - self.centroids) ** 2)
            self.centroids = new_centroids

            if centroid_shift < self.tol:
                break

        # Compute inertia (sum of squared distances)
        self.inertia_ = sum(
            np.sum((X[self.labels == k] - self.centroids[k]) ** 2)
            for k in range(self.n_clusters)
        )

        return self

    def predict(self, X):
        """Assign new points to nearest centroid."""
        distances = np.sqrt(np.sum(
            (X[:, np.newaxis] - self.centroids[np.newaxis]) ** 2,
            axis=2
        ))
        return np.argmin(distances, axis=1)


# ---- Test ----
np.random.seed(42)
X_c0 = np.random.randn(100, 2) + np.array([5, 5])
X_c1 = np.random.randn(100, 2) + np.array([-5, -5])
X_c2 = np.random.randn(100, 2) + np.array([5, -5])
X = np.vstack([X_c0, X_c1, X_c2])

kmeans = KMeans(n_clusters=3, init='kmeans++', random_state=42)
kmeans.fit(X)
print(f"Centroids:\n{kmeans.centroids}")
print(f"Inertia: {kmeans.inertia_:.2f}")
print(f"Cluster sizes: {[np.sum(kmeans.labels == k) for k in range(3)]}")
💡
Interview tip: Always implement K-means++ initialization, not random. K-means++ gives much better results by spreading initial centroids apart. The key insight: each new centroid is chosen with probability proportional to D(x)^2, the squared distance from the nearest existing centroid.

K-Nearest Neighbors (KNN)

KNN is a non-parametric algorithm: no training step, just store the data and classify new points by majority vote of their K nearest neighbors.

class KNearestNeighbors:
    """K-Nearest Neighbors Classifier from scratch."""

    def __init__(self, n_neighbors=5, metric='euclidean', weighted=False):
        self.k = n_neighbors
        self.metric = metric
        self.weighted = weighted
        self.X_train = None
        self.y_train = None

    def fit(self, X, y):
        """KNN is lazy: just store the training data."""
        self.X_train = X.copy()
        self.y_train = y.copy()
        return self

    def _compute_distances(self, X):
        """Compute pairwise distances between X and training data.
        Returns shape: (n_test, n_train)
        """
        if self.metric == 'euclidean':
            # Efficient vectorized Euclidean distance
            # ||a - b||^2 = ||a||^2 + ||b||^2 - 2*a.b
            X_sq = np.sum(X ** 2, axis=1, keepdims=True)       # (n_test, 1)
            train_sq = np.sum(self.X_train ** 2, axis=1)       # (n_train,)
            cross = X @ self.X_train.T                          # (n_test, n_train)
            distances = np.sqrt(np.maximum(X_sq + train_sq - 2 * cross, 0))

        elif self.metric == 'manhattan':
            # Manhattan: sum of absolute differences
            distances = np.sum(
                np.abs(X[:, np.newaxis] - self.X_train[np.newaxis]),
                axis=2
            )

        elif self.metric == 'cosine':
            # Cosine distance = 1 - cosine_similarity
            X_norm = X / np.linalg.norm(X, axis=1, keepdims=True)
            train_norm = self.X_train / np.linalg.norm(
                self.X_train, axis=1, keepdims=True
            )
            similarity = X_norm @ train_norm.T
            distances = 1 - similarity

        else:
            raise ValueError(f"Unknown metric: {self.metric}")

        return distances

    def predict(self, X):
        distances = self._compute_distances(X)
        predictions = np.zeros(X.shape[0], dtype=self.y_train.dtype)

        for i in range(X.shape[0]):
            # Find K nearest neighbors
            k_nearest_idx = np.argsort(distances[i])[:self.k]
            k_nearest_labels = self.y_train[k_nearest_idx]

            if self.weighted:
                # Distance-weighted voting
                k_distances = distances[i][k_nearest_idx]
                # Add epsilon to avoid division by zero
                weights = 1.0 / (k_distances + 1e-10)

                classes = np.unique(k_nearest_labels)
                weighted_votes = np.zeros(len(classes))
                for j, cls in enumerate(classes):
                    weighted_votes[j] = np.sum(
                        weights[k_nearest_labels == cls]
                    )
                predictions[i] = classes[np.argmax(weighted_votes)]
            else:
                # Simple majority vote
                values, counts = np.unique(k_nearest_labels,
                                           return_counts=True)
                predictions[i] = values[np.argmax(counts)]

        return predictions

    def accuracy(self, X, y):
        return np.mean(self.predict(X) == y)


# ---- Test ----
np.random.seed(42)
X_train = np.vstack([
    np.random.randn(50, 2) + [2, 2],
    np.random.randn(50, 2) + [-2, -2]
])
y_train = np.array([0]*50 + [1]*50)

X_test = np.vstack([
    np.random.randn(20, 2) + [2, 2],
    np.random.randn(20, 2) + [-2, -2]
])
y_test = np.array([0]*20 + [1]*20)

knn = KNearestNeighbors(n_neighbors=5, metric='euclidean')
knn.fit(X_train, y_train)
print(f"KNN accuracy: {knn.accuracy(X_test, y_test):.4f}")

DBSCAN

DBSCAN is a density-based clustering algorithm that can find arbitrarily shaped clusters and identify noise points. It does not require specifying the number of clusters.

class DBSCAN:
    """DBSCAN clustering from scratch.

    Parameters:
        eps: maximum distance between two points to be neighbors
        min_samples: minimum points in eps-neighborhood to be a core point
    """

    def __init__(self, eps=0.5, min_samples=5):
        self.eps = eps
        self.min_samples = min_samples
        self.labels = None

    def fit(self, X):
        n_samples = X.shape[0]
        self.labels = np.full(n_samples, -1)  # -1 = unvisited/noise

        # Precompute pairwise distances
        distances = np.sqrt(np.sum(
            (X[:, np.newaxis] - X[np.newaxis]) ** 2, axis=2
        ))

        # Find neighbors for each point
        neighborhoods = [
            np.where(distances[i] <= self.eps)[0]
            for i in range(n_samples)
        ]

        # Identify core points
        core_mask = np.array([
            len(neighbors) >= self.min_samples
            for neighbors in neighborhoods
        ])

        cluster_id = 0
        for i in range(n_samples):
            if self.labels[i] != -1:  # Already assigned
                continue
            if not core_mask[i]:      # Not a core point
                continue

            # Start new cluster from this core point
            self._expand_cluster(
                i, neighborhoods, core_mask, cluster_id
            )
            cluster_id += 1

        return self

    def _expand_cluster(self, seed_idx, neighborhoods, core_mask, cluster_id):
        """Expand cluster from a core point using BFS."""
        queue = [seed_idx]
        self.labels[seed_idx] = cluster_id

        while queue:
            current = queue.pop(0)
            neighbors = neighborhoods[current]

            for neighbor in neighbors:
                if self.labels[neighbor] == -1:  # Unvisited
                    self.labels[neighbor] = cluster_id

                    # If neighbor is also a core point, expand from it
                    if core_mask[neighbor]:
                        queue.append(neighbor)


# ---- Test ----
np.random.seed(42)
# Two circular clusters + noise
theta = np.linspace(0, 2*np.pi, 100)
X_circle1 = np.column_stack([np.cos(theta), np.sin(theta)]) * 2 + \
            np.random.randn(100, 2) * 0.1
X_circle2 = np.column_stack([np.cos(theta), np.sin(theta)]) * 5 + \
            np.random.randn(100, 2) * 0.1
X_noise = np.random.uniform(-7, 7, (20, 2))
X_db = np.vstack([X_circle1, X_circle2, X_noise])

dbscan = DBSCAN(eps=0.5, min_samples=5)
dbscan.fit(X_db)
n_clusters = len(set(dbscan.labels)) - (1 if -1 in dbscan.labels else 0)
n_noise = np.sum(dbscan.labels == -1)
print(f"DBSCAN found {n_clusters} clusters and {n_noise} noise points")
Common interview mistake: In the KNN distance computation, computing np.sqrt(negative_number) due to floating-point errors in the ||a||^2 + ||b||^2 - 2*a.b formula. Always wrap with np.maximum(..., 0) before taking the square root.

Key Takeaways

💡
  • K-means alternates between assignment (nearest centroid) and update (recompute means)
  • K-means++ initialization selects centroids proportional to D(x)^2 for better convergence
  • KNN is a lazy learner: no training, just store data and query at prediction time
  • Vectorized Euclidean distance: ||a-b||^2 = ||a||^2 + ||b||^2 - 2*a.b
  • DBSCAN finds arbitrary-shaped clusters and requires eps and min_samples, not K
  • Always handle empty clusters in K-means (reinitialize centroid to random point)