Advanced

Batch Processing Pipeline

In this step, you will build an asynchronous processing pipeline that handles hundreds of documents with queuing, progress tracking, retry logic, and error handling.

Why Batch Processing?

Processing one document takes 5-30 seconds. For 100 documents sequentially, that is 8-50 minutes. A batch pipeline with async processing and parallelism reduces this dramatically.

Step 1: Job Queue and Status Tracking

# app/pipeline/queue.py
import uuid
import asyncio
import logging
from enum import Enum
from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional

logger = logging.getLogger(__name__)


class JobStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"


@dataclass
class ProcessingJob:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    filename: str = ""
    file_path: str = ""
    status: JobStatus = JobStatus.PENDING
    progress: float = 0.0
    result: Optional[dict] = None
    error: Optional[str] = None
    created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    completed_at: Optional[str] = None
    retries: int = 0
    max_retries: int = 3


class JobQueue:
    """In-memory job queue with status tracking."""

    def __init__(self):
        self.jobs: dict[str, ProcessingJob] = {}
        self._queue: asyncio.Queue = asyncio.Queue()

    async def add_job(self, filename: str, file_path: str) -> ProcessingJob:
        job = ProcessingJob(filename=filename, file_path=file_path)
        self.jobs[job.id] = job
        await self._queue.put(job.id)
        return job

    def get_job(self, job_id: str) -> Optional[ProcessingJob]:
        return self.jobs.get(job_id)

    def update_job(self, job_id, status=None, progress=None, result=None, error=None):
        job = self.jobs.get(job_id)
        if not job:
            return
        if status: job.status = status
        if progress is not None: job.progress = progress
        if result is not None: job.result = result
        if error is not None: job.error = error
        if status in (JobStatus.COMPLETED, JobStatus.FAILED):
            job.completed_at = datetime.utcnow().isoformat()

    def get_all_jobs(self):
        return sorted(self.jobs.values(), key=lambda j: j.created_at, reverse=True)

    async def get_next_job_id(self):
        return await self._queue.get()

job_queue = JobQueue()

Step 2: Document Processing Pipeline

# app/pipeline/processor.py
import asyncio
import json
import logging
from pathlib import Path
from app.config import get_settings
from app.extraction.pdf_extractor import PDFExtractor
from app.extraction.table_extractor import TableExtractor
from app.vision.vision_analyzer import VisionAnalyzer
from app.vision.router import needs_vision_analysis
from app.structuring.extractor import StructuredExtractor
from app.pipeline.queue import JobQueue, JobStatus

logger = logging.getLogger(__name__)
settings = get_settings()


class DocumentProcessor:
    def __init__(self, queue: JobQueue):
        self.queue = queue
        self.pdf_extractor = PDFExtractor()
        self.table_extractor = TableExtractor()
        self.vision = VisionAnalyzer()
        self.structured = StructuredExtractor()

    async def process_document(self, job_id: str) -> dict:
        job = self.queue.get_job(job_id)
        if not job: raise ValueError(f"Job not found: {job_id}")

        self.queue.update_job(job_id, status=JobStatus.PROCESSING, progress=0.1)
        try:
            use_vision = needs_vision_analysis(job.file_path)
            self.queue.update_job(job_id, progress=0.2)

            if use_vision:
                vision_result = self.vision.analyze_image(job.file_path)
                raw_text = vision_result["text"]
                result = {"method": "vision", "vision_output": vision_result}
            else:
                pdf = self.pdf_extractor.extract(job.file_path)
                raw_text = "\n\n".join(p.text for p in pdf.pages)
                self.queue.update_job(job_id, progress=0.4)
                tables = self.table_extractor.extract_tables(job.file_path)
                result = {"method": "text",
                    "pages": [{"page": p.page_number, "text": p.text} for p in pdf.pages],
                    "tables": [{"headers": t.headers, "rows": t.rows} for t in tables]}

            self.queue.update_job(job_id, progress=0.6)
            doc_type = self.structured.detect_document_type(raw_text)
            structured = self.structured.extract(raw_text, doc_type)
            result["structured_data"] = structured["data"]
            result["document_type"] = doc_type

            self.queue.update_job(job_id, progress=0.8)
            out = Path(settings.results_dir) / f"{job_id}.json"
            with open(out, "w") as f:
                json.dump(result, f, indent=2, default=str)

            self.queue.update_job(job_id, status=JobStatus.COMPLETED, progress=1.0, result=result)
            return result
        except Exception as e:
            logger.error(f"Job {job_id} failed: {e}")
            job.retries += 1
            if job.retries < job.max_retries:
                self.queue.update_job(job_id, status=JobStatus.PENDING, progress=0.0,
                    error=f"Retry {job.retries}: {e}")
                await self.queue._queue.put(job_id)
            else:
                self.queue.update_job(job_id, status=JobStatus.FAILED, error=str(e))
            raise

    async def worker(self):
        while True:
            job_id = await self.queue.get_next_job_id()
            try: await self.process_document(job_id)
            except Exception as e: logger.error(f"Worker error: {e}")
            await asyncio.sleep(0.1)

Step 3: Batch Upload API

# Add to app/main.py
import asyncio
from app.pipeline.queue import job_queue
from app.pipeline.processor import DocumentProcessor

processor = DocumentProcessor(job_queue)

@app.on_event("startup")
async def start_workers():
    for i in range(3):  # 3 concurrent workers
        asyncio.create_task(processor.worker())

@app.post("/api/batch")
async def batch_upload(files: list[UploadFile] = File(...)):
    jobs = []
    for file in files:
        content = await file.read()
        path = Path(settings.upload_dir) / file.filename
        with open(path, "wb") as f: f.write(content)
        job = await job_queue.add_job(file.filename, str(path))
        jobs.append({"job_id": job.id, "filename": job.filename, "status": job.status.value})
    return {"jobs": jobs, "total": len(jobs)}

@app.get("/api/jobs/{job_id}")
async def get_job_status(job_id: str):
    job = job_queue.get_job(job_id)
    if not job: raise HTTPException(status_code=404, detail="Job not found")
    return {"job_id": job.id, "filename": job.filename, "status": job.status.value,
            "progress": job.progress, "error": job.error}

@app.get("/api/jobs")
async def list_jobs():
    return {"jobs": [{"job_id": j.id, "filename": j.filename,
        "status": j.status.value, "progress": j.progress}
        for j in job_queue.get_all_jobs()]}
💡
Production upgrade: Replace the in-memory queue with Celery + Redis for persistence across restarts. The same process_document method works as a Celery task with minimal changes.

Key Takeaways

  • An async job queue decouples upload from processing for immediate API responses.
  • Progress tracking at each pipeline stage provides real-time status updates.
  • Automatic retry with configurable limits handles transient failures gracefully.
  • Multiple concurrent workers process documents in parallel for faster throughput.