Advanced

Queues & Pub/Sub

Message queues and publish-subscribe systems are the backbone of distributed architectures. In coding interviews, you may be asked to implement an in-memory message queue with acknowledgment, a priority scheduler, a topic-based pub/sub system, or a dead letter queue with retry logic. This lesson covers all four.

Problem 1: Message Queue

Implement an in-memory message queue that supports enqueue, dequeue, acknowledgment, and visibility timeout (messages become visible again if not acknowledged within a timeout).

import uuid
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, List
from enum import Enum


class MessageStatus(Enum):
    PENDING = "pending"
    IN_FLIGHT = "in_flight"
    ACKNOWLEDGED = "acknowledged"


@dataclass
class Message:
    id: str
    body: Any
    created_at: float
    status: MessageStatus = MessageStatus.PENDING
    delivery_count: int = 0
    visible_after: float = 0.0


class MessageQueue:
    """In-memory message queue with visibility timeout.

    Features:
    - FIFO ordering
    - Visibility timeout: message becomes invisible after dequeue,
      re-appears if not acknowledged within timeout
    - At-least-once delivery semantics
    - Message acknowledgment

    Similar to AWS SQS behavior.
    """

    def __init__(self, visibility_timeout: int = 30):
        self._visibility_timeout = visibility_timeout
        self._messages: Dict[str, Message] = {}
        self._queue: deque = deque()  # message IDs in FIFO order

    def enqueue(self, body: Any) -> str:
        """Add a message to the queue. Returns message ID."""
        msg_id = str(uuid.uuid4())
        msg = Message(
            id=msg_id,
            body=body,
            created_at=time.time(),
        )
        self._messages[msg_id] = msg
        self._queue.append(msg_id)
        return msg_id

    def dequeue(self, max_messages: int = 1) -> List[Message]:
        """Receive up to max_messages from the queue.

        Messages become invisible for visibility_timeout seconds.
        If not acknowledged, they become visible again.
        """
        now = time.time()
        result = []

        # Check all messages for visibility
        retry_ids = []
        for msg_id, msg in self._messages.items():
            if msg.status == MessageStatus.IN_FLIGHT and now > msg.visible_after:
                # Timeout expired, make visible again
                msg.status = MessageStatus.PENDING
                retry_ids.append(msg_id)

        # Add timed-out messages back to queue
        for msg_id in retry_ids:
            if msg_id not in self._queue:
                self._queue.append(msg_id)

        # Dequeue visible messages
        checked = 0
        max_checks = len(self._queue)
        while self._queue and len(result) < max_messages and checked < max_checks:
            msg_id = self._queue.popleft()
            checked += 1

            if msg_id not in self._messages:
                continue

            msg = self._messages[msg_id]

            if msg.status == MessageStatus.PENDING:
                msg.status = MessageStatus.IN_FLIGHT
                msg.delivery_count += 1
                msg.visible_after = now + self._visibility_timeout
                result.append(msg)
            else:
                self._queue.append(msg_id)

        return result

    def acknowledge(self, message_id: str) -> bool:
        """Acknowledge a message (mark as processed). Returns success."""
        if message_id not in self._messages:
            return False

        msg = self._messages[message_id]
        if msg.status != MessageStatus.IN_FLIGHT:
            return False

        msg.status = MessageStatus.ACKNOWLEDGED
        del self._messages[message_id]
        return True

    def size(self) -> int:
        """Return number of pending + in-flight messages."""
        return sum(1 for m in self._messages.values()
                   if m.status != MessageStatus.ACKNOWLEDGED)

    def pending_count(self) -> int:
        return sum(1 for m in self._messages.values()
                   if m.status == MessageStatus.PENDING)


# ---- Usage ----
queue = MessageQueue(visibility_timeout=5)

# Producer
msg1 = queue.enqueue({"type": "email", "to": "alice@test.com"})
msg2 = queue.enqueue({"type": "email", "to": "bob@test.com"})
print(f"Queue size: {queue.size()}")  # 2

# Consumer
messages = queue.dequeue(max_messages=1)
print(f"Received: {messages[0].body}")

# Acknowledge
queue.acknowledge(messages[0].id)
print(f"Queue size after ack: {queue.size()}")  # 1

Problem 2: Priority Queue Scheduler

Implement a task scheduler that processes tasks based on priority. Higher-priority tasks are processed first. Equal priority tasks follow FIFO order.

import heapq
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import IntEnum


class Priority(IntEnum):
    CRITICAL = 0   # Highest priority (lowest number in min-heap)
    HIGH = 1
    MEDIUM = 2
    LOW = 3


@dataclass(order=True)
class ScheduledTask:
    priority: int
    sequence: int = field(compare=True)   # For FIFO within same priority
    task_id: str = field(compare=False)
    payload: Any = field(compare=False)
    created_at: float = field(compare=False, default_factory=time.time)


class PriorityScheduler:
    """Priority-based task scheduler.

    Uses a min-heap where lower priority numbers = higher priority.
    Within the same priority level, tasks are ordered FIFO using
    a sequence counter.
    """

    def __init__(self):
        self._heap: list = []
        self._sequence = 0
        self._task_ids: set = set()
        self._cancelled: set = set()

    def schedule(self, task_id: str, payload: Any,
                 priority: Priority = Priority.MEDIUM) -> bool:
        """Schedule a task. Returns False if task_id already exists."""
        if task_id in self._task_ids:
            return False

        task = ScheduledTask(
            priority=priority.value,
            sequence=self._sequence,
            task_id=task_id,
            payload=payload,
        )
        self._sequence += 1
        self._task_ids.add(task_id)
        heapq.heappush(self._heap, task)
        return True

    def next(self) -> Optional[ScheduledTask]:
        """Get the next highest-priority task."""
        while self._heap:
            task = heapq.heappop(self._heap)
            if task.task_id not in self._cancelled:
                self._task_ids.discard(task.task_id)
                return task
            # Skip cancelled tasks
            self._cancelled.discard(task.task_id)
        return None

    def cancel(self, task_id: str) -> bool:
        """Cancel a scheduled task (lazy deletion)."""
        if task_id in self._task_ids:
            self._cancelled.add(task_id)
            self._task_ids.discard(task_id)
            return True
        return False

    def peek(self) -> Optional[ScheduledTask]:
        """View the next task without removing it."""
        while self._heap:
            if self._heap[0].task_id not in self._cancelled:
                return self._heap[0]
            task = heapq.heappop(self._heap)
            self._cancelled.discard(task.task_id)
        return None

    def size(self) -> int:
        return len(self._task_ids)


# ---- Usage ----
scheduler = PriorityScheduler()

scheduler.schedule("send_alert", {"msg": "Server down"}, Priority.CRITICAL)
scheduler.schedule("process_payment", {"amount": 99.99}, Priority.HIGH)
scheduler.schedule("send_email", {"to": "user@test.com"}, Priority.LOW)
scheduler.schedule("update_cache", {"key": "users"}, Priority.MEDIUM)

# Process in priority order
while True:
    task = scheduler.next()
    if task is None:
        break
    print(f"Processing: {task.task_id} (priority={task.priority})")

Problem 3: Pub/Sub System

Implement a publish-subscribe system where publishers send messages to topics, and subscribers receive messages from topics they are subscribed to.

from collections import defaultdict
from typing import Callable, Set


class PubSubSystem:
    """Topic-based publish-subscribe system.

    Features:
    - Multiple topics
    - Multiple subscribers per topic
    - Wildcard topic subscriptions (e.g., "orders.*")
    - Message history per topic
    - Subscriber groups (only one subscriber in a group receives each message)
    """

    def __init__(self, max_history: int = 100):
        self._subscribers: Dict[str, Dict[str, Callable]] = defaultdict(dict)
        # topic -> {subscriber_id: callback}
        self._history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_history))
        self._subscriber_counter = 0
        self._groups: Dict[str, Dict[str, List[str]]] = defaultdict(lambda: defaultdict(list))
        # topic -> {group_name: [subscriber_ids]}

    def subscribe(self, topic: str, callback: Callable,
                  subscriber_id: Optional[str] = None,
                  group: Optional[str] = None) -> str:
        """Subscribe to a topic. Returns subscriber ID."""
        if subscriber_id is None:
            self._subscriber_counter += 1
            subscriber_id = f"sub_{self._subscriber_counter}"

        self._subscribers[topic][subscriber_id] = callback

        if group:
            self._groups[topic][group].append(subscriber_id)

        return subscriber_id

    def unsubscribe(self, topic: str, subscriber_id: str) -> bool:
        """Unsubscribe from a topic."""
        if topic in self._subscribers and subscriber_id in self._subscribers[topic]:
            del self._subscribers[topic][subscriber_id]
            # Remove from groups
            for group_name, members in self._groups[topic].items():
                if subscriber_id in members:
                    members.remove(subscriber_id)
            return True
        return False

    def publish(self, topic: str, message: Any) -> int:
        """Publish a message to a topic. Returns number of deliveries."""
        # Store in history
        msg_record = {
            "topic": topic,
            "message": message,
            "timestamp": time.time(),
        }
        self._history[topic].append(msg_record)

        delivered = 0

        # Direct subscribers (not in groups)
        grouped_subs = set()
        for group_members in self._groups[topic].values():
            grouped_subs.update(group_members)

        for sub_id, callback in self._subscribers.get(topic, {}).items():
            if sub_id not in grouped_subs:
                try:
                    callback(message)
                    delivered += 1
                except Exception:
                    pass  # Don't let one subscriber break others

        # Group subscribers (round-robin within each group)
        for group_name, members in self._groups[topic].items():
            if not members:
                continue
            # Round-robin: pick next member
            active_members = [m for m in members if m in self._subscribers.get(topic, {})]
            if active_members:
                chosen = active_members[hash(str(time.time())) % len(active_members)]
                try:
                    self._subscribers[topic][chosen](message)
                    delivered += 1
                except Exception:
                    pass

        # Wildcard matching: "orders.*" matches "orders.created"
        for pattern, subs in self._subscribers.items():
            if pattern != topic and self._matches_wildcard(pattern, topic):
                for sub_id, callback in subs.items():
                    try:
                        callback(message)
                        delivered += 1
                    except Exception:
                        pass

        return delivered

    def _matches_wildcard(self, pattern: str, topic: str) -> bool:
        """Check if a wildcard pattern matches a topic."""
        if "*" not in pattern:
            return False
        parts_p = pattern.split(".")
        parts_t = topic.split(".")
        if len(parts_p) != len(parts_t):
            return False
        return all(p == "*" or p == t for p, t in zip(parts_p, parts_t))

    def get_history(self, topic: str, limit: int = 10) -> List[Dict]:
        """Get recent messages for a topic."""
        history = list(self._history.get(topic, []))
        return history[-limit:]

    def get_topics(self) -> List[str]:
        """List all topics with subscribers."""
        return list(self._subscribers.keys())


# ---- Usage ----
pubsub = PubSubSystem()

# Subscribe
received = []
pubsub.subscribe("orders.created", lambda msg: received.append(("handler1", msg)))
pubsub.subscribe("orders.created", lambda msg: received.append(("handler2", msg)))
pubsub.subscribe("orders.*", lambda msg: received.append(("wildcard", msg)))

# Publish
count = pubsub.publish("orders.created", {"order_id": "123", "amount": 49.99})
print(f"Delivered to {count} subscribers")
print(f"Received: {received}")

Problem 4: Dead Letter Queue

Implement a dead letter queue (DLQ) that catches messages that fail processing after a configurable number of retries. This is essential for building reliable message processing systems.

@dataclass
class DLQMessage:
    id: str
    body: Any
    original_queue: str
    failure_reason: str
    retry_count: int
    first_failed_at: float
    last_failed_at: float


class DeadLetterQueue:
    """Dead letter queue with retry logic.

    When a message fails processing max_retries times, it is moved
    to the dead letter queue for manual inspection or reprocessing.

    Features:
    - Configurable max retries per message
    - Exponential backoff between retries
    - DLQ inspection and replay
    - Failure reason tracking
    """

    def __init__(self, max_retries: int = 3, base_backoff: float = 1.0):
        self._max_retries = max_retries
        self._base_backoff = base_backoff
        self._retry_counts: Dict[str, int] = {}
        self._last_attempt: Dict[str, float] = {}
        self._dlq: Dict[str, DLQMessage] = {}
        self._failure_reasons: Dict[str, str] = {}

    def should_retry(self, message_id: str) -> bool:
        """Check if a message should be retried."""
        count = self._retry_counts.get(message_id, 0)
        if count >= self._max_retries:
            return False

        # Check backoff
        last = self._last_attempt.get(message_id, 0)
        backoff = self._base_backoff * (2 ** count)  # Exponential backoff
        if time.time() - last < backoff:
            return False  # Too soon to retry

        return True

    def record_failure(self, message_id: str, body: Any,
                       queue_name: str, reason: str):
        """Record a processing failure for a message."""
        now = time.time()
        count = self._retry_counts.get(message_id, 0) + 1
        self._retry_counts[message_id] = count
        self._last_attempt[message_id] = now
        self._failure_reasons[message_id] = reason

        if count >= self._max_retries:
            # Move to DLQ
            dlq_msg = DLQMessage(
                id=message_id,
                body=body,
                original_queue=queue_name,
                failure_reason=reason,
                retry_count=count,
                first_failed_at=now - (count - 1) * self._base_backoff,
                last_failed_at=now,
            )
            self._dlq[message_id] = dlq_msg
            # Clean up retry tracking
            del self._retry_counts[message_id]
            del self._last_attempt[message_id]
            del self._failure_reasons[message_id]

    def record_success(self, message_id: str):
        """Record successful processing (clear retry state)."""
        self._retry_counts.pop(message_id, None)
        self._last_attempt.pop(message_id, None)
        self._failure_reasons.pop(message_id, None)

    def get_dlq_messages(self, limit: int = 10) -> List[DLQMessage]:
        """Get messages in the dead letter queue."""
        messages = sorted(self._dlq.values(),
                         key=lambda m: m.last_failed_at, reverse=True)
        return messages[:limit]

    def replay(self, message_id: str) -> Optional[DLQMessage]:
        """Remove a message from DLQ for reprocessing."""
        return self._dlq.pop(message_id, None)

    def replay_all(self) -> List[DLQMessage]:
        """Remove all messages from DLQ for reprocessing."""
        messages = list(self._dlq.values())
        self._dlq.clear()
        return messages

    def purge(self, message_id: str) -> bool:
        """Permanently delete a message from DLQ."""
        if message_id in self._dlq:
            del self._dlq[message_id]
            return True
        return False

    def dlq_size(self) -> int:
        return len(self._dlq)

    def get_backoff_time(self, message_id: str) -> float:
        """Get seconds until next retry is allowed."""
        count = self._retry_counts.get(message_id, 0)
        last = self._last_attempt.get(message_id, 0)
        backoff = self._base_backoff * (2 ** count)
        remaining = backoff - (time.time() - last)
        return max(0.0, remaining)


# ---- Usage: Message processor with DLQ ----
dlq = DeadLetterQueue(max_retries=3, base_backoff=0.1)

def process_message(msg_id, body):
    """Simulate processing that might fail."""
    # Simulate failure for certain messages
    if body.get("corrupt"):
        raise ValueError("Corrupt message data")
    return True

# Simulate processing with failures
messages = [
    ("msg_1", {"data": "good"}),
    ("msg_2", {"data": "bad", "corrupt": True}),
]

for msg_id, body in messages:
    for attempt in range(4):  # Try up to 4 times
        try:
            process_message(msg_id, body)
            dlq.record_success(msg_id)
            print(f"{msg_id}: processed successfully")
            break
        except Exception as e:
            dlq.record_failure(msg_id, body, "main_queue", str(e))
            if dlq.should_retry(msg_id):
                print(f"{msg_id}: retry {attempt + 1}")
            else:
                print(f"{msg_id}: moved to DLQ")
                break

# Inspect DLQ
print(f"\nDLQ size: {dlq.dlq_size()}")
for msg in dlq.get_dlq_messages():
    print(f"  {msg.id}: {msg.failure_reason} (retries: {msg.retry_count})")
💡
Interview tip: When implementing a message queue, always discuss delivery semantics: at-most-once (fire and forget), at-least-once (acknowledge after processing), or exactly-once (hardest, needs idempotency). Most interviewers expect at-least-once with acknowledgment, which is what our MessageQueue implements.

Key Takeaways

💡
  • Message queues need visibility timeout to handle consumer failures (at-least-once delivery)
  • Priority schedulers use a min-heap with a sequence counter for FIFO tie-breaking
  • Pub/sub systems decouple publishers from subscribers; support wildcards for flexible routing
  • Dead letter queues catch messages that fail after max retries; use exponential backoff
  • Always discuss delivery semantics: at-most-once vs. at-least-once vs. exactly-once
  • Use lazy deletion (cancelled set) for O(1) cancellation in heap-based schedulers