Beginner

CV Pipeline Architecture

Understand the core components of a production computer vision pipeline, learn when to choose batch vs real-time processing, decide between edge and cloud architectures, and study real-world use case designs for retail checkout, quality inspection, and surveillance systems.

What Is a Computer Vision Pipeline?

A computer vision pipeline is an end-to-end system that takes raw image or video input, processes it through a series of stages, and produces actionable results. Unlike a Jupyter notebook that runs inference on a single image, a production CV pipeline handles thousands of concurrent inputs, runs 24/7, recovers from failures, and delivers results within strict latency budgets.

Every production CV pipeline has five fundamental stages:

# The 5 stages of a production CV pipeline
# Each stage is independent, monitored, and retryable

# ┌─────────────┐    ┌──────────────┐    ┌─────────────┐    ┌──────────────┐    ┌─────────────┐
# │  1. Ingest   │───▶│ 2. Preprocess │───▶│ 3. Inference │───▶│ 4. Postprocess│───▶│ 5. Deliver  │
# └─────────────┘    └──────────────┘    └─────────────┘    └──────────────┘    └─────────────┘
#
# 1. INGEST: Capture frames from cameras, upload APIs, S3 buckets, or video streams
#    - RTSP/RTMP streams, HTTP uploads, S3 event triggers, edge camera SDKs
#    - Handle connection drops, frame corruption, variable frame rates
#
# 2. PREPROCESS: Prepare images for model consumption
#    - Resize to model input dimensions (e.g., 640x640 for YOLOv8)
#    - Normalize pixel values (0-255 → 0-1 or ImageNet normalization)
#    - Color space conversion (BGR→RGB), letterboxing, padding
#    - Batch formation: group multiple images for GPU efficiency
#
# 3. INFERENCE: Run the neural network model
#    - Object detection (YOLO, Faster R-CNN, DETR)
#    - Classification (ResNet, EfficientNet, ViT)
#    - Segmentation (Mask R-CNN, SAM, U-Net)
#    - Embedding/similarity (CLIP, DINOv2)
#
# 4. POSTPROCESS: Convert raw model output to business results
#    - Non-Maximum Suppression (NMS) to remove duplicate detections
#    - Confidence thresholding (discard low-confidence predictions)
#    - Object tracking across frames (assign IDs to detected objects)
#    - Business logic (count people, measure defect size, classify events)
#
# 5. DELIVER: Send results to downstream systems
#    - REST/gRPC API responses, webhook callbacks
#    - Database writes (PostgreSQL, MongoDB, Elasticsearch)
#    - Event streams (Kafka, Redis Pub/Sub)
#    - Dashboard updates, alert triggers

Batch vs Real-Time CV Processing

The most fundamental architecture decision is whether your CV pipeline processes images in batch or in real-time. This choice determines your entire infrastructure stack.

DimensionBatch ProcessingReal-Time Processing
Latency Minutes to hours 50-500ms end-to-end
Use cases Medical imaging analysis, satellite imagery, archival video analysis, product catalog tagging Live surveillance, autonomous driving, retail checkout, robotic guidance
GPU utilization High (large batches, fully saturated GPU) Variable (depends on stream count and frame rate)
Scaling model Queue-based: add GPU workers as queue grows Stream-based: 1 GPU per N concurrent streams
Failure handling Retry the batch, reprocess from queue Drop frame and continue (cannot pause live stream)
Cost model Pay per image processed, use spot/preemptible GPUs Pay for always-on GPU capacity, harder to use spot
# Batch CV pipeline: Process images from S3 with GPU workers
import boto3
import torch
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass

@dataclass
class BatchConfig:
    bucket: str = "cv-images"
    prefix: str = "incoming/"
    batch_size: int = 32          # Images per GPU batch
    num_workers: int = 4          # Parallel download threads
    model_name: str = "yolov8x"
    confidence_threshold: float = 0.5

class BatchCVPipeline:
    def __init__(self, config: BatchConfig):
        self.config = config
        self.s3 = boto3.client("s3")
        self.model = self._load_model()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def _load_model(self):
        from ultralytics import YOLO
        model = YOLO(self.config.model_name)
        return model

    def process_batch(self, image_keys: list[str]) -> list[dict]:
        """Process a batch of S3 image keys through the CV pipeline"""
        # Stage 1: Download images in parallel
        images = self._download_parallel(image_keys)

        # Stage 2-3: Preprocess + Inference (handled by YOLO internally)
        results = self.model.predict(
            images,
            conf=self.config.confidence_threshold,
            batch=self.config.batch_size,
            device=self.device,
        )

        # Stage 4: Postprocess into structured results
        detections = []
        for key, result in zip(image_keys, results):
            detections.append({
                "image_key": key,
                "objects": [
                    {
                        "class": result.names[int(box.cls)],
                        "confidence": float(box.conf),
                        "bbox": box.xyxy[0].tolist(),
                    }
                    for box in result.boxes
                ],
                "inference_time_ms": result.speed["inference"],
            })

        return detections

    def _download_parallel(self, keys: list[str]) -> list:
        """Download images from S3 in parallel threads"""
        import cv2
        import numpy as np
        from io import BytesIO

        def download_one(key):
            obj = self.s3.get_object(Bucket=self.config.bucket, Key=key)
            img_bytes = obj["Body"].read()
            img_array = np.frombuffer(img_bytes, dtype=np.uint8)
            return cv2.imdecode(img_array, cv2.IMREAD_COLOR)

        with ThreadPoolExecutor(max_workers=self.config.num_workers) as pool:
            return list(pool.map(download_one, keys))

    def run(self):
        """Process all pending images in the S3 prefix"""
        # List all unprocessed images
        paginator = self.s3.get_paginator("list_objects_v2")
        all_keys = []
        for page in paginator.paginate(Bucket=self.config.bucket, Prefix=self.config.prefix):
            all_keys.extend(obj["Key"] for obj in page.get("Contents", []))

        # Process in batches
        for i in range(0, len(all_keys), self.config.batch_size):
            batch_keys = all_keys[i : i + self.config.batch_size]
            results = self.process_batch(batch_keys)
            self._save_results(results)
            print(f"Processed batch {i // self.config.batch_size + 1}: {len(batch_keys)} images")
💡
Start with batch, upgrade to real-time: Most teams overestimate their latency requirements. If your use case can tolerate 5-second delays, batch processing with a fast queue is simpler, cheaper, and easier to debug than a streaming architecture. Start batch, measure actual latency needs, and upgrade to real-time only when the business requires it.

Edge vs Cloud Processing

Where you run inference is as important as what model you run. Edge processing reduces latency and bandwidth costs but limits model size. Cloud processing gives you unlimited GPU power but adds network latency.

FactorEdgeCloudHybrid
Latency 5-50ms (no network) 100-500ms (network + inference) Fast local + rich cloud analysis
Bandwidth Zero (process locally) High (stream video to cloud) Low (send only detections/crops)
Model size Small (YOLOv8n, MobileNet) Any size (GPT-4V, SAM, large YOLO) Small at edge, large in cloud
Offline Full operation without internet Stops if network fails Degrades gracefully
Cost at scale Fixed hardware cost per location Linear GPU cost per stream Lower cloud cost (less data sent)
Best for Factories, retail stores, vehicles API services, batch analysis Most production systems
# Hybrid edge-cloud architecture pattern
# Edge: fast detection → Cloud: detailed analysis on detections only

# --- Edge device (Jetson/OpenVINO) ---
class EdgeProcessor:
    def __init__(self):
        # Small, fast model for initial detection
        self.detector = load_trt_model("yolov8n_int8.engine")  # 2ms inference
        self.cloud_url = "https://cv-api.company.com/analyze"

    def process_frame(self, frame):
        # Run fast detection locally (5ms total)
        detections = self.detector.predict(frame, conf=0.3)

        if len(detections) > 0:
            # Crop detected regions and send to cloud for detailed analysis
            for det in detections:
                crop = frame[det.y1:det.y2, det.x1:det.x2]
                # Send only the crop (10KB) instead of full frame (500KB)
                self._send_to_cloud(crop, det.metadata)

            # Return immediate local result (low latency)
            return {"local_detections": len(detections), "sent_to_cloud": True}

        return {"local_detections": 0, "sent_to_cloud": False}

# --- Cloud API ---
class CloudAnalyzer:
    def __init__(self):
        # Large, accurate model for detailed analysis
        self.classifier = load_model("efficientnet_v2_l")  # Detailed classification
        self.segmenter = load_model("sam_vit_h")            # Precise segmentation

    def analyze_crop(self, crop_image, metadata):
        # Detailed analysis on the cropped region (100ms, but only for detections)
        classification = self.classifier.predict(crop_image)
        segmentation = self.segmenter.predict(crop_image)
        return {
            "class": classification.top1,
            "confidence": classification.score,
            "defect_area_pct": segmentation.mask.mean(),
            "metadata": metadata,
        }

Use Case Architectures

Every CV pipeline is designed for a specific domain. Here are four production architectures that cover the most common use cases.

Retail Self-Checkout (Real-Time, Edge)

# Retail checkout: Identify products placed on scale
# Requirements: <200ms latency, offline capable, 99.5% accuracy
#
# Architecture:
# Camera (30fps) → Edge GPU (Jetson Orin) → Product DB → POS System
#
# Pipeline:
# 1. Camera captures product placement event (motion trigger)
# 2. Edge model detects product bounding box (YOLOv8s, 8ms)
# 3. Classification model identifies product (EfficientNet, 5ms)
# 4. Lookup product in local SQLite DB (1ms)
# 5. Send product + price to POS system via local API
#
# Key design decisions:
# - Edge-only (no cloud dependency for checkout)
# - Motion-triggered (don't process every frame)
# - Local product DB synced nightly from cloud
# - Fallback: manual barcode scan if confidence < 0.85

class RetailCheckoutPipeline:
    def __init__(self):
        self.detector = load_trt_model("product_detector_v3.engine")
        self.classifier = load_trt_model("product_classifier_v5.engine")
        self.product_db = sqlite3.connect("products.db")
        self.motion_detector = MotionDetector(threshold=0.15)

    def process_frame(self, frame, scale_weight):
        # Only process when motion detected AND weight changes
        if not self.motion_detector.detect(frame) or scale_weight == 0:
            return None

        # Detect product region
        detections = self.detector.predict(frame, conf=0.5)
        if len(detections) == 0:
            return {"status": "no_product_detected"}

        # Classify the detected product
        crop = self._crop_detection(frame, detections[0])
        product = self.classifier.predict(crop)

        if product.confidence < 0.85:
            return {"status": "low_confidence", "suggestion": product.top1}

        # Look up price
        price = self._lookup_price(product.top1)
        return {
            "status": "identified",
            "product": product.top1,
            "confidence": product.confidence,
            "price": price,
            "weight": scale_weight,
        }

Manufacturing Quality Inspection (Real-Time, Hybrid)

# Manufacturing: Detect defects on production line
# Requirements: <100ms per frame, zero missed defects, 24/7 operation
#
# Architecture:
# Line cameras → Edge GPU → Defect DB → Cloud retraining
#
# Pipeline:
# 1. Line-scan camera captures product at fixed position (trigger sensor)
# 2. Edge GPU runs defect detection (YOLOv8m, 12ms)
# 3. If defect detected: high-res capture + cloud analysis
# 4. Defect classification + severity scoring
# 5. PLC signal to reject defective product (GPIO)
# 6. All images (pass + fail) logged for retraining

class QualityInspectionPipeline:
    def __init__(self):
        self.defect_detector = load_trt_model("defect_yolov8m.engine")
        self.severity_model = load_trt_model("severity_classifier.engine")
        self.plc = PLCConnection("192.168.1.100")  # Production line controller

    def inspect(self, frame, line_speed_mps: float):
        start = time.perf_counter()

        # Detect defects
        defects = self.defect_detector.predict(frame, conf=0.3)

        if len(defects) > 0:
            # Classify severity for each defect
            for defect in defects:
                crop = self._crop_detection(frame, defect)
                severity = self.severity_model.predict(crop)
                defect.severity = severity.top1  # "critical", "major", "minor"
                defect.severity_score = severity.score

            # Reject if any critical defect found
            critical = [d for d in defects if d.severity == "critical"]
            if critical:
                self.plc.trigger_reject()  # GPIO signal to reject arm

            # Log everything for retraining
            self._log_inspection(frame, defects, "fail")
        else:
            self._log_inspection(frame, [], "pass")

        elapsed_ms = (time.perf_counter() - start) * 1000
        return {
            "defects": len(defects),
            "critical": len([d for d in defects if d.severity == "critical"]),
            "inference_ms": elapsed_ms,
            "action": "reject" if critical else "pass",
        }

Surveillance and Security (Real-Time, Cloud/Hybrid)

# Surveillance: Monitor multiple camera feeds for security events
# Requirements: 100+ cameras, <2s alert latency, 24/7, person tracking
#
# Architecture:
# NVR/Cameras → RTSP → GPU Server → Event DB → Alert System
#
# Pipeline per camera:
# 1. RTSP stream decoded at 15fps (skip frames for efficiency)
# 2. Person/vehicle detection (YOLOv8m)
# 3. Multi-object tracking (DeepSORT with ReID)
# 4. Zone intrusion / tripwire / loitering detection
# 5. Event generation + alert dispatch

class SurveillancePipeline:
    def __init__(self, camera_configs: list[dict]):
        self.detector = load_trt_model("yolov8m_person_vehicle.engine")
        self.tracker = DeepSORTTracker(max_age=30, min_hits=3)
        self.cameras = {c["id"]: RTSPStream(c["url"]) for c in camera_configs}
        self.zones = self._load_zones(camera_configs)
        self.alert_manager = AlertManager()

    def process_camera(self, camera_id: str, frame, frame_number: int):
        # Skip frames for efficiency (process every 2nd frame at 15fps)
        if frame_number % 2 != 0:
            return None

        # Detect persons and vehicles
        detections = self.detector.predict(frame, conf=0.4, classes=[0, 2])

        # Track objects across frames
        tracks = self.tracker.update(detections, frame)

        # Check zone violations
        events = []
        for track in tracks:
            for zone in self.zones.get(camera_id, []):
                if zone.type == "restricted" and zone.contains(track.center):
                    events.append({
                        "type": "zone_intrusion",
                        "camera": camera_id,
                        "track_id": track.id,
                        "zone": zone.name,
                        "timestamp": time.time(),
                    })
                elif zone.type == "tripwire" and zone.crossed_by(track):
                    events.append({
                        "type": "tripwire_crossed",
                        "camera": camera_id,
                        "track_id": track.id,
                        "direction": zone.crossing_direction(track),
                    })

        # Dispatch alerts for security events
        for event in events:
            self.alert_manager.dispatch(event, frame, tracks)

        return {"tracks": len(tracks), "events": events}
💡
GPU planning rule of thumb: A single NVIDIA T4 GPU can handle approximately 8-12 camera streams at 15fps with YOLOv8m. An A10G handles about 20-30 streams. An A100 handles 50-80 streams. These numbers assume optimized TensorRT models. Without optimization, divide by 3-4x. Always benchmark with your actual model and frame rate before planning hardware.

Choosing Your Architecture

Use this decision framework to pick the right architecture for your use case.

RequirementArchitectureExample
Latency <50ms, offline required Edge-only (Jetson, OpenVINO) Autonomous vehicles, factory robots
Latency <500ms, internet available Cloud GPU (API-based) Photo moderation, document OCR API
Low latency + detailed analysis Hybrid (edge detect, cloud analyze) Retail, quality inspection
Batch, no latency requirement Cloud batch (spot GPUs) Satellite imagery, medical imaging
100+ cameras, central monitoring On-prem GPU server or cloud GPU Surveillance, traffic monitoring

What Is Next

Now that you understand the architectural patterns, the next lesson dives into Image Processing Pipelines. You will build a complete pipeline with OpenCV and PyTorch that handles preprocessing, GPU-batched inference with YOLO, post-processing with NMS, and result delivery — with production code you can adapt for your use case.