Intermediate
Streaming ML with Kafka
Connect Kafka to ML frameworks, compute streaming features with windowed aggregations, and process real-time data for machine learning.
Kafka + Spark Structured Streaming
Spark Structured Streaming treats a Kafka topic as an unbounded DataFrame, enabling you to use familiar DataFrame operations on streaming data.
Python — Reading from Kafka with Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, DoubleType
spark = SparkSession.builder \
.appName("KafkaStreamingML") \
.getOrCreate()
# Read from Kafka topic
raw_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "ml-events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON messages
schema = StructType() \
.add("user_id", StringType()) \
.add("action", StringType()) \
.add("item_id", StringType()) \
.add("timestamp", StringType()) \
.add("value", DoubleType())
events = raw_stream \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*") \
.withColumn("event_time", F.to_timestamp("timestamp"))
Windowed Aggregations
Python — Sliding Window Features
# Compute features over a 1-hour sliding window, updated every 5 minutes
windowed_features = events \
.withWatermark("event_time", "10 minutes") \
.groupBy(
F.col("user_id"),
F.window("event_time", "1 hour", "5 minutes")
) \
.agg(
F.count("*").alias("event_count_1h"),
F.avg("value").alias("avg_value_1h"),
F.max("value").alias("max_value_1h"),
F.countDistinct("item_id").alias("unique_items_1h")
)
# Write features to another Kafka topic
query = windowed_features \
.select(
F.col("user_id").alias("key"),
F.to_json(F.struct("*")).alias("value")
) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "ml-features") \
.option("checkpointLocation", "/checkpoints/features") \
.outputMode("update") \
.start()
Kafka + Faust (Python Stream Processing)
Python — Faust Stream Processing
import faust
app = faust.App('ml-stream', broker='kafka://localhost:9092')
class MLEvent(faust.Record):
user_id: str
action: str
item_id: str
value: float
# Define topics
events_topic = app.topic('ml-events', value_type=MLEvent)
features_topic = app.topic('ml-features', value_type=dict)
# Windowed table for feature computation
user_counts = app.Table(
'user_action_counts',
default=int
).tumbling(3600) # 1-hour tumbling window
@app.agent(events_topic)
async def process_events(events):
async for event in events:
# Update windowed counter
user_counts[event.user_id] += 1
# Compute and emit features
features = {
'user_id': event.user_id,
'action_count_1h': user_counts[event.user_id].current(),
'latest_action': event.action,
'latest_value': event.value
}
await features_topic.send(value=features)
Stream Processing Frameworks Comparison
| Framework | Language | Latency | Best For |
|---|---|---|---|
| Spark Structured Streaming | Python/Scala/Java | Seconds | Batch + stream unified, complex analytics |
| Kafka Streams | Java/Scala | Milliseconds | Lightweight, embedded processing |
| Faust | Python | Milliseconds | Python-native stream processing |
| Apache Flink | Java/Scala/Python | Milliseconds | Stateful event processing at scale |
Choose your framework wisely: Use Spark Structured Streaming if you already use Spark for batch ML. Use Faust for pure Python teams. Use Kafka Streams or Flink for the lowest latency requirements.
Lilly Tech Systems