Intermediate

Image Processing Pipeline

Build a production-ready image processing pipeline from scratch. Learn preprocessing techniques (resize, normalize, augment), run GPU-batched inference with YOLO, ResNet, and CLIP, implement post-processing with Non-Maximum Suppression and confidence filtering, and write the complete OpenCV + PyTorch pipeline code.

Stage 1: Image Preprocessing

Preprocessing transforms raw camera or upload images into the exact tensor format your model expects. Getting this wrong is the most common source of production CV bugs — a mismatch between training preprocessing and serving preprocessing silently degrades accuracy.

import cv2
import numpy as np
import torch
from dataclasses import dataclass
from typing import Tuple

@dataclass
class PreprocessConfig:
    target_size: Tuple[int, int] = (640, 640)    # Model input size (W, H)
    normalize_mean: Tuple[float, ...] = (0.485, 0.456, 0.406)  # ImageNet
    normalize_std: Tuple[float, ...] = (0.229, 0.224, 0.225)   # ImageNet
    letterbox: bool = True      # Preserve aspect ratio with padding
    pad_color: int = 114        # Gray padding for letterbox

class ImagePreprocessor:
    """Production image preprocessor with letterboxing and normalization"""

    def __init__(self, config: PreprocessConfig):
        self.config = config

    def preprocess(self, image: np.ndarray) -> Tuple[torch.Tensor, dict]:
        """
        Preprocess a single image for model inference.
        Returns (tensor, metadata) where metadata contains info needed
        to map predictions back to original image coordinates.
        """
        original_h, original_w = image.shape[:2]
        target_w, target_h = self.config.target_size

        # Step 1: BGR to RGB (OpenCV loads BGR, models expect RGB)
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Step 2: Resize with letterboxing (preserve aspect ratio)
        if self.config.letterbox:
            image_resized, scale, pad = self._letterbox(
                image_rgb, target_w, target_h
            )
        else:
            image_resized = cv2.resize(image_rgb, (target_w, target_h))
            scale = (target_w / original_w, target_h / original_h)
            pad = (0, 0)

        # Step 3: Normalize to [0, 1] then apply ImageNet normalization
        image_float = image_resized.astype(np.float32) / 255.0
        mean = np.array(self.config.normalize_mean, dtype=np.float32)
        std = np.array(self.config.normalize_std, dtype=np.float32)
        image_normalized = (image_float - mean) / std

        # Step 4: HWC to CHW (OpenCV is HWC, PyTorch expects CHW)
        image_chw = np.transpose(image_normalized, (2, 0, 1))

        # Step 5: Convert to tensor
        tensor = torch.from_numpy(image_chw).float()

        metadata = {
            "original_size": (original_w, original_h),
            "scale": scale,
            "pad": pad,
        }
        return tensor, metadata

    def preprocess_batch(self, images: list[np.ndarray]) -> Tuple[torch.Tensor, list]:
        """Preprocess a batch of images into a single tensor"""
        tensors, metadatas = [], []
        for img in images:
            tensor, meta = self.preprocess(img)
            tensors.append(tensor)
            metadatas.append(meta)

        # Stack into batch tensor: (B, C, H, W)
        batch_tensor = torch.stack(tensors)
        return batch_tensor, metadatas

    def _letterbox(self, image, target_w, target_h):
        """Resize preserving aspect ratio with gray padding"""
        h, w = image.shape[:2]
        scale = min(target_w / w, target_h / h)
        new_w, new_h = int(w * scale), int(h * scale)

        resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)

        # Create padded image
        padded = np.full((target_h, target_w, 3), self.config.pad_color, dtype=np.uint8)
        pad_x = (target_w - new_w) // 2
        pad_y = (target_h - new_h) // 2
        padded[pad_y:pad_y + new_h, pad_x:pad_x + new_w] = resized

        return padded, (scale, scale), (pad_x, pad_y)
💡
Always letterbox, never stretch: Stretching images to fit model input dimensions changes the aspect ratio, which distorts objects and degrades detection accuracy. Letterboxing preserves the original proportions by adding gray padding. This is especially critical for object detection where bounding box aspect ratios must be preserved.

Stage 2: Model Inference with GPU Batching

GPU batching is the single most impactful optimization for throughput. A single image on an A100 might take 8ms, but a batch of 32 images takes only 30ms total — a 8x throughput improvement.

import torch
import torchvision
from ultralytics import YOLO
import time

class InferenceEngine:
    """Production inference engine with batching and multiple model support"""

    def __init__(self, model_type: str = "yolov8", device: str = "cuda:0"):
        self.device = torch.device(device)
        self.model_type = model_type
        self.model = self._load_model(model_type)

    def _load_model(self, model_type: str):
        if model_type == "yolov8":
            model = YOLO("yolov8x.pt")
            model.to(self.device)
            return model
        elif model_type == "resnet":
            model = torchvision.models.resnet50(weights="IMAGENET1K_V2")
            model.eval().to(self.device)
            return model
        elif model_type == "clip":
            import open_clip
            model, _, preprocess = open_clip.create_model_and_transforms(
                "ViT-B-32", pretrained="laion2b_s34b_b79k"
            )
            model.eval().to(self.device)
            return model

    @torch.no_grad()
    def detect_batch(self, batch_tensor: torch.Tensor) -> list[dict]:
        """
        Run object detection on a batch of preprocessed images.
        Input: (B, 3, 640, 640) tensor
        Output: list of detection dicts per image
        """
        batch_tensor = batch_tensor.to(self.device)

        start = time.perf_counter()
        results = self.model.predict(batch_tensor, verbose=False)
        inference_ms = (time.perf_counter() - start) * 1000

        detections = []
        for result in results:
            image_dets = []
            for box in result.boxes:
                image_dets.append({
                    "class_id": int(box.cls),
                    "class_name": result.names[int(box.cls)],
                    "confidence": float(box.conf),
                    "bbox_xyxy": box.xyxy[0].cpu().tolist(),
                    "bbox_xywh": box.xywh[0].cpu().tolist(),
                })
            detections.append(image_dets)

        return detections, inference_ms

    @torch.no_grad()
    def classify_batch(self, batch_tensor: torch.Tensor) -> list[dict]:
        """
        Run classification on a batch of preprocessed images.
        Input: (B, 3, 224, 224) tensor
        Output: list of classification results per image
        """
        batch_tensor = batch_tensor.to(self.device)
        logits = self.model(batch_tensor)
        probs = torch.softmax(logits, dim=1)
        top5_probs, top5_indices = torch.topk(probs, 5, dim=1)

        results = []
        for i in range(batch_tensor.shape[0]):
            results.append({
                "top1_class": int(top5_indices[i, 0]),
                "top1_confidence": float(top5_probs[i, 0]),
                "top5": [
                    {"class": int(top5_indices[i, j]), "confidence": float(top5_probs[i, j])}
                    for j in range(5)
                ],
            })
        return results

    @torch.no_grad()
    def embed_batch(self, batch_tensor: torch.Tensor) -> np.ndarray:
        """
        Generate embeddings for a batch of images (e.g., CLIP).
        Input: (B, 3, 224, 224) tensor
        Output: (B, embedding_dim) numpy array
        """
        batch_tensor = batch_tensor.to(self.device)
        embeddings = self.model.encode_image(batch_tensor)
        embeddings = embeddings / embeddings.norm(dim=-1, keepdim=True)  # L2 normalize
        return embeddings.cpu().numpy()

Dynamic Batching for Variable Load

import asyncio
from collections import deque

class DynamicBatcher:
    """
    Collects incoming inference requests and batches them dynamically.
    Fires a batch when either max_batch_size is reached or max_wait_ms expires.
    This is the same pattern used by NVIDIA Triton Inference Server.
    """

    def __init__(self, engine: InferenceEngine, max_batch_size: int = 32, max_wait_ms: float = 50):
        self.engine = engine
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.queue: deque = deque()
        self._batch_task = None

    async def predict(self, image_tensor: torch.Tensor, metadata: dict) -> dict:
        """Submit a single image for batched inference. Returns when batch completes."""
        future = asyncio.get_event_loop().create_future()
        self.queue.append((image_tensor, metadata, future))

        # Start batch timer if this is the first item
        if len(self.queue) == 1:
            self._batch_task = asyncio.create_task(self._wait_and_process())

        # If batch is full, process immediately
        if len(self.queue) >= self.max_batch_size:
            if self._batch_task:
                self._batch_task.cancel()
            await self._process_batch()

        return await future

    async def _wait_and_process(self):
        """Wait for max_wait_ms then process whatever is in the queue"""
        await asyncio.sleep(self.max_wait_ms / 1000)
        await self._process_batch()

    async def _process_batch(self):
        """Process all queued items as a single GPU batch"""
        if not self.queue:
            return

        # Collect all queued items
        items = []
        while self.queue and len(items) < self.max_batch_size:
            items.append(self.queue.popleft())

        tensors = [item[0] for item in items]
        metadatas = [item[1] for item in items]
        futures = [item[2] for item in items]

        # Run batched inference
        batch_tensor = torch.stack(tensors)
        detections, inference_ms = self.engine.detect_batch(batch_tensor)

        # Resolve individual futures
        for i, future in enumerate(futures):
            future.set_result({
                "detections": detections[i],
                "metadata": metadatas[i],
                "batch_size": len(items),
                "inference_ms": inference_ms,
            })

Stage 3: Post-Processing

Raw model output requires significant post-processing before it becomes useful. Non-Maximum Suppression (NMS) removes duplicate detections, confidence filtering discards uncertain predictions, and coordinate mapping translates predictions back to original image space.

import numpy as np
from dataclasses import dataclass

@dataclass
class PostprocessConfig:
    confidence_threshold: float = 0.5
    nms_iou_threshold: float = 0.45
    max_detections: int = 100
    min_box_area: int = 100        # Filter tiny noise detections

class PostProcessor:
    """Production post-processor for object detection results"""

    def __init__(self, config: PostprocessConfig):
        self.config = config

    def process(self, raw_detections: list[dict], metadata: dict) -> list[dict]:
        """
        Post-process raw model detections:
        1. Filter by confidence
        2. Apply NMS to remove duplicates
        3. Map coordinates back to original image space
        4. Filter tiny/invalid boxes
        """
        if not raw_detections:
            return []

        # Step 1: Confidence filtering
        dets = [d for d in raw_detections if d["confidence"] >= self.config.confidence_threshold]

        # Step 2: NMS (Non-Maximum Suppression)
        if len(dets) > 1:
            dets = self._nms(dets)

        # Step 3: Map back to original coordinates
        dets = self._map_to_original(dets, metadata)

        # Step 4: Filter invalid boxes
        dets = self._filter_invalid(dets, metadata["original_size"])

        # Step 5: Limit max detections
        dets = sorted(dets, key=lambda d: d["confidence"], reverse=True)
        dets = dets[:self.config.max_detections]

        return dets

    def _nms(self, detections: list[dict]) -> list[dict]:
        """Apply Non-Maximum Suppression to remove overlapping detections"""
        if not detections:
            return []

        boxes = np.array([d["bbox_xyxy"] for d in detections])
        scores = np.array([d["confidence"] for d in detections])

        # Compute areas
        x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
        areas = (x2 - x1) * (y2 - y1)

        # Sort by confidence (descending)
        order = scores.argsort()[::-1]
        keep = []

        while order.size > 0:
            i = order[0]
            keep.append(i)

            # Compute IoU with remaining boxes
            xx1 = np.maximum(x1[i], x1[order[1:]])
            yy1 = np.maximum(y1[i], y1[order[1:]])
            xx2 = np.minimum(x2[i], x2[order[1:]])
            yy2 = np.minimum(y2[i], y2[order[1:]])

            w = np.maximum(0, xx2 - xx1)
            h = np.maximum(0, yy2 - yy1)
            intersection = w * h
            iou = intersection / (areas[i] + areas[order[1:]] - intersection)

            # Keep boxes with IoU below threshold
            inds = np.where(iou <= self.config.nms_iou_threshold)[0]
            order = order[inds + 1]

        return [detections[i] for i in keep]

    def _map_to_original(self, detections: list[dict], metadata: dict) -> list[dict]:
        """Map detection coordinates from model space back to original image space"""
        scale_x, scale_y = metadata["scale"]
        pad_x, pad_y = metadata["pad"]

        for det in detections:
            x1, y1, x2, y2 = det["bbox_xyxy"]
            # Remove padding offset, then un-scale
            det["bbox_xyxy"] = [
                (x1 - pad_x) / scale_x,
                (y1 - pad_y) / scale_y,
                (x2 - pad_x) / scale_x,
                (y2 - pad_y) / scale_y,
            ]
        return detections

    def _filter_invalid(self, detections: list[dict], original_size) -> list[dict]:
        """Remove detections that are too small or outside image bounds"""
        w, h = original_size
        valid = []
        for det in detections:
            x1, y1, x2, y2 = det["bbox_xyxy"]
            # Clip to image bounds
            x1 = max(0, min(x1, w))
            y1 = max(0, min(y1, h))
            x2 = max(0, min(x2, w))
            y2 = max(0, min(y2, h))
            det["bbox_xyxy"] = [x1, y1, x2, y2]

            # Filter tiny boxes
            area = (x2 - x1) * (y2 - y1)
            if area >= self.config.min_box_area:
                valid.append(det)
        return valid

Complete Production Pipeline

Here is the complete pipeline that ties all three stages together into a production-ready FastAPI service.

from fastapi import FastAPI, UploadFile, File
from typing import List
import cv2
import numpy as np

app = FastAPI(title="CV Inference API")

# Initialize pipeline components
preprocessor = ImagePreprocessor(PreprocessConfig(target_size=(640, 640)))
engine = InferenceEngine(model_type="yolov8", device="cuda:0")
postprocessor = PostProcessor(PostprocessConfig(confidence_threshold=0.5))
batcher = DynamicBatcher(engine, max_batch_size=16, max_wait_ms=30)

@app.post("/detect")
async def detect_objects(file: UploadFile = File(...)):
    """Single image object detection endpoint"""
    # Read uploaded image
    contents = await file.read()
    nparr = np.frombuffer(contents, np.uint8)
    image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

    if image is None:
        return {"error": "Invalid image file"}

    # Preprocess
    tensor, metadata = preprocessor.preprocess(image)

    # Inference (with dynamic batching)
    result = await batcher.predict(tensor, metadata)

    # Postprocess
    detections = postprocessor.process(result["detections"], metadata)

    return {
        "detections": detections,
        "count": len(detections),
        "inference_ms": result["inference_ms"],
        "batch_size": result["batch_size"],
        "image_size": {"width": image.shape[1], "height": image.shape[0]},
    }

@app.post("/detect/batch")
async def detect_objects_batch(files: List[UploadFile] = File(...)):
    """Batch image object detection endpoint"""
    images = []
    for file in files:
        contents = await file.read()
        nparr = np.frombuffer(contents, np.uint8)
        img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        if img is not None:
            images.append(img)

    # Preprocess batch
    batch_tensor, metadatas = preprocessor.preprocess_batch(images)

    # Inference
    all_detections, inference_ms = engine.detect_batch(batch_tensor)

    # Postprocess each image
    results = []
    for i, (dets, meta) in enumerate(zip(all_detections, metadatas)):
        processed = postprocessor.process(dets, meta)
        results.append({
            "image_index": i,
            "detections": processed,
            "count": len(processed),
        })

    return {
        "results": results,
        "total_images": len(images),
        "inference_ms": inference_ms,
    }

@app.get("/health")
async def health():
    """Health check - verify model is loaded and GPU is available"""
    import torch
    return {
        "status": "healthy",
        "gpu_available": torch.cuda.is_available(),
        "gpu_name": torch.cuda.get_device_name(0) if torch.cuda.is_available() else None,
        "model_loaded": engine.model is not None,
    }

# Run: uvicorn pipeline:app --host 0.0.0.0 --port 8000 --workers 1
# Note: Use 1 worker per GPU. Multiple workers = multiple model copies = OOM
💡
One worker per GPU: A common mistake is running multiple uvicorn workers thinking it improves throughput. Each worker loads a full copy of the model into GPU memory. With a 500MB model, 4 workers = 2GB of GPU memory wasted on duplicate models. Instead, use 1 worker with async batching — the GPU does the parallelism for you via batch inference.

Performance Benchmarks

Real throughput numbers you can expect from this pipeline on common GPU hardware:

GPUModelBatch SizeFPSLatency (p99)
T4 (16GB)YOLOv8n (640)1638045ms
T4 (16GB)YOLOv8x (640)845180ms
A10G (24GB)YOLOv8n (640)3272048ms
A10G (24GB)YOLOv8x (640)1695170ms
A100 (80GB)YOLOv8n (640)64140050ms
A100 (80GB)YOLOv8x (640)32210155ms
💡
These numbers are with PyTorch. With TensorRT optimization (covered in Lesson 4), expect 2-4x improvement in both FPS and latency. The T4 + YOLOv8n + TensorRT combination delivers 800+ FPS, making it the best cost-per-inference option for most production workloads.

What Is Next

The next lesson covers Video Processing Architecture. You will learn frame extraction strategies, keyframe detection, hardware-accelerated video decoding with FFmpeg and NVIDIA NVDEC, object tracking with SORT and DeepSORT for temporal analysis, and how to build a streaming video pipeline.