Intermediate
Feature Pipelines with Kafka
Build real-time feature pipelines that consume raw events from Kafka, compute ML features, and write them to feature stores or serving layers.
Feature Pipeline Architecture
A real-time feature pipeline has three stages:
- Ingest: Raw events flow into Kafka topics from application services, IoT devices, or user interactions.
- Transform: Stream processors compute features — aggregations, joins, lookups, and enrichments.
- Serve: Computed features are written to a low-latency store (Redis, DynamoDB, or a feature store) for model serving.
Building a Feature Pipeline
Python — End-to-End Feature Pipeline
from confluent_kafka import Consumer, Producer
import json
import redis
from datetime import datetime, timedelta
from collections import defaultdict
# Setup
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'feature-pipeline',
'auto.offset.reset': 'latest'
})
consumer.subscribe(['user-events'])
producer = Producer({'bootstrap.servers': 'localhost:9092'})
redis_client = redis.Redis(host='localhost', port=6379)
# In-memory state for windowed features
user_events = defaultdict(list)
def compute_features(user_id, events):
"""Compute features from recent events."""
now = datetime.utcnow()
one_hour_ago = now - timedelta(hours=1)
# Filter to last hour
recent = [e for e in events if e['timestamp'] > one_hour_ago]
return {
'user_id': user_id,
'event_count_1h': len(recent),
'unique_actions_1h': len(set(e['action'] for e in recent)),
'avg_value_1h': sum(e['value'] for e in recent) / max(len(recent), 1),
'last_action': recent[-1]['action'] if recent else None,
'computed_at': now.isoformat()
}
# Process loop
while True:
msg = consumer.poll(1.0)
if msg is None or msg.error():
continue
event = json.loads(msg.value())
user_id = event['user_id']
event['timestamp'] = datetime.fromisoformat(event['timestamp'])
user_events[user_id].append(event)
# Compute and store features
features = compute_features(user_id, user_events[user_id])
# Write to Redis for online serving
redis_client.setex(
f"features:{user_id}",
3600, # 1 hour TTL
json.dumps(features)
)
# Write to Kafka for downstream consumers
producer.produce('ml-features', value=json.dumps(features).encode())
Kafka Connect for Sinks
JSON — Kafka Connect Sink to Redis
{
"name": "redis-feature-sink",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"tasks.max": "3",
"topics": "ml-features",
"redis.hosts": "localhost:6379",
"redis.database": 0,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Feature Pipeline with Feast
Python — Kafka to Feast Feature Store
from feast import FeatureStore
from confluent_kafka import Consumer
import pandas as pd
import json
store = FeatureStore(repo_path="feature_repo/")
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'feast-ingestion'
})
consumer.subscribe(['ml-features'])
batch = []
BATCH_SIZE = 100
while True:
msg = consumer.poll(1.0)
if msg and not msg.error():
batch.append(json.loads(msg.value()))
if len(batch) >= BATCH_SIZE:
df = pd.DataFrame(batch)
df['event_timestamp'] = pd.to_datetime(df['computed_at'])
# Write to online store
store.write_to_online_store(
feature_view_name="user_streaming_features",
df=df
)
batch = []
Dual-write to online and offline: Write computed features to both a low-latency online store (Redis/DynamoDB) for serving and a data lake (S3/Delta Lake) for training. This ensures training-serving consistency.
Handling late data: Events may arrive out of order. Use watermarks in Spark Structured Streaming or event-time processing in Flink to handle late arrivals gracefully. Set a grace period appropriate for your use case.
Lilly Tech Systems