Streaming Data Challenges
Streaming data problems test your ability to process unbounded event streams with ordering guarantees, time windows, and resource constraints. These 5 challenges cover the core patterns used in Kafka, Flink, and Spark Streaming interviews.
Challenge 1: Sliding Window Aggregation
from collections import deque
from datetime import datetime, timedelta
class SlidingWindowAggregator:
"""Time-based sliding window with configurable size and slide.
Example: 5-minute window sliding every 1 minute means at any point
we aggregate the last 5 minutes of data, emitting results every minute.
"""
def __init__(self, window_size_sec, slide_sec):
self.window_size = window_size_sec
self.slide = slide_sec
self.buffer = deque() # (timestamp, value) pairs
self.last_emit = None
self.results = []
def process_event(self, timestamp, value):
"""Process a single event and emit window results if slide boundary crossed."""
self.buffer.append((timestamp, value))
# Initialize last_emit on first event
if self.last_emit is None:
self.last_emit = timestamp
# Evict expired events
cutoff = timestamp - self.window_size
while self.buffer and self.buffer[0][0] < cutoff:
self.buffer.popleft()
# Emit result if slide boundary crossed
if timestamp - self.last_emit >= self.slide:
result = self._compute_aggregate(timestamp)
self.results.append(result)
self.last_emit = timestamp
return result
return None
def _compute_aggregate(self, current_time):
"""Compute aggregates over current window contents."""
values = [v for _, v in self.buffer]
return {
'window_end': current_time,
'window_start': current_time - self.window_size,
'count': len(values),
'sum': sum(values),
'avg': sum(values) / len(values) if values else 0,
'min': min(values) if values else None,
'max': max(values) if values else None
}
def flush(self):
"""Emit final window result."""
if self.buffer:
last_ts = self.buffer[-1][0]
return self._compute_aggregate(last_ts)
return None
# Test: 10-second window, sliding every 5 seconds
agg = SlidingWindowAggregator(window_size_sec=10, slide_sec=5)
events = [
(1, 10), (2, 20), (3, 15), # First 3 seconds
(6, 25), (7, 30), # Seconds 6-7 (triggers slide at t=6)
(11, 40), (12, 35), # Seconds 11-12 (triggers slide at t=11)
(16, 50), # Second 16 (triggers slide, first events expired)
]
for ts, val in events:
result = agg.process_event(ts, val)
if result:
print(f"Window [{result['window_start']}-{result['window_end']}]: "
f"count={result['count']}, sum={result['sum']}, avg={result['avg']:.1f}")
# Final flush
final = agg.flush()
if final:
print(f"Final [{final['window_start']}-{final['window_end']}]: "
f"count={final['count']}, sum={final['sum']}, avg={final['avg']:.1f}")
Complexity: O(1) amortized per event for insertion. O(k) for eviction where k is expired events. O(n) for aggregation where n is events in window. Space is O(W) where W is max events in a window.
Challenge 2: Event Deduplication
import hashlib
from collections import OrderedDict
class EventDeduplicator:
"""Stream deduplication with time-based expiry.
Handles both exact duplicates (same event ID) and content-based
duplicates (same payload but different IDs).
"""
def __init__(self, window_sec=300, max_keys=100000):
self.window_sec = window_sec
self.max_keys = max_keys
self.seen_ids = OrderedDict() # event_id -> timestamp
self.seen_hashes = OrderedDict() # content_hash -> timestamp
self.stats = {'total': 0, 'unique': 0, 'id_dupes': 0, 'content_dupes': 0}
def _content_hash(self, event):
"""Hash event content excluding metadata fields."""
content = {k: v for k, v in event.items()
if k not in ('event_id', 'timestamp', 'received_at')}
serialized = str(sorted(content.items()))
return hashlib.md5(serialized.encode()).hexdigest()
def _evict_expired(self, current_time):
"""Remove entries older than the dedup window."""
cutoff = current_time - self.window_sec
while self.seen_ids and next(iter(self.seen_ids.values())) < cutoff:
self.seen_ids.popitem(last=False)
while self.seen_hashes and next(iter(self.seen_hashes.values())) < cutoff:
self.seen_hashes.popitem(last=False)
# Also enforce max_keys limit (LRU eviction)
while len(self.seen_ids) > self.max_keys:
self.seen_ids.popitem(last=False)
while len(self.seen_hashes) > self.max_keys:
self.seen_hashes.popitem(last=False)
def process(self, event):
"""Process an event, returning True if unique, False if duplicate.
Returns:
Tuple of (is_unique, reason)
"""
self.stats['total'] += 1
ts = event.get('timestamp', 0)
self._evict_expired(ts)
event_id = event.get('event_id')
content_hash = self._content_hash(event)
# Check exact ID duplicate
if event_id and event_id in self.seen_ids:
self.stats['id_dupes'] += 1
return False, 'duplicate_id'
# Check content duplicate
if content_hash in self.seen_hashes:
self.stats['content_dupes'] += 1
return False, 'duplicate_content'
# New unique event
if event_id:
self.seen_ids[event_id] = ts
self.seen_hashes[content_hash] = ts
self.stats['unique'] += 1
return True, 'unique'
# Test
dedup = EventDeduplicator(window_sec=10)
events = [
{'event_id': 'e1', 'timestamp': 1, 'user': 'alice', 'action': 'click'},
{'event_id': 'e2', 'timestamp': 2, 'user': 'bob', 'action': 'view'},
{'event_id': 'e1', 'timestamp': 3, 'user': 'alice', 'action': 'click'}, # ID dupe
{'event_id': 'e3', 'timestamp': 4, 'user': 'alice', 'action': 'click'}, # Content dupe
{'event_id': 'e4', 'timestamp': 5, 'user': 'charlie', 'action': 'buy'},
{'event_id': 'e5', 'timestamp': 15, 'user': 'alice', 'action': 'click'}, # Same content but expired
]
for event in events:
is_unique, reason = dedup.process(event)
status = 'PASS' if is_unique else 'DROP'
print(f" [{status}] {event['event_id']} t={event['timestamp']} - {reason}")
print(f"\nStats: {dedup.stats}")
Complexity: O(1) amortized per event. Space is O(min(W, max_keys)) where W is events in the dedup window. The OrderedDict provides O(1) insertion and LRU eviction.
Challenge 3: Late Arrival Handling
from collections import defaultdict
class LateArrivalHandler:
"""Handle late-arriving events in streaming aggregation.
Uses event-time processing with allowed lateness. Events that arrive
after allowed_lateness are sent to a dead-letter side output.
"""
def __init__(self, window_size, allowed_lateness):
self.window_size = window_size
self.allowed_lateness = allowed_lateness
self.watermark = 0
self.windows = defaultdict(lambda: {'count': 0, 'sum': 0, 'events': []})
self.closed_windows = {}
self.dead_letter = []
self.stats = {'on_time': 0, 'late_accepted': 0, 'dropped': 0}
def _get_window_key(self, event_time):
"""Determine which window an event belongs to."""
return (event_time // self.window_size) * self.window_size
def advance_watermark(self, processing_time):
"""Advance the watermark (represents progress of event time)."""
self.watermark = max(self.watermark, processing_time)
# Close windows that are past watermark + allowed lateness
windows_to_close = []
for window_start in list(self.windows.keys()):
window_end = window_start + self.window_size
if window_end + self.allowed_lateness < self.watermark:
windows_to_close.append(window_start)
results = []
for ws in windows_to_close:
self.closed_windows[ws] = dict(self.windows[ws])
results.append({
'window': f"[{ws}, {ws + self.window_size})",
'result': dict(self.windows[ws])
})
del self.windows[ws]
return results
def process_event(self, event):
"""Process an event considering its event time vs current watermark.
Returns:
(accepted, category) tuple
"""
event_time = event['event_time']
proc_time = event['processing_time']
value = event.get('value', 1)
window_key = self._get_window_key(event_time)
window_end = window_key + self.window_size
# Check if the event's window has been permanently closed
if window_key in self.closed_windows:
self.dead_letter.append({**event, 'reason': 'window_closed'})
self.stats['dropped'] += 1
return False, 'dropped'
# Check if event is late but within allowed lateness
is_late = event_time < self.watermark
if is_late and (window_end + self.allowed_lateness < self.watermark):
self.dead_letter.append({**event, 'reason': 'too_late'})
self.stats['dropped'] += 1
return False, 'dropped'
# Accept the event into its window
self.windows[window_key]['count'] += 1
self.windows[window_key]['sum'] += value
self.windows[window_key]['events'].append(event)
if is_late:
self.stats['late_accepted'] += 1
return True, 'late_accepted'
else:
self.stats['on_time'] += 1
return True, 'on_time'
# Test: window=10s, allowed_lateness=5s
handler = LateArrivalHandler(window_size=10, allowed_lateness=5)
events = [
# On-time events for window [0, 10)
{'event_time': 2, 'processing_time': 3, 'value': 10, 'id': 'a'},
{'event_time': 5, 'processing_time': 6, 'value': 20, 'id': 'b'},
{'event_time': 8, 'processing_time': 9, 'value': 15, 'id': 'c'},
# On-time events for window [10, 20)
{'event_time': 12, 'processing_time': 13, 'value': 25, 'id': 'd'},
{'event_time': 15, 'processing_time': 16, 'value': 30, 'id': 'e'},
# Late event for window [0, 10) - arrives at proc_time 17, event_time 7
# Window end=10, allowed_lateness=5, so accepted if watermark < 15
{'event_time': 7, 'processing_time': 14, 'value': 12, 'id': 'f_late'},
# Very late event for window [0, 10) - too late
{'event_time': 3, 'processing_time': 25, 'value': 5, 'id': 'g_too_late'},
]
for event in events:
accepted, category = handler.process_event(event)
closed = handler.advance_watermark(event['processing_time'])
status = 'ACCEPT' if accepted else 'DROP'
print(f" [{status}] id={event['id']} event_t={event['event_time']} "
f"proc_t={event['processing_time']} -> {category}")
for c in closed:
print(f" Window {c['window']} closed: count={c['result']['count']}, "
f"sum={c['result']['sum']}")
print(f"\nStats: {handler.stats}")
print(f"Dead letter queue: {len(handler.dead_letter)} events")
Complexity: O(1) per event for processing. O(W) for watermark advancement where W is the number of active windows. This mirrors Apache Flink's event-time processing model.
Challenge 4: Session Window Detection
from collections import defaultdict
def detect_sessions(events, gap_threshold, user_field='user_id', time_field='timestamp'):
"""Detect session windows from a stream of user events.
A session ends when the gap between consecutive events from the same
user exceeds gap_threshold.
Args:
events: List of event dicts (must be sorted by timestamp)
gap_threshold: Max gap (in seconds) between events in a session
user_field: Field name for user identifier
time_field: Field name for event timestamp
Returns:
List of session summaries
"""
# Track active sessions per user
active_sessions = {} # user_id -> {start, end, events, count}
completed_sessions = []
# Sort events by timestamp
sorted_events = sorted(events, key=lambda e: e[time_field])
for event in sorted_events:
user = event[user_field]
ts = event[time_field]
if user in active_sessions:
session = active_sessions[user]
gap = ts - session['end']
if gap <= gap_threshold:
# Continue existing session
session['end'] = ts
session['count'] += 1
session['events'].append(event)
else:
# Close current session and start new one
session['duration'] = session['end'] - session['start']
completed_sessions.append(session)
active_sessions[user] = {
'user': user,
'start': ts,
'end': ts,
'count': 1,
'events': [event],
'session_id': f"{user}_s{len(completed_sessions) + 1}"
}
else:
# First event from this user
active_sessions[user] = {
'user': user,
'start': ts,
'end': ts,
'count': 1,
'events': [event],
'session_id': f"{user}_s1"
}
# Close all remaining active sessions
for user, session in active_sessions.items():
session['duration'] = session['end'] - session['start']
completed_sessions.append(session)
# Sort by start time
completed_sessions.sort(key=lambda s: (s['user'], s['start']))
return completed_sessions
# Test
events = [
{'user_id': 'alice', 'timestamp': 100, 'action': 'page_view', 'page': '/home'},
{'user_id': 'alice', 'timestamp': 120, 'action': 'click', 'page': '/products'},
{'user_id': 'bob', 'timestamp': 130, 'action': 'page_view', 'page': '/home'},
{'user_id': 'alice', 'timestamp': 150, 'action': 'add_cart', 'page': '/products/1'},
{'user_id': 'bob', 'timestamp': 160, 'action': 'click', 'page': '/about'},
# Gap > 60s for alice
{'user_id': 'alice', 'timestamp': 500, 'action': 'page_view', 'page': '/home'},
{'user_id': 'alice', 'timestamp': 520, 'action': 'purchase', 'page': '/checkout'},
{'user_id': 'bob', 'timestamp': 550, 'action': 'page_view', 'page': '/pricing'},
]
sessions = detect_sessions(events, gap_threshold=60)
print(f"Detected {len(sessions)} sessions:\n")
for s in sessions:
actions = [e['action'] for e in s['events']]
print(f" User: {s['user']}, Duration: {s['duration']}s, "
f"Events: {s['count']}, Actions: {actions}")
Complexity: O(n log n) for sorting events, O(n) for session detection. Space is O(n) for storing events. In production streaming systems, sessions are maintained in state stores (e.g., Flink's RocksDB state backend).
Challenge 5: Watermark-Based Processing
class WatermarkGenerator:
"""Generate watermarks based on bounded out-of-orderness.
The watermark at any point is: max_event_time_seen - max_out_of_orderness.
This guarantees that no event with timestamp <= watermark will arrive
(assuming bounded lateness).
"""
def __init__(self, max_out_of_orderness, emit_interval=1):
self.max_ooo = max_out_of_orderness
self.emit_interval = emit_interval
self.max_timestamp = float('-inf')
self.current_watermark = float('-inf')
self.event_count = 0
def observe(self, event_time):
"""Update state with observed event time."""
self.max_timestamp = max(self.max_timestamp, event_time)
self.event_count += 1
# Emit watermark periodically
if self.event_count % self.emit_interval == 0:
return self.emit_watermark()
return None
def emit_watermark(self):
"""Compute and emit the current watermark."""
new_watermark = self.max_timestamp - self.max_ooo
if new_watermark > self.current_watermark:
self.current_watermark = new_watermark
return self.current_watermark
return self.current_watermark
class WatermarkProcessor:
"""Process events using watermark-based windowing.
Accumulates events in windows. When watermark passes a window's end,
the window is triggered and results are emitted.
"""
def __init__(self, window_size, max_out_of_orderness, emit_every=1):
self.window_size = window_size
self.wm_gen = WatermarkGenerator(max_out_of_orderness, emit_every)
self.windows = defaultdict(list) # window_start -> [values]
self.emitted = []
def _window_for(self, event_time):
return (event_time // self.window_size) * self.window_size
def process(self, event_time, value):
"""Process an event with watermark tracking."""
# Add to appropriate window
window_start = self._window_for(event_time)
self.windows[window_start].append({
'event_time': event_time,
'value': value
})
# Update watermark
watermark = self.wm_gen.observe(event_time)
# Trigger windows that are complete (end <= watermark)
triggered = []
if watermark is not None:
for ws in list(self.windows.keys()):
window_end = ws + self.window_size
if window_end <= watermark:
values = [e['value'] for e in self.windows[ws]]
result = {
'window': f"[{ws}, {ws + self.window_size})",
'count': len(values),
'sum': sum(values),
'avg': sum(values) / len(values),
'watermark': watermark
}
triggered.append(result)
self.emitted.append(result)
del self.windows[ws]
return watermark, triggered
# Test: 10-second windows, max 3 seconds out of order
processor = WatermarkProcessor(window_size=10, max_out_of_orderness=3, emit_every=1)
# Events arrive out of order
events = [
(1, 10), # In-order
(5, 20), # In-order
(3, 15), # Out of order (arrived after t=5)
(8, 25), # In-order
(12, 30), # New window [10,20)
(7, 12), # Late but within OOO bound
(15, 35), # Watermark advances to 12, window [0,10) triggers
(18, 40), # Watermark advances to 15
(22, 45), # Watermark to 19
(25, 50), # Watermark to 22, window [10,20) triggers
]
print("Processing events:")
for et, val in events:
wm, triggered = processor.process(et, val)
print(f" event_time={et:>3}, value={val:>3}, watermark={wm}")
for t in triggered:
print(f" TRIGGER {t['window']}: count={t['count']}, "
f"sum={t['sum']}, avg={t['avg']:.1f}")
print(f"\n{len(processor.emitted)} windows emitted total")
Complexity: O(1) per event for watermark update. O(W) for window triggering where W is active windows. This is the exact pattern used by Apache Flink's BoundedOutOfOrdernessWatermarks strategy.
Lilly Tech Systems