Advanced
Production Monitoring
Detect data drift, track model performance, and trigger automatic retraining.
Production Monitor
# src/monitor.py
import numpy as np
from scipy import stats
from datetime import datetime
from collections import deque
import json
class ProductionMonitor:
def __init__(self, reference_data, window_size=1000):
self.reference = reference_data
self.window = deque(maxlen=window_size)
self.predictions = deque(maxlen=window_size)
self.alerts = []
def log_prediction(self, features, prediction, actual=None):
self.window.append(features)
self.predictions.append({
"prediction": prediction, "actual": actual,
"timestamp": datetime.now().isoformat()})
def check_drift(self, feature_idx=None, threshold=0.05):
if len(self.window) < 100:
return {"drift": False, "reason": "insufficient data"}
current = np.array(list(self.window))
results = {}
indices = range(current.shape[1]) if feature_idx is None else [feature_idx]
for i in indices:
stat, p = stats.ks_2samp(self.reference[:, i], current[:, i])
results[f"feature_{i}"] = {
"ks_stat": round(float(stat), 4),
"p_value": round(float(p), 4),
"drifted": p < threshold
}
any_drift = any(r["drifted"] for r in results.values())
if any_drift:
self.alerts.append({
"type": "drift", "timestamp": datetime.now().isoformat(),
"features": [k for k,v in results.items() if v["drifted"]]})
return {"drift": any_drift, "features": results}
def check_performance(self, min_accuracy=0.80):
labeled = [p for p in self.predictions if p["actual"] is not None]
if len(labeled) < 50:
return {"degraded": False, "reason": "insufficient labels"}
correct = sum(1 for p in labeled if p["prediction"] == p["actual"])
accuracy = correct / len(labeled)
degraded = accuracy < min_accuracy
if degraded:
self.alerts.append({
"type": "performance", "accuracy": round(accuracy, 4),
"timestamp": datetime.now().isoformat()})
return {"degraded": degraded, "accuracy": round(accuracy, 4),
"samples": len(labeled)}
def should_retrain(self):
drift = self.check_drift()
perf = self.check_performance()
retrain = drift["drift"] or perf.get("degraded", False)
return {"retrain": retrain, "drift": drift, "performance": perf}
def get_report(self):
return {"alerts": self.alerts,
"window_size": len(self.window),
"predictions": len(self.predictions)}
if __name__ == "__main__":
ref = np.random.normal(0, 1, (1000, 4))
monitor = ProductionMonitor(ref)
# Simulate normal traffic
for _ in range(500):
monitor.log_prediction(np.random.normal(0, 1, 4), 1)
# Simulate drift
for _ in range(500):
monitor.log_prediction(np.random.normal(2, 1.5, 4), 0)
result = monitor.should_retrain()
print(json.dumps(result, indent=2))
Automated retraining: Connect the monitor to your CI/CD pipeline. When drift is detected, trigger a GitHub Actions workflow to retrain with the latest data.
Lilly Tech Systems