Advanced

Production Monitoring

Detect data drift, track model performance, and trigger automatic retraining.

Production Monitor

# src/monitor.py
import numpy as np
from scipy import stats
from datetime import datetime
from collections import deque
import json

class ProductionMonitor:
    def __init__(self, reference_data, window_size=1000):
        self.reference = reference_data
        self.window = deque(maxlen=window_size)
        self.predictions = deque(maxlen=window_size)
        self.alerts = []

    def log_prediction(self, features, prediction, actual=None):
        self.window.append(features)
        self.predictions.append({
            "prediction": prediction, "actual": actual,
            "timestamp": datetime.now().isoformat()})

    def check_drift(self, feature_idx=None, threshold=0.05):
        if len(self.window) < 100:
            return {"drift": False, "reason": "insufficient data"}

        current = np.array(list(self.window))
        results = {}

        indices = range(current.shape[1]) if feature_idx is None else [feature_idx]
        for i in indices:
            stat, p = stats.ks_2samp(self.reference[:, i], current[:, i])
            results[f"feature_{i}"] = {
                "ks_stat": round(float(stat), 4),
                "p_value": round(float(p), 4),
                "drifted": p < threshold
            }

        any_drift = any(r["drifted"] for r in results.values())
        if any_drift:
            self.alerts.append({
                "type": "drift", "timestamp": datetime.now().isoformat(),
                "features": [k for k,v in results.items() if v["drifted"]]})

        return {"drift": any_drift, "features": results}

    def check_performance(self, min_accuracy=0.80):
        labeled = [p for p in self.predictions if p["actual"] is not None]
        if len(labeled) < 50:
            return {"degraded": False, "reason": "insufficient labels"}

        correct = sum(1 for p in labeled if p["prediction"] == p["actual"])
        accuracy = correct / len(labeled)
        degraded = accuracy < min_accuracy

        if degraded:
            self.alerts.append({
                "type": "performance", "accuracy": round(accuracy, 4),
                "timestamp": datetime.now().isoformat()})

        return {"degraded": degraded, "accuracy": round(accuracy, 4),
                "samples": len(labeled)}

    def should_retrain(self):
        drift = self.check_drift()
        perf = self.check_performance()
        retrain = drift["drift"] or perf.get("degraded", False)
        return {"retrain": retrain, "drift": drift, "performance": perf}

    def get_report(self):
        return {"alerts": self.alerts,
                "window_size": len(self.window),
                "predictions": len(self.predictions)}

if __name__ == "__main__":
    ref = np.random.normal(0, 1, (1000, 4))
    monitor = ProductionMonitor(ref)
    # Simulate normal traffic
    for _ in range(500):
        monitor.log_prediction(np.random.normal(0, 1, 4), 1)
    # Simulate drift
    for _ in range(500):
        monitor.log_prediction(np.random.normal(2, 1.5, 4), 0)

    result = monitor.should_retrain()
    print(json.dumps(result, indent=2))
💡
Automated retraining: Connect the monitor to your CI/CD pipeline. When drift is detected, trigger a GitHub Actions workflow to retrain with the latest data.