Scaling CV Infrastructure
Design infrastructure that handles thousands of concurrent video streams and millions of images per day. Learn GPU cluster management with Kubernetes, efficient image and video storage architectures, inference result caching, distributed processing with Ray, and the math behind capacity planning for large-scale CV systems.
GPU Cluster Architecture
At scale, CV inference runs on GPU clusters managed by Kubernetes. The key challenge is GPU utilization — GPUs are expensive, and idle GPUs waste money.
# Kubernetes GPU deployment for CV inference
# Each pod runs one model instance on one GPU
# gpu-inference-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: cv-inference
labels:
app: cv-inference
spec:
replicas: 8 # 8 GPU pods
selector:
matchLabels:
app: cv-inference
template:
metadata:
labels:
app: cv-inference
spec:
containers:
- name: inference
image: company/cv-inference:v2.3
resources:
limits:
nvidia.com/gpu: 1 # 1 GPU per pod
memory: "16Gi"
cpu: "4"
requests:
nvidia.com/gpu: 1
memory: "12Gi"
cpu: "2"
ports:
- containerPort: 8000
env:
- name: MODEL_PATH
value: "/models/yolov8s_fp16.engine"
- name: MAX_BATCH_SIZE
value: "32"
- name: MAX_WAIT_MS
value: "50"
volumeMounts:
- name: model-volume
mountPath: /models
livenessProbe:
httpGet:
path: /health
port: 8000
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8000
periodSeconds: 10
volumes:
- name: model-volume
persistentVolumeClaim:
claimName: model-pvc
nodeSelector:
nvidia.com/gpu.product: "Tesla-T4" # Target specific GPU type
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
---
# Horizontal Pod Autoscaler based on GPU utilization
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: cv-inference-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: cv-inference
minReplicas: 4
maxReplicas: 32
metrics:
- type: Pods
pods:
metric:
name: gpu_utilization # Custom metric from DCGM exporter
target:
type: AverageValue
averageValue: "70" # Scale up when GPU util > 70%
- type: Pods
pods:
metric:
name: inference_queue_depth
target:
type: AverageValue
averageValue: "50" # Scale up when queue > 50 items
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Pods
value: 4
periodSeconds: 60 # Add up to 4 pods per minute
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Pods
value: 2
periodSeconds: 120 # Remove at most 2 pods per 2 minutes
Capacity Planning
Before deploying, calculate exactly how many GPUs you need. Here is the math.
# CV Infrastructure Capacity Planning Calculator
def calculate_gpu_requirements(
num_cameras: int,
fps_per_camera: int,
process_every_nth_frame: int,
model_fps_per_gpu: float, # From your benchmarks (TensorRT optimized)
gpu_utilization_target: float = 0.7, # Don't plan for 100% - leave headroom
redundancy_factor: float = 1.5, # N+1 redundancy
) -> dict:
"""
Calculate GPU requirements for a multi-camera CV system.
Example: 500 cameras, 15fps each, process every 3rd frame, YOLOv8s TensorRT FP16
"""
# Total frames per second across all cameras
total_fps = num_cameras * fps_per_camera
# Actual frames to process (after frame skipping)
process_fps = total_fps / process_every_nth_frame
# GPUs needed at target utilization
effective_fps_per_gpu = model_fps_per_gpu * gpu_utilization_target
gpus_needed = process_fps / effective_fps_per_gpu
# Add redundancy
gpus_with_redundancy = gpus_needed * redundancy_factor
import math
result = {
"total_camera_fps": total_fps,
"frames_to_process_fps": process_fps,
"gpus_needed_minimum": math.ceil(gpus_needed),
"gpus_with_redundancy": math.ceil(gpus_with_redundancy),
"cost_per_month_t4_spot": math.ceil(gpus_with_redundancy) * 180, # ~$180/month T4 spot
"cost_per_month_t4_ondemand": math.ceil(gpus_with_redundancy) * 450,
}
return result
# Example calculations
scenarios = [
{"name": "Small (50 cameras)", "cameras": 50, "fps": 15, "skip": 3, "model_fps": 540},
{"name": "Medium (500 cameras)", "cameras": 500, "fps": 15, "skip": 3, "model_fps": 540},
{"name": "Large (5000 cameras)", "cameras": 5000, "fps": 15, "skip": 5, "model_fps": 540},
{"name": "Massive (10000 cameras)", "cameras": 10000, "fps": 15, "skip": 5, "model_fps": 540},
]
for s in scenarios:
result = calculate_gpu_requirements(s["cameras"], s["fps"], s["skip"], s["model_fps"])
print(f"\n{s['name']}:")
print(f" Process FPS: {result['frames_to_process_fps']:.0f}")
print(f" GPUs needed: {result['gpus_with_redundancy']} T4s")
print(f" Cost (spot): ${result['cost_per_month_t4_spot']:,}/month")
# Output:
# Small (50 cameras): Process FPS: 250 GPUs: 2 T4s Cost: $360/month
# Medium (500 cameras): Process FPS: 2500 GPUs: 10 T4s Cost: $1,800/month
# Large (5000 cameras): Process FPS: 15000 GPUs: 60 T4s Cost: $10,800/month
# Massive (10K cameras):Process FPS: 30000 GPUs: 120 T4s Cost: $21,600/month
Image and Video Storage Architecture
# Storage architecture for CV systems
# Challenge: 1000 cameras × 15fps × 500KB/frame = 7.5 GB/second of raw data
class CVStorageManager:
"""
Tiered storage for CV images and video with lifecycle management.
Tier 1: Hot (SSD/NVMe) - Last 24 hours, fast random access
Tier 2: Warm (S3 Standard) - Last 30 days, for retraining and review
Tier 3: Cold (S3 Glacier) - 30+ days, compliance and audit
"""
def __init__(self, s3_client, hot_storage_path="/data/hot"):
self.s3 = s3_client
self.hot_path = hot_storage_path
def store_frame(self, camera_id: str, timestamp: float, frame: np.ndarray,
detections: list[dict], store_raw: bool = False):
"""
Store a processed frame with intelligent retention.
- Frames WITH detections: store image + metadata (always)
- Frames WITHOUT detections: store metadata only (save 99% storage)
- Raw video: store only if compliance requires it
"""
import cv2
metadata = {
"camera_id": camera_id,
"timestamp": timestamp,
"detection_count": len(detections),
"detections": detections,
}
if len(detections) > 0:
# Store image for frames with detections (for review and retraining)
# Use WebP for 30% smaller files than JPEG at same quality
_, buffer = cv2.imencode(".webp", frame, [cv2.IMWRITE_WEBP_QUALITY, 85])
image_key = f"frames/{camera_id}/{self._ts_to_path(timestamp)}.webp"
self.s3.put_object(
Bucket="cv-data",
Key=image_key,
Body=buffer.tobytes(),
ContentType="image/webp",
Metadata={"detections": json.dumps(detections)},
)
metadata["image_key"] = image_key
# Always store metadata (tiny, useful for analytics)
meta_key = f"metadata/{camera_id}/{self._ts_to_path(timestamp)}.json"
self.s3.put_object(
Bucket="cv-data",
Key=meta_key,
Body=json.dumps(metadata),
ContentType="application/json",
)
def _ts_to_path(self, timestamp: float) -> str:
"""Convert timestamp to partitioned path: YYYY/MM/DD/HH/timestamp"""
from datetime import datetime
dt = datetime.fromtimestamp(timestamp)
return f"{dt.year}/{dt.month:02d}/{dt.day:02d}/{dt.hour:02d}/{int(timestamp * 1000)}"
# S3 lifecycle policy for automatic tiering
# Apply via AWS CLI or Terraform:
# {
# "Rules": [
# {
# "ID": "MoveToIA30Days",
# "Filter": {"Prefix": "frames/"},
# "Status": "Enabled",
# "Transitions": [
# {"Days": 30, "StorageClass": "STANDARD_IA"},
# {"Days": 90, "StorageClass": "GLACIER"},
# {"Days": 365, "StorageClass": "DEEP_ARCHIVE"}
# ]
# },
# {
# "ID": "DeleteMetadata90Days",
# "Filter": {"Prefix": "metadata/"},
# "Status": "Enabled",
# "Expiration": {"Days": 90}
# }
# ]
# }
Distributed Processing with Ray
import ray
from ray import serve
# Initialize Ray cluster
ray.init(address="auto") # Connect to existing cluster
@serve.deployment(
num_replicas=8, # 8 inference workers
ray_actor_options={
"num_gpus": 1, # 1 GPU per worker
"num_cpus": 4,
},
max_ongoing_requests=100, # Queue depth per replica
autoscaling_config={
"min_replicas": 4,
"max_replicas": 32,
"target_ongoing_requests": 50, # Scale up at 50 queued requests
},
)
class CVInferenceDeployment:
"""
Ray Serve deployment for distributed CV inference.
Automatically scales GPU workers based on request queue depth.
"""
def __init__(self):
import torch
self.device = torch.device("cuda")
self.model = self._load_model()
self.preprocessor = ImagePreprocessor(PreprocessConfig())
self.postprocessor = PostProcessor(PostprocessConfig())
def _load_model(self):
from ultralytics import YOLO
return YOLO("yolov8s.engine") # TensorRT engine
async def __call__(self, request):
data = await request.json()
image_bytes = base64.b64decode(data["image"])
nparr = np.frombuffer(image_bytes, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
tensor, metadata = self.preprocessor.preprocess(image)
results = self.model.predict(tensor.unsqueeze(0).to(self.device))
detections = self.postprocessor.process(
self._parse_results(results[0]), metadata
)
return {"detections": detections, "count": len(detections)}
# --- Batch processing with Ray for large-scale image analysis ---
@ray.remote(num_gpus=1)
class BatchProcessor:
"""Process millions of images using Ray distributed compute"""
def __init__(self, model_path: str):
self.model = YOLO(model_path)
def process_batch(self, image_paths: list[str]) -> list[dict]:
results = []
for path in image_paths:
img = cv2.imread(path)
preds = self.model.predict(img, verbose=False)
results.append({
"path": path,
"detections": len(preds[0].boxes),
})
return results
# Distribute 1M images across GPU workers
def process_million_images(image_paths: list[str], num_gpus: int = 8):
"""Process 1M images distributed across GPU workers"""
processors = [BatchProcessor.remote("yolov8s.engine") for _ in range(num_gpus)]
# Split images across workers
batch_size = 100 # Images per task
futures = []
for i in range(0, len(image_paths), batch_size):
batch = image_paths[i:i + batch_size]
worker = processors[i // batch_size % num_gpus]
futures.append(worker.process_batch.remote(batch))
# Collect results
results = ray.get(futures)
return [item for batch in results for item in batch]
Inference Result Caching
import hashlib
import redis
import json
class InferenceCache:
"""
Cache inference results to avoid re-processing identical or similar images.
Strategies:
1. Exact match: Hash the image bytes, cache result by hash
2. Perceptual hash: Similar images hit the same cache entry
3. Embedding cache: Find nearest cached result by visual similarity
"""
def __init__(self, redis_url: str = "redis://localhost:6379",
ttl_seconds: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl_seconds
def get_or_compute(self, image: np.ndarray, inference_fn) -> dict:
"""Check cache first, compute only on cache miss"""
# Compute image hash
image_hash = self._compute_hash(image)
cache_key = f"cv:inference:{image_hash}"
# Check cache
cached = self.redis.get(cache_key)
if cached:
result = json.loads(cached)
result["cache_hit"] = True
return result
# Cache miss - run inference
result = inference_fn(image)
result["cache_hit"] = False
# Store in cache
self.redis.setex(cache_key, self.ttl, json.dumps(result))
return result
def _compute_hash(self, image: np.ndarray) -> str:
"""Compute perceptual hash (robust to minor changes like JPEG artifacts)"""
# Resize to 8x8 and compute average
small = cv2.resize(image, (8, 8), interpolation=cv2.INTER_AREA)
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
avg = gray.mean()
# Binary hash: 1 if pixel > average, 0 otherwise
bits = (gray > avg).flatten()
hash_val = "".join(str(int(b)) for b in bits)
return hashlib.md5(hash_val.encode()).hexdigest()
def get_stats(self) -> dict:
"""Cache hit rate statistics"""
hits = int(self.redis.get("cv:cache:hits") or 0)
misses = int(self.redis.get("cv:cache:misses") or 0)
total = hits + misses
return {
"hits": hits,
"misses": misses,
"hit_rate": hits / total if total > 0 else 0,
"cache_size": self.redis.dbsize(),
}
What Is Next
The final lesson covers Best Practices and Checklist. You will get a production-ready checklist for CV systems, learn annotation pipeline design for continuous model improvement, define model retraining triggers, and find answers to frequently asked questions about building vision systems at scale.
Lilly Tech Systems