Feature Engineering Challenges
Feature engineering is where data pipelines meet ML. These 5 challenges cover the feature generation patterns that ML engineers implement in production feature pipelines and interviewers test at senior levels.
Challenge 1: Time-Based Feature Generation
from datetime import datetime, timedelta
from collections import defaultdict
def generate_time_features(transactions, user_field='user_id',
time_field='timestamp', value_field='amount'):
"""Generate time-based features for each transaction.
Features generated:
- time_since_last: seconds since user's previous transaction
- rolling_avg_7d: average transaction amount over last 7 days
- rolling_count_7d: number of transactions in last 7 days
- rolling_avg_30d: average over last 30 days
- day_of_week: 0=Monday, 6=Sunday
- hour_of_day: 0-23
- is_weekend: boolean
- recency_score: days since first transaction
- frequency_score: total transaction count so far
- monetary_score: cumulative total amount
"""
# Sort by user, then timestamp
sorted_txns = sorted(transactions, key=lambda t: (t[user_field], t[time_field]))
# Track per-user state
user_history = defaultdict(list) # user -> [(timestamp, amount)]
results = []
for txn in sorted_txns:
user = txn[user_field]
ts = txn[time_field]
amount = txn[value_field]
history = user_history[user]
features = dict(txn) # Copy original fields
# Time since last transaction
if history:
features['time_since_last'] = ts - history[-1][0]
else:
features['time_since_last'] = 0
# Rolling windows (7-day and 30-day)
window_7d = [(t, a) for t, a in history if ts - t <= 7 * 86400]
window_30d = [(t, a) for t, a in history if ts - t <= 30 * 86400]
features['rolling_count_7d'] = len(window_7d)
features['rolling_avg_7d'] = (
sum(a for _, a in window_7d) / len(window_7d)
if window_7d else 0
)
features['rolling_count_30d'] = len(window_30d)
features['rolling_avg_30d'] = (
sum(a for _, a in window_30d) / len(window_30d)
if window_30d else 0
)
# Calendar features
dt = datetime.fromtimestamp(ts)
features['day_of_week'] = dt.weekday()
features['hour_of_day'] = dt.hour
features['is_weekend'] = dt.weekday() >= 5
# RFM scores
if history:
features['recency_days'] = (ts - history[-1][0]) / 86400
else:
features['recency_days'] = 0
features['frequency'] = len(history) + 1
features['monetary_total'] = sum(a for _, a in history) + amount
# Update history
history.append((ts, amount))
results.append(features)
return results
# Test
base_ts = 1704067200 # 2024-01-01 00:00:00 UTC
transactions = [
{'user_id': 'u1', 'timestamp': base_ts, 'amount': 50},
{'user_id': 'u1', 'timestamp': base_ts + 86400, 'amount': 75}, # +1 day
{'user_id': 'u2', 'timestamp': base_ts + 86400, 'amount': 120},
{'user_id': 'u1', 'timestamp': base_ts + 3 * 86400, 'amount': 30}, # +3 days
{'user_id': 'u1', 'timestamp': base_ts + 10 * 86400, 'amount': 200}, # +10 days
{'user_id': 'u2', 'timestamp': base_ts + 15 * 86400, 'amount': 85}, # +15 days
]
features = generate_time_features(transactions)
for f in features:
print(f" user={f['user_id']} amount={f['amount']:>5} "
f"time_since_last={f['time_since_last']:>7} "
f"roll_7d_avg={f['rolling_avg_7d']:>6.1f} "
f"freq={f['frequency']} monetary={f['monetary_total']}")
Complexity: O(n * h) where n is transactions and h is average history length per user. For large histories, use a sorted data structure to make window lookups O(log h).
Challenge 2: Categorical Encoding Pipeline
from collections import Counter, defaultdict
class CategoricalEncoder:
"""Production-ready categorical encoding with multiple strategies.
Supports: label, onehot, frequency, target encoding.
Handles unseen categories at inference time.
"""
def __init__(self, strategy='label', handle_unknown='default'):
self.strategy = strategy
self.handle_unknown = handle_unknown # 'default', 'error', or a value
self.mappings = {} # field -> encoding mapping
self.is_fitted = False
def fit(self, records, fields, target_field=None):
"""Learn encoding mappings from training data."""
for field in fields:
values = [r.get(field) for r in records if r.get(field) is not None]
if self.strategy == 'label':
unique = sorted(set(values))
self.mappings[field] = {v: i for i, v in enumerate(unique)}
elif self.strategy == 'onehot':
self.mappings[field] = sorted(set(values))
elif self.strategy == 'frequency':
counts = Counter(values)
total = len(values)
self.mappings[field] = {v: c / total for v, c in counts.items()}
elif self.strategy == 'target':
if target_field is None:
raise ValueError("target_field required for target encoding")
# Group target values by category
cat_targets = defaultdict(list)
for r in records:
cat = r.get(field)
target = r.get(target_field)
if cat is not None and target is not None:
cat_targets[cat].append(target)
# Mean target per category (with smoothing)
global_mean = sum(r.get(target_field, 0) for r in records) / len(records)
smoothing = 10 # Regularization parameter
self.mappings[field] = {}
for cat, targets in cat_targets.items():
n = len(targets)
cat_mean = sum(targets) / n
# Bayesian smoothing: blend category mean with global mean
smoothed = (n * cat_mean + smoothing * global_mean) / (n + smoothing)
self.mappings[field][cat] = round(smoothed, 4)
self.mappings[f'{field}__global_mean'] = global_mean
self.is_fitted = True
return self
def transform(self, records, fields):
"""Apply learned encodings to records."""
if not self.is_fitted:
raise ValueError("Must call fit() before transform()")
results = []
for record in records:
encoded = dict(record)
for field in fields:
value = record.get(field)
if self.strategy == 'label':
mapping = self.mappings[field]
if value in mapping:
encoded[f'{field}_encoded'] = mapping[value]
else:
encoded[f'{field}_encoded'] = -1 # Unknown
elif self.strategy == 'onehot':
categories = self.mappings[field]
for cat in categories:
encoded[f'{field}_{cat}'] = 1 if value == cat else 0
elif self.strategy == 'frequency':
mapping = self.mappings[field]
encoded[f'{field}_freq'] = mapping.get(value, 0)
elif self.strategy == 'target':
mapping = self.mappings[field]
global_mean = self.mappings.get(f'{field}__global_mean', 0)
encoded[f'{field}_target'] = mapping.get(value, global_mean)
results.append(encoded)
return results
# Test all strategies
train_data = [
{'city': 'NYC', 'color': 'red', 'purchased': 1},
{'city': 'NYC', 'color': 'blue', 'purchased': 1},
{'city': 'LA', 'color': 'red', 'purchased': 0},
{'city': 'LA', 'color': 'green', 'purchased': 0},
{'city': 'Chicago', 'color': 'blue', 'purchased': 1},
{'city': 'NYC', 'color': 'red', 'purchased': 0},
]
test_data = [
{'city': 'NYC', 'color': 'red', 'purchased': None},
{'city': 'Boston', 'color': 'blue', 'purchased': None}, # Unseen city
]
fields = ['city', 'color']
# Label encoding
enc = CategoricalEncoder(strategy='label').fit(train_data, fields)
result = enc.transform(test_data, fields)
print("Label encoding:")
for r in result:
print(f" {r['city']} -> {r['city_encoded']}, {r['color']} -> {r['color_encoded']}")
# Frequency encoding
enc = CategoricalEncoder(strategy='frequency').fit(train_data, fields)
result = enc.transform(test_data, fields)
print("\nFrequency encoding:")
for r in result:
print(f" {r['city']} -> {r['city_freq']:.3f}, {r['color']} -> {r['color_freq']:.3f}")
# Target encoding
enc = CategoricalEncoder(strategy='target').fit(train_data, fields, target_field='purchased')
result = enc.transform(test_data, fields)
print("\nTarget encoding:")
for r in result:
print(f" {r['city']} -> {r['city_target']:.4f}, {r['color']} -> {r['color_target']:.4f}")
Complexity: Fit: O(n * f) where n is records and f is fields. Transform: O(n * f) for label/frequency/target, O(n * f * c) for one-hot where c is unique categories per field.
Challenge 3: Text Feature Extraction
import math
import re
from collections import Counter, defaultdict
class TextFeatureExtractor:
"""Extract numerical features from text without external libraries."""
def __init__(self, max_features=100, ngram_range=(1, 1)):
self.max_features = max_features
self.ngram_range = ngram_range
self.vocabulary = {} # term -> index
self.idf_scores = {} # term -> IDF score
self.is_fitted = False
def _tokenize(self, text):
"""Simple whitespace + punctuation tokenizer."""
text = text.lower().strip()
tokens = re.findall(r'\b[a-z0-9]+\b', text)
return tokens
def _get_ngrams(self, tokens):
"""Generate n-grams from tokens."""
ngrams = []
for n in range(self.ngram_range[0], self.ngram_range[1] + 1):
for i in range(len(tokens) - n + 1):
ngrams.append('_'.join(tokens[i:i + n]))
return ngrams
def _text_stats(self, text):
"""Compute basic text statistics."""
return {
'char_count': len(text),
'word_count': len(text.split()),
'avg_word_len': (sum(len(w) for w in text.split()) /
max(len(text.split()), 1)),
'uppercase_ratio': sum(1 for c in text if c.isupper()) / max(len(text), 1),
'digit_ratio': sum(1 for c in text if c.isdigit()) / max(len(text), 1),
'special_char_ratio': sum(1 for c in text if not c.isalnum() and c != ' ') / max(len(text), 1),
'sentence_count': len(re.split(r'[.!?]+', text.strip())),
}
def fit(self, texts):
"""Learn vocabulary and IDF scores from corpus."""
doc_count = len(texts)
term_doc_freq = Counter() # How many docs contain each term
term_freq_total = Counter() # Total term frequency across all docs
for text in texts:
tokens = self._tokenize(text)
ngrams = self._get_ngrams(tokens)
term_freq_total.update(ngrams)
# Document frequency (unique terms per doc)
term_doc_freq.update(set(ngrams))
# Select top features by total frequency
top_terms = term_freq_total.most_common(self.max_features)
self.vocabulary = {term: idx for idx, (term, _) in enumerate(top_terms)}
# Compute IDF: log(N / (1 + df)) (add 1 to prevent division by zero)
self.idf_scores = {
term: math.log(doc_count / (1 + term_doc_freq.get(term, 0)))
for term in self.vocabulary
}
self.is_fitted = True
return self
def transform(self, texts, mode='tfidf'):
"""Transform texts into feature vectors.
Modes: 'bow' (bag of words), 'tfidf', 'all' (tfidf + stats)
"""
if not self.is_fitted:
raise ValueError("Must call fit() first")
results = []
for text in texts:
tokens = self._tokenize(text)
ngrams = self._get_ngrams(tokens)
term_counts = Counter(ngrams)
features = {}
if mode in ('bow', 'all'):
for term, idx in self.vocabulary.items():
features[f'bow_{term}'] = term_counts.get(term, 0)
if mode in ('tfidf', 'all'):
total_terms = max(len(ngrams), 1)
for term, idx in self.vocabulary.items():
tf = term_counts.get(term, 0) / total_terms
idf = self.idf_scores.get(term, 0)
features[f'tfidf_{term}'] = round(tf * idf, 4)
if mode == 'all':
stats = self._text_stats(text)
features.update({f'stat_{k}': v for k, v in stats.items()})
results.append(features)
return results
# Test
corpus = [
"The data pipeline processes streaming events in real time",
"ETL pipelines extract transform and load data into warehouses",
"Streaming data requires windowing and watermark handling",
"Data validation ensures quality in production pipelines",
"Feature engineering transforms raw data into ML features",
]
extractor = TextFeatureExtractor(max_features=15, ngram_range=(1, 2)).fit(corpus)
test_texts = [
"This pipeline handles streaming data events",
"Data quality validation is critical for ML",
]
results = extractor.transform(test_texts, mode='all')
for i, features in enumerate(results):
print(f"\nText {i + 1}:")
tfidf = {k: v for k, v in features.items() if k.startswith('tfidf_') and v > 0}
stats = {k: v for k, v in features.items() if k.startswith('stat_')}
print(f" Top TF-IDF: {dict(sorted(tfidf.items(), key=lambda x: -x[1])[:5])}")
print(f" Stats: {stats}")
Complexity: Fit: O(D * T) where D is documents and T is average tokens per document. Transform: O(T * V) per document where V is vocabulary size.
Challenge 4: Aggregation Feature Generator
from collections import defaultdict
import math
class AggregationFeatureGenerator:
"""Generate aggregation-based features for ML pipelines.
Creates group-level statistics that can be joined back to individual records.
This is the most common feature engineering pattern in industry.
"""
def __init__(self):
self.agg_tables = {} # key -> aggregation results
def _percentile(self, values, p):
"""Compute percentile without numpy."""
sorted_vals = sorted(values)
k = (len(sorted_vals) - 1) * (p / 100)
f = math.floor(k)
c = math.ceil(k)
if f == c:
return sorted_vals[int(k)]
return sorted_vals[f] * (c - k) + sorted_vals[c] * (k - f)
def _std(self, values):
"""Compute standard deviation without numpy."""
n = len(values)
if n < 2:
return 0
mean = sum(values) / n
variance = sum((x - mean) ** 2 for x in values) / (n - 1)
return math.sqrt(variance)
def fit_transform(self, records, group_keys, value_fields, prefix=None):
"""Compute aggregation features and join back to records.
Args:
records: List of dictionaries
group_keys: List of fields to group by
value_fields: List of numeric fields to aggregate
prefix: Optional prefix for feature names
Returns:
Records with aggregation features added
"""
# Group records
groups = defaultdict(list)
for record in records:
key = tuple(record.get(k) for k in group_keys)
groups[key].append(record)
# Compute aggregates per group
agg_results = {}
for key, group_records in groups.items():
agg = {}
key_str = '_'.join(str(k) for k in group_keys)
pref = prefix or f"agg_{key_str}"
agg[f'{pref}_count'] = len(group_records)
for field in value_fields:
values = [r[field] for r in group_records
if r.get(field) is not None and isinstance(r[field], (int, float))]
if not values:
continue
agg[f'{pref}_{field}_mean'] = round(sum(values) / len(values), 2)
agg[f'{pref}_{field}_sum'] = sum(values)
agg[f'{pref}_{field}_min'] = min(values)
agg[f'{pref}_{field}_max'] = max(values)
agg[f'{pref}_{field}_std'] = round(self._std(values), 2)
agg[f'{pref}_{field}_p25'] = round(self._percentile(values, 25), 2)
agg[f'{pref}_{field}_p50'] = round(self._percentile(values, 50), 2)
agg[f'{pref}_{field}_p75'] = round(self._percentile(values, 75), 2)
agg_results[key] = agg
# Join aggregates back to records
enriched = []
for record in records:
key = tuple(record.get(k) for k in group_keys)
new_record = dict(record)
new_record.update(agg_results.get(key, {}))
enriched.append(new_record)
self.agg_tables[tuple(group_keys)] = agg_results
return enriched
# Test
orders = [
{'user_id': 'u1', 'category': 'electronics', 'amount': 500, 'quantity': 1},
{'user_id': 'u1', 'category': 'electronics', 'amount': 300, 'quantity': 2},
{'user_id': 'u1', 'category': 'books', 'amount': 25, 'quantity': 3},
{'user_id': 'u2', 'category': 'electronics', 'amount': 800, 'quantity': 1},
{'user_id': 'u2', 'category': 'clothing', 'amount': 120, 'quantity': 4},
{'user_id': 'u3', 'category': 'books', 'amount': 45, 'quantity': 2},
{'user_id': 'u3', 'category': 'books', 'amount': 30, 'quantity': 1},
{'user_id': 'u3', 'category': 'electronics', 'amount': 650, 'quantity': 1},
]
gen = AggregationFeatureGenerator()
# User-level aggregations
enriched = gen.fit_transform(orders, group_keys=['user_id'],
value_fields=['amount', 'quantity'], prefix='user')
# Category-level aggregations on top
enriched = gen.fit_transform(enriched, group_keys=['category'],
value_fields=['amount'], prefix='cat')
print("Enriched records (showing key features):")
for r in enriched[:3]:
print(f" user={r['user_id']} cat={r['category']} amount={r['amount']}")
print(f" user_count={r['user_count']} user_amount_mean={r['user_amount_mean']} "
f"user_amount_std={r['user_amount_std']}")
print(f" cat_count={r['cat_count']} cat_amount_mean={r['cat_amount_mean']}")
Complexity: O(n * g * f) where n is records, g is group-by operations, and f is value fields. This mirrors how feature stores like Feast compute aggregate features.
Challenge 5: Interaction Feature Generation
import math
from itertools import combinations
class InteractionFeatureGenerator:
"""Generate interaction features with automatic selection.
Creates: products, ratios, differences, and polynomial features.
Filters: by variance threshold and correlation with existing features.
"""
def __init__(self, variance_threshold=0.01, max_correlation=0.95):
self.variance_threshold = variance_threshold
self.max_correlation = max_correlation
def _variance(self, values):
n = len(values)
if n < 2:
return 0
mean = sum(values) / n
return sum((x - mean) ** 2 for x in values) / (n - 1)
def _correlation(self, x, y):
"""Pearson correlation without numpy."""
n = len(x)
if n < 2:
return 0
mean_x = sum(x) / n
mean_y = sum(y) / n
cov = sum((xi - mean_x) * (yi - mean_y) for xi, yi in zip(x, y)) / (n - 1)
std_x = math.sqrt(sum((xi - mean_x) ** 2 for xi in x) / (n - 1))
std_y = math.sqrt(sum((yi - mean_y) ** 2 for yi in y) / (n - 1))
if std_x == 0 or std_y == 0:
return 0
return cov / (std_x * std_y)
def generate(self, records, numeric_fields, interactions='all'):
"""Generate interaction features.
Args:
records: List of dictionaries
numeric_fields: List of numeric field names
interactions: 'all', 'multiply', 'ratio', 'difference'
Returns:
Records with interaction features added, plus feature report
"""
# Extract numeric values
field_values = {}
for field in numeric_fields:
field_values[field] = [r.get(field, 0) for r in records]
new_features = {}
for f1, f2 in combinations(numeric_fields, 2):
v1 = field_values[f1]
v2 = field_values[f2]
if interactions in ('all', 'multiply'):
name = f'{f1}_x_{f2}'
new_features[name] = [a * b for a, b in zip(v1, v2)]
if interactions in ('all', 'ratio'):
name = f'{f1}_div_{f2}'
new_features[name] = [
a / b if b != 0 else 0 for a, b in zip(v1, v2)
]
name = f'{f2}_div_{f1}'
new_features[name] = [
b / a if a != 0 else 0 for a, b in zip(v1, v2)
]
if interactions in ('all', 'difference'):
name = f'{f1}_minus_{f2}'
new_features[name] = [a - b for a, b in zip(v1, v2)]
# Filter by variance threshold
kept_features = {}
dropped_low_var = []
for name, values in new_features.items():
var = self._variance(values)
if var >= self.variance_threshold:
kept_features[name] = values
else:
dropped_low_var.append((name, var))
# Filter by correlation with existing features
final_features = {}
dropped_corr = []
feature_names = list(kept_features.keys())
for i, name in enumerate(feature_names):
is_redundant = False
for existing_name in final_features:
corr = abs(self._correlation(
kept_features[name], final_features[existing_name]
))
if corr > self.max_correlation:
is_redundant = True
dropped_corr.append((name, existing_name, round(corr, 3)))
break
if not is_redundant:
final_features[name] = kept_features[name]
# Add features to records
enriched = []
for i, record in enumerate(records):
new_record = dict(record)
for name, values in final_features.items():
new_record[name] = round(values[i], 4)
enriched.append(new_record)
report = {
'input_features': len(numeric_fields),
'generated': len(new_features),
'dropped_low_variance': len(dropped_low_var),
'dropped_high_correlation': len(dropped_corr),
'final_features': len(final_features),
'feature_names': list(final_features.keys())
}
return enriched, report
# Test
records = [
{'price': 100, 'quantity': 5, 'weight': 2.5, 'rating': 4.2},
{'price': 200, 'quantity': 3, 'weight': 1.8, 'rating': 4.5},
{'price': 50, 'quantity': 10, 'weight': 3.0, 'rating': 3.8},
{'price': 150, 'quantity': 2, 'weight': 0.5, 'rating': 4.9},
{'price': 80, 'quantity': 7, 'weight': 2.2, 'rating': 4.0},
]
gen = InteractionFeatureGenerator(variance_threshold=0.1, max_correlation=0.95)
enriched, report = gen.generate(records, ['price', 'quantity', 'weight', 'rating'])
print(f"Generated {report['generated']} features")
print(f"Dropped {report['dropped_low_variance']} (low variance)")
print(f"Dropped {report['dropped_high_correlation']} (high correlation)")
print(f"Final features ({report['final_features']}): {report['feature_names']}")
print(f"\nSample enriched record:")
for k, v in enriched[0].items():
print(f" {k}: {v}")
Complexity: O(n * C(f, 2)) for generation where n is records and f is numeric fields. Correlation filtering is O(k^2 * n) where k is generated features. For high-dimensional data, use random projections or mutual information instead of exhaustive correlation checks.
Lilly Tech Systems