Intermediate

Analytics

Count objects, measure dwell time, generate heatmaps, define detection zones, and export analytics.

Analytics Engine

# src/analytics.py
import cv2, numpy as np, json, time
from collections import defaultdict

class AnalyticsEngine:
    def __init__(self, frame_shape):
        self.height, self.width = frame_shape
        self.heatmap = np.zeros((self.height, self.width), dtype=np.float32)
        self.unique_tracks = set()
        self.class_counts = defaultdict(set)
        self.frame_counts = []
        self.first_seen = {}
        self.last_seen = {}
        self.dwell = {}
        self.zones = []

    def update(self, tracked):
        now = time.time()
        self.frame_counts.append(len(tracked))
        for t in tracked:
            tid = t["track_id"]
            self.unique_tracks.add(tid)
            self.class_counts[t["class_name"]].add(tid)
            if tid not in self.first_seen:
                self.first_seen[tid] = now
            self.last_seen[tid] = now
            self.dwell[tid] = now - self.first_seen[tid]
            cx, cy = t["center"]
            if 0 <= cx < self.width and 0 <= cy < self.height:
                cv2.circle(self.heatmap, (cx, cy), 20, 1, -1)
            for z in self.zones:
                z.check(tid, cx, cy)

    def add_zone(self, name, points):
        self.zones.append(DetectionZone(name, points))

    def get_heatmap_overlay(self, frame, alpha=0.5):
        norm = cv2.normalize(self.heatmap, None, 0, 255, cv2.NORM_MINMAX)
        colored = cv2.applyColorMap(norm.astype(np.uint8), cv2.COLORMAP_JET)
        return cv2.addWeighted(frame, 1-alpha, colored, alpha, 0)

    def get_summary(self):
        avg_c = float(np.mean(self.frame_counts)) if self.frame_counts else 0
        avg_d = float(np.mean(list(self.dwell.values()))) if self.dwell else 0
        return {
            "total_unique": len(self.unique_tracks),
            "class_counts": {k: len(v) for k,v in self.class_counts.items()},
            "avg_per_frame": round(avg_c, 1),
            "avg_dwell_s": round(avg_d, 1),
            "max_dwell_s": round(max(self.dwell.values(), default=0), 1),
            "frames": len(self.frame_counts),
            "zones": [z.summary() for z in self.zones]
        }

    def export_json(self, path):
        with open(path, "w") as f:
            json.dump(self.get_summary(), f, indent=2)

    def draw_zones(self, frame):
        ann = frame.copy()
        for z in self.zones:
            pts = np.array(z.points, np.int32).reshape((-1,1,2))
            cv2.polylines(ann, [pts], True, (0,255,255), 2)
            cv2.putText(ann, f"{z.name}: {z.current}",
                (z.points[0][0], z.points[0][1]-10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
        return ann


class DetectionZone:
    def __init__(self, name, points):
        self.name = name
        self.points = points
        self.polygon = np.array(points, dtype=np.int32)
        self.inside = set()
        self.entered = 0
        self.exited = 0
        self.current = 0

    def check(self, tid, x, y):
        inside = cv2.pointPolygonTest(
            self.polygon, (float(x), float(y)), False) >= 0
        if inside and tid not in self.inside:
            self.inside.add(tid)
            self.entered += 1
        elif not inside and tid in self.inside:
            self.inside.discard(tid)
            self.exited += 1
        self.current = len(self.inside)

    def summary(self):
        return {"name":self.name, "entered":self.entered,
                "exited":self.exited, "inside":self.current}

if __name__ == "__main__":
    from detector import ObjectDetector
    from tracker import ObjectTracker
    detector = ObjectDetector(confidence=0.4, classes=[0])
    tracker = ObjectTracker()
    cap = cv2.VideoCapture(0)
    ret, frame = cap.read()
    analytics = AnalyticsEngine(frame.shape[:2])
    h, w = frame.shape[:2]
    analytics.add_zone("Center",
        [(w//4,h//4),(3*w//4,h//4),(3*w//4,3*h//4),(w//4,3*h//4)])
    while True:
        ret, frame = cap.read()
        if not ret: break
        dets = detector.detect(frame)
        tracked = tracker.update(dets, frame.shape[:2])
        analytics.update(tracked)
        ann = detector.annotate_frame(frame, dets)
        ann = analytics.draw_zones(ann)
        cv2.imshow("Analytics", ann)
        if cv2.waitKey(1) & 0xFF == ord("q"): break
    analytics.export_json("report.json")
    cap.release()
    cv2.destroyAllWindows()
💡
Zone tip: Define zones by clicking points on a reference frame. The Streamlit interface adds an interactive zone editor.