Intermediate
Feature Computation
Build robust feature computation pipelines using batch, streaming, and on-demand engines with proper scheduling and incremental processing.
Computation Modes
Batch Computation
Scheduled jobs (hourly/daily) that process large volumes of data using Spark, Flink Batch, or SQL engines. Best for aggregate features over time windows.
Streaming Computation
Continuous processing of event streams using Flink, Spark Streaming, or Kafka Streams. Best for real-time features with low-latency requirements.
On-Demand Computation
Features computed at request time using lightweight transformations. Best for features that combine request context with stored features.
Batch Feature Pipeline
Python — Spark Batch Feature Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
def compute_user_features(spark, run_date):
"""Compute user aggregate features for a given date."""
transactions = spark.read.parquet(
f"s3://data-lake/transactions/date={run_date}"
)
# Define time windows
window_30d = Window.partitionBy("user_id").orderBy("timestamp").rangeBetween(
-30 * 86400, 0 # 30 days in seconds
)
user_features = transactions.groupBy("user_id").agg(
F.count("*").alias("tx_count_30d"),
F.sum("amount").alias("tx_sum_30d"),
F.avg("amount").alias("tx_avg_30d"),
F.stddev("amount").alias("tx_stddev_30d"),
F.countDistinct("merchant_id").alias("unique_merchants_30d"),
F.max("timestamp").alias("last_tx_timestamp")
)
# Write to offline store
user_features.write.mode("overwrite").parquet(
f"s3://feature-store/user_features/date={run_date}"
)
return user_features
Streaming Feature Pipeline
Python — Flink Streaming Features
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col, lit
from pyflink.table.window import Slide
def build_streaming_features(t_env):
"""Real-time sliding window features from event stream."""
# Read from Kafka
t_env.execute_sql("""
CREATE TABLE events (
user_id STRING,
event_type STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'format' = 'json'
)
""")
# Sliding window aggregation - updated every minute
result = t_env.sql_query("""
SELECT
user_id,
COUNT(*) as event_count_1h,
SUM(amount) as spend_sum_1h,
window_end as feature_timestamp
FROM TABLE(
HOP(TABLE events, DESCRIPTOR(event_time),
INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
)
GROUP BY user_id, window_start, window_end
""")
return result
Incremental Processing
For efficiency at scale, prefer incremental computation over full recomputation:
- Change data capture (CDC): Process only rows that changed since the last run.
- Watermark tracking: Track the latest processed timestamp and only read new data.
- Merge operations: Use UPSERT semantics to update only affected entity keys.
- Backfill support: Ability to recompute historical features when logic changes.
Choose the right mode: Start with batch computation for most features. Add streaming only when you need sub-minute freshness. Use on-demand computation for features that depend on request context (e.g., distance between user location and store).
Lilly Tech Systems