Queues & Pub/Sub
Message queues and publish-subscribe systems are the backbone of distributed architectures. In coding interviews, you may be asked to implement an in-memory message queue with acknowledgment, a priority scheduler, a topic-based pub/sub system, or a dead letter queue with retry logic. This lesson covers all four.
Problem 1: Message Queue
Implement an in-memory message queue that supports enqueue, dequeue, acknowledgment, and visibility timeout (messages become visible again if not acknowledged within a timeout).
import uuid
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, List
from enum import Enum
class MessageStatus(Enum):
PENDING = "pending"
IN_FLIGHT = "in_flight"
ACKNOWLEDGED = "acknowledged"
@dataclass
class Message:
id: str
body: Any
created_at: float
status: MessageStatus = MessageStatus.PENDING
delivery_count: int = 0
visible_after: float = 0.0
class MessageQueue:
"""In-memory message queue with visibility timeout.
Features:
- FIFO ordering
- Visibility timeout: message becomes invisible after dequeue,
re-appears if not acknowledged within timeout
- At-least-once delivery semantics
- Message acknowledgment
Similar to AWS SQS behavior.
"""
def __init__(self, visibility_timeout: int = 30):
self._visibility_timeout = visibility_timeout
self._messages: Dict[str, Message] = {}
self._queue: deque = deque() # message IDs in FIFO order
def enqueue(self, body: Any) -> str:
"""Add a message to the queue. Returns message ID."""
msg_id = str(uuid.uuid4())
msg = Message(
id=msg_id,
body=body,
created_at=time.time(),
)
self._messages[msg_id] = msg
self._queue.append(msg_id)
return msg_id
def dequeue(self, max_messages: int = 1) -> List[Message]:
"""Receive up to max_messages from the queue.
Messages become invisible for visibility_timeout seconds.
If not acknowledged, they become visible again.
"""
now = time.time()
result = []
# Check all messages for visibility
retry_ids = []
for msg_id, msg in self._messages.items():
if msg.status == MessageStatus.IN_FLIGHT and now > msg.visible_after:
# Timeout expired, make visible again
msg.status = MessageStatus.PENDING
retry_ids.append(msg_id)
# Add timed-out messages back to queue
for msg_id in retry_ids:
if msg_id not in self._queue:
self._queue.append(msg_id)
# Dequeue visible messages
checked = 0
max_checks = len(self._queue)
while self._queue and len(result) < max_messages and checked < max_checks:
msg_id = self._queue.popleft()
checked += 1
if msg_id not in self._messages:
continue
msg = self._messages[msg_id]
if msg.status == MessageStatus.PENDING:
msg.status = MessageStatus.IN_FLIGHT
msg.delivery_count += 1
msg.visible_after = now + self._visibility_timeout
result.append(msg)
else:
self._queue.append(msg_id)
return result
def acknowledge(self, message_id: str) -> bool:
"""Acknowledge a message (mark as processed). Returns success."""
if message_id not in self._messages:
return False
msg = self._messages[message_id]
if msg.status != MessageStatus.IN_FLIGHT:
return False
msg.status = MessageStatus.ACKNOWLEDGED
del self._messages[message_id]
return True
def size(self) -> int:
"""Return number of pending + in-flight messages."""
return sum(1 for m in self._messages.values()
if m.status != MessageStatus.ACKNOWLEDGED)
def pending_count(self) -> int:
return sum(1 for m in self._messages.values()
if m.status == MessageStatus.PENDING)
# ---- Usage ----
queue = MessageQueue(visibility_timeout=5)
# Producer
msg1 = queue.enqueue({"type": "email", "to": "alice@test.com"})
msg2 = queue.enqueue({"type": "email", "to": "bob@test.com"})
print(f"Queue size: {queue.size()}") # 2
# Consumer
messages = queue.dequeue(max_messages=1)
print(f"Received: {messages[0].body}")
# Acknowledge
queue.acknowledge(messages[0].id)
print(f"Queue size after ack: {queue.size()}") # 1
Problem 2: Priority Queue Scheduler
Implement a task scheduler that processes tasks based on priority. Higher-priority tasks are processed first. Equal priority tasks follow FIFO order.
import heapq
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import IntEnum
class Priority(IntEnum):
CRITICAL = 0 # Highest priority (lowest number in min-heap)
HIGH = 1
MEDIUM = 2
LOW = 3
@dataclass(order=True)
class ScheduledTask:
priority: int
sequence: int = field(compare=True) # For FIFO within same priority
task_id: str = field(compare=False)
payload: Any = field(compare=False)
created_at: float = field(compare=False, default_factory=time.time)
class PriorityScheduler:
"""Priority-based task scheduler.
Uses a min-heap where lower priority numbers = higher priority.
Within the same priority level, tasks are ordered FIFO using
a sequence counter.
"""
def __init__(self):
self._heap: list = []
self._sequence = 0
self._task_ids: set = set()
self._cancelled: set = set()
def schedule(self, task_id: str, payload: Any,
priority: Priority = Priority.MEDIUM) -> bool:
"""Schedule a task. Returns False if task_id already exists."""
if task_id in self._task_ids:
return False
task = ScheduledTask(
priority=priority.value,
sequence=self._sequence,
task_id=task_id,
payload=payload,
)
self._sequence += 1
self._task_ids.add(task_id)
heapq.heappush(self._heap, task)
return True
def next(self) -> Optional[ScheduledTask]:
"""Get the next highest-priority task."""
while self._heap:
task = heapq.heappop(self._heap)
if task.task_id not in self._cancelled:
self._task_ids.discard(task.task_id)
return task
# Skip cancelled tasks
self._cancelled.discard(task.task_id)
return None
def cancel(self, task_id: str) -> bool:
"""Cancel a scheduled task (lazy deletion)."""
if task_id in self._task_ids:
self._cancelled.add(task_id)
self._task_ids.discard(task_id)
return True
return False
def peek(self) -> Optional[ScheduledTask]:
"""View the next task without removing it."""
while self._heap:
if self._heap[0].task_id not in self._cancelled:
return self._heap[0]
task = heapq.heappop(self._heap)
self._cancelled.discard(task.task_id)
return None
def size(self) -> int:
return len(self._task_ids)
# ---- Usage ----
scheduler = PriorityScheduler()
scheduler.schedule("send_alert", {"msg": "Server down"}, Priority.CRITICAL)
scheduler.schedule("process_payment", {"amount": 99.99}, Priority.HIGH)
scheduler.schedule("send_email", {"to": "user@test.com"}, Priority.LOW)
scheduler.schedule("update_cache", {"key": "users"}, Priority.MEDIUM)
# Process in priority order
while True:
task = scheduler.next()
if task is None:
break
print(f"Processing: {task.task_id} (priority={task.priority})")
Problem 3: Pub/Sub System
Implement a publish-subscribe system where publishers send messages to topics, and subscribers receive messages from topics they are subscribed to.
from collections import defaultdict
from typing import Callable, Set
class PubSubSystem:
"""Topic-based publish-subscribe system.
Features:
- Multiple topics
- Multiple subscribers per topic
- Wildcard topic subscriptions (e.g., "orders.*")
- Message history per topic
- Subscriber groups (only one subscriber in a group receives each message)
"""
def __init__(self, max_history: int = 100):
self._subscribers: Dict[str, Dict[str, Callable]] = defaultdict(dict)
# topic -> {subscriber_id: callback}
self._history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_history))
self._subscriber_counter = 0
self._groups: Dict[str, Dict[str, List[str]]] = defaultdict(lambda: defaultdict(list))
# topic -> {group_name: [subscriber_ids]}
def subscribe(self, topic: str, callback: Callable,
subscriber_id: Optional[str] = None,
group: Optional[str] = None) -> str:
"""Subscribe to a topic. Returns subscriber ID."""
if subscriber_id is None:
self._subscriber_counter += 1
subscriber_id = f"sub_{self._subscriber_counter}"
self._subscribers[topic][subscriber_id] = callback
if group:
self._groups[topic][group].append(subscriber_id)
return subscriber_id
def unsubscribe(self, topic: str, subscriber_id: str) -> bool:
"""Unsubscribe from a topic."""
if topic in self._subscribers and subscriber_id in self._subscribers[topic]:
del self._subscribers[topic][subscriber_id]
# Remove from groups
for group_name, members in self._groups[topic].items():
if subscriber_id in members:
members.remove(subscriber_id)
return True
return False
def publish(self, topic: str, message: Any) -> int:
"""Publish a message to a topic. Returns number of deliveries."""
# Store in history
msg_record = {
"topic": topic,
"message": message,
"timestamp": time.time(),
}
self._history[topic].append(msg_record)
delivered = 0
# Direct subscribers (not in groups)
grouped_subs = set()
for group_members in self._groups[topic].values():
grouped_subs.update(group_members)
for sub_id, callback in self._subscribers.get(topic, {}).items():
if sub_id not in grouped_subs:
try:
callback(message)
delivered += 1
except Exception:
pass # Don't let one subscriber break others
# Group subscribers (round-robin within each group)
for group_name, members in self._groups[topic].items():
if not members:
continue
# Round-robin: pick next member
active_members = [m for m in members if m in self._subscribers.get(topic, {})]
if active_members:
chosen = active_members[hash(str(time.time())) % len(active_members)]
try:
self._subscribers[topic][chosen](message)
delivered += 1
except Exception:
pass
# Wildcard matching: "orders.*" matches "orders.created"
for pattern, subs in self._subscribers.items():
if pattern != topic and self._matches_wildcard(pattern, topic):
for sub_id, callback in subs.items():
try:
callback(message)
delivered += 1
except Exception:
pass
return delivered
def _matches_wildcard(self, pattern: str, topic: str) -> bool:
"""Check if a wildcard pattern matches a topic."""
if "*" not in pattern:
return False
parts_p = pattern.split(".")
parts_t = topic.split(".")
if len(parts_p) != len(parts_t):
return False
return all(p == "*" or p == t for p, t in zip(parts_p, parts_t))
def get_history(self, topic: str, limit: int = 10) -> List[Dict]:
"""Get recent messages for a topic."""
history = list(self._history.get(topic, []))
return history[-limit:]
def get_topics(self) -> List[str]:
"""List all topics with subscribers."""
return list(self._subscribers.keys())
# ---- Usage ----
pubsub = PubSubSystem()
# Subscribe
received = []
pubsub.subscribe("orders.created", lambda msg: received.append(("handler1", msg)))
pubsub.subscribe("orders.created", lambda msg: received.append(("handler2", msg)))
pubsub.subscribe("orders.*", lambda msg: received.append(("wildcard", msg)))
# Publish
count = pubsub.publish("orders.created", {"order_id": "123", "amount": 49.99})
print(f"Delivered to {count} subscribers")
print(f"Received: {received}")
Problem 4: Dead Letter Queue
Implement a dead letter queue (DLQ) that catches messages that fail processing after a configurable number of retries. This is essential for building reliable message processing systems.
@dataclass
class DLQMessage:
id: str
body: Any
original_queue: str
failure_reason: str
retry_count: int
first_failed_at: float
last_failed_at: float
class DeadLetterQueue:
"""Dead letter queue with retry logic.
When a message fails processing max_retries times, it is moved
to the dead letter queue for manual inspection or reprocessing.
Features:
- Configurable max retries per message
- Exponential backoff between retries
- DLQ inspection and replay
- Failure reason tracking
"""
def __init__(self, max_retries: int = 3, base_backoff: float = 1.0):
self._max_retries = max_retries
self._base_backoff = base_backoff
self._retry_counts: Dict[str, int] = {}
self._last_attempt: Dict[str, float] = {}
self._dlq: Dict[str, DLQMessage] = {}
self._failure_reasons: Dict[str, str] = {}
def should_retry(self, message_id: str) -> bool:
"""Check if a message should be retried."""
count = self._retry_counts.get(message_id, 0)
if count >= self._max_retries:
return False
# Check backoff
last = self._last_attempt.get(message_id, 0)
backoff = self._base_backoff * (2 ** count) # Exponential backoff
if time.time() - last < backoff:
return False # Too soon to retry
return True
def record_failure(self, message_id: str, body: Any,
queue_name: str, reason: str):
"""Record a processing failure for a message."""
now = time.time()
count = self._retry_counts.get(message_id, 0) + 1
self._retry_counts[message_id] = count
self._last_attempt[message_id] = now
self._failure_reasons[message_id] = reason
if count >= self._max_retries:
# Move to DLQ
dlq_msg = DLQMessage(
id=message_id,
body=body,
original_queue=queue_name,
failure_reason=reason,
retry_count=count,
first_failed_at=now - (count - 1) * self._base_backoff,
last_failed_at=now,
)
self._dlq[message_id] = dlq_msg
# Clean up retry tracking
del self._retry_counts[message_id]
del self._last_attempt[message_id]
del self._failure_reasons[message_id]
def record_success(self, message_id: str):
"""Record successful processing (clear retry state)."""
self._retry_counts.pop(message_id, None)
self._last_attempt.pop(message_id, None)
self._failure_reasons.pop(message_id, None)
def get_dlq_messages(self, limit: int = 10) -> List[DLQMessage]:
"""Get messages in the dead letter queue."""
messages = sorted(self._dlq.values(),
key=lambda m: m.last_failed_at, reverse=True)
return messages[:limit]
def replay(self, message_id: str) -> Optional[DLQMessage]:
"""Remove a message from DLQ for reprocessing."""
return self._dlq.pop(message_id, None)
def replay_all(self) -> List[DLQMessage]:
"""Remove all messages from DLQ for reprocessing."""
messages = list(self._dlq.values())
self._dlq.clear()
return messages
def purge(self, message_id: str) -> bool:
"""Permanently delete a message from DLQ."""
if message_id in self._dlq:
del self._dlq[message_id]
return True
return False
def dlq_size(self) -> int:
return len(self._dlq)
def get_backoff_time(self, message_id: str) -> float:
"""Get seconds until next retry is allowed."""
count = self._retry_counts.get(message_id, 0)
last = self._last_attempt.get(message_id, 0)
backoff = self._base_backoff * (2 ** count)
remaining = backoff - (time.time() - last)
return max(0.0, remaining)
# ---- Usage: Message processor with DLQ ----
dlq = DeadLetterQueue(max_retries=3, base_backoff=0.1)
def process_message(msg_id, body):
"""Simulate processing that might fail."""
# Simulate failure for certain messages
if body.get("corrupt"):
raise ValueError("Corrupt message data")
return True
# Simulate processing with failures
messages = [
("msg_1", {"data": "good"}),
("msg_2", {"data": "bad", "corrupt": True}),
]
for msg_id, body in messages:
for attempt in range(4): # Try up to 4 times
try:
process_message(msg_id, body)
dlq.record_success(msg_id)
print(f"{msg_id}: processed successfully")
break
except Exception as e:
dlq.record_failure(msg_id, body, "main_queue", str(e))
if dlq.should_retry(msg_id):
print(f"{msg_id}: retry {attempt + 1}")
else:
print(f"{msg_id}: moved to DLQ")
break
# Inspect DLQ
print(f"\nDLQ size: {dlq.dlq_size()}")
for msg in dlq.get_dlq_messages():
print(f" {msg.id}: {msg.failure_reason} (retries: {msg.retry_count})")
Key Takeaways
- Message queues need visibility timeout to handle consumer failures (at-least-once delivery)
- Priority schedulers use a min-heap with a sequence counter for FIFO tie-breaking
- Pub/sub systems decouple publishers from subscribers; support wildcards for flexible routing
- Dead letter queues catch messages that fail after max retries; use exponential backoff
- Always discuss delivery semantics: at-most-once vs. at-least-once vs. exactly-once
- Use lazy deletion (cancelled set) for O(1) cancellation in heap-based schedulers
Lilly Tech Systems