Human Review Pipeline Advanced
Automated moderation handles the volume, but human reviewers handle the nuance. This lesson covers how to build a production review queue system, assign content to the right reviewers, ensure quality through inter-rater agreement, protect reviewer mental health, and manage SLAs so no content waits too long for a decision.
Review Queue Architecture
# Production review queue system
import heapq
import time
import uuid
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime, timedelta
class ReviewStatus(Enum):
PENDING = "pending"
ASSIGNED = "assigned"
IN_REVIEW = "in_review"
COMPLETED = "completed"
EXPIRED = "expired" # SLA breached, auto-escalated
class ReviewDecision(Enum):
REMOVE = "remove"
APPROVE = "approve"
RESTRICT = "restrict"
ESCALATE_HIGHER = "escalate_higher"
NEEDS_MORE_CONTEXT = "needs_more_context"
@dataclass
class ReviewTask:
task_id: str
content_id: str
content_type: str
category: str # Primary violation category
ml_confidence: float # ML model's confidence
severity: float
priority: int # 1=highest, 5=lowest
sla_deadline: datetime
status: ReviewStatus = ReviewStatus.PENDING
assigned_to: Optional[str] = None
assigned_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
decision: Optional[ReviewDecision] = None
reviewer_notes: str = ""
review_time_sec: float = 0
is_qa_sample: bool = False # Part of quality audit
def __lt__(self, other):
"""For priority queue: lower priority number = higher urgency."""
if self.priority != other.priority:
return self.priority < other.priority
return self.sla_deadline < other.sla_deadline
@dataclass
class Reviewer:
reviewer_id: str
name: str
skill_level: int # 1=junior, 2=senior, 3=lead
specializations: List[str] # Categories they're trained for
max_queue_size: int = 20
current_queue: List[str] = field(default_factory=list)
daily_reviewed: int = 0
daily_limit: int = 200
wellness_break_due: bool = False
shift_end: Optional[datetime] = None
accuracy_score: float = 0.95
class ReviewQueue:
"""Priority-based review queue with assignment logic."""
def __init__(self):
self._queue: List[ReviewTask] = [] # min-heap
self.tasks: Dict[str, ReviewTask] = {}
self.reviewers: Dict[str, Reviewer] = {}
def enqueue(self, task: ReviewTask):
"""Add a review task to the queue."""
self.tasks[task.task_id] = task
heapq.heappush(self._queue, task)
def assign_next(self, reviewer_id: str) -> Optional[ReviewTask]:
"""Assign the highest-priority task to a reviewer."""
reviewer = self.reviewers.get(reviewer_id)
if not reviewer:
return None
# Check reviewer capacity
if len(reviewer.current_queue) >= reviewer.max_queue_size:
return None
if reviewer.daily_reviewed >= reviewer.daily_limit:
return None
if reviewer.wellness_break_due:
return None
# Find a task matching reviewer's specializations
temp = []
assigned_task = None
while self._queue:
task = heapq.heappop(self._queue)
if (task.status == ReviewStatus.PENDING and
task.category in reviewer.specializations and
task.sla_deadline > datetime.utcnow()):
assigned_task = task
break
temp.append(task)
# Put unassigned tasks back
for t in temp:
heapq.heappush(self._queue, t)
if assigned_task:
assigned_task.status = ReviewStatus.ASSIGNED
assigned_task.assigned_to = reviewer_id
assigned_task.assigned_at = datetime.utcnow()
reviewer.current_queue.append(assigned_task.task_id)
return assigned_task
return None
def submit_decision(self, task_id: str, reviewer_id: str,
decision: ReviewDecision,
notes: str = "") -> Dict:
"""Reviewer submits their decision."""
task = self.tasks.get(task_id)
reviewer = self.reviewers.get(reviewer_id)
if not task or task.assigned_to != reviewer_id:
return {"error": "Invalid task or reviewer"}
task.status = ReviewStatus.COMPLETED
task.decision = decision
task.reviewer_notes = notes
task.completed_at = datetime.utcnow()
task.review_time_sec = (
task.completed_at - task.assigned_at
).total_seconds()
# Update reviewer stats
reviewer.current_queue.remove(task_id)
reviewer.daily_reviewed += 1
# Check if wellness break is needed
if reviewer.daily_reviewed % 50 == 0:
reviewer.wellness_break_due = True
return {
"task_id": task_id,
"decision": decision.value,
"review_time_sec": task.review_time_sec,
"met_sla": task.completed_at <= task.sla_deadline
}
def check_sla_breaches(self) -> List[ReviewTask]:
"""Find tasks that have breached their SLA deadline."""
breached = []
now = datetime.utcnow()
for task in self.tasks.values():
if (task.status in (ReviewStatus.PENDING, ReviewStatus.ASSIGNED)
and task.sla_deadline <= now):
task.status = ReviewStatus.EXPIRED
breached.append(task)
return breached
Reviewer Assignment Algorithms
# Smart reviewer assignment strategies
class ReviewerAssigner:
"""Assign content to the best-fit reviewer."""
def __init__(self, reviewers: Dict[str, Reviewer]):
self.reviewers = reviewers
def round_robin(self, category: str) -> Optional[str]:
"""Simple round-robin among eligible reviewers."""
eligible = [
r for r in self.reviewers.values()
if category in r.specializations
and len(r.current_queue) < r.max_queue_size
and not r.wellness_break_due
]
if not eligible:
return None
# Pick reviewer with least items in queue
return min(eligible, key=lambda r: len(r.current_queue)).reviewer_id
def skill_based(self, category: str,
severity: float) -> Optional[str]:
"""Assign based on severity: harder cases to senior reviewers."""
eligible = [
r for r in self.reviewers.values()
if category in r.specializations
and len(r.current_queue) < r.max_queue_size
and not r.wellness_break_due
]
if not eligible:
return None
# High severity -> need senior (skill_level >= 2)
if severity >= 0.8:
seniors = [r for r in eligible if r.skill_level >= 2]
if seniors:
return min(seniors,
key=lambda r: len(r.current_queue)).reviewer_id
# Lower severity -> any eligible reviewer
return min(eligible,
key=lambda r: len(r.current_queue)).reviewer_id
def load_balanced(self, category: str) -> Optional[str]:
"""Balance by estimated completion time, not just queue size."""
eligible = [
r for r in self.reviewers.values()
if category in r.specializations
and len(r.current_queue) < r.max_queue_size
and not r.wellness_break_due
and r.daily_reviewed < r.daily_limit
]
if not eligible:
return None
# Estimate time to clear each reviewer's queue
# Based on their average review speed
def estimated_wait(reviewer):
avg_time_per_review = 45 # seconds, from historical data
return len(reviewer.current_queue) * avg_time_per_review
return min(eligible, key=estimated_wait).reviewer_id
Quality Assurance for Reviewers
# QA system for moderation reviewer accuracy
import random
from collections import defaultdict
class ReviewerQA:
"""Quality assurance system for content moderation reviewers."""
def __init__(self, qa_sample_rate: float = 0.05,
agreement_threshold: float = 0.80):
self.qa_sample_rate = qa_sample_rate # 5% of reviews audited
self.agreement_threshold = agreement_threshold
self.reviewer_scores: Dict[str, List[bool]] = defaultdict(list)
self.golden_set: Dict[str, ReviewDecision] = {}
def should_qa_sample(self) -> bool:
"""Randomly select reviews for QA auditing."""
return random.random() < self.qa_sample_rate
def add_golden_item(self, content_id: str,
correct_decision: ReviewDecision):
"""Add a known-correct decision to the golden set.
Golden items are seeded into queues to test reviewers."""
self.golden_set[content_id] = correct_decision
def check_golden_decision(self, content_id: str,
reviewer_id: str,
reviewer_decision: ReviewDecision) -> Dict:
"""Check if reviewer's decision matches the golden answer."""
if content_id not in self.golden_set:
return {"is_golden": False}
correct = self.golden_set[content_id]
is_correct = reviewer_decision == correct
self.reviewer_scores[reviewer_id].append(is_correct)
return {
"is_golden": True,
"correct_answer": correct.value,
"reviewer_answer": reviewer_decision.value,
"is_correct": is_correct,
"reviewer_accuracy": self.get_reviewer_accuracy(reviewer_id)
}
def get_reviewer_accuracy(self, reviewer_id: str) -> float:
"""Calculate reviewer's accuracy on golden items."""
scores = self.reviewer_scores.get(reviewer_id, [])
if not scores:
return 1.0 # No data yet, assume good
return sum(scores) / len(scores)
def inter_rater_agreement(self, decisions_by_reviewer: Dict[str, Dict]
) -> Dict:
"""Calculate inter-rater agreement (Cohen's kappa-like)."""
# decisions_by_reviewer: {reviewer_id: {content_id: decision}}
reviewers = list(decisions_by_reviewer.keys())
if len(reviewers) < 2:
return {"error": "Need at least 2 reviewers"}
# Find content reviewed by multiple reviewers
agreements = 0
total = 0
for r1_id in reviewers:
for r2_id in reviewers:
if r1_id >= r2_id:
continue
r1_decisions = decisions_by_reviewer[r1_id]
r2_decisions = decisions_by_reviewer[r2_id]
shared = set(r1_decisions.keys()) & set(r2_decisions.keys())
for content_id in shared:
total += 1
if r1_decisions[content_id] == r2_decisions[content_id]:
agreements += 1
agreement_rate = agreements / total if total > 0 else 0
return {
"agreement_rate": agreement_rate,
"total_comparisons": total,
"meets_threshold": agreement_rate >= self.agreement_threshold
}
def identify_struggling_reviewers(self) -> List[Dict]:
"""Find reviewers who may need retraining."""
struggling = []
for reviewer_id, scores in self.reviewer_scores.items():
if len(scores) < 10:
continue # Not enough data
accuracy = sum(scores) / len(scores)
if accuracy < self.agreement_threshold:
struggling.append({
"reviewer_id": reviewer_id,
"accuracy": accuracy,
"total_qa_items": len(scores),
"recommendation": (
"retrain" if accuracy < 0.6
else "additional_coaching"
)
})
return struggling
Reviewer Wellness Considerations
Content moderators are exposed to disturbing material daily. Failing to protect their mental health leads to high turnover, poor decision quality, and legal liability.
# Reviewer wellness management system
class ReviewerWellness:
"""Protect reviewer mental health through exposure management."""
def __init__(self):
self.exposure_limits = {
"csam": 0, # Should NEVER be manually reviewed
"violence_gore": 15, # Max 15 items per shift
"self_harm": 10,
"sexual_content": 30,
"hate_speech": 50,
"spam": 200 # Low impact, higher limit
}
self.break_rules = {
"items_between_breaks": 50,
"break_duration_min": 10,
"max_shift_hours": 8,
"max_disturbing_consecutive": 3 # Max 3 graphic items in a row
}
self.reviewer_exposure: Dict[str, Dict[str, int]] = {}
def can_assign(self, reviewer_id: str, category: str) -> bool:
"""Check if reviewer can handle another item of this category."""
exposure = self.reviewer_exposure.get(reviewer_id, {})
current = exposure.get(category, 0)
limit = self.exposure_limits.get(category, 100)
return current < limit
def record_exposure(self, reviewer_id: str, category: str):
"""Track reviewer's exposure to disturbing content."""
if reviewer_id not in self.reviewer_exposure:
self.reviewer_exposure[reviewer_id] = {}
exposure = self.reviewer_exposure[reviewer_id]
exposure[category] = exposure.get(category, 0) + 1
# Check if break is needed
total = sum(exposure.values())
if total % self.break_rules["items_between_breaks"] == 0:
return {"break_required": True,
"duration_min": self.break_rules["break_duration_min"],
"reason": "Periodic wellness break"}
# Check consecutive disturbing content
disturbing_categories = {"violence_gore", "self_harm", "csam"}
if category in disturbing_categories:
consecutive = exposure.get("_consecutive_disturbing", 0) + 1
exposure["_consecutive_disturbing"] = consecutive
if consecutive >= self.break_rules["max_disturbing_consecutive"]:
exposure["_consecutive_disturbing"] = 0
return {"break_required": True,
"duration_min": 15,
"reason": "Consecutive disturbing content limit reached"}
else:
exposure["_consecutive_disturbing"] = 0
return {"break_required": False}
def get_wellness_report(self, reviewer_id: str) -> Dict:
"""Generate wellness report for a reviewer."""
exposure = self.reviewer_exposure.get(reviewer_id, {})
warnings = []
for category, count in exposure.items():
if category.startswith("_"):
continue
limit = self.exposure_limits.get(category, 100)
if count >= limit * 0.8:
warnings.append(
f"{category}: {count}/{limit} (approaching limit)"
)
return {
"reviewer_id": reviewer_id,
"total_items_reviewed": sum(
v for k, v in exposure.items() if not k.startswith("_")),
"exposure_by_category": {
k: v for k, v in exposure.items() if not k.startswith("_")
},
"warnings": warnings,
"counseling_recommended": len(warnings) >= 2
}
Legal Requirement: Many jurisdictions now require platforms to provide mental health support for content moderators. This includes access to counseling, mandatory breaks, exposure limits, and the right to refuse particularly disturbing content. Build these protections into your system from day one, not as an afterthought.
SLA Management
# SLA monitoring and alerting
class SLAManager:
"""Monitor and enforce review SLA compliance."""
def __init__(self):
self.sla_targets = {
1: timedelta(hours=1), # Priority 1: 1 hour
2: timedelta(hours=4), # Priority 2: 4 hours
3: timedelta(hours=12), # Priority 3: 12 hours
4: timedelta(hours=24), # Priority 4: 24 hours
5: timedelta(hours=72), # Priority 5: 72 hours
}
def check_compliance(self, tasks: List[ReviewTask]) -> Dict:
"""Calculate SLA compliance metrics."""
total = len(tasks)
met_sla = 0
breached = 0
at_risk = 0 # Within 20% of deadline
for task in tasks:
if task.status == ReviewStatus.COMPLETED:
if task.completed_at <= task.sla_deadline:
met_sla += 1
else:
breached += 1
elif task.status in (ReviewStatus.PENDING,
ReviewStatus.ASSIGNED):
remaining = task.sla_deadline - datetime.utcnow()
total_sla = self.sla_targets[task.priority]
if remaining < total_sla * 0.2:
at_risk += 1
return {
"total_tasks": total,
"met_sla": met_sla,
"breached": breached,
"at_risk": at_risk,
"compliance_rate": met_sla / total if total > 0 else 1.0,
"alert": breached > 0 or at_risk > total * 0.1
}
Lilly Tech Systems