Intermediate

Offline Feature Store

Batch computation, point-in-time joins, and historical feature retrieval for model training.

Why Point-in-Time Joins Matter

When training ML models, you need features as they were at the time of each training example. A regular SQL join would leak future data. Feast handles this automatically.

# src/offline.py
from feast import FeatureStore
import pandas as pd
from datetime import datetime, timedelta

store = FeatureStore(repo_path="feature_repo")

def get_training_data(entity_df):
    """Retrieve historical features with point-in-time correctness."""
    training_df = store.get_historical_features(
        entity_df=entity_df,
        features=[
            "driver_stats:conv_rate",
            "driver_stats:acc_rate",
            "driver_stats:avg_daily_trips",
        ],
    ).to_df()
    return training_df

def create_training_set():
    """Generate a training dataset with historical features."""
    now = datetime.now()
    entity_df = pd.DataFrame({
        "driver_id": [1, 2, 3, 4, 5] * 20,
        "event_timestamp": [
            now - timedelta(hours=i*24)
            for i in range(100)
        ],
        "label": [1, 0, 1, 0, 1] * 20,
    })

    training_df = get_training_data(entity_df)
    print(f"Training set: {len(training_df)} rows, "
          f"{len(training_df.columns)} columns")
    print(training_df.head())
    return training_df

if __name__ == "__main__":
    df = create_training_set()
    df.to_parquet("data/training_set.parquet")
    print("Saved training set")

Train a Model with Features

# src/train.py
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import joblib

df = pd.read_parquet("data/training_set.parquet")
features = ["conv_rate", "acc_rate", "avg_daily_trips"]
X = df[features].fillna(0)
y = df["label"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = GradientBoostingClassifier(n_estimators=100)
model.fit(X_train, y_train)

acc = accuracy_score(y_test, model.predict(X_test))
print(f"Accuracy: {acc:.3f}")
joblib.dump(model, "model.joblib")
💡
Point-in-time joins prevent data leakage by ensuring each training example only sees features available at that timestamp. This is critical for time-series and event-driven ML.