Model Performance Monitoring Intermediate

Data drift tells you the inputs changed. Performance monitoring tells you if those changes actually hurt your model. This lesson covers how to track model accuracy in production — including the hard problem of delayed ground truth — with production-ready code for metrics tracking, degradation detection, and A/B test monitoring.

The Ground Truth Delay Problem

The hardest part of ML monitoring: you often can't compute accuracy in real time because ground truth arrives late.

Use CaseGround Truth SourceDelayStrategy
Fraud detectionChargeback reports30-90 daysUse proxy metrics (block rate, manual review rate)
Search rankingClick/engagement dataMinutes-hoursOnline metrics (CTR, dwell time)
Content moderationHuman review queueHours-daysSample-based evaluation
Medical diagnosisFollow-up outcomesWeeks-monthsPhysician override rates, proxy tests
RecommendationPurchase/conversionMinutes-daysEngagement proxies + delayed conversion

Production Metrics Tracker

# Production model performance tracker
import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import json

class ModelPerformanceTracker:
    """Track model performance metrics in production.
    Handles delayed ground truth, windowed metrics, and alerts.

    Usage:
        tracker = ModelPerformanceTracker("fraud-detector-v2")

        # Log predictions as they happen
        tracker.log_prediction("req-123", prediction=1, score=0.92,
                               features={"amount": 500})

        # Log ground truth when it arrives (possibly days later)
        tracker.log_ground_truth("req-123", actual=1)

        # Compute metrics
        metrics = tracker.compute_metrics(window_hours=24)
    """

    def __init__(self, model_name: str, model_version: str = "latest"):
        self.model_name = model_name
        self.model_version = model_version
        self.predictions: Dict[str, dict] = {}  # request_id -> prediction
        self.ground_truths: Dict[str, dict] = {}  # request_id -> ground truth
        self.metrics_history: List[dict] = []

    def log_prediction(self, request_id: str, prediction: int,
                        score: float, features: dict = None,
                        timestamp: datetime = None):
        """Log a model prediction. Call this at inference time."""
        self.predictions[request_id] = {
            "prediction": prediction,
            "score": score,
            "features": features or {},
            "timestamp": (timestamp or datetime.utcnow()).isoformat(),
            "model_version": self.model_version
        }

    def log_ground_truth(self, request_id: str, actual: int,
                          timestamp: datetime = None):
        """Log ground truth when it becomes available."""
        self.ground_truths[request_id] = {
            "actual": actual,
            "timestamp": (timestamp or datetime.utcnow()).isoformat()
        }

    def compute_metrics(self, window_hours: int = 24) -> dict:
        """Compute classification metrics for predictions that
        have ground truth within the time window."""
        cutoff = datetime.utcnow() - timedelta(hours=window_hours)

        # Match predictions with ground truth
        y_true, y_pred, y_scores = [], [], []
        matched_count = 0

        for req_id, pred in self.predictions.items():
            pred_time = datetime.fromisoformat(pred["timestamp"])
            if pred_time < cutoff:
                continue

            if req_id in self.ground_truths:
                matched_count += 1
                y_true.append(self.ground_truths[req_id]["actual"])
                y_pred.append(pred["prediction"])
                y_scores.append(pred["score"])

        if len(y_true) == 0:
            return {
                "status": "no_matched_data",
                "total_predictions": len(self.predictions),
                "ground_truth_coverage": 0.0
            }

        y_true = np.array(y_true)
        y_pred = np.array(y_pred)
        y_scores = np.array(y_scores)

        # Compute metrics
        tp = np.sum((y_pred == 1) & (y_true == 1))
        fp = np.sum((y_pred == 1) & (y_true == 0))
        fn = np.sum((y_pred == 0) & (y_true == 1))
        tn = np.sum((y_pred == 0) & (y_true == 0))

        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = (2 * precision * recall / (precision + recall)
               if (precision + recall) > 0 else 0)
        accuracy = (tp + tn) / len(y_true)

        metrics = {
            "model": self.model_name,
            "version": self.model_version,
            "window_hours": window_hours,
            "timestamp": datetime.utcnow().isoformat(),
            "sample_size": len(y_true),
            "ground_truth_coverage": round(matched_count / max(1, len(self.predictions)), 4),
            "accuracy": round(accuracy, 4),
            "precision": round(precision, 4),
            "recall": round(recall, 4),
            "f1_score": round(f1, 4),
            "confusion_matrix": {
                "tp": int(tp), "fp": int(fp),
                "fn": int(fn), "tn": int(tn)
            },
            "prediction_positive_rate": round(np.mean(y_pred), 4),
            "actual_positive_rate": round(np.mean(y_true), 4),
            "mean_score": round(np.mean(y_scores), 4),
            "score_std": round(np.std(y_scores), 4)
        }

        self.metrics_history.append(metrics)
        return metrics


# Example usage
tracker = ModelPerformanceTracker("fraud-detector", "v2.3")

# Simulate logging predictions
for i in range(100):
    is_fraud = np.random.random() < 0.1  # 10% fraud rate
    score = np.random.beta(8, 2) if is_fraud else np.random.beta(2, 8)
    prediction = 1 if score > 0.5 else 0
    tracker.log_prediction(f"req-{i}", prediction, round(score, 3))
    tracker.log_ground_truth(f"req-{i}", int(is_fraud))

metrics = tracker.compute_metrics(window_hours=24)
print(json.dumps(metrics, indent=2))

Performance Degradation Detection

# Detect model performance degradation over time
import numpy as np
from typing import List

class DegradationDetector:
    """Detect gradual or sudden model performance drops.

    Two detection modes:
    1. Threshold-based: alert when metric drops below absolute threshold
    2. Trend-based: alert when metric shows sustained downward trend
    """

    def __init__(self, baseline_metrics: dict):
        """Initialize with baseline metrics from validation/shadow period."""
        self.baseline = baseline_metrics
        self.history: List[dict] = []

    def check_absolute_degradation(self, current_metrics: dict,
                                     thresholds: dict = None) -> dict:
        """Check if current metrics dropped below thresholds."""
        if thresholds is None:
            # Default: alert if metric drops more than 5% from baseline
            thresholds = {
                k: v * 0.95 for k, v in self.baseline.items()
                if isinstance(v, (int, float))
            }

        alerts = {}
        for metric, threshold in thresholds.items():
            current_val = current_metrics.get(metric)
            if current_val is not None and current_val < threshold:
                baseline_val = self.baseline.get(metric, threshold)
                drop_pct = ((baseline_val - current_val) / baseline_val * 100
                            if baseline_val > 0 else 0)
                alerts[metric] = {
                    "current": round(current_val, 4),
                    "threshold": round(threshold, 4),
                    "baseline": round(baseline_val, 4),
                    "drop_percent": round(drop_pct, 2)
                }

        return {
            "degradation_detected": len(alerts) > 0,
            "alerts": alerts,
            "severity": "critical" if len(alerts) >= 2 else
                        "warning" if len(alerts) == 1 else "ok"
        }

    def check_trend_degradation(self, metric_name: str,
                                 recent_values: List[float],
                                 window: int = 7,
                                 min_slope: float = -0.01) -> dict:
        """Detect sustained downward trend using linear regression."""
        if len(recent_values) < window:
            return {"status": "insufficient_data"}

        values = np.array(recent_values[-window:])
        x = np.arange(len(values))

        # Simple linear regression
        slope = np.polyfit(x, values, 1)[0]

        return {
            "metric": metric_name,
            "trend": "declining" if slope < min_slope else
                     "improving" if slope > abs(min_slope) else "stable",
            "slope_per_period": round(slope, 6),
            "latest_value": round(values[-1], 4),
            "period_start_value": round(values[0], 4),
            "alert": slope < min_slope,
            "projected_next": round(values[-1] + slope, 4)
        }


# Example: detect degradation
baseline = {"accuracy": 0.95, "precision": 0.90, "recall": 0.88, "f1_score": 0.89}
detector = DegradationDetector(baseline)

# Current metrics show a drop
current = {"accuracy": 0.91, "precision": 0.85, "recall": 0.82, "f1_score": 0.83}
result = detector.check_absolute_degradation(current)
print(f"Degradation: {result['degradation_detected']}, Severity: {result['severity']}")

A/B Test Monitoring for Models

# A/B test monitoring for model comparisons
from scipy import stats
import numpy as np

class ModelABTestMonitor:
    """Monitor A/B tests between model versions.

    Tracks statistical significance, guardrail metrics,
    and provides automated recommendations.
    """

    def __init__(self, control_name: str, treatment_name: str,
                 min_sample_size: int = 1000,
                 significance_level: float = 0.05):
        self.control_name = control_name
        self.treatment_name = treatment_name
        self.min_sample_size = min_sample_size
        self.significance_level = significance_level

    def evaluate(self, control_metric: list,
                 treatment_metric: list,
                 metric_name: str = "conversion_rate",
                 higher_is_better: bool = True) -> dict:
        """Evaluate A/B test results with statistical rigor."""
        control = np.array(control_metric)
        treatment = np.array(treatment_metric)

        # Sample size check
        if len(control) < self.min_sample_size or \
           len(treatment) < self.min_sample_size:
            return {
                "status": "insufficient_sample",
                "control_n": len(control),
                "treatment_n": len(treatment),
                "required_n": self.min_sample_size,
                "recommendation": "Continue collecting data"
            }

        # Statistical test
        t_stat, p_value = stats.ttest_ind(control, treatment)

        control_mean = np.mean(control)
        treatment_mean = np.mean(treatment)
        lift = ((treatment_mean - control_mean) / control_mean * 100
                if control_mean != 0 else 0)

        is_significant = p_value < self.significance_level
        is_positive = (treatment_mean > control_mean) == higher_is_better

        # Determine recommendation
        if is_significant and is_positive:
            recommendation = f"Ship {self.treatment_name} - statistically significant improvement"
        elif is_significant and not is_positive:
            recommendation = f"Revert to {self.control_name} - treatment is significantly worse"
        else:
            recommendation = "Continue test - results not yet significant"

        return {
            "metric": metric_name,
            "control": {
                "name": self.control_name,
                "mean": round(control_mean, 6),
                "std": round(np.std(control), 6),
                "n": len(control)
            },
            "treatment": {
                "name": self.treatment_name,
                "mean": round(treatment_mean, 6),
                "std": round(np.std(treatment), 6),
                "n": len(treatment)
            },
            "lift_percent": round(lift, 3),
            "p_value": round(p_value, 6),
            "is_significant": is_significant,
            "recommendation": recommendation
        }

    def check_guardrails(self, control_latencies: list,
                          treatment_latencies: list,
                          max_p99_increase_ms: float = 50) -> dict:
        """Check guardrail metrics to ensure treatment doesn't
        degrade critical non-target metrics."""
        control_p99 = np.percentile(control_latencies, 99)
        treatment_p99 = np.percentile(treatment_latencies, 99)
        p99_increase = treatment_p99 - control_p99

        return {
            "guardrail": "latency_p99",
            "control_p99_ms": round(control_p99, 1),
            "treatment_p99_ms": round(treatment_p99, 1),
            "increase_ms": round(p99_increase, 1),
            "threshold_ms": max_p99_increase_ms,
            "passed": p99_increase <= max_p99_increase_ms,
            "action": "OK" if p99_increase <= max_p99_increase_ms
                      else "BLOCK - latency guardrail violated"
        }


# Example: compare model v2.3 vs v2.4
monitor = ModelABTestMonitor("fraud-v2.3", "fraud-v2.4")

# Simulated conversion data
np.random.seed(42)
control_conversions = np.random.binomial(1, 0.12, 5000)
treatment_conversions = np.random.binomial(1, 0.135, 5000)

result = monitor.evaluate(
    control_conversions.tolist(),
    treatment_conversions.tolist(),
    metric_name="fraud_catch_rate"
)
print(f"Lift: {result['lift_percent']}%, p-value: {result['p_value']}")
print(f"Recommendation: {result['recommendation']}")

Proxy Metrics: When Ground Truth is Delayed

Use proxy metrics to bridge the ground truth gap:
  • Fraud detection: Manual review rate, rule trigger rate, user dispute rate
  • Recommendation: Click-through rate, add-to-cart rate, time-on-page
  • Content moderation: User appeal rate, re-report rate after action
  • Search ranking: Mean reciprocal rank, abandonment rate, reformulation rate
Proxy metrics aren't perfect, but they give you signal within minutes rather than waiting days or weeks for ground truth.
Watch Out: Goodhart's Law
"When a measure becomes a target, it ceases to be a good measure." If you optimize purely for proxy metrics, you risk gaming them. Always validate proxy metrics against ground truth periodically to ensure the correlation still holds.