Advanced

ML on Lakehouse

Run end-to-end ML workflows on the lakehouse: feature engineering from Delta tables, model training with MLflow, and serving predictions.

Feature Engineering from Delta Tables

Python — Feature Engineering on Lakehouse
import mlflow
from pyspark.sql import functions as F

# Read Gold-layer feature table
users = spark.read.table("ml_project.gold.user_features")
transactions = spark.read.table("ml_project.silver.transactions")

# Compute time-based features
features = transactions \
    .withColumn("days_since_first", F.datediff(F.current_date(), "first_tx_date")) \
    .groupBy("user_id") \
    .agg(
        F.count("*").alias("tx_count_30d"),
        F.sum("amount").alias("total_spend_30d"),
        F.avg("amount").alias("avg_tx_amount"),
        F.stddev("amount").alias("std_tx_amount"),
        F.countDistinct("merchant_id").alias("unique_merchants"),
        F.max("amount").alias("max_tx_amount")
    )

# Join with user features
ml_dataset = features.join(users, "user_id", "left")

# Save as a feature table
ml_dataset.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("ml_project.ml_features.training_set_v1")

Training with MLflow on Lakehouse

Python — Model Training with MLflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, precision_recall_fscore_support

mlflow.set_experiment("/ml-project/churn-prediction")

# Load training data from Delta
train_df = spark.read.table("ml_project.ml_features.training_set_v1")
train_pd = train_df.toPandas()

X = train_pd.drop(columns=["user_id", "label"])
y = train_pd["label"]

# Log the Delta table version for reproducibility
delta_version = spark.sql(
    "DESCRIBE HISTORY ml_project.ml_features.training_set_v1 LIMIT 1"
).select("version").first()[0]

with mlflow.start_run(run_name="gbt-churn-v1"):
    mlflow.log_param("data_version", delta_version)
    mlflow.log_param("data_table", "ml_project.ml_features.training_set_v1")
    mlflow.log_param("n_features", X.shape[1])

    model = GradientBoostingClassifier(
        n_estimators=200, max_depth=6, learning_rate=0.1
    )
    model.fit(X_train, y_train)

    predictions = model.predict_proba(X_test)[:, 1]
    auc = roc_auc_score(y_test, predictions)

    mlflow.log_metric("auc_roc", auc)
    mlflow.sklearn.log_model(model, "model",
        registered_model_name="churn-prediction"
    )

Batch Predictions to Delta

Python — Writing Predictions Back to Lakehouse
import mlflow.pyfunc

# Load the production model
model_uri = "models:/churn-prediction/Production"
model = mlflow.pyfunc.spark_udf(spark, model_uri)

# Score all users
scoring_df = spark.read.table("ml_project.ml_features.training_set_v1")
feature_cols = [c for c in scoring_df.columns if c not in ["user_id", "label"]]

predictions = scoring_df.withColumn(
    "churn_probability",
    model(*feature_cols)
)

# Write predictions to Delta
predictions.select("user_id", "churn_probability", F.current_timestamp().alias("scored_at")) \
    .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("ml_project.gold.churn_predictions")

Model Serving

  • Batch serving: Schedule prediction jobs that read from Delta, score with the model, and write results back to Delta.
  • Real-time serving: Deploy models as REST endpoints using Databricks Model Serving or MLflow Model Serving.
  • Streaming inference: Use Spark Structured Streaming to score events as they arrive, writing predictions to Delta.
Feature and model versioning: Always link your model to the exact Delta table version used for training. This creates a complete audit trail from raw data to bronze to gold features to the trained model to predictions.