Intermediate
Analytics
Count objects, measure dwell time, generate heatmaps, define detection zones, and export analytics.
Analytics Engine
# src/analytics.py
import cv2, numpy as np, json, time
from collections import defaultdict
class AnalyticsEngine:
def __init__(self, frame_shape):
self.height, self.width = frame_shape
self.heatmap = np.zeros((self.height, self.width), dtype=np.float32)
self.unique_tracks = set()
self.class_counts = defaultdict(set)
self.frame_counts = []
self.first_seen = {}
self.last_seen = {}
self.dwell = {}
self.zones = []
def update(self, tracked):
now = time.time()
self.frame_counts.append(len(tracked))
for t in tracked:
tid = t["track_id"]
self.unique_tracks.add(tid)
self.class_counts[t["class_name"]].add(tid)
if tid not in self.first_seen:
self.first_seen[tid] = now
self.last_seen[tid] = now
self.dwell[tid] = now - self.first_seen[tid]
cx, cy = t["center"]
if 0 <= cx < self.width and 0 <= cy < self.height:
cv2.circle(self.heatmap, (cx, cy), 20, 1, -1)
for z in self.zones:
z.check(tid, cx, cy)
def add_zone(self, name, points):
self.zones.append(DetectionZone(name, points))
def get_heatmap_overlay(self, frame, alpha=0.5):
norm = cv2.normalize(self.heatmap, None, 0, 255, cv2.NORM_MINMAX)
colored = cv2.applyColorMap(norm.astype(np.uint8), cv2.COLORMAP_JET)
return cv2.addWeighted(frame, 1-alpha, colored, alpha, 0)
def get_summary(self):
avg_c = float(np.mean(self.frame_counts)) if self.frame_counts else 0
avg_d = float(np.mean(list(self.dwell.values()))) if self.dwell else 0
return {
"total_unique": len(self.unique_tracks),
"class_counts": {k: len(v) for k,v in self.class_counts.items()},
"avg_per_frame": round(avg_c, 1),
"avg_dwell_s": round(avg_d, 1),
"max_dwell_s": round(max(self.dwell.values(), default=0), 1),
"frames": len(self.frame_counts),
"zones": [z.summary() for z in self.zones]
}
def export_json(self, path):
with open(path, "w") as f:
json.dump(self.get_summary(), f, indent=2)
def draw_zones(self, frame):
ann = frame.copy()
for z in self.zones:
pts = np.array(z.points, np.int32).reshape((-1,1,2))
cv2.polylines(ann, [pts], True, (0,255,255), 2)
cv2.putText(ann, f"{z.name}: {z.current}",
(z.points[0][0], z.points[0][1]-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
return ann
class DetectionZone:
def __init__(self, name, points):
self.name = name
self.points = points
self.polygon = np.array(points, dtype=np.int32)
self.inside = set()
self.entered = 0
self.exited = 0
self.current = 0
def check(self, tid, x, y):
inside = cv2.pointPolygonTest(
self.polygon, (float(x), float(y)), False) >= 0
if inside and tid not in self.inside:
self.inside.add(tid)
self.entered += 1
elif not inside and tid in self.inside:
self.inside.discard(tid)
self.exited += 1
self.current = len(self.inside)
def summary(self):
return {"name":self.name, "entered":self.entered,
"exited":self.exited, "inside":self.current}
if __name__ == "__main__":
from detector import ObjectDetector
from tracker import ObjectTracker
detector = ObjectDetector(confidence=0.4, classes=[0])
tracker = ObjectTracker()
cap = cv2.VideoCapture(0)
ret, frame = cap.read()
analytics = AnalyticsEngine(frame.shape[:2])
h, w = frame.shape[:2]
analytics.add_zone("Center",
[(w//4,h//4),(3*w//4,h//4),(3*w//4,3*h//4),(w//4,3*h//4)])
while True:
ret, frame = cap.read()
if not ret: break
dets = detector.detect(frame)
tracked = tracker.update(dets, frame.shape[:2])
analytics.update(tracked)
ann = detector.annotate_frame(frame, dets)
ann = analytics.draw_zones(ann)
cv2.imshow("Analytics", ann)
if cv2.waitKey(1) & 0xFF == ord("q"): break
analytics.export_json("report.json")
cap.release()
cv2.destroyAllWindows()
Zone tip: Define zones by clicking points on a reference frame. The Streamlit interface adds an interactive zone editor.
Lilly Tech Systems