CV Pipeline Architecture
Understand the core components of a production computer vision pipeline, learn when to choose batch vs real-time processing, decide between edge and cloud architectures, and study real-world use case designs for retail checkout, quality inspection, and surveillance systems.
What Is a Computer Vision Pipeline?
A computer vision pipeline is an end-to-end system that takes raw image or video input, processes it through a series of stages, and produces actionable results. Unlike a Jupyter notebook that runs inference on a single image, a production CV pipeline handles thousands of concurrent inputs, runs 24/7, recovers from failures, and delivers results within strict latency budgets.
Every production CV pipeline has five fundamental stages:
# The 5 stages of a production CV pipeline
# Each stage is independent, monitored, and retryable
# ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐
# │ 1. Ingest │───▶│ 2. Preprocess │───▶│ 3. Inference │───▶│ 4. Postprocess│───▶│ 5. Deliver │
# └─────────────┘ └──────────────┘ └─────────────┘ └──────────────┘ └─────────────┘
#
# 1. INGEST: Capture frames from cameras, upload APIs, S3 buckets, or video streams
# - RTSP/RTMP streams, HTTP uploads, S3 event triggers, edge camera SDKs
# - Handle connection drops, frame corruption, variable frame rates
#
# 2. PREPROCESS: Prepare images for model consumption
# - Resize to model input dimensions (e.g., 640x640 for YOLOv8)
# - Normalize pixel values (0-255 → 0-1 or ImageNet normalization)
# - Color space conversion (BGR→RGB), letterboxing, padding
# - Batch formation: group multiple images for GPU efficiency
#
# 3. INFERENCE: Run the neural network model
# - Object detection (YOLO, Faster R-CNN, DETR)
# - Classification (ResNet, EfficientNet, ViT)
# - Segmentation (Mask R-CNN, SAM, U-Net)
# - Embedding/similarity (CLIP, DINOv2)
#
# 4. POSTPROCESS: Convert raw model output to business results
# - Non-Maximum Suppression (NMS) to remove duplicate detections
# - Confidence thresholding (discard low-confidence predictions)
# - Object tracking across frames (assign IDs to detected objects)
# - Business logic (count people, measure defect size, classify events)
#
# 5. DELIVER: Send results to downstream systems
# - REST/gRPC API responses, webhook callbacks
# - Database writes (PostgreSQL, MongoDB, Elasticsearch)
# - Event streams (Kafka, Redis Pub/Sub)
# - Dashboard updates, alert triggers
Batch vs Real-Time CV Processing
The most fundamental architecture decision is whether your CV pipeline processes images in batch or in real-time. This choice determines your entire infrastructure stack.
| Dimension | Batch Processing | Real-Time Processing |
|---|---|---|
| Latency | Minutes to hours | 50-500ms end-to-end |
| Use cases | Medical imaging analysis, satellite imagery, archival video analysis, product catalog tagging | Live surveillance, autonomous driving, retail checkout, robotic guidance |
| GPU utilization | High (large batches, fully saturated GPU) | Variable (depends on stream count and frame rate) |
| Scaling model | Queue-based: add GPU workers as queue grows | Stream-based: 1 GPU per N concurrent streams |
| Failure handling | Retry the batch, reprocess from queue | Drop frame and continue (cannot pause live stream) |
| Cost model | Pay per image processed, use spot/preemptible GPUs | Pay for always-on GPU capacity, harder to use spot |
# Batch CV pipeline: Process images from S3 with GPU workers
import boto3
import torch
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
@dataclass
class BatchConfig:
bucket: str = "cv-images"
prefix: str = "incoming/"
batch_size: int = 32 # Images per GPU batch
num_workers: int = 4 # Parallel download threads
model_name: str = "yolov8x"
confidence_threshold: float = 0.5
class BatchCVPipeline:
def __init__(self, config: BatchConfig):
self.config = config
self.s3 = boto3.client("s3")
self.model = self._load_model()
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def _load_model(self):
from ultralytics import YOLO
model = YOLO(self.config.model_name)
return model
def process_batch(self, image_keys: list[str]) -> list[dict]:
"""Process a batch of S3 image keys through the CV pipeline"""
# Stage 1: Download images in parallel
images = self._download_parallel(image_keys)
# Stage 2-3: Preprocess + Inference (handled by YOLO internally)
results = self.model.predict(
images,
conf=self.config.confidence_threshold,
batch=self.config.batch_size,
device=self.device,
)
# Stage 4: Postprocess into structured results
detections = []
for key, result in zip(image_keys, results):
detections.append({
"image_key": key,
"objects": [
{
"class": result.names[int(box.cls)],
"confidence": float(box.conf),
"bbox": box.xyxy[0].tolist(),
}
for box in result.boxes
],
"inference_time_ms": result.speed["inference"],
})
return detections
def _download_parallel(self, keys: list[str]) -> list:
"""Download images from S3 in parallel threads"""
import cv2
import numpy as np
from io import BytesIO
def download_one(key):
obj = self.s3.get_object(Bucket=self.config.bucket, Key=key)
img_bytes = obj["Body"].read()
img_array = np.frombuffer(img_bytes, dtype=np.uint8)
return cv2.imdecode(img_array, cv2.IMREAD_COLOR)
with ThreadPoolExecutor(max_workers=self.config.num_workers) as pool:
return list(pool.map(download_one, keys))
def run(self):
"""Process all pending images in the S3 prefix"""
# List all unprocessed images
paginator = self.s3.get_paginator("list_objects_v2")
all_keys = []
for page in paginator.paginate(Bucket=self.config.bucket, Prefix=self.config.prefix):
all_keys.extend(obj["Key"] for obj in page.get("Contents", []))
# Process in batches
for i in range(0, len(all_keys), self.config.batch_size):
batch_keys = all_keys[i : i + self.config.batch_size]
results = self.process_batch(batch_keys)
self._save_results(results)
print(f"Processed batch {i // self.config.batch_size + 1}: {len(batch_keys)} images")
Edge vs Cloud Processing
Where you run inference is as important as what model you run. Edge processing reduces latency and bandwidth costs but limits model size. Cloud processing gives you unlimited GPU power but adds network latency.
| Factor | Edge | Cloud | Hybrid |
|---|---|---|---|
| Latency | 5-50ms (no network) | 100-500ms (network + inference) | Fast local + rich cloud analysis |
| Bandwidth | Zero (process locally) | High (stream video to cloud) | Low (send only detections/crops) |
| Model size | Small (YOLOv8n, MobileNet) | Any size (GPT-4V, SAM, large YOLO) | Small at edge, large in cloud |
| Offline | Full operation without internet | Stops if network fails | Degrades gracefully |
| Cost at scale | Fixed hardware cost per location | Linear GPU cost per stream | Lower cloud cost (less data sent) |
| Best for | Factories, retail stores, vehicles | API services, batch analysis | Most production systems |
# Hybrid edge-cloud architecture pattern
# Edge: fast detection → Cloud: detailed analysis on detections only
# --- Edge device (Jetson/OpenVINO) ---
class EdgeProcessor:
def __init__(self):
# Small, fast model for initial detection
self.detector = load_trt_model("yolov8n_int8.engine") # 2ms inference
self.cloud_url = "https://cv-api.company.com/analyze"
def process_frame(self, frame):
# Run fast detection locally (5ms total)
detections = self.detector.predict(frame, conf=0.3)
if len(detections) > 0:
# Crop detected regions and send to cloud for detailed analysis
for det in detections:
crop = frame[det.y1:det.y2, det.x1:det.x2]
# Send only the crop (10KB) instead of full frame (500KB)
self._send_to_cloud(crop, det.metadata)
# Return immediate local result (low latency)
return {"local_detections": len(detections), "sent_to_cloud": True}
return {"local_detections": 0, "sent_to_cloud": False}
# --- Cloud API ---
class CloudAnalyzer:
def __init__(self):
# Large, accurate model for detailed analysis
self.classifier = load_model("efficientnet_v2_l") # Detailed classification
self.segmenter = load_model("sam_vit_h") # Precise segmentation
def analyze_crop(self, crop_image, metadata):
# Detailed analysis on the cropped region (100ms, but only for detections)
classification = self.classifier.predict(crop_image)
segmentation = self.segmenter.predict(crop_image)
return {
"class": classification.top1,
"confidence": classification.score,
"defect_area_pct": segmentation.mask.mean(),
"metadata": metadata,
}
Use Case Architectures
Every CV pipeline is designed for a specific domain. Here are four production architectures that cover the most common use cases.
Retail Self-Checkout (Real-Time, Edge)
# Retail checkout: Identify products placed on scale
# Requirements: <200ms latency, offline capable, 99.5% accuracy
#
# Architecture:
# Camera (30fps) → Edge GPU (Jetson Orin) → Product DB → POS System
#
# Pipeline:
# 1. Camera captures product placement event (motion trigger)
# 2. Edge model detects product bounding box (YOLOv8s, 8ms)
# 3. Classification model identifies product (EfficientNet, 5ms)
# 4. Lookup product in local SQLite DB (1ms)
# 5. Send product + price to POS system via local API
#
# Key design decisions:
# - Edge-only (no cloud dependency for checkout)
# - Motion-triggered (don't process every frame)
# - Local product DB synced nightly from cloud
# - Fallback: manual barcode scan if confidence < 0.85
class RetailCheckoutPipeline:
def __init__(self):
self.detector = load_trt_model("product_detector_v3.engine")
self.classifier = load_trt_model("product_classifier_v5.engine")
self.product_db = sqlite3.connect("products.db")
self.motion_detector = MotionDetector(threshold=0.15)
def process_frame(self, frame, scale_weight):
# Only process when motion detected AND weight changes
if not self.motion_detector.detect(frame) or scale_weight == 0:
return None
# Detect product region
detections = self.detector.predict(frame, conf=0.5)
if len(detections) == 0:
return {"status": "no_product_detected"}
# Classify the detected product
crop = self._crop_detection(frame, detections[0])
product = self.classifier.predict(crop)
if product.confidence < 0.85:
return {"status": "low_confidence", "suggestion": product.top1}
# Look up price
price = self._lookup_price(product.top1)
return {
"status": "identified",
"product": product.top1,
"confidence": product.confidence,
"price": price,
"weight": scale_weight,
}
Manufacturing Quality Inspection (Real-Time, Hybrid)
# Manufacturing: Detect defects on production line
# Requirements: <100ms per frame, zero missed defects, 24/7 operation
#
# Architecture:
# Line cameras → Edge GPU → Defect DB → Cloud retraining
#
# Pipeline:
# 1. Line-scan camera captures product at fixed position (trigger sensor)
# 2. Edge GPU runs defect detection (YOLOv8m, 12ms)
# 3. If defect detected: high-res capture + cloud analysis
# 4. Defect classification + severity scoring
# 5. PLC signal to reject defective product (GPIO)
# 6. All images (pass + fail) logged for retraining
class QualityInspectionPipeline:
def __init__(self):
self.defect_detector = load_trt_model("defect_yolov8m.engine")
self.severity_model = load_trt_model("severity_classifier.engine")
self.plc = PLCConnection("192.168.1.100") # Production line controller
def inspect(self, frame, line_speed_mps: float):
start = time.perf_counter()
# Detect defects
defects = self.defect_detector.predict(frame, conf=0.3)
if len(defects) > 0:
# Classify severity for each defect
for defect in defects:
crop = self._crop_detection(frame, defect)
severity = self.severity_model.predict(crop)
defect.severity = severity.top1 # "critical", "major", "minor"
defect.severity_score = severity.score
# Reject if any critical defect found
critical = [d for d in defects if d.severity == "critical"]
if critical:
self.plc.trigger_reject() # GPIO signal to reject arm
# Log everything for retraining
self._log_inspection(frame, defects, "fail")
else:
self._log_inspection(frame, [], "pass")
elapsed_ms = (time.perf_counter() - start) * 1000
return {
"defects": len(defects),
"critical": len([d for d in defects if d.severity == "critical"]),
"inference_ms": elapsed_ms,
"action": "reject" if critical else "pass",
}
Surveillance and Security (Real-Time, Cloud/Hybrid)
# Surveillance: Monitor multiple camera feeds for security events
# Requirements: 100+ cameras, <2s alert latency, 24/7, person tracking
#
# Architecture:
# NVR/Cameras → RTSP → GPU Server → Event DB → Alert System
#
# Pipeline per camera:
# 1. RTSP stream decoded at 15fps (skip frames for efficiency)
# 2. Person/vehicle detection (YOLOv8m)
# 3. Multi-object tracking (DeepSORT with ReID)
# 4. Zone intrusion / tripwire / loitering detection
# 5. Event generation + alert dispatch
class SurveillancePipeline:
def __init__(self, camera_configs: list[dict]):
self.detector = load_trt_model("yolov8m_person_vehicle.engine")
self.tracker = DeepSORTTracker(max_age=30, min_hits=3)
self.cameras = {c["id"]: RTSPStream(c["url"]) for c in camera_configs}
self.zones = self._load_zones(camera_configs)
self.alert_manager = AlertManager()
def process_camera(self, camera_id: str, frame, frame_number: int):
# Skip frames for efficiency (process every 2nd frame at 15fps)
if frame_number % 2 != 0:
return None
# Detect persons and vehicles
detections = self.detector.predict(frame, conf=0.4, classes=[0, 2])
# Track objects across frames
tracks = self.tracker.update(detections, frame)
# Check zone violations
events = []
for track in tracks:
for zone in self.zones.get(camera_id, []):
if zone.type == "restricted" and zone.contains(track.center):
events.append({
"type": "zone_intrusion",
"camera": camera_id,
"track_id": track.id,
"zone": zone.name,
"timestamp": time.time(),
})
elif zone.type == "tripwire" and zone.crossed_by(track):
events.append({
"type": "tripwire_crossed",
"camera": camera_id,
"track_id": track.id,
"direction": zone.crossing_direction(track),
})
# Dispatch alerts for security events
for event in events:
self.alert_manager.dispatch(event, frame, tracks)
return {"tracks": len(tracks), "events": events}
Choosing Your Architecture
Use this decision framework to pick the right architecture for your use case.
| Requirement | Architecture | Example |
|---|---|---|
| Latency <50ms, offline required | Edge-only (Jetson, OpenVINO) | Autonomous vehicles, factory robots |
| Latency <500ms, internet available | Cloud GPU (API-based) | Photo moderation, document OCR API |
| Low latency + detailed analysis | Hybrid (edge detect, cloud analyze) | Retail, quality inspection |
| Batch, no latency requirement | Cloud batch (spot GPUs) | Satellite imagery, medical imaging |
| 100+ cameras, central monitoring | On-prem GPU server or cloud GPU | Surveillance, traffic monitoring |
What Is Next
Now that you understand the architectural patterns, the next lesson dives into Image Processing Pipelines. You will build a complete pipeline with OpenCV and PyTorch that handles preprocessing, GPU-batched inference with YOLO, post-processing with NMS, and result delivery — with production code you can adapt for your use case.
Lilly Tech Systems