Advanced

Probability in Code

Probability is the language of uncertainty, and ML is fundamentally about modeling uncertainty. This lesson implements five probability algorithms from scratch: Monte Carlo estimation, sampling methods, Bayesian inference, Markov chains, and pseudorandom number generation.

Problem 1: Monte Carlo Estimation

Problem: Estimate the value of pi using Monte Carlo simulation. Generate random points in a unit square and count what fraction falls inside the unit circle. Also implement Monte Carlo integration for arbitrary functions.

import random
import math

def estimate_pi(n_samples=100000):
    """
    Estimate pi using Monte Carlo: ratio of points inside unit circle
    to total points in a 2x2 square, multiplied by 4.
    Time: O(n), Space: O(1)
    Error: O(1/sqrt(n)) - need 100x more samples for 10x more accuracy
    """
    inside = 0

    for _ in range(n_samples):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if x*x + y*y <= 1:
            inside += 1

    pi_estimate = 4 * inside / n_samples
    return pi_estimate

# Test with increasing sample sizes
for n in [1000, 10000, 100000, 1000000]:
    pi_est = estimate_pi(n)
    error = abs(pi_est - math.pi)
    print(f"n={n:>8}: pi ≈ {pi_est:.6f}, error = {error:.6f}")

def monte_carlo_integrate(f, a, b, n_samples=100000):
    """
    Estimate integral of f from a to b using Monte Carlo.
    integral ≈ (b-a) * (1/n) * sum(f(x_i)) where x_i ~ Uniform(a,b)
    Time: O(n * cost(f)), Space: O(1)
    """
    total = 0.0
    for _ in range(n_samples):
        x = random.uniform(a, b)
        total += f(x)

    return (b - a) * total / n_samples

# Test: integral of sin(x) from 0 to pi = 2.0
result = monte_carlo_integrate(math.sin, 0, math.pi)
print(f"\nIntegral of sin(x) from 0 to pi: {result:.6f}")
print(f"Exact value: 2.000000")

# Test: integral of x^2 from 0 to 1 = 1/3
result = monte_carlo_integrate(lambda x: x**2, 0, 1)
print(f"Integral of x^2 from 0 to 1: {result:.6f}")
print(f"Exact value: {1/3:.6f}")

Problem 2: Sampling Methods

Problem: Implement rejection sampling and importance sampling. These methods allow you to sample from complex distributions when direct sampling is impossible.

def rejection_sampling(target_pdf, proposal_sampler, proposal_pdf,
                        M, n_samples=10000):
    """
    Rejection sampling: sample from target distribution using a
    proposal distribution that envelopes it.

    target_pdf: the distribution we want to sample from
    proposal_sampler: function that generates samples from proposal
    proposal_pdf: density of the proposal distribution
    M: constant such that target_pdf(x) <= M * proposal_pdf(x) for all x

    Time: O(n * M) expected, Space: O(n)
    """
    samples = []
    total_proposed = 0

    while len(samples) < n_samples:
        # Sample from proposal
        x = proposal_sampler()
        total_proposed += 1

        # Accept with probability target(x) / (M * proposal(x))
        acceptance_prob = target_pdf(x) / (M * proposal_pdf(x))
        if random.random() < acceptance_prob:
            samples.append(x)

    acceptance_rate = n_samples / total_proposed
    print(f"Acceptance rate: {acceptance_rate:.4f} (theoretical: {1/M:.4f})")

    return samples

# Example: sample from Beta(2,5) using uniform proposal
def beta_pdf(x, a=2, b=5):
    """Unnormalized Beta(a,b) pdf."""
    if 0 < x < 1:
        return x**(a-1) * (1-x)**(b-1)
    return 0.0

# Use uniform as proposal, M = max of beta_pdf
M = 1.3  # max of Beta(2,5) unnormalized
samples = rejection_sampling(
    target_pdf=beta_pdf,
    proposal_sampler=lambda: random.uniform(0, 1),
    proposal_pdf=lambda x: 1.0,
    M=M,
    n_samples=5000
)
mean = sum(samples) / len(samples)
print(f"Sample mean: {mean:.4f}, Expected (2/7): {2/7:.4f}")

def importance_sampling(f, target_pdf, proposal_sampler, proposal_pdf,
                         n_samples=10000):
    """
    Importance sampling: estimate E_p[f(x)] using samples from q(x).
    E_p[f(x)] ≈ (1/n) * sum(f(x_i) * w(x_i))
    where w(x_i) = p(x_i) / q(x_i) are importance weights.
    """
    weighted_sum = 0.0
    weight_sum = 0.0

    for _ in range(n_samples):
        x = proposal_sampler()
        w = target_pdf(x) / proposal_pdf(x)  # importance weight
        weighted_sum += f(x) * w
        weight_sum += w

    # Self-normalized importance sampling
    return weighted_sum / weight_sum

# Estimate E[X] where X ~ Beta(2,5), using uniform proposal
mean_is = importance_sampling(
    f=lambda x: x,
    target_pdf=beta_pdf,
    proposal_sampler=lambda: random.uniform(0, 1),
    proposal_pdf=lambda x: 1.0
)
print(f"\nImportance sampling E[X]: {mean_is:.4f}")
print(f"Expected (2/7): {2/7:.4f}")
💡
Interview context: Rejection sampling is simple but wasteful for high-dimensional spaces (acceptance rate drops exponentially). Importance sampling avoids rejection but can have high variance if the proposal is a poor match. MCMC methods (next problem) are the practical solution for complex distributions.

Problem 3: Bayesian Inference

Problem: Implement Bayesian updating for a coin flip problem. Given a sequence of observations (heads/tails), compute the posterior distribution of the coin's bias using the Beta-Binomial conjugate prior.

def bayesian_coin(observations, prior_alpha=1, prior_beta=1):
    """
    Bayesian inference for coin bias.
    Prior: Beta(alpha, beta)
    Likelihood: Binomial(n, p)
    Posterior: Beta(alpha + heads, beta + tails)  (conjugate)

    Time: O(n), Space: O(1)
    """
    heads = sum(observations)
    tails = len(observations) - heads

    # Posterior parameters
    post_alpha = prior_alpha + heads
    post_beta = prior_beta + tails

    # Posterior mean (point estimate)
    posterior_mean = post_alpha / (post_alpha + post_beta)

    # Posterior mode (MAP estimate)
    if post_alpha > 1 and post_beta > 1:
        posterior_mode = (post_alpha - 1) / (post_alpha + post_beta - 2)
    else:
        posterior_mode = posterior_mean

    # Posterior variance
    total = post_alpha + post_beta
    posterior_var = (post_alpha * post_beta) / (total**2 * (total + 1))

    # 95% credible interval (approximate using Normal)
    std = math.sqrt(posterior_var)
    ci_low = max(0, posterior_mean - 1.96 * std)
    ci_high = min(1, posterior_mean + 1.96 * std)

    return {
        'posterior_alpha': post_alpha,
        'posterior_beta': post_beta,
        'mean': posterior_mean,
        'mode': posterior_mode,
        'variance': posterior_var,
        'ci_95': (ci_low, ci_high)
    }

# Test: Observe 7 heads out of 10 flips with uniform prior
observations = [1, 1, 0, 1, 1, 1, 0, 1, 1, 0]  # 1=heads, 0=tails
result = bayesian_coin(observations)
print("After 10 flips (7H, 3T) with uniform prior:")
print(f"  Posterior: Beta({result['posterior_alpha']}, {result['posterior_beta']})")
print(f"  Mean: {result['mean']:.4f}")
print(f"  Mode: {result['mode']:.4f}")
print(f"  95% CI: ({result['ci_95'][0]:.4f}, {result['ci_95'][1]:.4f})")

# Now observe 70 heads out of 100
obs_100 = [1]*70 + [0]*30
result2 = bayesian_coin(obs_100)
print(f"\nAfter 100 flips (70H, 30T):")
print(f"  Mean: {result2['mean']:.4f}")
print(f"  95% CI: ({result2['ci_95'][0]:.4f}, {result2['ci_95'][1]:.4f})")
print("  Notice: CI narrows with more data")

Problem 4: Markov Chains

Problem: Implement a Markov chain simulator and compute the stationary distribution. Also implement the Metropolis-Hastings algorithm for MCMC sampling from an arbitrary distribution.

def markov_chain_simulate(transition_matrix, initial_state, n_steps):
    """
    Simulate a discrete Markov chain.
    transition_matrix[i][j] = P(next=j | current=i)
    Time: O(n_steps * n_states), Space: O(n_steps)
    """
    n_states = len(transition_matrix)
    states = [initial_state]

    for _ in range(n_steps):
        current = states[-1]
        probs = transition_matrix[current]

        # Sample next state
        r = random.random()
        cumulative = 0.0
        for j in range(n_states):
            cumulative += probs[j]
            if r < cumulative:
                states.append(j)
                break

    return states

def stationary_distribution(transition_matrix, n_steps=100000):
    """
    Estimate stationary distribution by running the chain
    and counting state frequencies.
    """
    states = markov_chain_simulate(transition_matrix, 0, n_steps)

    n_states = len(transition_matrix)
    counts = [0] * n_states
    # Skip burn-in period
    for s in states[1000:]:
        counts[s] += 1

    total = sum(counts)
    return [c / total for c in counts]

# Weather model: Sunny(0), Cloudy(1), Rainy(2)
T = [
    [0.7, 0.2, 0.1],  # Sunny -> ...
    [0.3, 0.4, 0.3],  # Cloudy -> ...
    [0.2, 0.3, 0.5],  # Rainy -> ...
]
dist = stationary_distribution(T)
print(f"Stationary distribution: Sunny={dist[0]:.3f}, "
      f"Cloudy={dist[1]:.3f}, Rainy={dist[2]:.3f}")

def metropolis_hastings(target_log_pdf, proposal_sampler, x0,
                         n_samples=10000, burn_in=1000):
    """
    Metropolis-Hastings MCMC sampler.
    Samples from a distribution known only up to a normalizing constant.
    Time: O(n_samples * cost(target)), Space: O(n_samples)
    """
    x = x0
    samples = []
    accepted = 0

    for i in range(n_samples + burn_in):
        # Propose new state
        x_new = proposal_sampler(x)

        # Acceptance ratio (in log space for numerical stability)
        log_alpha = target_log_pdf(x_new) - target_log_pdf(x)

        # Accept or reject
        if math.log(random.random()) < log_alpha:
            x = x_new
            accepted += 1

        if i >= burn_in:
            samples.append(x)

    print(f"Acceptance rate: {accepted/(n_samples+burn_in):.4f}")
    return samples

# Sample from N(3, 2^2) using MH
def target_log_pdf(x):
    mu, sigma = 3.0, 2.0
    return -0.5 * ((x - mu) / sigma) ** 2

def proposal(x):
    return x + random.gauss(0, 1)  # random walk proposal

samples = metropolis_hastings(target_log_pdf, proposal, x0=0.0,
                                n_samples=10000)
mean = sum(samples) / len(samples)
var = sum((x - mean)**2 for x in samples) / len(samples)
print(f"Sample mean: {mean:.4f} (expected: 3.0)")
print(f"Sample std:  {math.sqrt(var):.4f} (expected: 2.0)")

Problem 5: Random Number Generation

Problem: Implement a pseudorandom number generator from scratch using a Linear Congruential Generator (LCG), and use it to generate samples from common distributions (uniform, normal, exponential) using the inverse transform method.

class LCG:
    """
    Linear Congruential Generator.
    x_{n+1} = (a * x_n + c) mod m
    Parameters chosen for good statistical properties (Numerical Recipes).
    """
    def __init__(self, seed=42):
        self.state = seed
        self.a = 1664525
        self.c = 1013904223
        self.m = 2**32

    def next_int(self):
        """Generate next pseudorandom integer."""
        self.state = (self.a * self.state + self.c) % self.m
        return self.state

    def uniform(self, low=0.0, high=1.0):
        """Generate uniform random number in [low, high)."""
        return low + (high - low) * self.next_int() / self.m

    def exponential(self, rate=1.0):
        """
        Generate exponential random variable using inverse transform.
        CDF: F(x) = 1 - e^{-rate*x}
        Inverse: F^{-1}(u) = -ln(1-u) / rate
        """
        u = self.uniform()
        return -math.log(1 - u) / rate

    def normal(self, mu=0.0, sigma=1.0):
        """
        Generate normal random variable using Box-Muller transform.
        Given U1, U2 ~ Uniform(0,1):
        Z = sqrt(-2*ln(U1)) * cos(2*pi*U2) ~ N(0,1)
        """
        u1 = self.uniform()
        u2 = self.uniform()
        # Avoid log(0)
        while u1 < 1e-15:
            u1 = self.uniform()
        z = math.sqrt(-2 * math.log(u1)) * math.cos(2 * math.pi * u2)
        return mu + sigma * z

# Test our RNG
rng = LCG(seed=12345)

# Generate uniform samples
uniforms = [rng.uniform() for _ in range(10000)]
print(f"Uniform: mean={sum(uniforms)/len(uniforms):.4f} (expect 0.5), "
      f"min={min(uniforms):.4f}, max={max(uniforms):.4f}")

# Generate normal samples
normals = [rng.normal(mu=5, sigma=2) for _ in range(10000)]
mean_n = sum(normals) / len(normals)
std_n = math.sqrt(sum((x - mean_n)**2 for x in normals) / len(normals))
print(f"Normal: mean={mean_n:.4f} (expect 5.0), std={std_n:.4f} (expect 2.0)")

# Generate exponential samples
exponentials = [rng.exponential(rate=0.5) for _ in range(10000)]
mean_e = sum(exponentials) / len(exponentials)
print(f"Exponential: mean={mean_e:.4f} (expect {1/0.5:.1f})")
📝
Interview depth: If asked "how does random.random() work?", explain that most languages use Mersenne Twister (MT19937), which has a period of 2^19937 - 1. LCG is simpler but has shorter period and lower quality. For cryptographic purposes, neither is suitable — you need CSPRNGs like ChaCha20 or AES-CTR.

Key Takeaways

  • Monte Carlo estimation approximates integrals and expectations using random sampling. Error decreases as O(1/sqrt(n)).
  • Rejection sampling draws from a target distribution using a simpler proposal, but becomes inefficient in high dimensions.
  • Importance sampling reweights samples from a proposal distribution, avoiding the waste of rejection sampling.
  • Bayesian inference updates beliefs (prior to posterior) using observed data. Conjugate priors give closed-form posteriors.
  • Metropolis-Hastings MCMC samples from complex distributions known only up to a normalizing constant.
  • Pseudorandom number generators are deterministic algorithms that produce sequences that appear random. LCGs are the simplest form.