Human Review Pipeline Advanced

Automated moderation handles the volume, but human reviewers handle the nuance. This lesson covers how to build a production review queue system, assign content to the right reviewers, ensure quality through inter-rater agreement, protect reviewer mental health, and manage SLAs so no content waits too long for a decision.

Review Queue Architecture

# Production review queue system
import heapq
import time
import uuid
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime, timedelta

class ReviewStatus(Enum):
    PENDING = "pending"
    ASSIGNED = "assigned"
    IN_REVIEW = "in_review"
    COMPLETED = "completed"
    EXPIRED = "expired"         # SLA breached, auto-escalated

class ReviewDecision(Enum):
    REMOVE = "remove"
    APPROVE = "approve"
    RESTRICT = "restrict"
    ESCALATE_HIGHER = "escalate_higher"
    NEEDS_MORE_CONTEXT = "needs_more_context"

@dataclass
class ReviewTask:
    task_id: str
    content_id: str
    content_type: str
    category: str              # Primary violation category
    ml_confidence: float       # ML model's confidence
    severity: float
    priority: int              # 1=highest, 5=lowest
    sla_deadline: datetime
    status: ReviewStatus = ReviewStatus.PENDING
    assigned_to: Optional[str] = None
    assigned_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    decision: Optional[ReviewDecision] = None
    reviewer_notes: str = ""
    review_time_sec: float = 0
    is_qa_sample: bool = False  # Part of quality audit

    def __lt__(self, other):
        """For priority queue: lower priority number = higher urgency."""
        if self.priority != other.priority:
            return self.priority < other.priority
        return self.sla_deadline < other.sla_deadline

@dataclass
class Reviewer:
    reviewer_id: str
    name: str
    skill_level: int           # 1=junior, 2=senior, 3=lead
    specializations: List[str] # Categories they're trained for
    max_queue_size: int = 20
    current_queue: List[str] = field(default_factory=list)
    daily_reviewed: int = 0
    daily_limit: int = 200
    wellness_break_due: bool = False
    shift_end: Optional[datetime] = None
    accuracy_score: float = 0.95

class ReviewQueue:
    """Priority-based review queue with assignment logic."""

    def __init__(self):
        self._queue: List[ReviewTask] = []  # min-heap
        self.tasks: Dict[str, ReviewTask] = {}
        self.reviewers: Dict[str, Reviewer] = {}

    def enqueue(self, task: ReviewTask):
        """Add a review task to the queue."""
        self.tasks[task.task_id] = task
        heapq.heappush(self._queue, task)

    def assign_next(self, reviewer_id: str) -> Optional[ReviewTask]:
        """Assign the highest-priority task to a reviewer."""
        reviewer = self.reviewers.get(reviewer_id)
        if not reviewer:
            return None

        # Check reviewer capacity
        if len(reviewer.current_queue) >= reviewer.max_queue_size:
            return None
        if reviewer.daily_reviewed >= reviewer.daily_limit:
            return None
        if reviewer.wellness_break_due:
            return None

        # Find a task matching reviewer's specializations
        temp = []
        assigned_task = None

        while self._queue:
            task = heapq.heappop(self._queue)
            if (task.status == ReviewStatus.PENDING and
                task.category in reviewer.specializations and
                task.sla_deadline > datetime.utcnow()):
                assigned_task = task
                break
            temp.append(task)

        # Put unassigned tasks back
        for t in temp:
            heapq.heappush(self._queue, t)

        if assigned_task:
            assigned_task.status = ReviewStatus.ASSIGNED
            assigned_task.assigned_to = reviewer_id
            assigned_task.assigned_at = datetime.utcnow()
            reviewer.current_queue.append(assigned_task.task_id)
            return assigned_task

        return None

    def submit_decision(self, task_id: str, reviewer_id: str,
                        decision: ReviewDecision,
                        notes: str = "") -> Dict:
        """Reviewer submits their decision."""
        task = self.tasks.get(task_id)
        reviewer = self.reviewers.get(reviewer_id)

        if not task or task.assigned_to != reviewer_id:
            return {"error": "Invalid task or reviewer"}

        task.status = ReviewStatus.COMPLETED
        task.decision = decision
        task.reviewer_notes = notes
        task.completed_at = datetime.utcnow()
        task.review_time_sec = (
            task.completed_at - task.assigned_at
        ).total_seconds()

        # Update reviewer stats
        reviewer.current_queue.remove(task_id)
        reviewer.daily_reviewed += 1

        # Check if wellness break is needed
        if reviewer.daily_reviewed % 50 == 0:
            reviewer.wellness_break_due = True

        return {
            "task_id": task_id,
            "decision": decision.value,
            "review_time_sec": task.review_time_sec,
            "met_sla": task.completed_at <= task.sla_deadline
        }

    def check_sla_breaches(self) -> List[ReviewTask]:
        """Find tasks that have breached their SLA deadline."""
        breached = []
        now = datetime.utcnow()
        for task in self.tasks.values():
            if (task.status in (ReviewStatus.PENDING, ReviewStatus.ASSIGNED)
                and task.sla_deadline <= now):
                task.status = ReviewStatus.EXPIRED
                breached.append(task)
        return breached

Reviewer Assignment Algorithms

# Smart reviewer assignment strategies
class ReviewerAssigner:
    """Assign content to the best-fit reviewer."""

    def __init__(self, reviewers: Dict[str, Reviewer]):
        self.reviewers = reviewers

    def round_robin(self, category: str) -> Optional[str]:
        """Simple round-robin among eligible reviewers."""
        eligible = [
            r for r in self.reviewers.values()
            if category in r.specializations
            and len(r.current_queue) < r.max_queue_size
            and not r.wellness_break_due
        ]
        if not eligible:
            return None
        # Pick reviewer with least items in queue
        return min(eligible, key=lambda r: len(r.current_queue)).reviewer_id

    def skill_based(self, category: str,
                    severity: float) -> Optional[str]:
        """Assign based on severity: harder cases to senior reviewers."""
        eligible = [
            r for r in self.reviewers.values()
            if category in r.specializations
            and len(r.current_queue) < r.max_queue_size
            and not r.wellness_break_due
        ]
        if not eligible:
            return None

        # High severity -> need senior (skill_level >= 2)
        if severity >= 0.8:
            seniors = [r for r in eligible if r.skill_level >= 2]
            if seniors:
                return min(seniors,
                           key=lambda r: len(r.current_queue)).reviewer_id

        # Lower severity -> any eligible reviewer
        return min(eligible,
                   key=lambda r: len(r.current_queue)).reviewer_id

    def load_balanced(self, category: str) -> Optional[str]:
        """Balance by estimated completion time, not just queue size."""
        eligible = [
            r for r in self.reviewers.values()
            if category in r.specializations
            and len(r.current_queue) < r.max_queue_size
            and not r.wellness_break_due
            and r.daily_reviewed < r.daily_limit
        ]
        if not eligible:
            return None

        # Estimate time to clear each reviewer's queue
        # Based on their average review speed
        def estimated_wait(reviewer):
            avg_time_per_review = 45  # seconds, from historical data
            return len(reviewer.current_queue) * avg_time_per_review

        return min(eligible, key=estimated_wait).reviewer_id

Quality Assurance for Reviewers

# QA system for moderation reviewer accuracy
import random
from collections import defaultdict

class ReviewerQA:
    """Quality assurance system for content moderation reviewers."""

    def __init__(self, qa_sample_rate: float = 0.05,
                 agreement_threshold: float = 0.80):
        self.qa_sample_rate = qa_sample_rate  # 5% of reviews audited
        self.agreement_threshold = agreement_threshold
        self.reviewer_scores: Dict[str, List[bool]] = defaultdict(list)
        self.golden_set: Dict[str, ReviewDecision] = {}

    def should_qa_sample(self) -> bool:
        """Randomly select reviews for QA auditing."""
        return random.random() < self.qa_sample_rate

    def add_golden_item(self, content_id: str,
                        correct_decision: ReviewDecision):
        """Add a known-correct decision to the golden set.
        Golden items are seeded into queues to test reviewers."""
        self.golden_set[content_id] = correct_decision

    def check_golden_decision(self, content_id: str,
                               reviewer_id: str,
                               reviewer_decision: ReviewDecision) -> Dict:
        """Check if reviewer's decision matches the golden answer."""
        if content_id not in self.golden_set:
            return {"is_golden": False}

        correct = self.golden_set[content_id]
        is_correct = reviewer_decision == correct
        self.reviewer_scores[reviewer_id].append(is_correct)

        return {
            "is_golden": True,
            "correct_answer": correct.value,
            "reviewer_answer": reviewer_decision.value,
            "is_correct": is_correct,
            "reviewer_accuracy": self.get_reviewer_accuracy(reviewer_id)
        }

    def get_reviewer_accuracy(self, reviewer_id: str) -> float:
        """Calculate reviewer's accuracy on golden items."""
        scores = self.reviewer_scores.get(reviewer_id, [])
        if not scores:
            return 1.0  # No data yet, assume good
        return sum(scores) / len(scores)

    def inter_rater_agreement(self, decisions_by_reviewer: Dict[str, Dict]
                              ) -> Dict:
        """Calculate inter-rater agreement (Cohen's kappa-like)."""
        # decisions_by_reviewer: {reviewer_id: {content_id: decision}}
        reviewers = list(decisions_by_reviewer.keys())
        if len(reviewers) < 2:
            return {"error": "Need at least 2 reviewers"}

        # Find content reviewed by multiple reviewers
        agreements = 0
        total = 0
        for r1_id in reviewers:
            for r2_id in reviewers:
                if r1_id >= r2_id:
                    continue
                r1_decisions = decisions_by_reviewer[r1_id]
                r2_decisions = decisions_by_reviewer[r2_id]
                shared = set(r1_decisions.keys()) & set(r2_decisions.keys())
                for content_id in shared:
                    total += 1
                    if r1_decisions[content_id] == r2_decisions[content_id]:
                        agreements += 1

        agreement_rate = agreements / total if total > 0 else 0
        return {
            "agreement_rate": agreement_rate,
            "total_comparisons": total,
            "meets_threshold": agreement_rate >= self.agreement_threshold
        }

    def identify_struggling_reviewers(self) -> List[Dict]:
        """Find reviewers who may need retraining."""
        struggling = []
        for reviewer_id, scores in self.reviewer_scores.items():
            if len(scores) < 10:
                continue  # Not enough data
            accuracy = sum(scores) / len(scores)
            if accuracy < self.agreement_threshold:
                struggling.append({
                    "reviewer_id": reviewer_id,
                    "accuracy": accuracy,
                    "total_qa_items": len(scores),
                    "recommendation": (
                        "retrain" if accuracy < 0.6
                        else "additional_coaching"
                    )
                })
        return struggling

Reviewer Wellness Considerations

Content moderators are exposed to disturbing material daily. Failing to protect their mental health leads to high turnover, poor decision quality, and legal liability.

# Reviewer wellness management system
class ReviewerWellness:
    """Protect reviewer mental health through exposure management."""

    def __init__(self):
        self.exposure_limits = {
            "csam": 0,           # Should NEVER be manually reviewed
            "violence_gore": 15,  # Max 15 items per shift
            "self_harm": 10,
            "sexual_content": 30,
            "hate_speech": 50,
            "spam": 200          # Low impact, higher limit
        }
        self.break_rules = {
            "items_between_breaks": 50,
            "break_duration_min": 10,
            "max_shift_hours": 8,
            "max_disturbing_consecutive": 3  # Max 3 graphic items in a row
        }
        self.reviewer_exposure: Dict[str, Dict[str, int]] = {}

    def can_assign(self, reviewer_id: str, category: str) -> bool:
        """Check if reviewer can handle another item of this category."""
        exposure = self.reviewer_exposure.get(reviewer_id, {})
        current = exposure.get(category, 0)
        limit = self.exposure_limits.get(category, 100)
        return current < limit

    def record_exposure(self, reviewer_id: str, category: str):
        """Track reviewer's exposure to disturbing content."""
        if reviewer_id not in self.reviewer_exposure:
            self.reviewer_exposure[reviewer_id] = {}
        exposure = self.reviewer_exposure[reviewer_id]
        exposure[category] = exposure.get(category, 0) + 1

        # Check if break is needed
        total = sum(exposure.values())
        if total % self.break_rules["items_between_breaks"] == 0:
            return {"break_required": True,
                    "duration_min": self.break_rules["break_duration_min"],
                    "reason": "Periodic wellness break"}

        # Check consecutive disturbing content
        disturbing_categories = {"violence_gore", "self_harm", "csam"}
        if category in disturbing_categories:
            consecutive = exposure.get("_consecutive_disturbing", 0) + 1
            exposure["_consecutive_disturbing"] = consecutive
            if consecutive >= self.break_rules["max_disturbing_consecutive"]:
                exposure["_consecutive_disturbing"] = 0
                return {"break_required": True,
                        "duration_min": 15,
                        "reason": "Consecutive disturbing content limit reached"}
        else:
            exposure["_consecutive_disturbing"] = 0

        return {"break_required": False}

    def get_wellness_report(self, reviewer_id: str) -> Dict:
        """Generate wellness report for a reviewer."""
        exposure = self.reviewer_exposure.get(reviewer_id, {})
        warnings = []
        for category, count in exposure.items():
            if category.startswith("_"):
                continue
            limit = self.exposure_limits.get(category, 100)
            if count >= limit * 0.8:
                warnings.append(
                    f"{category}: {count}/{limit} (approaching limit)"
                )
        return {
            "reviewer_id": reviewer_id,
            "total_items_reviewed": sum(
                v for k, v in exposure.items() if not k.startswith("_")),
            "exposure_by_category": {
                k: v for k, v in exposure.items() if not k.startswith("_")
            },
            "warnings": warnings,
            "counseling_recommended": len(warnings) >= 2
        }
Legal Requirement: Many jurisdictions now require platforms to provide mental health support for content moderators. This includes access to counseling, mandatory breaks, exposure limits, and the right to refuse particularly disturbing content. Build these protections into your system from day one, not as an afterthought.

SLA Management

# SLA monitoring and alerting
class SLAManager:
    """Monitor and enforce review SLA compliance."""

    def __init__(self):
        self.sla_targets = {
            1: timedelta(hours=1),    # Priority 1: 1 hour
            2: timedelta(hours=4),    # Priority 2: 4 hours
            3: timedelta(hours=12),   # Priority 3: 12 hours
            4: timedelta(hours=24),   # Priority 4: 24 hours
            5: timedelta(hours=72),   # Priority 5: 72 hours
        }

    def check_compliance(self, tasks: List[ReviewTask]) -> Dict:
        """Calculate SLA compliance metrics."""
        total = len(tasks)
        met_sla = 0
        breached = 0
        at_risk = 0  # Within 20% of deadline

        for task in tasks:
            if task.status == ReviewStatus.COMPLETED:
                if task.completed_at <= task.sla_deadline:
                    met_sla += 1
                else:
                    breached += 1
            elif task.status in (ReviewStatus.PENDING,
                                  ReviewStatus.ASSIGNED):
                remaining = task.sla_deadline - datetime.utcnow()
                total_sla = self.sla_targets[task.priority]
                if remaining < total_sla * 0.2:
                    at_risk += 1

        return {
            "total_tasks": total,
            "met_sla": met_sla,
            "breached": breached,
            "at_risk": at_risk,
            "compliance_rate": met_sla / total if total > 0 else 1.0,
            "alert": breached > 0 or at_risk > total * 0.1
        }