Batch Data Pipelines
Build production-grade batch pipelines with PySpark and Dask for feature computation. Learn partitioning strategies that prevent data skew, implement idempotent processing for safe retries, and design backfill patterns for recomputing historical features.
When to Use Batch Pipelines
Batch pipelines are the workhorse of ML data infrastructure. They process large volumes of data on a schedule (hourly, daily, weekly) to compute features, build training datasets, and populate offline feature stores. Use batch when your features do not need sub-minute freshness — which is most features in most ML systems.
PySpark Feature Pipeline: Production Example
This is a complete, production-ready PySpark pipeline that computes user engagement features for a recommendation model. It demonstrates partitioning, incremental processing, and proper error handling.
# production_feature_pipeline.py
# PySpark feature pipeline for user engagement features
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, LongType
from datetime import datetime, timedelta
import argparse
import sys
def create_spark_session(app_name: str) -> SparkSession:
"""Create a Spark session with production-tuned configuration"""
return (
SparkSession.builder
.appName(app_name)
.config("spark.sql.adaptive.enabled", "true") # Adaptive query execution
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.shuffle.partitions", "200") # Auto-tuned by AQE
.config("spark.sql.parquet.compression.codec", "zstd") # Better compression
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") # Only overwrite affected partitions
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
)
def validate_input_data(df, date_str: str) -> bool:
"""Pre-transform validation: check data quality before processing"""
row_count = df.count()
if row_count == 0:
raise ValueError(f"No data found for date {date_str}")
null_user_rate = df.where(F.col("user_id").isNull()).count() / row_count
if null_user_rate > 0.01:
raise ValueError(f"user_id null rate {null_user_rate:.2%} exceeds 1% threshold")
# Check for expected event types
event_types = set(row.event_type for row in df.select("event_type").distinct().collect())
required_types = {"page_view", "click", "purchase"}
missing = required_types - event_types
if missing:
print(f"WARNING: Missing event types: {missing}")
print(f"Input validation passed: {row_count:,} rows, null_user_rate={null_user_rate:.4%}")
return True
def compute_user_features(spark: SparkSession, process_date: str) -> None:
"""Compute user engagement features for a given date"""
# Read raw events for the 30-day window ending on process_date
end_date = datetime.strptime(process_date, "%Y-%m-%d")
start_date = end_date - timedelta(days=30)
events = (
spark.read.parquet("s3://data-lake/events/")
.where(
(F.col("event_date") >= start_date.strftime("%Y-%m-%d")) &
(F.col("event_date") <= process_date)
)
)
# Validate before processing
validate_input_data(events, process_date)
# Define windows for time-based features
user_window = Window.partitionBy("user_id")
user_time_window = Window.partitionBy("user_id").orderBy("timestamp")
# Compute engagement features
features = (
events
.groupBy("user_id")
.agg(
# Activity volume
F.count("*").alias("total_events_30d"),
F.countDistinct("session_id").alias("total_sessions_30d"),
F.countDistinct(F.date_trunc("day", "timestamp")).alias("active_days_30d"),
# Engagement depth
F.sum(F.when(F.col("event_type") == "page_view", 1).otherwise(0)).alias("page_views_30d"),
F.sum(F.when(F.col("event_type") == "click", 1).otherwise(0)).alias("clicks_30d"),
F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchases_30d"),
F.sum(F.when(F.col("event_type") == "add_to_cart", 1).otherwise(0)).alias("cart_adds_30d"),
# Monetary
F.sum(F.when(F.col("event_type") == "purchase", F.col("value")).otherwise(0)).alias("total_spend_30d"),
F.avg(F.when(F.col("event_type") == "purchase", F.col("value"))).alias("avg_order_value_30d"),
# Recency
F.max("timestamp").alias("last_event_at"),
F.min("timestamp").alias("first_event_at"),
# Category diversity
F.countDistinct("product_category").alias("unique_categories_30d"),
)
.withColumn("click_through_rate",
F.when(F.col("page_views_30d") > 0,
F.col("clicks_30d") / F.col("page_views_30d")
).otherwise(0.0)
)
.withColumn("purchase_rate",
F.when(F.col("total_sessions_30d") > 0,
F.col("purchases_30d") / F.col("total_sessions_30d")
).otherwise(0.0)
)
.withColumn("days_since_last_event",
F.datediff(F.lit(process_date), F.col("last_event_at"))
)
.withColumn("feature_date", F.lit(process_date))
.withColumn("computed_at", F.current_timestamp())
)
# Write with partition overwrite (idempotent - safe to re-run)
(
features
.repartition(10) # Right-size output files (~128MB each)
.write
.mode("overwrite")
.partitionBy("feature_date")
.parquet("s3://feature-store/user_engagement/")
)
row_count = features.count()
print(f"Wrote {row_count:,} feature rows for date={process_date}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--date", required=True, help="Process date (YYYY-MM-DD)")
args = parser.parse_args()
spark = create_spark_session("user-engagement-features")
try:
compute_user_features(spark, args.date)
except Exception as e:
print(f"Pipeline failed: {e}", file=sys.stderr)
sys.exit(1)
finally:
spark.stop()
Partitioning Strategies
Partitioning determines how data is physically organized on disk and how Spark distributes work across nodes. Wrong partitioning causes data skew (one worker does all the work while others sit idle) and tiny files that kill read performance.
| Strategy | Partition By | Best For | Watch Out For |
|---|---|---|---|
| Date partitioning | year/month/day |
Time-series data, incremental processing | Too many small partitions if using hour-level |
| Hash partitioning | hash(user_id) % N |
Even distribution for user-level aggregations | Cannot prune partitions by user_id range |
| Composite | date + region |
Multi-dimensional queries | Combinatorial explosion of partitions |
| Z-order / Hilbert | Multi-column clustering | Delta Lake / Iceberg tables with multi-column filters | Requires compute for clustering; best on large tables |
# Partitioning best practices in PySpark
# GOOD: Date partition with right-sized files
(
df.repartition(10) # Control output file count
.write
.mode("overwrite")
.partitionBy("event_date") # One directory per date
.option("maxRecordsPerFile", 1_000_000) # Cap file size
.parquet("s3://output/events/")
)
# BAD: Over-partitioning creates millions of tiny files
# df.write.partitionBy("event_date", "hour", "user_country", "event_type")
# Handle data skew: salted joins for hot keys
# Problem: 1% of user_ids generate 50% of events
from pyspark.sql import functions as F
SALT_BUCKETS = 10
# Salt the large table (events)
events_salted = events.withColumn(
"salt", (F.rand() * SALT_BUCKETS).cast("int")
)
# Explode the small table (users) to match all salt values
users_exploded = users.crossJoin(
spark.range(SALT_BUCKETS).withColumnRenamed("id", "salt")
)
# Join on user_id + salt = evenly distributed
result = events_salted.join(
users_exploded,
on=["user_id", "salt"],
how="inner"
).drop("salt")
Idempotent Processing
Every batch pipeline fails eventually — network timeouts, OOM errors, corrupted input files. Idempotent processing means you can re-run any stage safely without producing duplicates or incorrect results. This is the single most important property of a reliable batch pipeline.
# Three patterns for idempotent batch processing
# Pattern 1: Partition Overwrite (simplest, most common)
# Re-running overwrites the same partition, producing identical output
df.write.mode("overwrite").partitionBy("date").parquet("s3://output/")
# Pattern 2: Delta Lake MERGE (upsert - handles late-arriving data)
from delta.tables import DeltaTable
target = DeltaTable.forPath(spark, "s3://feature-store/user_features/")
target.alias("target").merge(
new_features.alias("source"),
"target.user_id = source.user_id AND target.feature_date = source.feature_date"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Pattern 3: Write-Audit-Publish (highest reliability)
# 1. Write to staging location
staging_path = f"s3://staging/features/{run_id}/"
df.write.mode("overwrite").parquet(staging_path)
# 2. Audit: validate output
staging_df = spark.read.parquet(staging_path)
assert staging_df.count() > 0, "Empty output"
assert staging_df.where(F.col("user_id").isNull()).count() == 0, "Null user_ids in output"
# 3. Publish: atomically swap staging to production
# With Delta Lake, this is a single metadata operation
spark.sql(f"""
CREATE OR REPLACE TABLE feature_store.user_features
USING DELTA LOCATION 's3://feature-store/user_features/'
AS SELECT * FROM parquet.`{staging_path}`
""")
Backfill Patterns
Backfills are required when you add new features, fix bugs in feature computation, or onboard a new model that needs historical features. A well-designed pipeline supports backfills without special code paths.
# Backfill orchestration with Airflow
# Run the same pipeline DAG for a range of historical dates
# Option 1: Airflow CLI backfill (simplest)
# airflow dags backfill feature_pipeline \
# --start-date 2025-01-01 \
# --end-date 2026-03-01 \
# --reset-dagruns
# Option 2: Programmatic backfill with parallelism control
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import date, timedelta
def backfill_date(process_date: str) -> dict:
"""Run feature pipeline for a single date"""
spark = create_spark_session(f"backfill-{process_date}")
try:
compute_user_features(spark, process_date)
return {"date": process_date, "status": "success"}
except Exception as e:
return {"date": process_date, "status": "failed", "error": str(e)}
finally:
spark.stop()
def run_backfill(start: str, end: str, max_parallel: int = 4):
"""Backfill features for a date range with controlled parallelism"""
start_date = date.fromisoformat(start)
end_date = date.fromisoformat(end)
dates = []
current = start_date
while current <= end_date:
dates.append(current.isoformat())
current += timedelta(days=1)
print(f"Backfilling {len(dates)} dates with {max_parallel} parallel workers")
results = []
with ThreadPoolExecutor(max_workers=max_parallel) as executor:
futures = {executor.submit(backfill_date, d): d for d in dates}
for future in as_completed(futures):
result = future.result()
results.append(result)
status = result["status"]
print(f" {result['date']}: {status}")
failed = [r for r in results if r["status"] == "failed"]
if failed:
print(f"\n{len(failed)} dates failed:")
for f in failed:
print(f" {f['date']}: {f['error']}")
raise RuntimeError(f"Backfill had {len(failed)} failures")
print(f"\nBackfill complete: {len(results)} dates processed successfully")
# Run: python backfill.py --start 2025-01-01 --end 2026-03-01 --parallel 8
Dask: Alternative to Spark for Smaller Teams
If your data fits in a single machine's memory (up to ~500GB with disk spilling) or your team is Python-native and does not want to manage a Spark cluster, Dask is a strong alternative. It uses familiar pandas-like APIs and scales from a laptop to a cluster.
# Dask feature pipeline - same logic, pandas-like API
import dask.dataframe as dd
from dask.distributed import Client
def compute_features_dask(process_date: str):
# Connect to Dask cluster (or use local threads)
client = Client("scheduler:8786") # or Client() for local
# Read partitioned parquet (Dask reads lazily)
events = dd.read_parquet(
"s3://data-lake/events/",
filters=[("event_date", ">=", "2026-02-18"), ("event_date", "<=", process_date)],
)
# Compute features (pandas-like API, distributed execution)
features = (
events
.groupby("user_id")
.agg({
"event_id": "count",
"session_id": "nunique",
"value": ["sum", "mean"],
})
)
features.columns = ["total_events", "total_sessions", "total_spend", "avg_order_value"]
# Write output (idempotent overwrite)
features.to_parquet(
f"s3://feature-store/user_engagement/feature_date={process_date}/",
engine="pyarrow",
write_index=True,
overwrite=True,
)
print(f"Dask pipeline complete: {len(features)} users processed")
client.close()
What Is Next
Now that you can build batch pipelines for feature computation, the next lesson covers Streaming Data Pipelines. You will learn Kafka + Flink/Spark Streaming for real-time features, exactly-once semantics, windowed aggregations, and how to unify batch and streaming paths to avoid train-serve skew.
Lilly Tech Systems