Intermediate

Batch Data Pipelines

Build production-grade batch pipelines with PySpark and Dask for feature computation. Learn partitioning strategies that prevent data skew, implement idempotent processing for safe retries, and design backfill patterns for recomputing historical features.

When to Use Batch Pipelines

Batch pipelines are the workhorse of ML data infrastructure. They process large volumes of data on a schedule (hourly, daily, weekly) to compute features, build training datasets, and populate offline feature stores. Use batch when your features do not need sub-minute freshness — which is most features in most ML systems.

💡
The 90/10 rule: At most companies, 90% of ML features can be computed in batch (hourly or daily). Only 10% truly need real-time computation. Start with batch for everything, then move specific features to streaming only when latency requirements demand it. Batch is cheaper, simpler, and easier to debug.

PySpark Feature Pipeline: Production Example

This is a complete, production-ready PySpark pipeline that computes user engagement features for a recommendation model. It demonstrates partitioning, incremental processing, and proper error handling.

# production_feature_pipeline.py
# PySpark feature pipeline for user engagement features
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, LongType
from datetime import datetime, timedelta
import argparse
import sys

def create_spark_session(app_name: str) -> SparkSession:
    """Create a Spark session with production-tuned configuration"""
    return (
        SparkSession.builder
        .appName(app_name)
        .config("spark.sql.adaptive.enabled", "true")           # Adaptive query execution
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .config("spark.sql.shuffle.partitions", "200")           # Auto-tuned by AQE
        .config("spark.sql.parquet.compression.codec", "zstd")   # Better compression
        .config("spark.sql.sources.partitionOverwriteMode", "dynamic")  # Only overwrite affected partitions
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .getOrCreate()
    )

def validate_input_data(df, date_str: str) -> bool:
    """Pre-transform validation: check data quality before processing"""
    row_count = df.count()
    if row_count == 0:
        raise ValueError(f"No data found for date {date_str}")

    null_user_rate = df.where(F.col("user_id").isNull()).count() / row_count
    if null_user_rate > 0.01:
        raise ValueError(f"user_id null rate {null_user_rate:.2%} exceeds 1% threshold")

    # Check for expected event types
    event_types = set(row.event_type for row in df.select("event_type").distinct().collect())
    required_types = {"page_view", "click", "purchase"}
    missing = required_types - event_types
    if missing:
        print(f"WARNING: Missing event types: {missing}")

    print(f"Input validation passed: {row_count:,} rows, null_user_rate={null_user_rate:.4%}")
    return True

def compute_user_features(spark: SparkSession, process_date: str) -> None:
    """Compute user engagement features for a given date"""
    # Read raw events for the 30-day window ending on process_date
    end_date = datetime.strptime(process_date, "%Y-%m-%d")
    start_date = end_date - timedelta(days=30)

    events = (
        spark.read.parquet("s3://data-lake/events/")
        .where(
            (F.col("event_date") >= start_date.strftime("%Y-%m-%d")) &
            (F.col("event_date") <= process_date)
        )
    )

    # Validate before processing
    validate_input_data(events, process_date)

    # Define windows for time-based features
    user_window = Window.partitionBy("user_id")
    user_time_window = Window.partitionBy("user_id").orderBy("timestamp")

    # Compute engagement features
    features = (
        events
        .groupBy("user_id")
        .agg(
            # Activity volume
            F.count("*").alias("total_events_30d"),
            F.countDistinct("session_id").alias("total_sessions_30d"),
            F.countDistinct(F.date_trunc("day", "timestamp")).alias("active_days_30d"),

            # Engagement depth
            F.sum(F.when(F.col("event_type") == "page_view", 1).otherwise(0)).alias("page_views_30d"),
            F.sum(F.when(F.col("event_type") == "click", 1).otherwise(0)).alias("clicks_30d"),
            F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchases_30d"),
            F.sum(F.when(F.col("event_type") == "add_to_cart", 1).otherwise(0)).alias("cart_adds_30d"),

            # Monetary
            F.sum(F.when(F.col("event_type") == "purchase", F.col("value")).otherwise(0)).alias("total_spend_30d"),
            F.avg(F.when(F.col("event_type") == "purchase", F.col("value"))).alias("avg_order_value_30d"),

            # Recency
            F.max("timestamp").alias("last_event_at"),
            F.min("timestamp").alias("first_event_at"),

            # Category diversity
            F.countDistinct("product_category").alias("unique_categories_30d"),
        )
        .withColumn("click_through_rate",
            F.when(F.col("page_views_30d") > 0,
                   F.col("clicks_30d") / F.col("page_views_30d")
            ).otherwise(0.0)
        )
        .withColumn("purchase_rate",
            F.when(F.col("total_sessions_30d") > 0,
                   F.col("purchases_30d") / F.col("total_sessions_30d")
            ).otherwise(0.0)
        )
        .withColumn("days_since_last_event",
            F.datediff(F.lit(process_date), F.col("last_event_at"))
        )
        .withColumn("feature_date", F.lit(process_date))
        .withColumn("computed_at", F.current_timestamp())
    )

    # Write with partition overwrite (idempotent - safe to re-run)
    (
        features
        .repartition(10)  # Right-size output files (~128MB each)
        .write
        .mode("overwrite")
        .partitionBy("feature_date")
        .parquet("s3://feature-store/user_engagement/")
    )

    row_count = features.count()
    print(f"Wrote {row_count:,} feature rows for date={process_date}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--date", required=True, help="Process date (YYYY-MM-DD)")
    args = parser.parse_args()

    spark = create_spark_session("user-engagement-features")
    try:
        compute_user_features(spark, args.date)
    except Exception as e:
        print(f"Pipeline failed: {e}", file=sys.stderr)
        sys.exit(1)
    finally:
        spark.stop()

Partitioning Strategies

Partitioning determines how data is physically organized on disk and how Spark distributes work across nodes. Wrong partitioning causes data skew (one worker does all the work while others sit idle) and tiny files that kill read performance.

StrategyPartition ByBest ForWatch Out For
Date partitioning year/month/day Time-series data, incremental processing Too many small partitions if using hour-level
Hash partitioning hash(user_id) % N Even distribution for user-level aggregations Cannot prune partitions by user_id range
Composite date + region Multi-dimensional queries Combinatorial explosion of partitions
Z-order / Hilbert Multi-column clustering Delta Lake / Iceberg tables with multi-column filters Requires compute for clustering; best on large tables
# Partitioning best practices in PySpark

# GOOD: Date partition with right-sized files
(
    df.repartition(10)               # Control output file count
    .write
    .mode("overwrite")
    .partitionBy("event_date")       # One directory per date
    .option("maxRecordsPerFile", 1_000_000)  # Cap file size
    .parquet("s3://output/events/")
)

# BAD: Over-partitioning creates millions of tiny files
# df.write.partitionBy("event_date", "hour", "user_country", "event_type")

# Handle data skew: salted joins for hot keys
# Problem: 1% of user_ids generate 50% of events
from pyspark.sql import functions as F

SALT_BUCKETS = 10

# Salt the large table (events)
events_salted = events.withColumn(
    "salt", (F.rand() * SALT_BUCKETS).cast("int")
)

# Explode the small table (users) to match all salt values
users_exploded = users.crossJoin(
    spark.range(SALT_BUCKETS).withColumnRenamed("id", "salt")
)

# Join on user_id + salt = evenly distributed
result = events_salted.join(
    users_exploded,
    on=["user_id", "salt"],
    how="inner"
).drop("salt")

Idempotent Processing

Every batch pipeline fails eventually — network timeouts, OOM errors, corrupted input files. Idempotent processing means you can re-run any stage safely without producing duplicates or incorrect results. This is the single most important property of a reliable batch pipeline.

# Three patterns for idempotent batch processing

# Pattern 1: Partition Overwrite (simplest, most common)
# Re-running overwrites the same partition, producing identical output
df.write.mode("overwrite").partitionBy("date").parquet("s3://output/")

# Pattern 2: Delta Lake MERGE (upsert - handles late-arriving data)
from delta.tables import DeltaTable

target = DeltaTable.forPath(spark, "s3://feature-store/user_features/")
target.alias("target").merge(
    new_features.alias("source"),
    "target.user_id = source.user_id AND target.feature_date = source.feature_date"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Pattern 3: Write-Audit-Publish (highest reliability)
# 1. Write to staging location
staging_path = f"s3://staging/features/{run_id}/"
df.write.mode("overwrite").parquet(staging_path)

# 2. Audit: validate output
staging_df = spark.read.parquet(staging_path)
assert staging_df.count() > 0, "Empty output"
assert staging_df.where(F.col("user_id").isNull()).count() == 0, "Null user_ids in output"

# 3. Publish: atomically swap staging to production
# With Delta Lake, this is a single metadata operation
spark.sql(f"""
    CREATE OR REPLACE TABLE feature_store.user_features
    USING DELTA LOCATION 's3://feature-store/user_features/'
    AS SELECT * FROM parquet.`{staging_path}`
""")

Backfill Patterns

Backfills are required when you add new features, fix bugs in feature computation, or onboard a new model that needs historical features. A well-designed pipeline supports backfills without special code paths.

# Backfill orchestration with Airflow
# Run the same pipeline DAG for a range of historical dates

# Option 1: Airflow CLI backfill (simplest)
# airflow dags backfill feature_pipeline \
#   --start-date 2025-01-01 \
#   --end-date 2026-03-01 \
#   --reset-dagruns

# Option 2: Programmatic backfill with parallelism control
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import date, timedelta

def backfill_date(process_date: str) -> dict:
    """Run feature pipeline for a single date"""
    spark = create_spark_session(f"backfill-{process_date}")
    try:
        compute_user_features(spark, process_date)
        return {"date": process_date, "status": "success"}
    except Exception as e:
        return {"date": process_date, "status": "failed", "error": str(e)}
    finally:
        spark.stop()

def run_backfill(start: str, end: str, max_parallel: int = 4):
    """Backfill features for a date range with controlled parallelism"""
    start_date = date.fromisoformat(start)
    end_date = date.fromisoformat(end)
    dates = []
    current = start_date
    while current <= end_date:
        dates.append(current.isoformat())
        current += timedelta(days=1)

    print(f"Backfilling {len(dates)} dates with {max_parallel} parallel workers")
    results = []

    with ThreadPoolExecutor(max_workers=max_parallel) as executor:
        futures = {executor.submit(backfill_date, d): d for d in dates}
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
            status = result["status"]
            print(f"  {result['date']}: {status}")

    failed = [r for r in results if r["status"] == "failed"]
    if failed:
        print(f"\n{len(failed)} dates failed:")
        for f in failed:
            print(f"  {f['date']}: {f['error']}")
        raise RuntimeError(f"Backfill had {len(failed)} failures")

    print(f"\nBackfill complete: {len(results)} dates processed successfully")

# Run: python backfill.py --start 2025-01-01 --end 2026-03-01 --parallel 8
💡
Backfill cost control: Backfills over months of data can be extremely expensive. Use spot instances for backfill jobs (with checkpointing), process dates in chronological order so you can stop early if something is wrong, and always run a single-date test before launching the full backfill. Budget 2-3x the normal daily cost per backfill date due to cold-start overhead.

Dask: Alternative to Spark for Smaller Teams

If your data fits in a single machine's memory (up to ~500GB with disk spilling) or your team is Python-native and does not want to manage a Spark cluster, Dask is a strong alternative. It uses familiar pandas-like APIs and scales from a laptop to a cluster.

# Dask feature pipeline - same logic, pandas-like API
import dask.dataframe as dd
from dask.distributed import Client

def compute_features_dask(process_date: str):
    # Connect to Dask cluster (or use local threads)
    client = Client("scheduler:8786")  # or Client() for local

    # Read partitioned parquet (Dask reads lazily)
    events = dd.read_parquet(
        "s3://data-lake/events/",
        filters=[("event_date", ">=", "2026-02-18"), ("event_date", "<=", process_date)],
    )

    # Compute features (pandas-like API, distributed execution)
    features = (
        events
        .groupby("user_id")
        .agg({
            "event_id": "count",
            "session_id": "nunique",
            "value": ["sum", "mean"],
        })
    )
    features.columns = ["total_events", "total_sessions", "total_spend", "avg_order_value"]

    # Write output (idempotent overwrite)
    features.to_parquet(
        f"s3://feature-store/user_engagement/feature_date={process_date}/",
        engine="pyarrow",
        write_index=True,
        overwrite=True,
    )

    print(f"Dask pipeline complete: {len(features)} users processed")
    client.close()

What Is Next

Now that you can build batch pipelines for feature computation, the next lesson covers Streaming Data Pipelines. You will learn Kafka + Flink/Spark Streaming for real-time features, exactly-once semantics, windowed aggregations, and how to unify batch and streaming paths to avoid train-serve skew.