Advanced
Batch Processing Pipeline
In this step, you will build an asynchronous processing pipeline that handles hundreds of documents with queuing, progress tracking, retry logic, and error handling.
Why Batch Processing?
Processing one document takes 5-30 seconds. For 100 documents sequentially, that is 8-50 minutes. A batch pipeline with async processing and parallelism reduces this dramatically.
Step 1: Job Queue and Status Tracking
# app/pipeline/queue.py
import uuid
import asyncio
import logging
from enum import Enum
from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger(__name__)
class JobStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class ProcessingJob:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
filename: str = ""
file_path: str = ""
status: JobStatus = JobStatus.PENDING
progress: float = 0.0
result: Optional[dict] = None
error: Optional[str] = None
created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
completed_at: Optional[str] = None
retries: int = 0
max_retries: int = 3
class JobQueue:
"""In-memory job queue with status tracking."""
def __init__(self):
self.jobs: dict[str, ProcessingJob] = {}
self._queue: asyncio.Queue = asyncio.Queue()
async def add_job(self, filename: str, file_path: str) -> ProcessingJob:
job = ProcessingJob(filename=filename, file_path=file_path)
self.jobs[job.id] = job
await self._queue.put(job.id)
return job
def get_job(self, job_id: str) -> Optional[ProcessingJob]:
return self.jobs.get(job_id)
def update_job(self, job_id, status=None, progress=None, result=None, error=None):
job = self.jobs.get(job_id)
if not job:
return
if status: job.status = status
if progress is not None: job.progress = progress
if result is not None: job.result = result
if error is not None: job.error = error
if status in (JobStatus.COMPLETED, JobStatus.FAILED):
job.completed_at = datetime.utcnow().isoformat()
def get_all_jobs(self):
return sorted(self.jobs.values(), key=lambda j: j.created_at, reverse=True)
async def get_next_job_id(self):
return await self._queue.get()
job_queue = JobQueue()
Step 2: Document Processing Pipeline
# app/pipeline/processor.py
import asyncio
import json
import logging
from pathlib import Path
from app.config import get_settings
from app.extraction.pdf_extractor import PDFExtractor
from app.extraction.table_extractor import TableExtractor
from app.vision.vision_analyzer import VisionAnalyzer
from app.vision.router import needs_vision_analysis
from app.structuring.extractor import StructuredExtractor
from app.pipeline.queue import JobQueue, JobStatus
logger = logging.getLogger(__name__)
settings = get_settings()
class DocumentProcessor:
def __init__(self, queue: JobQueue):
self.queue = queue
self.pdf_extractor = PDFExtractor()
self.table_extractor = TableExtractor()
self.vision = VisionAnalyzer()
self.structured = StructuredExtractor()
async def process_document(self, job_id: str) -> dict:
job = self.queue.get_job(job_id)
if not job: raise ValueError(f"Job not found: {job_id}")
self.queue.update_job(job_id, status=JobStatus.PROCESSING, progress=0.1)
try:
use_vision = needs_vision_analysis(job.file_path)
self.queue.update_job(job_id, progress=0.2)
if use_vision:
vision_result = self.vision.analyze_image(job.file_path)
raw_text = vision_result["text"]
result = {"method": "vision", "vision_output": vision_result}
else:
pdf = self.pdf_extractor.extract(job.file_path)
raw_text = "\n\n".join(p.text for p in pdf.pages)
self.queue.update_job(job_id, progress=0.4)
tables = self.table_extractor.extract_tables(job.file_path)
result = {"method": "text",
"pages": [{"page": p.page_number, "text": p.text} for p in pdf.pages],
"tables": [{"headers": t.headers, "rows": t.rows} for t in tables]}
self.queue.update_job(job_id, progress=0.6)
doc_type = self.structured.detect_document_type(raw_text)
structured = self.structured.extract(raw_text, doc_type)
result["structured_data"] = structured["data"]
result["document_type"] = doc_type
self.queue.update_job(job_id, progress=0.8)
out = Path(settings.results_dir) / f"{job_id}.json"
with open(out, "w") as f:
json.dump(result, f, indent=2, default=str)
self.queue.update_job(job_id, status=JobStatus.COMPLETED, progress=1.0, result=result)
return result
except Exception as e:
logger.error(f"Job {job_id} failed: {e}")
job.retries += 1
if job.retries < job.max_retries:
self.queue.update_job(job_id, status=JobStatus.PENDING, progress=0.0,
error=f"Retry {job.retries}: {e}")
await self.queue._queue.put(job_id)
else:
self.queue.update_job(job_id, status=JobStatus.FAILED, error=str(e))
raise
async def worker(self):
while True:
job_id = await self.queue.get_next_job_id()
try: await self.process_document(job_id)
except Exception as e: logger.error(f"Worker error: {e}")
await asyncio.sleep(0.1)
Step 3: Batch Upload API
# Add to app/main.py
import asyncio
from app.pipeline.queue import job_queue
from app.pipeline.processor import DocumentProcessor
processor = DocumentProcessor(job_queue)
@app.on_event("startup")
async def start_workers():
for i in range(3): # 3 concurrent workers
asyncio.create_task(processor.worker())
@app.post("/api/batch")
async def batch_upload(files: list[UploadFile] = File(...)):
jobs = []
for file in files:
content = await file.read()
path = Path(settings.upload_dir) / file.filename
with open(path, "wb") as f: f.write(content)
job = await job_queue.add_job(file.filename, str(path))
jobs.append({"job_id": job.id, "filename": job.filename, "status": job.status.value})
return {"jobs": jobs, "total": len(jobs)}
@app.get("/api/jobs/{job_id}")
async def get_job_status(job_id: str):
job = job_queue.get_job(job_id)
if not job: raise HTTPException(status_code=404, detail="Job not found")
return {"job_id": job.id, "filename": job.filename, "status": job.status.value,
"progress": job.progress, "error": job.error}
@app.get("/api/jobs")
async def list_jobs():
return {"jobs": [{"job_id": j.id, "filename": j.filename,
"status": j.status.value, "progress": j.progress}
for j in job_queue.get_all_jobs()]}
Production upgrade: Replace the in-memory queue with Celery + Redis for persistence across restarts. The same
process_document method works as a Celery task with minimal changes.Key Takeaways
- An async job queue decouples upload from processing for immediate API responses.
- Progress tracking at each pipeline stage provides real-time status updates.
- Automatic retry with configurable limits handles transient failures gracefully.
- Multiple concurrent workers process documents in parallel for faster throughput.
Lilly Tech Systems