Intermediate

Streaming ML with Kafka

Connect Kafka to ML frameworks, compute streaming features with windowed aggregations, and process real-time data for machine learning.

Kafka + Spark Structured Streaming

Spark Structured Streaming treats a Kafka topic as an unbounded DataFrame, enabling you to use familiar DataFrame operations on streaming data.

Python — Reading from Kafka with Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, DoubleType

spark = SparkSession.builder \
    .appName("KafkaStreamingML") \
    .getOrCreate()

# Read from Kafka topic
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "ml-events") \
    .option("startingOffsets", "latest") \
    .load()

# Parse JSON messages
schema = StructType() \
    .add("user_id", StringType()) \
    .add("action", StringType()) \
    .add("item_id", StringType()) \
    .add("timestamp", StringType()) \
    .add("value", DoubleType())

events = raw_stream \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_time", F.to_timestamp("timestamp"))

Windowed Aggregations

Python — Sliding Window Features
# Compute features over a 1-hour sliding window, updated every 5 minutes
windowed_features = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        F.col("user_id"),
        F.window("event_time", "1 hour", "5 minutes")
    ) \
    .agg(
        F.count("*").alias("event_count_1h"),
        F.avg("value").alias("avg_value_1h"),
        F.max("value").alias("max_value_1h"),
        F.countDistinct("item_id").alias("unique_items_1h")
    )

# Write features to another Kafka topic
query = windowed_features \
    .select(
        F.col("user_id").alias("key"),
        F.to_json(F.struct("*")).alias("value")
    ) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "ml-features") \
    .option("checkpointLocation", "/checkpoints/features") \
    .outputMode("update") \
    .start()

Kafka + Faust (Python Stream Processing)

Python — Faust Stream Processing
import faust

app = faust.App('ml-stream', broker='kafka://localhost:9092')

class MLEvent(faust.Record):
    user_id: str
    action: str
    item_id: str
    value: float

# Define topics
events_topic = app.topic('ml-events', value_type=MLEvent)
features_topic = app.topic('ml-features', value_type=dict)

# Windowed table for feature computation
user_counts = app.Table(
    'user_action_counts',
    default=int
).tumbling(3600)  # 1-hour tumbling window

@app.agent(events_topic)
async def process_events(events):
    async for event in events:
        # Update windowed counter
        user_counts[event.user_id] += 1

        # Compute and emit features
        features = {
            'user_id': event.user_id,
            'action_count_1h': user_counts[event.user_id].current(),
            'latest_action': event.action,
            'latest_value': event.value
        }
        await features_topic.send(value=features)

Stream Processing Frameworks Comparison

FrameworkLanguageLatencyBest For
Spark Structured StreamingPython/Scala/JavaSecondsBatch + stream unified, complex analytics
Kafka StreamsJava/ScalaMillisecondsLightweight, embedded processing
FaustPythonMillisecondsPython-native stream processing
Apache FlinkJava/Scala/PythonMillisecondsStateful event processing at scale
Choose your framework wisely: Use Spark Structured Streaming if you already use Spark for batch ML. Use Faust for pure Python teams. Use Kafka Streams or Flink for the lowest latency requirements.