Model Performance Monitoring Intermediate
Data drift tells you the inputs changed. Performance monitoring tells you if those changes actually hurt your model. This lesson covers how to track model accuracy in production — including the hard problem of delayed ground truth — with production-ready code for metrics tracking, degradation detection, and A/B test monitoring.
The Ground Truth Delay Problem
The hardest part of ML monitoring: you often can't compute accuracy in real time because ground truth arrives late.
| Use Case | Ground Truth Source | Delay | Strategy |
|---|---|---|---|
| Fraud detection | Chargeback reports | 30-90 days | Use proxy metrics (block rate, manual review rate) |
| Search ranking | Click/engagement data | Minutes-hours | Online metrics (CTR, dwell time) |
| Content moderation | Human review queue | Hours-days | Sample-based evaluation |
| Medical diagnosis | Follow-up outcomes | Weeks-months | Physician override rates, proxy tests |
| Recommendation | Purchase/conversion | Minutes-days | Engagement proxies + delayed conversion |
Production Metrics Tracker
# Production model performance tracker
import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import json
class ModelPerformanceTracker:
"""Track model performance metrics in production.
Handles delayed ground truth, windowed metrics, and alerts.
Usage:
tracker = ModelPerformanceTracker("fraud-detector-v2")
# Log predictions as they happen
tracker.log_prediction("req-123", prediction=1, score=0.92,
features={"amount": 500})
# Log ground truth when it arrives (possibly days later)
tracker.log_ground_truth("req-123", actual=1)
# Compute metrics
metrics = tracker.compute_metrics(window_hours=24)
"""
def __init__(self, model_name: str, model_version: str = "latest"):
self.model_name = model_name
self.model_version = model_version
self.predictions: Dict[str, dict] = {} # request_id -> prediction
self.ground_truths: Dict[str, dict] = {} # request_id -> ground truth
self.metrics_history: List[dict] = []
def log_prediction(self, request_id: str, prediction: int,
score: float, features: dict = None,
timestamp: datetime = None):
"""Log a model prediction. Call this at inference time."""
self.predictions[request_id] = {
"prediction": prediction,
"score": score,
"features": features or {},
"timestamp": (timestamp or datetime.utcnow()).isoformat(),
"model_version": self.model_version
}
def log_ground_truth(self, request_id: str, actual: int,
timestamp: datetime = None):
"""Log ground truth when it becomes available."""
self.ground_truths[request_id] = {
"actual": actual,
"timestamp": (timestamp or datetime.utcnow()).isoformat()
}
def compute_metrics(self, window_hours: int = 24) -> dict:
"""Compute classification metrics for predictions that
have ground truth within the time window."""
cutoff = datetime.utcnow() - timedelta(hours=window_hours)
# Match predictions with ground truth
y_true, y_pred, y_scores = [], [], []
matched_count = 0
for req_id, pred in self.predictions.items():
pred_time = datetime.fromisoformat(pred["timestamp"])
if pred_time < cutoff:
continue
if req_id in self.ground_truths:
matched_count += 1
y_true.append(self.ground_truths[req_id]["actual"])
y_pred.append(pred["prediction"])
y_scores.append(pred["score"])
if len(y_true) == 0:
return {
"status": "no_matched_data",
"total_predictions": len(self.predictions),
"ground_truth_coverage": 0.0
}
y_true = np.array(y_true)
y_pred = np.array(y_pred)
y_scores = np.array(y_scores)
# Compute metrics
tp = np.sum((y_pred == 1) & (y_true == 1))
fp = np.sum((y_pred == 1) & (y_true == 0))
fn = np.sum((y_pred == 0) & (y_true == 1))
tn = np.sum((y_pred == 0) & (y_true == 0))
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = (2 * precision * recall / (precision + recall)
if (precision + recall) > 0 else 0)
accuracy = (tp + tn) / len(y_true)
metrics = {
"model": self.model_name,
"version": self.model_version,
"window_hours": window_hours,
"timestamp": datetime.utcnow().isoformat(),
"sample_size": len(y_true),
"ground_truth_coverage": round(matched_count / max(1, len(self.predictions)), 4),
"accuracy": round(accuracy, 4),
"precision": round(precision, 4),
"recall": round(recall, 4),
"f1_score": round(f1, 4),
"confusion_matrix": {
"tp": int(tp), "fp": int(fp),
"fn": int(fn), "tn": int(tn)
},
"prediction_positive_rate": round(np.mean(y_pred), 4),
"actual_positive_rate": round(np.mean(y_true), 4),
"mean_score": round(np.mean(y_scores), 4),
"score_std": round(np.std(y_scores), 4)
}
self.metrics_history.append(metrics)
return metrics
# Example usage
tracker = ModelPerformanceTracker("fraud-detector", "v2.3")
# Simulate logging predictions
for i in range(100):
is_fraud = np.random.random() < 0.1 # 10% fraud rate
score = np.random.beta(8, 2) if is_fraud else np.random.beta(2, 8)
prediction = 1 if score > 0.5 else 0
tracker.log_prediction(f"req-{i}", prediction, round(score, 3))
tracker.log_ground_truth(f"req-{i}", int(is_fraud))
metrics = tracker.compute_metrics(window_hours=24)
print(json.dumps(metrics, indent=2))
Performance Degradation Detection
# Detect model performance degradation over time
import numpy as np
from typing import List
class DegradationDetector:
"""Detect gradual or sudden model performance drops.
Two detection modes:
1. Threshold-based: alert when metric drops below absolute threshold
2. Trend-based: alert when metric shows sustained downward trend
"""
def __init__(self, baseline_metrics: dict):
"""Initialize with baseline metrics from validation/shadow period."""
self.baseline = baseline_metrics
self.history: List[dict] = []
def check_absolute_degradation(self, current_metrics: dict,
thresholds: dict = None) -> dict:
"""Check if current metrics dropped below thresholds."""
if thresholds is None:
# Default: alert if metric drops more than 5% from baseline
thresholds = {
k: v * 0.95 for k, v in self.baseline.items()
if isinstance(v, (int, float))
}
alerts = {}
for metric, threshold in thresholds.items():
current_val = current_metrics.get(metric)
if current_val is not None and current_val < threshold:
baseline_val = self.baseline.get(metric, threshold)
drop_pct = ((baseline_val - current_val) / baseline_val * 100
if baseline_val > 0 else 0)
alerts[metric] = {
"current": round(current_val, 4),
"threshold": round(threshold, 4),
"baseline": round(baseline_val, 4),
"drop_percent": round(drop_pct, 2)
}
return {
"degradation_detected": len(alerts) > 0,
"alerts": alerts,
"severity": "critical" if len(alerts) >= 2 else
"warning" if len(alerts) == 1 else "ok"
}
def check_trend_degradation(self, metric_name: str,
recent_values: List[float],
window: int = 7,
min_slope: float = -0.01) -> dict:
"""Detect sustained downward trend using linear regression."""
if len(recent_values) < window:
return {"status": "insufficient_data"}
values = np.array(recent_values[-window:])
x = np.arange(len(values))
# Simple linear regression
slope = np.polyfit(x, values, 1)[0]
return {
"metric": metric_name,
"trend": "declining" if slope < min_slope else
"improving" if slope > abs(min_slope) else "stable",
"slope_per_period": round(slope, 6),
"latest_value": round(values[-1], 4),
"period_start_value": round(values[0], 4),
"alert": slope < min_slope,
"projected_next": round(values[-1] + slope, 4)
}
# Example: detect degradation
baseline = {"accuracy": 0.95, "precision": 0.90, "recall": 0.88, "f1_score": 0.89}
detector = DegradationDetector(baseline)
# Current metrics show a drop
current = {"accuracy": 0.91, "precision": 0.85, "recall": 0.82, "f1_score": 0.83}
result = detector.check_absolute_degradation(current)
print(f"Degradation: {result['degradation_detected']}, Severity: {result['severity']}")
A/B Test Monitoring for Models
# A/B test monitoring for model comparisons
from scipy import stats
import numpy as np
class ModelABTestMonitor:
"""Monitor A/B tests between model versions.
Tracks statistical significance, guardrail metrics,
and provides automated recommendations.
"""
def __init__(self, control_name: str, treatment_name: str,
min_sample_size: int = 1000,
significance_level: float = 0.05):
self.control_name = control_name
self.treatment_name = treatment_name
self.min_sample_size = min_sample_size
self.significance_level = significance_level
def evaluate(self, control_metric: list,
treatment_metric: list,
metric_name: str = "conversion_rate",
higher_is_better: bool = True) -> dict:
"""Evaluate A/B test results with statistical rigor."""
control = np.array(control_metric)
treatment = np.array(treatment_metric)
# Sample size check
if len(control) < self.min_sample_size or \
len(treatment) < self.min_sample_size:
return {
"status": "insufficient_sample",
"control_n": len(control),
"treatment_n": len(treatment),
"required_n": self.min_sample_size,
"recommendation": "Continue collecting data"
}
# Statistical test
t_stat, p_value = stats.ttest_ind(control, treatment)
control_mean = np.mean(control)
treatment_mean = np.mean(treatment)
lift = ((treatment_mean - control_mean) / control_mean * 100
if control_mean != 0 else 0)
is_significant = p_value < self.significance_level
is_positive = (treatment_mean > control_mean) == higher_is_better
# Determine recommendation
if is_significant and is_positive:
recommendation = f"Ship {self.treatment_name} - statistically significant improvement"
elif is_significant and not is_positive:
recommendation = f"Revert to {self.control_name} - treatment is significantly worse"
else:
recommendation = "Continue test - results not yet significant"
return {
"metric": metric_name,
"control": {
"name": self.control_name,
"mean": round(control_mean, 6),
"std": round(np.std(control), 6),
"n": len(control)
},
"treatment": {
"name": self.treatment_name,
"mean": round(treatment_mean, 6),
"std": round(np.std(treatment), 6),
"n": len(treatment)
},
"lift_percent": round(lift, 3),
"p_value": round(p_value, 6),
"is_significant": is_significant,
"recommendation": recommendation
}
def check_guardrails(self, control_latencies: list,
treatment_latencies: list,
max_p99_increase_ms: float = 50) -> dict:
"""Check guardrail metrics to ensure treatment doesn't
degrade critical non-target metrics."""
control_p99 = np.percentile(control_latencies, 99)
treatment_p99 = np.percentile(treatment_latencies, 99)
p99_increase = treatment_p99 - control_p99
return {
"guardrail": "latency_p99",
"control_p99_ms": round(control_p99, 1),
"treatment_p99_ms": round(treatment_p99, 1),
"increase_ms": round(p99_increase, 1),
"threshold_ms": max_p99_increase_ms,
"passed": p99_increase <= max_p99_increase_ms,
"action": "OK" if p99_increase <= max_p99_increase_ms
else "BLOCK - latency guardrail violated"
}
# Example: compare model v2.3 vs v2.4
monitor = ModelABTestMonitor("fraud-v2.3", "fraud-v2.4")
# Simulated conversion data
np.random.seed(42)
control_conversions = np.random.binomial(1, 0.12, 5000)
treatment_conversions = np.random.binomial(1, 0.135, 5000)
result = monitor.evaluate(
control_conversions.tolist(),
treatment_conversions.tolist(),
metric_name="fraud_catch_rate"
)
print(f"Lift: {result['lift_percent']}%, p-value: {result['p_value']}")
print(f"Recommendation: {result['recommendation']}")
Proxy Metrics: When Ground Truth is Delayed
Use proxy metrics to bridge the ground truth gap:
- Fraud detection: Manual review rate, rule trigger rate, user dispute rate
- Recommendation: Click-through rate, add-to-cart rate, time-on-page
- Content moderation: User appeal rate, re-report rate after action
- Search ranking: Mean reciprocal rank, abandonment rate, reformulation rate
Watch Out: Goodhart's Law
"When a measure becomes a target, it ceases to be a good measure." If you optimize purely for proxy metrics, you risk gaming them. Always validate proxy metrics against ground truth periodically to ensure the correlation still holds.
"When a measure becomes a target, it ceases to be a good measure." If you optimize purely for proxy metrics, you risk gaming them. Always validate proxy metrics against ground truth periodically to ensure the correlation still holds.
Lilly Tech Systems