Advanced
ML on Lakehouse
Run end-to-end ML workflows on the lakehouse: feature engineering from Delta tables, model training with MLflow, and serving predictions.
Feature Engineering from Delta Tables
Python — Feature Engineering on Lakehouse
import mlflow
from pyspark.sql import functions as F
# Read Gold-layer feature table
users = spark.read.table("ml_project.gold.user_features")
transactions = spark.read.table("ml_project.silver.transactions")
# Compute time-based features
features = transactions \
.withColumn("days_since_first", F.datediff(F.current_date(), "first_tx_date")) \
.groupBy("user_id") \
.agg(
F.count("*").alias("tx_count_30d"),
F.sum("amount").alias("total_spend_30d"),
F.avg("amount").alias("avg_tx_amount"),
F.stddev("amount").alias("std_tx_amount"),
F.countDistinct("merchant_id").alias("unique_merchants"),
F.max("amount").alias("max_tx_amount")
)
# Join with user features
ml_dataset = features.join(users, "user_id", "left")
# Save as a feature table
ml_dataset.write.format("delta") \
.mode("overwrite") \
.saveAsTable("ml_project.ml_features.training_set_v1")
Training with MLflow on Lakehouse
Python — Model Training with MLflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, precision_recall_fscore_support
mlflow.set_experiment("/ml-project/churn-prediction")
# Load training data from Delta
train_df = spark.read.table("ml_project.ml_features.training_set_v1")
train_pd = train_df.toPandas()
X = train_pd.drop(columns=["user_id", "label"])
y = train_pd["label"]
# Log the Delta table version for reproducibility
delta_version = spark.sql(
"DESCRIBE HISTORY ml_project.ml_features.training_set_v1 LIMIT 1"
).select("version").first()[0]
with mlflow.start_run(run_name="gbt-churn-v1"):
mlflow.log_param("data_version", delta_version)
mlflow.log_param("data_table", "ml_project.ml_features.training_set_v1")
mlflow.log_param("n_features", X.shape[1])
model = GradientBoostingClassifier(
n_estimators=200, max_depth=6, learning_rate=0.1
)
model.fit(X_train, y_train)
predictions = model.predict_proba(X_test)[:, 1]
auc = roc_auc_score(y_test, predictions)
mlflow.log_metric("auc_roc", auc)
mlflow.sklearn.log_model(model, "model",
registered_model_name="churn-prediction"
)
Batch Predictions to Delta
Python — Writing Predictions Back to Lakehouse
import mlflow.pyfunc
# Load the production model
model_uri = "models:/churn-prediction/Production"
model = mlflow.pyfunc.spark_udf(spark, model_uri)
# Score all users
scoring_df = spark.read.table("ml_project.ml_features.training_set_v1")
feature_cols = [c for c in scoring_df.columns if c not in ["user_id", "label"]]
predictions = scoring_df.withColumn(
"churn_probability",
model(*feature_cols)
)
# Write predictions to Delta
predictions.select("user_id", "churn_probability", F.current_timestamp().alias("scored_at")) \
.write.format("delta") \
.mode("overwrite") \
.saveAsTable("ml_project.gold.churn_predictions")
Model Serving
- Batch serving: Schedule prediction jobs that read from Delta, score with the model, and write results back to Delta.
- Real-time serving: Deploy models as REST endpoints using Databricks Model Serving or MLflow Model Serving.
- Streaming inference: Use Spark Structured Streaming to score events as they arrive, writing predictions to Delta.
Feature and model versioning: Always link your model to the exact Delta table version used for training. This creates a complete audit trail from raw data to bronze to gold features to the trained model to predictions.
Lilly Tech Systems