Probability in Code
Probability is the language of uncertainty, and ML is fundamentally about modeling uncertainty. This lesson implements five probability algorithms from scratch: Monte Carlo estimation, sampling methods, Bayesian inference, Markov chains, and pseudorandom number generation.
Problem 1: Monte Carlo Estimation
Problem: Estimate the value of pi using Monte Carlo simulation. Generate random points in a unit square and count what fraction falls inside the unit circle. Also implement Monte Carlo integration for arbitrary functions.
import random
import math
def estimate_pi(n_samples=100000):
"""
Estimate pi using Monte Carlo: ratio of points inside unit circle
to total points in a 2x2 square, multiplied by 4.
Time: O(n), Space: O(1)
Error: O(1/sqrt(n)) - need 100x more samples for 10x more accuracy
"""
inside = 0
for _ in range(n_samples):
x = random.uniform(-1, 1)
y = random.uniform(-1, 1)
if x*x + y*y <= 1:
inside += 1
pi_estimate = 4 * inside / n_samples
return pi_estimate
# Test with increasing sample sizes
for n in [1000, 10000, 100000, 1000000]:
pi_est = estimate_pi(n)
error = abs(pi_est - math.pi)
print(f"n={n:>8}: pi ≈ {pi_est:.6f}, error = {error:.6f}")
def monte_carlo_integrate(f, a, b, n_samples=100000):
"""
Estimate integral of f from a to b using Monte Carlo.
integral ≈ (b-a) * (1/n) * sum(f(x_i)) where x_i ~ Uniform(a,b)
Time: O(n * cost(f)), Space: O(1)
"""
total = 0.0
for _ in range(n_samples):
x = random.uniform(a, b)
total += f(x)
return (b - a) * total / n_samples
# Test: integral of sin(x) from 0 to pi = 2.0
result = monte_carlo_integrate(math.sin, 0, math.pi)
print(f"\nIntegral of sin(x) from 0 to pi: {result:.6f}")
print(f"Exact value: 2.000000")
# Test: integral of x^2 from 0 to 1 = 1/3
result = monte_carlo_integrate(lambda x: x**2, 0, 1)
print(f"Integral of x^2 from 0 to 1: {result:.6f}")
print(f"Exact value: {1/3:.6f}")
Problem 2: Sampling Methods
Problem: Implement rejection sampling and importance sampling. These methods allow you to sample from complex distributions when direct sampling is impossible.
def rejection_sampling(target_pdf, proposal_sampler, proposal_pdf,
M, n_samples=10000):
"""
Rejection sampling: sample from target distribution using a
proposal distribution that envelopes it.
target_pdf: the distribution we want to sample from
proposal_sampler: function that generates samples from proposal
proposal_pdf: density of the proposal distribution
M: constant such that target_pdf(x) <= M * proposal_pdf(x) for all x
Time: O(n * M) expected, Space: O(n)
"""
samples = []
total_proposed = 0
while len(samples) < n_samples:
# Sample from proposal
x = proposal_sampler()
total_proposed += 1
# Accept with probability target(x) / (M * proposal(x))
acceptance_prob = target_pdf(x) / (M * proposal_pdf(x))
if random.random() < acceptance_prob:
samples.append(x)
acceptance_rate = n_samples / total_proposed
print(f"Acceptance rate: {acceptance_rate:.4f} (theoretical: {1/M:.4f})")
return samples
# Example: sample from Beta(2,5) using uniform proposal
def beta_pdf(x, a=2, b=5):
"""Unnormalized Beta(a,b) pdf."""
if 0 < x < 1:
return x**(a-1) * (1-x)**(b-1)
return 0.0
# Use uniform as proposal, M = max of beta_pdf
M = 1.3 # max of Beta(2,5) unnormalized
samples = rejection_sampling(
target_pdf=beta_pdf,
proposal_sampler=lambda: random.uniform(0, 1),
proposal_pdf=lambda x: 1.0,
M=M,
n_samples=5000
)
mean = sum(samples) / len(samples)
print(f"Sample mean: {mean:.4f}, Expected (2/7): {2/7:.4f}")
def importance_sampling(f, target_pdf, proposal_sampler, proposal_pdf,
n_samples=10000):
"""
Importance sampling: estimate E_p[f(x)] using samples from q(x).
E_p[f(x)] ≈ (1/n) * sum(f(x_i) * w(x_i))
where w(x_i) = p(x_i) / q(x_i) are importance weights.
"""
weighted_sum = 0.0
weight_sum = 0.0
for _ in range(n_samples):
x = proposal_sampler()
w = target_pdf(x) / proposal_pdf(x) # importance weight
weighted_sum += f(x) * w
weight_sum += w
# Self-normalized importance sampling
return weighted_sum / weight_sum
# Estimate E[X] where X ~ Beta(2,5), using uniform proposal
mean_is = importance_sampling(
f=lambda x: x,
target_pdf=beta_pdf,
proposal_sampler=lambda: random.uniform(0, 1),
proposal_pdf=lambda x: 1.0
)
print(f"\nImportance sampling E[X]: {mean_is:.4f}")
print(f"Expected (2/7): {2/7:.4f}")
Problem 3: Bayesian Inference
Problem: Implement Bayesian updating for a coin flip problem. Given a sequence of observations (heads/tails), compute the posterior distribution of the coin's bias using the Beta-Binomial conjugate prior.
def bayesian_coin(observations, prior_alpha=1, prior_beta=1):
"""
Bayesian inference for coin bias.
Prior: Beta(alpha, beta)
Likelihood: Binomial(n, p)
Posterior: Beta(alpha + heads, beta + tails) (conjugate)
Time: O(n), Space: O(1)
"""
heads = sum(observations)
tails = len(observations) - heads
# Posterior parameters
post_alpha = prior_alpha + heads
post_beta = prior_beta + tails
# Posterior mean (point estimate)
posterior_mean = post_alpha / (post_alpha + post_beta)
# Posterior mode (MAP estimate)
if post_alpha > 1 and post_beta > 1:
posterior_mode = (post_alpha - 1) / (post_alpha + post_beta - 2)
else:
posterior_mode = posterior_mean
# Posterior variance
total = post_alpha + post_beta
posterior_var = (post_alpha * post_beta) / (total**2 * (total + 1))
# 95% credible interval (approximate using Normal)
std = math.sqrt(posterior_var)
ci_low = max(0, posterior_mean - 1.96 * std)
ci_high = min(1, posterior_mean + 1.96 * std)
return {
'posterior_alpha': post_alpha,
'posterior_beta': post_beta,
'mean': posterior_mean,
'mode': posterior_mode,
'variance': posterior_var,
'ci_95': (ci_low, ci_high)
}
# Test: Observe 7 heads out of 10 flips with uniform prior
observations = [1, 1, 0, 1, 1, 1, 0, 1, 1, 0] # 1=heads, 0=tails
result = bayesian_coin(observations)
print("After 10 flips (7H, 3T) with uniform prior:")
print(f" Posterior: Beta({result['posterior_alpha']}, {result['posterior_beta']})")
print(f" Mean: {result['mean']:.4f}")
print(f" Mode: {result['mode']:.4f}")
print(f" 95% CI: ({result['ci_95'][0]:.4f}, {result['ci_95'][1]:.4f})")
# Now observe 70 heads out of 100
obs_100 = [1]*70 + [0]*30
result2 = bayesian_coin(obs_100)
print(f"\nAfter 100 flips (70H, 30T):")
print(f" Mean: {result2['mean']:.4f}")
print(f" 95% CI: ({result2['ci_95'][0]:.4f}, {result2['ci_95'][1]:.4f})")
print(" Notice: CI narrows with more data")
Problem 4: Markov Chains
Problem: Implement a Markov chain simulator and compute the stationary distribution. Also implement the Metropolis-Hastings algorithm for MCMC sampling from an arbitrary distribution.
def markov_chain_simulate(transition_matrix, initial_state, n_steps):
"""
Simulate a discrete Markov chain.
transition_matrix[i][j] = P(next=j | current=i)
Time: O(n_steps * n_states), Space: O(n_steps)
"""
n_states = len(transition_matrix)
states = [initial_state]
for _ in range(n_steps):
current = states[-1]
probs = transition_matrix[current]
# Sample next state
r = random.random()
cumulative = 0.0
for j in range(n_states):
cumulative += probs[j]
if r < cumulative:
states.append(j)
break
return states
def stationary_distribution(transition_matrix, n_steps=100000):
"""
Estimate stationary distribution by running the chain
and counting state frequencies.
"""
states = markov_chain_simulate(transition_matrix, 0, n_steps)
n_states = len(transition_matrix)
counts = [0] * n_states
# Skip burn-in period
for s in states[1000:]:
counts[s] += 1
total = sum(counts)
return [c / total for c in counts]
# Weather model: Sunny(0), Cloudy(1), Rainy(2)
T = [
[0.7, 0.2, 0.1], # Sunny -> ...
[0.3, 0.4, 0.3], # Cloudy -> ...
[0.2, 0.3, 0.5], # Rainy -> ...
]
dist = stationary_distribution(T)
print(f"Stationary distribution: Sunny={dist[0]:.3f}, "
f"Cloudy={dist[1]:.3f}, Rainy={dist[2]:.3f}")
def metropolis_hastings(target_log_pdf, proposal_sampler, x0,
n_samples=10000, burn_in=1000):
"""
Metropolis-Hastings MCMC sampler.
Samples from a distribution known only up to a normalizing constant.
Time: O(n_samples * cost(target)), Space: O(n_samples)
"""
x = x0
samples = []
accepted = 0
for i in range(n_samples + burn_in):
# Propose new state
x_new = proposal_sampler(x)
# Acceptance ratio (in log space for numerical stability)
log_alpha = target_log_pdf(x_new) - target_log_pdf(x)
# Accept or reject
if math.log(random.random()) < log_alpha:
x = x_new
accepted += 1
if i >= burn_in:
samples.append(x)
print(f"Acceptance rate: {accepted/(n_samples+burn_in):.4f}")
return samples
# Sample from N(3, 2^2) using MH
def target_log_pdf(x):
mu, sigma = 3.0, 2.0
return -0.5 * ((x - mu) / sigma) ** 2
def proposal(x):
return x + random.gauss(0, 1) # random walk proposal
samples = metropolis_hastings(target_log_pdf, proposal, x0=0.0,
n_samples=10000)
mean = sum(samples) / len(samples)
var = sum((x - mean)**2 for x in samples) / len(samples)
print(f"Sample mean: {mean:.4f} (expected: 3.0)")
print(f"Sample std: {math.sqrt(var):.4f} (expected: 2.0)")
Problem 5: Random Number Generation
Problem: Implement a pseudorandom number generator from scratch using a Linear Congruential Generator (LCG), and use it to generate samples from common distributions (uniform, normal, exponential) using the inverse transform method.
class LCG:
"""
Linear Congruential Generator.
x_{n+1} = (a * x_n + c) mod m
Parameters chosen for good statistical properties (Numerical Recipes).
"""
def __init__(self, seed=42):
self.state = seed
self.a = 1664525
self.c = 1013904223
self.m = 2**32
def next_int(self):
"""Generate next pseudorandom integer."""
self.state = (self.a * self.state + self.c) % self.m
return self.state
def uniform(self, low=0.0, high=1.0):
"""Generate uniform random number in [low, high)."""
return low + (high - low) * self.next_int() / self.m
def exponential(self, rate=1.0):
"""
Generate exponential random variable using inverse transform.
CDF: F(x) = 1 - e^{-rate*x}
Inverse: F^{-1}(u) = -ln(1-u) / rate
"""
u = self.uniform()
return -math.log(1 - u) / rate
def normal(self, mu=0.0, sigma=1.0):
"""
Generate normal random variable using Box-Muller transform.
Given U1, U2 ~ Uniform(0,1):
Z = sqrt(-2*ln(U1)) * cos(2*pi*U2) ~ N(0,1)
"""
u1 = self.uniform()
u2 = self.uniform()
# Avoid log(0)
while u1 < 1e-15:
u1 = self.uniform()
z = math.sqrt(-2 * math.log(u1)) * math.cos(2 * math.pi * u2)
return mu + sigma * z
# Test our RNG
rng = LCG(seed=12345)
# Generate uniform samples
uniforms = [rng.uniform() for _ in range(10000)]
print(f"Uniform: mean={sum(uniforms)/len(uniforms):.4f} (expect 0.5), "
f"min={min(uniforms):.4f}, max={max(uniforms):.4f}")
# Generate normal samples
normals = [rng.normal(mu=5, sigma=2) for _ in range(10000)]
mean_n = sum(normals) / len(normals)
std_n = math.sqrt(sum((x - mean_n)**2 for x in normals) / len(normals))
print(f"Normal: mean={mean_n:.4f} (expect 5.0), std={std_n:.4f} (expect 2.0)")
# Generate exponential samples
exponentials = [rng.exponential(rate=0.5) for _ in range(10000)]
mean_e = sum(exponentials) / len(exponentials)
print(f"Exponential: mean={mean_e:.4f} (expect {1/0.5:.1f})")
Key Takeaways
- Monte Carlo estimation approximates integrals and expectations using random sampling. Error decreases as O(1/sqrt(n)).
- Rejection sampling draws from a target distribution using a simpler proposal, but becomes inefficient in high dimensions.
- Importance sampling reweights samples from a proposal distribution, avoiding the waste of rejection sampling.
- Bayesian inference updates beliefs (prior to posterior) using observed data. Conjugate priors give closed-form posteriors.
- Metropolis-Hastings MCMC samples from complex distributions known only up to a normalizing constant.
- Pseudorandom number generators are deterministic algorithms that produce sequences that appear random. LCGs are the simplest form.
Lilly Tech Systems