Intermediate

Feature Computation

Build robust feature computation pipelines using batch, streaming, and on-demand engines with proper scheduling and incremental processing.

Computation Modes

📦

Batch Computation

Scheduled jobs (hourly/daily) that process large volumes of data using Spark, Flink Batch, or SQL engines. Best for aggregate features over time windows.

Streaming Computation

Continuous processing of event streams using Flink, Spark Streaming, or Kafka Streams. Best for real-time features with low-latency requirements.

🚀

On-Demand Computation

Features computed at request time using lightweight transformations. Best for features that combine request context with stored features.

Batch Feature Pipeline

Python — Spark Batch Feature Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def compute_user_features(spark, run_date):
    """Compute user aggregate features for a given date."""
    transactions = spark.read.parquet(
        f"s3://data-lake/transactions/date={run_date}"
    )

    # Define time windows
    window_30d = Window.partitionBy("user_id").orderBy("timestamp").rangeBetween(
        -30 * 86400, 0  # 30 days in seconds
    )

    user_features = transactions.groupBy("user_id").agg(
        F.count("*").alias("tx_count_30d"),
        F.sum("amount").alias("tx_sum_30d"),
        F.avg("amount").alias("tx_avg_30d"),
        F.stddev("amount").alias("tx_stddev_30d"),
        F.countDistinct("merchant_id").alias("unique_merchants_30d"),
        F.max("timestamp").alias("last_tx_timestamp")
    )

    # Write to offline store
    user_features.write.mode("overwrite").parquet(
        f"s3://feature-store/user_features/date={run_date}"
    )
    return user_features

Streaming Feature Pipeline

Python — Flink Streaming Features
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col, lit
from pyflink.table.window import Slide

def build_streaming_features(t_env):
    """Real-time sliding window features from event stream."""
    # Read from Kafka
    t_env.execute_sql("""
        CREATE TABLE events (
            user_id STRING,
            event_type STRING,
            amount DOUBLE,
            event_time TIMESTAMP(3),
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'user-events',
            'format' = 'json'
        )
    """)

    # Sliding window aggregation - updated every minute
    result = t_env.sql_query("""
        SELECT
            user_id,
            COUNT(*) as event_count_1h,
            SUM(amount) as spend_sum_1h,
            window_end as feature_timestamp
        FROM TABLE(
            HOP(TABLE events, DESCRIPTOR(event_time),
                INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
        )
        GROUP BY user_id, window_start, window_end
    """)
    return result

Incremental Processing

For efficiency at scale, prefer incremental computation over full recomputation:

  • Change data capture (CDC): Process only rows that changed since the last run.
  • Watermark tracking: Track the latest processed timestamp and only read new data.
  • Merge operations: Use UPSERT semantics to update only affected entity keys.
  • Backfill support: Ability to recompute historical features when logic changes.
Choose the right mode: Start with batch computation for most features. Add streaming only when you need sub-minute freshness. Use on-demand computation for features that depend on request context (e.g., distance between user location and store).