Advanced

Feature Engineering Challenges

Feature engineering is where data pipelines meet ML. These 5 challenges cover the feature generation patterns that ML engineers implement in production feature pipelines and interviewers test at senior levels.

Challenge 1: Time-Based Feature Generation

📝
Problem: Given a stream of user transactions with timestamps, generate time-based features for each transaction: time since last transaction, rolling averages (7-day, 30-day), day-of-week patterns, and recency/frequency/monetary (RFM) scores.
from datetime import datetime, timedelta
from collections import defaultdict

def generate_time_features(transactions, user_field='user_id',
                           time_field='timestamp', value_field='amount'):
    """Generate time-based features for each transaction.

    Features generated:
    - time_since_last: seconds since user's previous transaction
    - rolling_avg_7d: average transaction amount over last 7 days
    - rolling_count_7d: number of transactions in last 7 days
    - rolling_avg_30d: average over last 30 days
    - day_of_week: 0=Monday, 6=Sunday
    - hour_of_day: 0-23
    - is_weekend: boolean
    - recency_score: days since first transaction
    - frequency_score: total transaction count so far
    - monetary_score: cumulative total amount
    """
    # Sort by user, then timestamp
    sorted_txns = sorted(transactions, key=lambda t: (t[user_field], t[time_field]))

    # Track per-user state
    user_history = defaultdict(list)  # user -> [(timestamp, amount)]
    results = []

    for txn in sorted_txns:
        user = txn[user_field]
        ts = txn[time_field]
        amount = txn[value_field]
        history = user_history[user]

        features = dict(txn)  # Copy original fields

        # Time since last transaction
        if history:
            features['time_since_last'] = ts - history[-1][0]
        else:
            features['time_since_last'] = 0

        # Rolling windows (7-day and 30-day)
        window_7d = [(t, a) for t, a in history if ts - t <= 7 * 86400]
        window_30d = [(t, a) for t, a in history if ts - t <= 30 * 86400]

        features['rolling_count_7d'] = len(window_7d)
        features['rolling_avg_7d'] = (
            sum(a for _, a in window_7d) / len(window_7d)
            if window_7d else 0
        )
        features['rolling_count_30d'] = len(window_30d)
        features['rolling_avg_30d'] = (
            sum(a for _, a in window_30d) / len(window_30d)
            if window_30d else 0
        )

        # Calendar features
        dt = datetime.fromtimestamp(ts)
        features['day_of_week'] = dt.weekday()
        features['hour_of_day'] = dt.hour
        features['is_weekend'] = dt.weekday() >= 5

        # RFM scores
        if history:
            features['recency_days'] = (ts - history[-1][0]) / 86400
        else:
            features['recency_days'] = 0
        features['frequency'] = len(history) + 1
        features['monetary_total'] = sum(a for _, a in history) + amount

        # Update history
        history.append((ts, amount))
        results.append(features)

    return results

# Test
base_ts = 1704067200  # 2024-01-01 00:00:00 UTC
transactions = [
    {'user_id': 'u1', 'timestamp': base_ts, 'amount': 50},
    {'user_id': 'u1', 'timestamp': base_ts + 86400, 'amount': 75},       # +1 day
    {'user_id': 'u2', 'timestamp': base_ts + 86400, 'amount': 120},
    {'user_id': 'u1', 'timestamp': base_ts + 3 * 86400, 'amount': 30},   # +3 days
    {'user_id': 'u1', 'timestamp': base_ts + 10 * 86400, 'amount': 200}, # +10 days
    {'user_id': 'u2', 'timestamp': base_ts + 15 * 86400, 'amount': 85},  # +15 days
]

features = generate_time_features(transactions)
for f in features:
    print(f"  user={f['user_id']} amount={f['amount']:>5} "
          f"time_since_last={f['time_since_last']:>7} "
          f"roll_7d_avg={f['rolling_avg_7d']:>6.1f} "
          f"freq={f['frequency']} monetary={f['monetary_total']}")

Complexity: O(n * h) where n is transactions and h is average history length per user. For large histories, use a sorted data structure to make window lookups O(log h).

Challenge 2: Categorical Encoding Pipeline

📝
Problem: Implement a categorical encoding pipeline that supports label encoding, one-hot encoding, frequency encoding, and target encoding. The encoder must fit on training data and transform both training and test data consistently, handling unseen categories gracefully.
from collections import Counter, defaultdict

class CategoricalEncoder:
    """Production-ready categorical encoding with multiple strategies.

    Supports: label, onehot, frequency, target encoding.
    Handles unseen categories at inference time.
    """

    def __init__(self, strategy='label', handle_unknown='default'):
        self.strategy = strategy
        self.handle_unknown = handle_unknown  # 'default', 'error', or a value
        self.mappings = {}  # field -> encoding mapping
        self.is_fitted = False

    def fit(self, records, fields, target_field=None):
        """Learn encoding mappings from training data."""
        for field in fields:
            values = [r.get(field) for r in records if r.get(field) is not None]

            if self.strategy == 'label':
                unique = sorted(set(values))
                self.mappings[field] = {v: i for i, v in enumerate(unique)}

            elif self.strategy == 'onehot':
                self.mappings[field] = sorted(set(values))

            elif self.strategy == 'frequency':
                counts = Counter(values)
                total = len(values)
                self.mappings[field] = {v: c / total for v, c in counts.items()}

            elif self.strategy == 'target':
                if target_field is None:
                    raise ValueError("target_field required for target encoding")
                # Group target values by category
                cat_targets = defaultdict(list)
                for r in records:
                    cat = r.get(field)
                    target = r.get(target_field)
                    if cat is not None and target is not None:
                        cat_targets[cat].append(target)
                # Mean target per category (with smoothing)
                global_mean = sum(r.get(target_field, 0) for r in records) / len(records)
                smoothing = 10  # Regularization parameter
                self.mappings[field] = {}
                for cat, targets in cat_targets.items():
                    n = len(targets)
                    cat_mean = sum(targets) / n
                    # Bayesian smoothing: blend category mean with global mean
                    smoothed = (n * cat_mean + smoothing * global_mean) / (n + smoothing)
                    self.mappings[field][cat] = round(smoothed, 4)
                self.mappings[f'{field}__global_mean'] = global_mean

        self.is_fitted = True
        return self

    def transform(self, records, fields):
        """Apply learned encodings to records."""
        if not self.is_fitted:
            raise ValueError("Must call fit() before transform()")

        results = []
        for record in records:
            encoded = dict(record)

            for field in fields:
                value = record.get(field)

                if self.strategy == 'label':
                    mapping = self.mappings[field]
                    if value in mapping:
                        encoded[f'{field}_encoded'] = mapping[value]
                    else:
                        encoded[f'{field}_encoded'] = -1  # Unknown

                elif self.strategy == 'onehot':
                    categories = self.mappings[field]
                    for cat in categories:
                        encoded[f'{field}_{cat}'] = 1 if value == cat else 0

                elif self.strategy == 'frequency':
                    mapping = self.mappings[field]
                    encoded[f'{field}_freq'] = mapping.get(value, 0)

                elif self.strategy == 'target':
                    mapping = self.mappings[field]
                    global_mean = self.mappings.get(f'{field}__global_mean', 0)
                    encoded[f'{field}_target'] = mapping.get(value, global_mean)

            results.append(encoded)
        return results

# Test all strategies
train_data = [
    {'city': 'NYC', 'color': 'red', 'purchased': 1},
    {'city': 'NYC', 'color': 'blue', 'purchased': 1},
    {'city': 'LA', 'color': 'red', 'purchased': 0},
    {'city': 'LA', 'color': 'green', 'purchased': 0},
    {'city': 'Chicago', 'color': 'blue', 'purchased': 1},
    {'city': 'NYC', 'color': 'red', 'purchased': 0},
]

test_data = [
    {'city': 'NYC', 'color': 'red', 'purchased': None},
    {'city': 'Boston', 'color': 'blue', 'purchased': None},  # Unseen city
]

fields = ['city', 'color']

# Label encoding
enc = CategoricalEncoder(strategy='label').fit(train_data, fields)
result = enc.transform(test_data, fields)
print("Label encoding:")
for r in result:
    print(f"  {r['city']} -> {r['city_encoded']}, {r['color']} -> {r['color_encoded']}")

# Frequency encoding
enc = CategoricalEncoder(strategy='frequency').fit(train_data, fields)
result = enc.transform(test_data, fields)
print("\nFrequency encoding:")
for r in result:
    print(f"  {r['city']} -> {r['city_freq']:.3f}, {r['color']} -> {r['color_freq']:.3f}")

# Target encoding
enc = CategoricalEncoder(strategy='target').fit(train_data, fields, target_field='purchased')
result = enc.transform(test_data, fields)
print("\nTarget encoding:")
for r in result:
    print(f"  {r['city']} -> {r['city_target']:.4f}, {r['color']} -> {r['color_target']:.4f}")

Complexity: Fit: O(n * f) where n is records and f is fields. Transform: O(n * f) for label/frequency/target, O(n * f * c) for one-hot where c is unique categories per field.

Challenge 3: Text Feature Extraction

📝
Problem: Implement a text feature extraction pipeline that generates: bag-of-words counts, TF-IDF scores, n-gram features, and basic text statistics (length, word count, special character ratio). Do not use sklearn — implement from scratch.
import math
import re
from collections import Counter, defaultdict

class TextFeatureExtractor:
    """Extract numerical features from text without external libraries."""

    def __init__(self, max_features=100, ngram_range=(1, 1)):
        self.max_features = max_features
        self.ngram_range = ngram_range
        self.vocabulary = {}     # term -> index
        self.idf_scores = {}     # term -> IDF score
        self.is_fitted = False

    def _tokenize(self, text):
        """Simple whitespace + punctuation tokenizer."""
        text = text.lower().strip()
        tokens = re.findall(r'\b[a-z0-9]+\b', text)
        return tokens

    def _get_ngrams(self, tokens):
        """Generate n-grams from tokens."""
        ngrams = []
        for n in range(self.ngram_range[0], self.ngram_range[1] + 1):
            for i in range(len(tokens) - n + 1):
                ngrams.append('_'.join(tokens[i:i + n]))
        return ngrams

    def _text_stats(self, text):
        """Compute basic text statistics."""
        return {
            'char_count': len(text),
            'word_count': len(text.split()),
            'avg_word_len': (sum(len(w) for w in text.split()) /
                           max(len(text.split()), 1)),
            'uppercase_ratio': sum(1 for c in text if c.isupper()) / max(len(text), 1),
            'digit_ratio': sum(1 for c in text if c.isdigit()) / max(len(text), 1),
            'special_char_ratio': sum(1 for c in text if not c.isalnum() and c != ' ') / max(len(text), 1),
            'sentence_count': len(re.split(r'[.!?]+', text.strip())),
        }

    def fit(self, texts):
        """Learn vocabulary and IDF scores from corpus."""
        doc_count = len(texts)
        term_doc_freq = Counter()  # How many docs contain each term
        term_freq_total = Counter()  # Total term frequency across all docs

        for text in texts:
            tokens = self._tokenize(text)
            ngrams = self._get_ngrams(tokens)
            term_freq_total.update(ngrams)
            # Document frequency (unique terms per doc)
            term_doc_freq.update(set(ngrams))

        # Select top features by total frequency
        top_terms = term_freq_total.most_common(self.max_features)
        self.vocabulary = {term: idx for idx, (term, _) in enumerate(top_terms)}

        # Compute IDF: log(N / (1 + df))  (add 1 to prevent division by zero)
        self.idf_scores = {
            term: math.log(doc_count / (1 + term_doc_freq.get(term, 0)))
            for term in self.vocabulary
        }

        self.is_fitted = True
        return self

    def transform(self, texts, mode='tfidf'):
        """Transform texts into feature vectors.

        Modes: 'bow' (bag of words), 'tfidf', 'all' (tfidf + stats)
        """
        if not self.is_fitted:
            raise ValueError("Must call fit() first")

        results = []
        for text in texts:
            tokens = self._tokenize(text)
            ngrams = self._get_ngrams(tokens)
            term_counts = Counter(ngrams)

            features = {}

            if mode in ('bow', 'all'):
                for term, idx in self.vocabulary.items():
                    features[f'bow_{term}'] = term_counts.get(term, 0)

            if mode in ('tfidf', 'all'):
                total_terms = max(len(ngrams), 1)
                for term, idx in self.vocabulary.items():
                    tf = term_counts.get(term, 0) / total_terms
                    idf = self.idf_scores.get(term, 0)
                    features[f'tfidf_{term}'] = round(tf * idf, 4)

            if mode == 'all':
                stats = self._text_stats(text)
                features.update({f'stat_{k}': v for k, v in stats.items()})

            results.append(features)

        return results

# Test
corpus = [
    "The data pipeline processes streaming events in real time",
    "ETL pipelines extract transform and load data into warehouses",
    "Streaming data requires windowing and watermark handling",
    "Data validation ensures quality in production pipelines",
    "Feature engineering transforms raw data into ML features",
]

extractor = TextFeatureExtractor(max_features=15, ngram_range=(1, 2)).fit(corpus)

test_texts = [
    "This pipeline handles streaming data events",
    "Data quality validation is critical for ML",
]

results = extractor.transform(test_texts, mode='all')
for i, features in enumerate(results):
    print(f"\nText {i + 1}:")
    tfidf = {k: v for k, v in features.items() if k.startswith('tfidf_') and v > 0}
    stats = {k: v for k, v in features.items() if k.startswith('stat_')}
    print(f"  Top TF-IDF: {dict(sorted(tfidf.items(), key=lambda x: -x[1])[:5])}")
    print(f"  Stats: {stats}")

Complexity: Fit: O(D * T) where D is documents and T is average tokens per document. Transform: O(T * V) per document where V is vocabulary size.

Challenge 4: Aggregation Feature Generator

📝
Problem: Build a feature generator that creates aggregation features by grouping data on specified keys and computing statistics (count, sum, mean, std, min, max, percentiles) for numeric columns. Support multiple group-by levels and cross-join features.
from collections import defaultdict
import math

class AggregationFeatureGenerator:
    """Generate aggregation-based features for ML pipelines.

    Creates group-level statistics that can be joined back to individual records.
    This is the most common feature engineering pattern in industry.
    """

    def __init__(self):
        self.agg_tables = {}  # key -> aggregation results

    def _percentile(self, values, p):
        """Compute percentile without numpy."""
        sorted_vals = sorted(values)
        k = (len(sorted_vals) - 1) * (p / 100)
        f = math.floor(k)
        c = math.ceil(k)
        if f == c:
            return sorted_vals[int(k)]
        return sorted_vals[f] * (c - k) + sorted_vals[c] * (k - f)

    def _std(self, values):
        """Compute standard deviation without numpy."""
        n = len(values)
        if n < 2:
            return 0
        mean = sum(values) / n
        variance = sum((x - mean) ** 2 for x in values) / (n - 1)
        return math.sqrt(variance)

    def fit_transform(self, records, group_keys, value_fields, prefix=None):
        """Compute aggregation features and join back to records.

        Args:
            records: List of dictionaries
            group_keys: List of fields to group by
            value_fields: List of numeric fields to aggregate
            prefix: Optional prefix for feature names

        Returns:
            Records with aggregation features added
        """
        # Group records
        groups = defaultdict(list)
        for record in records:
            key = tuple(record.get(k) for k in group_keys)
            groups[key].append(record)

        # Compute aggregates per group
        agg_results = {}
        for key, group_records in groups.items():
            agg = {}
            key_str = '_'.join(str(k) for k in group_keys)
            pref = prefix or f"agg_{key_str}"

            agg[f'{pref}_count'] = len(group_records)

            for field in value_fields:
                values = [r[field] for r in group_records
                         if r.get(field) is not None and isinstance(r[field], (int, float))]
                if not values:
                    continue

                agg[f'{pref}_{field}_mean'] = round(sum(values) / len(values), 2)
                agg[f'{pref}_{field}_sum'] = sum(values)
                agg[f'{pref}_{field}_min'] = min(values)
                agg[f'{pref}_{field}_max'] = max(values)
                agg[f'{pref}_{field}_std'] = round(self._std(values), 2)
                agg[f'{pref}_{field}_p25'] = round(self._percentile(values, 25), 2)
                agg[f'{pref}_{field}_p50'] = round(self._percentile(values, 50), 2)
                agg[f'{pref}_{field}_p75'] = round(self._percentile(values, 75), 2)

            agg_results[key] = agg

        # Join aggregates back to records
        enriched = []
        for record in records:
            key = tuple(record.get(k) for k in group_keys)
            new_record = dict(record)
            new_record.update(agg_results.get(key, {}))
            enriched.append(new_record)

        self.agg_tables[tuple(group_keys)] = agg_results
        return enriched

# Test
orders = [
    {'user_id': 'u1', 'category': 'electronics', 'amount': 500, 'quantity': 1},
    {'user_id': 'u1', 'category': 'electronics', 'amount': 300, 'quantity': 2},
    {'user_id': 'u1', 'category': 'books', 'amount': 25, 'quantity': 3},
    {'user_id': 'u2', 'category': 'electronics', 'amount': 800, 'quantity': 1},
    {'user_id': 'u2', 'category': 'clothing', 'amount': 120, 'quantity': 4},
    {'user_id': 'u3', 'category': 'books', 'amount': 45, 'quantity': 2},
    {'user_id': 'u3', 'category': 'books', 'amount': 30, 'quantity': 1},
    {'user_id': 'u3', 'category': 'electronics', 'amount': 650, 'quantity': 1},
]

gen = AggregationFeatureGenerator()

# User-level aggregations
enriched = gen.fit_transform(orders, group_keys=['user_id'],
                              value_fields=['amount', 'quantity'], prefix='user')

# Category-level aggregations on top
enriched = gen.fit_transform(enriched, group_keys=['category'],
                              value_fields=['amount'], prefix='cat')

print("Enriched records (showing key features):")
for r in enriched[:3]:
    print(f"  user={r['user_id']} cat={r['category']} amount={r['amount']}")
    print(f"    user_count={r['user_count']} user_amount_mean={r['user_amount_mean']} "
          f"user_amount_std={r['user_amount_std']}")
    print(f"    cat_count={r['cat_count']} cat_amount_mean={r['cat_amount_mean']}")

Complexity: O(n * g * f) where n is records, g is group-by operations, and f is value fields. This mirrors how feature stores like Feast compute aggregate features.

Challenge 5: Interaction Feature Generation

📝
Problem: Implement an interaction feature generator that creates polynomial interactions, ratio features, and difference features between numeric columns. Include feature selection based on variance threshold and correlation filtering.
import math
from itertools import combinations

class InteractionFeatureGenerator:
    """Generate interaction features with automatic selection.

    Creates: products, ratios, differences, and polynomial features.
    Filters: by variance threshold and correlation with existing features.
    """

    def __init__(self, variance_threshold=0.01, max_correlation=0.95):
        self.variance_threshold = variance_threshold
        self.max_correlation = max_correlation

    def _variance(self, values):
        n = len(values)
        if n < 2:
            return 0
        mean = sum(values) / n
        return sum((x - mean) ** 2 for x in values) / (n - 1)

    def _correlation(self, x, y):
        """Pearson correlation without numpy."""
        n = len(x)
        if n < 2:
            return 0
        mean_x = sum(x) / n
        mean_y = sum(y) / n
        cov = sum((xi - mean_x) * (yi - mean_y) for xi, yi in zip(x, y)) / (n - 1)
        std_x = math.sqrt(sum((xi - mean_x) ** 2 for xi in x) / (n - 1))
        std_y = math.sqrt(sum((yi - mean_y) ** 2 for yi in y) / (n - 1))
        if std_x == 0 or std_y == 0:
            return 0
        return cov / (std_x * std_y)

    def generate(self, records, numeric_fields, interactions='all'):
        """Generate interaction features.

        Args:
            records: List of dictionaries
            numeric_fields: List of numeric field names
            interactions: 'all', 'multiply', 'ratio', 'difference'

        Returns:
            Records with interaction features added, plus feature report
        """
        # Extract numeric values
        field_values = {}
        for field in numeric_fields:
            field_values[field] = [r.get(field, 0) for r in records]

        new_features = {}

        for f1, f2 in combinations(numeric_fields, 2):
            v1 = field_values[f1]
            v2 = field_values[f2]

            if interactions in ('all', 'multiply'):
                name = f'{f1}_x_{f2}'
                new_features[name] = [a * b for a, b in zip(v1, v2)]

            if interactions in ('all', 'ratio'):
                name = f'{f1}_div_{f2}'
                new_features[name] = [
                    a / b if b != 0 else 0 for a, b in zip(v1, v2)
                ]
                name = f'{f2}_div_{f1}'
                new_features[name] = [
                    b / a if a != 0 else 0 for a, b in zip(v1, v2)
                ]

            if interactions in ('all', 'difference'):
                name = f'{f1}_minus_{f2}'
                new_features[name] = [a - b for a, b in zip(v1, v2)]

        # Filter by variance threshold
        kept_features = {}
        dropped_low_var = []
        for name, values in new_features.items():
            var = self._variance(values)
            if var >= self.variance_threshold:
                kept_features[name] = values
            else:
                dropped_low_var.append((name, var))

        # Filter by correlation with existing features
        final_features = {}
        dropped_corr = []
        feature_names = list(kept_features.keys())

        for i, name in enumerate(feature_names):
            is_redundant = False
            for existing_name in final_features:
                corr = abs(self._correlation(
                    kept_features[name], final_features[existing_name]
                ))
                if corr > self.max_correlation:
                    is_redundant = True
                    dropped_corr.append((name, existing_name, round(corr, 3)))
                    break
            if not is_redundant:
                final_features[name] = kept_features[name]

        # Add features to records
        enriched = []
        for i, record in enumerate(records):
            new_record = dict(record)
            for name, values in final_features.items():
                new_record[name] = round(values[i], 4)
            enriched.append(new_record)

        report = {
            'input_features': len(numeric_fields),
            'generated': len(new_features),
            'dropped_low_variance': len(dropped_low_var),
            'dropped_high_correlation': len(dropped_corr),
            'final_features': len(final_features),
            'feature_names': list(final_features.keys())
        }

        return enriched, report

# Test
records = [
    {'price': 100, 'quantity': 5, 'weight': 2.5, 'rating': 4.2},
    {'price': 200, 'quantity': 3, 'weight': 1.8, 'rating': 4.5},
    {'price': 50, 'quantity': 10, 'weight': 3.0, 'rating': 3.8},
    {'price': 150, 'quantity': 2, 'weight': 0.5, 'rating': 4.9},
    {'price': 80, 'quantity': 7, 'weight': 2.2, 'rating': 4.0},
]

gen = InteractionFeatureGenerator(variance_threshold=0.1, max_correlation=0.95)
enriched, report = gen.generate(records, ['price', 'quantity', 'weight', 'rating'])

print(f"Generated {report['generated']} features")
print(f"Dropped {report['dropped_low_variance']} (low variance)")
print(f"Dropped {report['dropped_high_correlation']} (high correlation)")
print(f"Final features ({report['final_features']}): {report['feature_names']}")
print(f"\nSample enriched record:")
for k, v in enriched[0].items():
    print(f"  {k}: {v}")

Complexity: O(n * C(f, 2)) for generation where n is records and f is numeric fields. Correlation filtering is O(k^2 * n) where k is generated features. For high-dimensional data, use random projections or mutual information instead of exhaustive correlation checks.

💡
Interview tip: When discussing feature engineering, always mention the risk of data leakage. Aggregation features must be computed on training data only, then applied to test data. Target encoding must use cross-validation folds to prevent overfitting. Interviewers specifically look for this awareness.