Bayesian Methods in Code
Five problems that implement Bayesian inference from scratch. You will build a general-purpose Bayesian inference engine, conjugate models, a Naive Bayes classifier, Bayesian A/B testing, and Thompson sampling for multi-armed bandits. These are asked at quant firms, tech companies, and anywhere decisions are made under uncertainty.
Problem 1: Bayesian Inference Engine
Problem: Implement a discrete Bayesian inference engine that updates a prior distribution with observed evidence using Bayes' theorem. This is the core computation that all Bayesian methods build upon.
import math
# ---- CHALLENGE ----
def bayesian_update(prior, likelihood, evidence):
"""Update prior with evidence using Bayes' theorem.
prior: dict mapping hypothesis -> P(hypothesis)
likelihood: dict mapping hypothesis -> P(evidence | hypothesis)
Return: posterior dict mapping hypothesis -> P(hypothesis | evidence)
"""
pass
# ---- SOLUTION ----
def bayesian_update(prior, likelihood):
"""Discrete Bayesian inference.
Bayes' Theorem: P(H|E) = P(E|H) * P(H) / P(E)
where P(E) = sum over all hypotheses H_i of P(E|H_i) * P(H_i)
Steps:
1. Compute unnormalized posterior: P(E|H) * P(H) for each H
2. Compute evidence (normalizing constant): sum of unnormalized posteriors
3. Normalize: P(H|E) = unnormalized / evidence
"""
# Step 1: Unnormalized posterior
unnormalized = {}
for hypothesis in prior:
unnormalized[hypothesis] = prior[hypothesis] * likelihood[hypothesis]
# Step 2: Evidence (normalizing constant)
evidence = sum(unnormalized.values())
if evidence == 0:
raise ValueError("Evidence is zero - impossible observation given the model")
# Step 3: Normalize
posterior = {h: p / evidence for h, p in unnormalized.items()}
return posterior
def sequential_bayesian_update(prior, observations, likelihood_func):
"""Apply Bayes' theorem sequentially for multiple observations.
The posterior after each observation becomes the prior for the next.
This is the power of Bayesian updating: O(n) computation, no need to
reprocess all data.
"""
current_prior = dict(prior)
for obs in observations:
likelihood = {h: likelihood_func(h, obs) for h in current_prior}
current_prior = bayesian_update(current_prior, likelihood)
return current_prior
# ---- VERIFICATION ----
# Classic example: disease testing
# P(disease) = 0.01, P(no disease) = 0.99
# Test sensitivity: P(positive | disease) = 0.95
# Test specificity: P(negative | no disease) = 0.90 -> P(positive | no disease) = 0.10
prior = {"disease": 0.01, "healthy": 0.99}
likelihood_positive = {"disease": 0.95, "healthy": 0.10}
posterior = bayesian_update(prior, likelihood_positive)
# P(disease | positive test) should be ~8.8% (NOT 95%!)
assert abs(posterior["disease"] - 0.0876) < 0.001
assert abs(posterior["healthy"] - 0.9124) < 0.001
assert abs(sum(posterior.values()) - 1.0) < 1e-10
# Sequential: two positive tests
def test_likelihood(hypothesis, result):
if result == "positive":
return 0.95 if hypothesis == "disease" else 0.10
else:
return 0.05 if hypothesis == "disease" else 0.90
posterior_2 = sequential_bayesian_update(prior, ["positive", "positive"], test_likelihood)
# After two positive tests, P(disease) should be much higher
assert posterior_2["disease"] > 0.45, f"Should be high: {posterior_2['disease']}"
print("All tests passed!")
Problem 2: Beta-Binomial Conjugate Model
Problem: Implement the Beta-Binomial conjugate model from scratch. The Beta distribution is the conjugate prior for the Binomial likelihood, meaning the posterior is also a Beta distribution with simple update rules. Implement the Beta PDF, posterior update, and credible intervals.
import math
import random
# ---- CHALLENGE ----
class BetaBinomial:
"""Beta-Binomial conjugate model for learning probabilities from data."""
def __init__(self, alpha=1, beta=1):
pass
def update(self, successes, failures):
pass
def mean(self):
pass
def pdf(self, x):
pass
# ---- SOLUTION ----
def _log_beta_function(a, b):
"""Log of Beta function: B(a,b) = Gamma(a)*Gamma(b)/Gamma(a+b).
Using Stirling's approximation for log-gamma:
log(Gamma(x)) ~ (x-0.5)*log(x) - x + 0.5*log(2*pi) + 1/(12*x)
"""
def _log_gamma(x):
if x <= 0:
raise ValueError("x must be positive")
if x < 7:
# Shift to larger value for better approximation
shift = 7 - int(x)
log_correction = sum(math.log(x + i) for i in range(shift))
x = x + shift
result = (x - 0.5) * math.log(x) - x + 0.5 * math.log(2 * math.pi) + 1.0 / (12 * x)
return result - log_correction
return (x - 0.5) * math.log(x) - x + 0.5 * math.log(2 * math.pi) + 1.0 / (12 * x)
return _log_gamma(a) + _log_gamma(b) - _log_gamma(a + b)
class BetaBinomial:
"""Beta-Binomial conjugate model.
Prior: p ~ Beta(alpha, beta)
Likelihood: X | p ~ Binomial(n, p)
Posterior: p | X ~ Beta(alpha + successes, beta + failures)
The conjugate property means:
- Prior is Beta(a, b)
- After observing k successes in n trials
- Posterior is Beta(a + k, b + n - k)
This is the simplest and most important Bayesian model.
"""
def __init__(self, alpha=1.0, beta=1.0):
"""Initialize with prior Beta(alpha, beta).
Default alpha=beta=1 is the uniform (non-informative) prior.
"""
self.alpha = alpha
self.beta = beta
def update(self, successes, failures):
"""Update posterior with observed data.
Posterior: Beta(alpha + successes, beta + failures)
"""
self.alpha += successes
self.beta += failures
def mean(self):
"""Posterior mean: E[p] = alpha / (alpha + beta)"""
return self.alpha / (self.alpha + self.beta)
def mode(self):
"""Posterior mode (MAP estimate): (alpha-1) / (alpha+beta-2)
Only defined when alpha > 1 and beta > 1.
"""
if self.alpha > 1 and self.beta > 1:
return (self.alpha - 1) / (self.alpha + self.beta - 2)
return self.mean() # fallback
def variance(self):
"""Posterior variance: alpha*beta / ((alpha+beta)^2 * (alpha+beta+1))"""
ab = self.alpha + self.beta
return (self.alpha * self.beta) / (ab ** 2 * (ab + 1))
def pdf(self, x):
"""Beta PDF: f(x) = x^(a-1) * (1-x)^(b-1) / B(a,b)
Computed in log-space for numerical stability.
"""
if x <= 0 or x >= 1:
return 0.0
log_pdf = ((self.alpha - 1) * math.log(x) +
(self.beta - 1) * math.log(1 - x) -
_log_beta_function(self.alpha, self.beta))
return math.exp(log_pdf)
def credible_interval(self, confidence=0.95, num_points=1000):
"""Compute credible interval using grid approximation.
Find the narrowest interval containing `confidence` probability mass.
"""
alpha_level = 1 - confidence
# Grid approximation of the CDF
step = 1.0 / num_points
grid = [(i + 0.5) * step for i in range(num_points)]
densities = [self.pdf(x) * step for x in grid]
# Find percentile-based interval
cumsum = 0
lower = 0
for i, d in enumerate(densities):
cumsum += d
if cumsum >= alpha_level / 2:
lower = grid[i]
break
cumsum = 0
upper = 1
for i in range(len(densities) - 1, -1, -1):
cumsum += densities[i]
if cumsum >= alpha_level / 2:
upper = grid[i]
break
return lower, upper
def sample(self, n=1):
"""Sample from Beta distribution using the ratio-of-uniforms method."""
samples = []
for _ in range(n):
# Use gamma sampling: if X~Gamma(a,1) and Y~Gamma(b,1),
# then X/(X+Y) ~ Beta(a,b)
x = _sample_gamma(self.alpha)
y = _sample_gamma(self.beta)
samples.append(x / (x + y))
return samples if n > 1 else samples[0]
def _sample_gamma(shape):
"""Sample from Gamma(shape, 1) using Marsaglia and Tsang's method."""
if shape < 1:
# Gamma(shape) = Gamma(shape+1) * U^(1/shape)
return _sample_gamma(shape + 1) * (random.random() ** (1.0 / shape))
d = shape - 1.0 / 3.0
c = 1.0 / math.sqrt(9.0 * d)
while True:
while True:
# Generate normal sample
u1 = random.random()
u2 = random.random()
while u1 == 0:
u1 = random.random()
x = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
v = (1 + c * x) ** 3
if v > 0:
break
u = random.random()
if u < 1 - 0.0331 * x ** 4 or math.log(u) < 0.5 * x ** 2 + d * (1 - v + math.log(v)):
return d * v
# ---- VERIFICATION ----
random.seed(42)
# Start with uniform prior, observe 7 heads out of 10 coin flips
model = BetaBinomial(alpha=1, beta=1)
model.update(successes=7, failures=3)
assert abs(model.mean() - 8/12) < 1e-10 # (1+7)/(1+7+1+3) = 8/12
assert model.alpha == 8 and model.beta == 4
# More data: 70 heads out of 100 total
model2 = BetaBinomial(alpha=1, beta=1)
model2.update(successes=70, failures=30)
# With lots of data, posterior mean should be close to MLE (0.70)
assert abs(model2.mean() - 0.70) < 0.01
# Credible interval should contain the true value
lower, upper = model2.credible_interval(0.95)
assert lower < 0.70 < upper
# Sampling
samples = model2.sample(n=5000)
sample_mean = sum(samples) / len(samples)
assert abs(sample_mean - model2.mean()) < 0.02
print("All tests passed!")
Problem 3: Naive Bayes Classifier from Scratch
Problem: Implement a Gaussian Naive Bayes classifier from scratch. Train it on continuous features, predict class labels, and compute class probabilities. This is asked at every data science interview.
import math
# ---- CHALLENGE ----
class NaiveBayes:
"""Gaussian Naive Bayes classifier from scratch."""
def fit(self, X, y):
pass
def predict(self, X):
pass
def predict_proba(self, X):
pass
# ---- SOLUTION ----
class NaiveBayes:
"""Gaussian Naive Bayes classifier.
Assumption: features are conditionally independent given the class.
P(class | x1, x2, ..., xd) proportional to P(class) * prod(P(xi | class))
For continuous features, P(xi | class) is modeled as Gaussian:
P(xi | class=c) = N(xi; mu_c_i, sigma_c_i^2)
Training: compute mean and variance of each feature for each class.
Prediction: compute posterior for each class, return argmax.
"""
def __init__(self):
self.classes = []
self.class_prior = {} # P(class)
self.means = {} # mean[class][feature]
self.variances = {} # var[class][feature]
def fit(self, X, y):
"""Train the model. X: list of feature vectors, y: list of labels."""
n_samples = len(X)
n_features = len(X[0])
# Find unique classes
self.classes = sorted(set(y))
for c in self.classes:
# Get samples belonging to class c
class_samples = [X[i] for i in range(n_samples) if y[i] == c]
n_c = len(class_samples)
# Prior probability
self.class_prior[c] = n_c / n_samples
# Mean and variance per feature
self.means[c] = []
self.variances[c] = []
for j in range(n_features):
values = [sample[j] for sample in class_samples]
mu = sum(values) / n_c
# Variance with smoothing to avoid zero variance
var = sum((v - mu) ** 2 for v in values) / n_c
var = max(var, 1e-9) # smoothing
self.means[c].append(mu)
self.variances[c].append(var)
def _log_gaussian_pdf(self, x, mu, var):
"""Log of Gaussian PDF for numerical stability."""
return -0.5 * math.log(2 * math.pi * var) - 0.5 * ((x - mu) ** 2 / var)
def _log_posterior(self, x):
"""Compute log P(class | x) for all classes (unnormalized)."""
log_posteriors = {}
for c in self.classes:
# Start with log prior
log_p = math.log(self.class_prior[c])
# Add log likelihood for each feature (independence assumption)
for j in range(len(x)):
log_p += self._log_gaussian_pdf(x[j], self.means[c][j], self.variances[c][j])
log_posteriors[c] = log_p
return log_posteriors
def predict(self, X):
"""Predict class labels for a list of feature vectors."""
predictions = []
for x in X:
log_post = self._log_posterior(x)
best_class = max(log_post, key=log_post.get)
predictions.append(best_class)
return predictions
def predict_proba(self, X):
"""Predict class probabilities using log-sum-exp for normalization."""
all_probs = []
for x in X:
log_post = self._log_posterior(x)
# Log-sum-exp normalization
max_log = max(log_post.values())
log_sum = max_log + math.log(sum(
math.exp(lp - max_log) for lp in log_post.values()
))
probs = {c: math.exp(lp - log_sum) for c, lp in log_post.items()}
all_probs.append(probs)
return all_probs
# ---- VERIFICATION ----
# Simple 2D dataset: two clusters
X_train = [
[1.0, 2.0], [1.5, 1.8], [1.2, 2.2], [0.8, 1.9], # class 0
[5.0, 8.0], [5.5, 7.5], [5.2, 8.2], [4.8, 7.9] # class 1
]
y_train = [0, 0, 0, 0, 1, 1, 1, 1]
model = NaiveBayes()
model.fit(X_train, y_train)
# Test prediction
X_test = [[1.0, 2.0], [5.0, 8.0], [3.0, 5.0]]
predictions = model.predict(X_test)
assert predictions[0] == 0, "Should predict class 0"
assert predictions[1] == 1, "Should predict class 1"
# Test probabilities sum to 1
probs = model.predict_proba(X_test)
for p in probs:
total = sum(p.values())
assert abs(total - 1.0) < 1e-6, f"Probabilities should sum to 1, got {total}"
# Class 0 should have high probability for [1.0, 2.0]
assert probs[0][0] > 0.99
# Class 1 should have high probability for [5.0, 8.0]
assert probs[1][1] > 0.99
print("All tests passed!")
Problem 4: Bayesian A/B Testing
Problem: Implement Bayesian A/B testing from scratch. Instead of computing a p-value, compute the posterior probability that variant B is better than variant A. Use Beta-Binomial conjugate analysis and Monte Carlo estimation.
import math
import random
# ---- CHALLENGE ----
def bayesian_ab_test(a_successes, a_total, b_successes, b_total, num_samples=100000):
"""Bayesian A/B test: P(B > A), expected lift, risk analysis."""
pass
# ---- SOLUTION ----
def _sample_beta(alpha, beta_param):
"""Sample from Beta(alpha, beta) using Gamma ratio method."""
def _gamma_sample(shape):
if shape < 1:
return _gamma_sample(shape + 1) * (random.random() ** (1.0 / shape))
d = shape - 1.0 / 3.0
c = 1.0 / math.sqrt(9.0 * d)
while True:
while True:
u1 = random.random()
u2 = random.random()
while u1 == 0:
u1 = random.random()
x = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
v = (1 + c * x) ** 3
if v > 0:
break
u = random.random()
if u < 1 - 0.0331 * x ** 4 or math.log(u) < 0.5 * x ** 2 + d * (1 - v + math.log(v)):
return d * v
x = _gamma_sample(alpha)
y = _gamma_sample(beta_param)
return x / (x + y)
def bayesian_ab_test(a_successes, a_total, b_successes, b_total, num_samples=100000):
"""Bayesian A/B test using Beta-Binomial conjugate model.
Model:
- p_A ~ Beta(1 + a_successes, 1 + a_failures) (uniform prior)
- p_B ~ Beta(1 + b_successes, 1 + b_failures)
Key outputs:
1. P(B > A): probability that B is truly better
2. Expected lift: E[(p_B - p_A) / p_A]
3. Expected loss: E[max(p_A - p_B, 0)] (risk of choosing B)
Advantages over frequentist A/B testing:
- Direct probability statement ("B is better with 95% probability")
- No need for fixed sample sizes or stopping rules
- Natural handling of multiple comparisons
- Risk/loss quantification
"""
a_failures = a_total - a_successes
b_failures = b_total - b_successes
# Posterior parameters (uniform prior: Beta(1,1))
alpha_a = 1 + a_successes
beta_a = 1 + a_failures
alpha_b = 1 + b_successes
beta_b = 1 + b_failures
# Monte Carlo estimation
b_wins = 0
lift_sum = 0.0
loss_if_b_sum = 0.0 # loss from choosing B when A is better
loss_if_a_sum = 0.0 # loss from choosing A when B is better
for _ in range(num_samples):
p_a = _sample_beta(alpha_a, beta_a)
p_b = _sample_beta(alpha_b, beta_b)
if p_b > p_a:
b_wins += 1
# Relative lift
if p_a > 0:
lift_sum += (p_b - p_a) / p_a
# Expected losses
loss_if_b_sum += max(p_a - p_b, 0)
loss_if_a_sum += max(p_b - p_a, 0)
return {
"p_b_better": b_wins / num_samples,
"p_a_better": 1 - b_wins / num_samples,
"expected_lift": lift_sum / num_samples,
"expected_loss_choosing_b": loss_if_b_sum / num_samples,
"expected_loss_choosing_a": loss_if_a_sum / num_samples,
"rate_a": a_successes / a_total,
"rate_b": b_successes / b_total,
"posterior_mean_a": alpha_a / (alpha_a + beta_a),
"posterior_mean_b": alpha_b / (alpha_b + beta_b)
}
# ---- VERIFICATION ----
random.seed(42)
# Clear winner: B has higher conversion
result = bayesian_ab_test(
a_successes=100, a_total=1000, # 10%
b_successes=130, b_total=1000 # 13%
)
assert result["p_b_better"] > 0.95, f"B should be clearly better: {result['p_b_better']}"
assert result["expected_lift"] > 0.2, "Lift should be ~30%"
# Very close results: inconclusive
result2 = bayesian_ab_test(
a_successes=100, a_total=1000,
b_successes=103, b_total=1000
)
assert 0.3 < result2["p_b_better"] < 0.7, "Should be inconclusive"
# B clearly worse
result3 = bayesian_ab_test(
a_successes=150, a_total=1000,
b_successes=100, b_total=1000
)
assert result3["p_b_better"] < 0.01, "A should be clearly better"
print("All tests passed!")
Problem 5: Thompson Sampling for Multi-Armed Bandits
Problem: Implement Thompson sampling from scratch. Thompson sampling is a Bayesian approach to the exploration-exploitation tradeoff in multi-armed bandit problems. It is used in production at Google, Netflix, and Spotify for content recommendation and A/B testing.
import math
import random
# ---- CHALLENGE ----
class ThompsonSampling:
"""Thompson sampling for Bernoulli multi-armed bandits."""
def __init__(self, num_arms):
pass
def select_arm(self):
pass
def update(self, arm, reward):
pass
# ---- SOLUTION ----
def _sample_beta_ts(alpha, beta_param):
"""Sample from Beta distribution for Thompson Sampling."""
def _gamma_sample(shape):
if shape < 1:
return _gamma_sample(shape + 1) * (random.random() ** (1.0 / shape))
d = shape - 1.0 / 3.0
c = 1.0 / math.sqrt(9.0 * d)
while True:
while True:
u1 = random.random()
u2 = random.random()
while u1 == 0:
u1 = random.random()
x = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
v = (1 + c * x) ** 3
if v > 0:
break
u = random.random()
if u < 1 - 0.0331 * x ** 4 or math.log(u) < 0.5 * x ** 2 + d * (1 - v + math.log(v)):
return d * v
x = _gamma_sample(alpha)
y = _gamma_sample(beta_param)
return x / (x + y)
class ThompsonSampling:
"""Thompson Sampling for Bernoulli multi-armed bandits.
Algorithm:
1. Maintain Beta(alpha_i, beta_i) posterior for each arm i
2. At each step:
a. Sample theta_i ~ Beta(alpha_i, beta_i) for each arm
b. Select the arm with the highest sampled theta
3. After observing reward:
a. If reward=1: alpha_i += 1 (success)
b. If reward=0: beta_i += 1 (failure)
Why Thompson Sampling works:
- Naturally balances exploration (uncertain arms get wide samples)
and exploitation (good arms have high posterior means)
- Arms with high uncertainty get explored because their samples
occasionally come out high
- As evidence accumulates, the posterior concentrates and the
algorithm exploits the best arm
Thompson Sampling is provably optimal (in a Bayesian sense) and
often outperforms UCB and epsilon-greedy in practice.
"""
def __init__(self, num_arms):
self.num_arms = num_arms
# Uniform prior Beta(1,1) for each arm
self.alpha = [1.0] * num_arms
self.beta = [1.0] * num_arms
self.counts = [0] * num_arms
self.rewards = [0] * num_arms
def select_arm(self):
"""Sample from each arm's posterior and select the highest."""
samples = [_sample_beta_ts(self.alpha[i], self.beta[i]) for i in range(self.num_arms)]
return samples.index(max(samples))
def update(self, arm, reward):
"""Update posterior after observing reward (0 or 1)."""
self.counts[arm] += 1
self.rewards[arm] += reward
if reward == 1:
self.alpha[arm] += 1
else:
self.beta[arm] += 1
def get_stats(self):
"""Return current statistics for all arms."""
stats = []
for i in range(self.num_arms):
stats.append({
"arm": i,
"pulls": self.counts[i],
"wins": self.rewards[i],
"posterior_mean": self.alpha[i] / (self.alpha[i] + self.beta[i]),
"alpha": self.alpha[i],
"beta": self.beta[i]
})
return stats
# ---- VERIFICATION ----
random.seed(42)
# 3-armed bandit with true probabilities [0.1, 0.5, 0.3]
true_probs = [0.1, 0.5, 0.3]
ts = ThompsonSampling(num_arms=3)
total_reward = 0
for _ in range(2000):
arm = ts.select_arm()
reward = 1 if random.random() < true_probs[arm] else 0
ts.update(arm, reward)
total_reward += reward
stats = ts.get_stats()
# Best arm (index 1, prob=0.5) should be pulled most
assert stats[1]["pulls"] > stats[0]["pulls"], "Arm 1 should be pulled more than arm 0"
assert stats[1]["pulls"] > stats[2]["pulls"], "Arm 1 should be pulled more than arm 2"
# Posterior mean of best arm should be close to true probability
assert abs(stats[1]["posterior_mean"] - 0.5) < 0.1
# Total reward should be close to optimal (0.5 * 2000 = 1000)
# Thompson sampling should achieve at least 70% of optimal
assert total_reward > 700, f"Reward {total_reward} too low"
print(f"Total reward: {total_reward} / 2000 (optimal ~1000)")
print("All tests passed!")
Key Takeaways
- Bayes' theorem is the foundation: posterior is proportional to prior times likelihood
- Beta-Binomial is the most important conjugate model — know the update rules by heart
- Naive Bayes works surprisingly well in practice despite the independence assumption
- Bayesian A/B testing gives direct probability statements ("B is better with 95% probability")
- Thompson sampling is provably optimal and used in production at Google, Netflix, and Spotify
- Always use log-probabilities when multiplying many small numbers (Naive Bayes, sequential updates)
Lilly Tech Systems