Intermediate

Feature Engineering with Spark

Learn how to transform raw data into ML-ready features using Spark's built-in transformers, estimators, and feature utilities.

Transformers vs Estimators

Spark ML uses two key abstractions for feature engineering:

  • Transformer: Takes a DataFrame, returns a new DataFrame with added columns. Uses .transform(). Example: VectorAssembler.
  • Estimator: Takes a DataFrame, learns parameters, returns a Transformer (model). Uses .fit() then .transform(). Example: StandardScaler.

VectorAssembler

The most essential transformer — combines multiple columns into a single feature vector.

Python — VectorAssembler
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["age", "income", "credit_score", "num_products"],
    outputCol="features"
)

df_assembled = assembler.transform(df)
df_assembled.select("features", "label").show(5, truncate=False)

Encoding Categorical Features

Python — StringIndexer and OneHotEncoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Convert string labels to numeric indices
indexer = StringIndexer(inputCol="category", outputCol="category_index")
df_indexed = indexer.fit(df).transform(df)

# One-hot encode the indices
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")
df_encoded = encoder.fit(df_indexed).transform(df_indexed)

# For multiple columns at once
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_index")
    for col in ["city", "gender", "education"]
]

Scaling and Normalization

Python — Feature Scaling
from pyspark.ml.feature import StandardScaler, MinMaxScaler, Normalizer

# StandardScaler (zero mean, unit variance)
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withMean=True,
    withStd=True
)
scaler_model = scaler.fit(df)
df_scaled = scaler_model.transform(df)

# MinMaxScaler (scale to [0, 1])
min_max = MinMaxScaler(inputCol="features", outputCol="normalized_features")
df_normalized = min_max.fit(df).transform(df)

# L2 Normalization (unit norm)
normalizer = Normalizer(inputCol="features", outputCol="norm_features", p=2.0)
df_normed = normalizer.transform(df)

Text Features

Python — Text Feature Extraction
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

# Tokenize text
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df_words = tokenizer.transform(df)

# Term frequency
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
df_tf = hashing_tf.transform(df_words)

# TF-IDF
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
df_tfidf = idf.fit(df_tf).transform(df_tf)

Feature Selection

Python — Feature Selection with ChiSqSelector
from pyspark.ml.feature import ChiSqSelector

selector = ChiSqSelector(
    numTopFeatures=20,
    featuresCol="features",
    outputCol="selected_features",
    labelCol="label"
)

model = selector.fit(df)
df_selected = model.transform(df)
print(f"Selected feature indices: {model.selectedFeatures}")

Common Transformers Reference

TransformerTypePurpose
VectorAssemblerTransformerCombine columns into feature vector
StringIndexerEstimatorMap strings to numeric indices
OneHotEncoderEstimatorOne-hot encode categorical indices
StandardScalerEstimatorStandardize to zero mean, unit variance
BucketizerTransformerBin continuous features into buckets
ImputerEstimatorFill missing values with mean/median
PCAEstimatorDimensionality reduction
Chain transformers in a Pipeline: Rather than applying transformers one at a time, chain them in a Pipeline for cleaner code and easier model persistence. We cover this in the next lesson.