Advanced

Scaling CV Infrastructure

Design infrastructure that handles thousands of concurrent video streams and millions of images per day. Learn GPU cluster management with Kubernetes, efficient image and video storage architectures, inference result caching, distributed processing with Ray, and the math behind capacity planning for large-scale CV systems.

GPU Cluster Architecture

At scale, CV inference runs on GPU clusters managed by Kubernetes. The key challenge is GPU utilization — GPUs are expensive, and idle GPUs waste money.

# Kubernetes GPU deployment for CV inference
# Each pod runs one model instance on one GPU

# gpu-inference-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: cv-inference
  labels:
    app: cv-inference
spec:
  replicas: 8                    # 8 GPU pods
  selector:
    matchLabels:
      app: cv-inference
  template:
    metadata:
      labels:
        app: cv-inference
    spec:
      containers:
      - name: inference
        image: company/cv-inference:v2.3
        resources:
          limits:
            nvidia.com/gpu: 1    # 1 GPU per pod
            memory: "16Gi"
            cpu: "4"
          requests:
            nvidia.com/gpu: 1
            memory: "12Gi"
            cpu: "2"
        ports:
        - containerPort: 8000
        env:
        - name: MODEL_PATH
          value: "/models/yolov8s_fp16.engine"
        - name: MAX_BATCH_SIZE
          value: "32"
        - name: MAX_WAIT_MS
          value: "50"
        volumeMounts:
        - name: model-volume
          mountPath: /models
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          periodSeconds: 10
      volumes:
      - name: model-volume
        persistentVolumeClaim:
          claimName: model-pvc
      nodeSelector:
        nvidia.com/gpu.product: "Tesla-T4"    # Target specific GPU type
      tolerations:
      - key: nvidia.com/gpu
        operator: Exists
        effect: NoSchedule

---
# Horizontal Pod Autoscaler based on GPU utilization
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: cv-inference-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: cv-inference
  minReplicas: 4
  maxReplicas: 32
  metrics:
  - type: Pods
    pods:
      metric:
        name: gpu_utilization       # Custom metric from DCGM exporter
      target:
        type: AverageValue
        averageValue: "70"          # Scale up when GPU util > 70%
  - type: Pods
    pods:
      metric:
        name: inference_queue_depth
      target:
        type: AverageValue
        averageValue: "50"          # Scale up when queue > 50 items
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Pods
        value: 4
        periodSeconds: 60          # Add up to 4 pods per minute
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Pods
        value: 2
        periodSeconds: 120         # Remove at most 2 pods per 2 minutes

Capacity Planning

Before deploying, calculate exactly how many GPUs you need. Here is the math.

# CV Infrastructure Capacity Planning Calculator

def calculate_gpu_requirements(
    num_cameras: int,
    fps_per_camera: int,
    process_every_nth_frame: int,
    model_fps_per_gpu: float,       # From your benchmarks (TensorRT optimized)
    gpu_utilization_target: float = 0.7,  # Don't plan for 100% - leave headroom
    redundancy_factor: float = 1.5,       # N+1 redundancy
) -> dict:
    """
    Calculate GPU requirements for a multi-camera CV system.

    Example: 500 cameras, 15fps each, process every 3rd frame, YOLOv8s TensorRT FP16
    """
    # Total frames per second across all cameras
    total_fps = num_cameras * fps_per_camera

    # Actual frames to process (after frame skipping)
    process_fps = total_fps / process_every_nth_frame

    # GPUs needed at target utilization
    effective_fps_per_gpu = model_fps_per_gpu * gpu_utilization_target
    gpus_needed = process_fps / effective_fps_per_gpu

    # Add redundancy
    gpus_with_redundancy = gpus_needed * redundancy_factor

    import math
    result = {
        "total_camera_fps": total_fps,
        "frames_to_process_fps": process_fps,
        "gpus_needed_minimum": math.ceil(gpus_needed),
        "gpus_with_redundancy": math.ceil(gpus_with_redundancy),
        "cost_per_month_t4_spot": math.ceil(gpus_with_redundancy) * 180,  # ~$180/month T4 spot
        "cost_per_month_t4_ondemand": math.ceil(gpus_with_redundancy) * 450,
    }
    return result

# Example calculations
scenarios = [
    {"name": "Small (50 cameras)", "cameras": 50, "fps": 15, "skip": 3, "model_fps": 540},
    {"name": "Medium (500 cameras)", "cameras": 500, "fps": 15, "skip": 3, "model_fps": 540},
    {"name": "Large (5000 cameras)", "cameras": 5000, "fps": 15, "skip": 5, "model_fps": 540},
    {"name": "Massive (10000 cameras)", "cameras": 10000, "fps": 15, "skip": 5, "model_fps": 540},
]

for s in scenarios:
    result = calculate_gpu_requirements(s["cameras"], s["fps"], s["skip"], s["model_fps"])
    print(f"\n{s['name']}:")
    print(f"  Process FPS: {result['frames_to_process_fps']:.0f}")
    print(f"  GPUs needed: {result['gpus_with_redundancy']} T4s")
    print(f"  Cost (spot): ${result['cost_per_month_t4_spot']:,}/month")

# Output:
# Small (50 cameras):   Process FPS: 250    GPUs: 2 T4s    Cost: $360/month
# Medium (500 cameras): Process FPS: 2500   GPUs: 10 T4s   Cost: $1,800/month
# Large (5000 cameras): Process FPS: 15000  GPUs: 60 T4s   Cost: $10,800/month
# Massive (10K cameras):Process FPS: 30000  GPUs: 120 T4s  Cost: $21,600/month
💡
Cost optimization lever: The single biggest cost reduction comes from frame skipping. Processing every 5th frame instead of every frame reduces GPU cost by 5x with minimal impact on detection quality for most surveillance and monitoring use cases. Combined with adaptive frame selection (process more frames when objects are present), you can often reduce cost by 10-20x compared to processing every frame.

Image and Video Storage Architecture

# Storage architecture for CV systems
# Challenge: 1000 cameras × 15fps × 500KB/frame = 7.5 GB/second of raw data

class CVStorageManager:
    """
    Tiered storage for CV images and video with lifecycle management.

    Tier 1: Hot (SSD/NVMe)    - Last 24 hours, fast random access
    Tier 2: Warm (S3 Standard) - Last 30 days, for retraining and review
    Tier 3: Cold (S3 Glacier)  - 30+ days, compliance and audit
    """

    def __init__(self, s3_client, hot_storage_path="/data/hot"):
        self.s3 = s3_client
        self.hot_path = hot_storage_path

    def store_frame(self, camera_id: str, timestamp: float, frame: np.ndarray,
                    detections: list[dict], store_raw: bool = False):
        """
        Store a processed frame with intelligent retention.

        - Frames WITH detections: store image + metadata (always)
        - Frames WITHOUT detections: store metadata only (save 99% storage)
        - Raw video: store only if compliance requires it
        """
        import cv2
        metadata = {
            "camera_id": camera_id,
            "timestamp": timestamp,
            "detection_count": len(detections),
            "detections": detections,
        }

        if len(detections) > 0:
            # Store image for frames with detections (for review and retraining)
            # Use WebP for 30% smaller files than JPEG at same quality
            _, buffer = cv2.imencode(".webp", frame, [cv2.IMWRITE_WEBP_QUALITY, 85])
            image_key = f"frames/{camera_id}/{self._ts_to_path(timestamp)}.webp"

            self.s3.put_object(
                Bucket="cv-data",
                Key=image_key,
                Body=buffer.tobytes(),
                ContentType="image/webp",
                Metadata={"detections": json.dumps(detections)},
            )
            metadata["image_key"] = image_key

        # Always store metadata (tiny, useful for analytics)
        meta_key = f"metadata/{camera_id}/{self._ts_to_path(timestamp)}.json"
        self.s3.put_object(
            Bucket="cv-data",
            Key=meta_key,
            Body=json.dumps(metadata),
            ContentType="application/json",
        )

    def _ts_to_path(self, timestamp: float) -> str:
        """Convert timestamp to partitioned path: YYYY/MM/DD/HH/timestamp"""
        from datetime import datetime
        dt = datetime.fromtimestamp(timestamp)
        return f"{dt.year}/{dt.month:02d}/{dt.day:02d}/{dt.hour:02d}/{int(timestamp * 1000)}"


# S3 lifecycle policy for automatic tiering
# Apply via AWS CLI or Terraform:
# {
#   "Rules": [
#     {
#       "ID": "MoveToIA30Days",
#       "Filter": {"Prefix": "frames/"},
#       "Status": "Enabled",
#       "Transitions": [
#         {"Days": 30, "StorageClass": "STANDARD_IA"},
#         {"Days": 90, "StorageClass": "GLACIER"},
#         {"Days": 365, "StorageClass": "DEEP_ARCHIVE"}
#       ]
#     },
#     {
#       "ID": "DeleteMetadata90Days",
#       "Filter": {"Prefix": "metadata/"},
#       "Status": "Enabled",
#       "Expiration": {"Days": 90}
#     }
#   ]
# }

Distributed Processing with Ray

import ray
from ray import serve

# Initialize Ray cluster
ray.init(address="auto")  # Connect to existing cluster

@serve.deployment(
    num_replicas=8,              # 8 inference workers
    ray_actor_options={
        "num_gpus": 1,           # 1 GPU per worker
        "num_cpus": 4,
    },
    max_ongoing_requests=100,    # Queue depth per replica
    autoscaling_config={
        "min_replicas": 4,
        "max_replicas": 32,
        "target_ongoing_requests": 50,  # Scale up at 50 queued requests
    },
)
class CVInferenceDeployment:
    """
    Ray Serve deployment for distributed CV inference.
    Automatically scales GPU workers based on request queue depth.
    """

    def __init__(self):
        import torch
        self.device = torch.device("cuda")
        self.model = self._load_model()
        self.preprocessor = ImagePreprocessor(PreprocessConfig())
        self.postprocessor = PostProcessor(PostprocessConfig())

    def _load_model(self):
        from ultralytics import YOLO
        return YOLO("yolov8s.engine")  # TensorRT engine

    async def __call__(self, request):
        data = await request.json()
        image_bytes = base64.b64decode(data["image"])
        nparr = np.frombuffer(image_bytes, np.uint8)
        image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

        tensor, metadata = self.preprocessor.preprocess(image)
        results = self.model.predict(tensor.unsqueeze(0).to(self.device))
        detections = self.postprocessor.process(
            self._parse_results(results[0]), metadata
        )

        return {"detections": detections, "count": len(detections)}


# --- Batch processing with Ray for large-scale image analysis ---
@ray.remote(num_gpus=1)
class BatchProcessor:
    """Process millions of images using Ray distributed compute"""

    def __init__(self, model_path: str):
        self.model = YOLO(model_path)

    def process_batch(self, image_paths: list[str]) -> list[dict]:
        results = []
        for path in image_paths:
            img = cv2.imread(path)
            preds = self.model.predict(img, verbose=False)
            results.append({
                "path": path,
                "detections": len(preds[0].boxes),
            })
        return results

# Distribute 1M images across GPU workers
def process_million_images(image_paths: list[str], num_gpus: int = 8):
    """Process 1M images distributed across GPU workers"""
    processors = [BatchProcessor.remote("yolov8s.engine") for _ in range(num_gpus)]

    # Split images across workers
    batch_size = 100  # Images per task
    futures = []
    for i in range(0, len(image_paths), batch_size):
        batch = image_paths[i:i + batch_size]
        worker = processors[i // batch_size % num_gpus]
        futures.append(worker.process_batch.remote(batch))

    # Collect results
    results = ray.get(futures)
    return [item for batch in results for item in batch]

Inference Result Caching

import hashlib
import redis
import json

class InferenceCache:
    """
    Cache inference results to avoid re-processing identical or similar images.

    Strategies:
    1. Exact match: Hash the image bytes, cache result by hash
    2. Perceptual hash: Similar images hit the same cache entry
    3. Embedding cache: Find nearest cached result by visual similarity
    """

    def __init__(self, redis_url: str = "redis://localhost:6379",
                 ttl_seconds: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl_seconds

    def get_or_compute(self, image: np.ndarray, inference_fn) -> dict:
        """Check cache first, compute only on cache miss"""
        # Compute image hash
        image_hash = self._compute_hash(image)
        cache_key = f"cv:inference:{image_hash}"

        # Check cache
        cached = self.redis.get(cache_key)
        if cached:
            result = json.loads(cached)
            result["cache_hit"] = True
            return result

        # Cache miss - run inference
        result = inference_fn(image)
        result["cache_hit"] = False

        # Store in cache
        self.redis.setex(cache_key, self.ttl, json.dumps(result))
        return result

    def _compute_hash(self, image: np.ndarray) -> str:
        """Compute perceptual hash (robust to minor changes like JPEG artifacts)"""
        # Resize to 8x8 and compute average
        small = cv2.resize(image, (8, 8), interpolation=cv2.INTER_AREA)
        gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
        avg = gray.mean()
        # Binary hash: 1 if pixel > average, 0 otherwise
        bits = (gray > avg).flatten()
        hash_val = "".join(str(int(b)) for b in bits)
        return hashlib.md5(hash_val.encode()).hexdigest()

    def get_stats(self) -> dict:
        """Cache hit rate statistics"""
        hits = int(self.redis.get("cv:cache:hits") or 0)
        misses = int(self.redis.get("cv:cache:misses") or 0)
        total = hits + misses
        return {
            "hits": hits,
            "misses": misses,
            "hit_rate": hits / total if total > 0 else 0,
            "cache_size": self.redis.dbsize(),
        }
💡
Caching works best for batch and API workloads where the same or similar images are processed repeatedly (e.g., product catalog scanning, document OCR). For live video streams, caching has limited value since every frame is different. However, you can still cache common sub-problems like "is this a known face" or "has this product been classified before" using embedding similarity.

What Is Next

The final lesson covers Best Practices and Checklist. You will get a production-ready checklist for CV systems, learn annotation pipeline design for continuous model improvement, define model retraining triggers, and find answers to frequently asked questions about building vision systems at scale.