Intermediate
Feature Engineering with Spark
Learn how to transform raw data into ML-ready features using Spark's built-in transformers, estimators, and feature utilities.
Transformers vs Estimators
Spark ML uses two key abstractions for feature engineering:
- Transformer: Takes a DataFrame, returns a new DataFrame with added columns. Uses
.transform(). Example: VectorAssembler. - Estimator: Takes a DataFrame, learns parameters, returns a Transformer (model). Uses
.fit()then.transform(). Example: StandardScaler.
VectorAssembler
The most essential transformer — combines multiple columns into a single feature vector.
Python — VectorAssembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["age", "income", "credit_score", "num_products"],
outputCol="features"
)
df_assembled = assembler.transform(df)
df_assembled.select("features", "label").show(5, truncate=False)
Encoding Categorical Features
Python — StringIndexer and OneHotEncoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder
# Convert string labels to numeric indices
indexer = StringIndexer(inputCol="category", outputCol="category_index")
df_indexed = indexer.fit(df).transform(df)
# One-hot encode the indices
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")
df_encoded = encoder.fit(df_indexed).transform(df_indexed)
# For multiple columns at once
indexers = [
StringIndexer(inputCol=col, outputCol=f"{col}_index")
for col in ["city", "gender", "education"]
]
Scaling and Normalization
Python — Feature Scaling
from pyspark.ml.feature import StandardScaler, MinMaxScaler, Normalizer
# StandardScaler (zero mean, unit variance)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withMean=True,
withStd=True
)
scaler_model = scaler.fit(df)
df_scaled = scaler_model.transform(df)
# MinMaxScaler (scale to [0, 1])
min_max = MinMaxScaler(inputCol="features", outputCol="normalized_features")
df_normalized = min_max.fit(df).transform(df)
# L2 Normalization (unit norm)
normalizer = Normalizer(inputCol="features", outputCol="norm_features", p=2.0)
df_normed = normalizer.transform(df)
Text Features
Python — Text Feature Extraction
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
# Tokenize text
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df_words = tokenizer.transform(df)
# Term frequency
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
df_tf = hashing_tf.transform(df_words)
# TF-IDF
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
df_tfidf = idf.fit(df_tf).transform(df_tf)
Feature Selection
Python — Feature Selection with ChiSqSelector
from pyspark.ml.feature import ChiSqSelector
selector = ChiSqSelector(
numTopFeatures=20,
featuresCol="features",
outputCol="selected_features",
labelCol="label"
)
model = selector.fit(df)
df_selected = model.transform(df)
print(f"Selected feature indices: {model.selectedFeatures}")
Common Transformers Reference
| Transformer | Type | Purpose |
|---|---|---|
| VectorAssembler | Transformer | Combine columns into feature vector |
| StringIndexer | Estimator | Map strings to numeric indices |
| OneHotEncoder | Estimator | One-hot encode categorical indices |
| StandardScaler | Estimator | Standardize to zero mean, unit variance |
| Bucketizer | Transformer | Bin continuous features into buckets |
| Imputer | Estimator | Fill missing values with mean/median |
| PCA | Estimator | Dimensionality reduction |
Chain transformers in a Pipeline: Rather than applying transformers one at a time, chain them in a Pipeline for cleaner code and easier model persistence. We cover this in the next lesson.
Lilly Tech Systems