ML Pipelines & Automation
Master Delta Live Tables, Databricks Workflows, and CI/CD patterns for production ML — covering approximately 20% of the exam (ML Pipelines & Production domain).
Delta Live Tables (DLT)
Delta Live Tables is a declarative framework for building reliable, maintainable data pipelines. For ML, DLT is commonly used to prepare and transform feature data before it enters the Feature Store or training pipeline.
DLT Core Concepts
- Live Table — A materialized view defined with
@dlt.tablethat is recomputed from scratch on each pipeline run - Streaming Live Table — An incrementally updated table defined with
@dlt.tablethat reads from a streaming source - Expectations — Data quality constraints that can warn, drop invalid rows, or fail the pipeline
- Pipeline — A DAG of tables and views that DLT manages as a unit
DLT for ML Feature Preparation
import dlt
from pyspark.sql.functions import col, avg, count, datediff, current_date
@dlt.table(
comment="Raw customer events from streaming source",
table_properties={"quality": "bronze"}
)
def raw_customer_events():
return spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/data/customer_events/")
@dlt.table(
comment="Cleaned customer events with quality checks",
table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_amount", "amount > 0")
def cleaned_customer_events():
return dlt.read_stream("raw_customer_events").select(
col("customer_id"),
col("event_type"),
col("amount").cast("double"),
col("event_timestamp").cast("timestamp")
)
@dlt.table(
comment="Aggregated customer features for ML",
table_properties={"quality": "gold"}
)
def customer_ml_features():
return dlt.read("cleaned_customer_events").groupBy("customer_id").agg(
count("*").alias("total_events"),
avg("amount").alias("avg_amount"),
count("event_type").alias("event_count")
)
@dlt.expect() (warn only, keep row), @dlt.expect_or_drop() (drop invalid rows), and @dlt.expect_or_fail() (fail the entire pipeline). The exam tests which to use in different data quality scenarios.Databricks Workflows
Databricks Workflows is the orchestration service for scheduling and managing multi-task jobs. For ML, it orchestrates the full pipeline from data prep to model training to deployment.
Workflow Task Types
- Notebook task — Runs a Databricks notebook (most common for ML)
- Python script task — Runs a Python file from a repo or workspace
- DLT pipeline task — Triggers a Delta Live Tables pipeline
- SQL task — Runs a SQL query or dashboard refresh
- JAR task — Runs a Java/Scala JAR file
- Conditional task — Branches based on conditions (e.g., model accuracy threshold)
ML Workflow Pattern
A typical production ML workflow has these tasks in sequence:
# Typical multi-task ML workflow structure:
#
# Task 1: feature_pipeline (DLT pipeline task)
# - Runs DLT to refresh feature tables
# - Triggers: daily schedule
#
# Task 2: train_model (Notebook task, depends on Task 1)
# - Trains model using Feature Store
# - Logs to MLflow
# - Registers new model version
#
# Task 3: evaluate_model (Notebook task, depends on Task 2)
# - Compares new model vs champion
# - Sets task value: "promote" or "skip"
#
# Task 4: deploy_model (Conditional + Notebook, depends on Task 3)
# - IF evaluate_model output == "promote":
# - Transition new version to Production
# - Update serving endpoint
# - ELSE: skip deployment
Task Values for Communication
# In the training notebook (Task 2):
dbutils.jobs.taskValues.set(
key="new_model_version",
value=5
)
dbutils.jobs.taskValues.set(
key="new_model_accuracy",
value=0.94
)
# In the evaluation notebook (Task 3):
new_version = dbutils.jobs.taskValues.get(
taskKey="train_model",
key="new_model_version"
)
new_accuracy = dbutils.jobs.taskValues.get(
taskKey="train_model",
key="new_model_accuracy"
)
dbutils.jobs.taskValues is the mechanism for passing data between tasks in a workflow. The taskKey parameter references the task name (not the notebook name). This is frequently tested. Also know that task values must be JSON-serializable (strings, numbers, booleans, lists, dicts).CI/CD for ML
Production ML requires version-controlled code, automated testing, and controlled deployment. Databricks supports Git integration and Repos for source control.
Databricks Repos
- Git integration — Clone, pull, push, and branch directly in the Databricks workspace
- Supported providers — GitHub, GitLab, Azure DevOps, Bitbucket
- Branch-based development — Develop on feature branches, merge to main for production
ML CI/CD Pipeline Pattern
# Typical CI/CD pipeline for ML on Databricks:
#
# 1. DEVELOP (feature branch)
# - Data scientist develops in a Databricks notebook or IDE
# - Commits code to feature branch
#
# 2. CI (automated on PR)
# - Run unit tests (pytest on model code)
# - Run integration tests (small-scale training on sample data)
# - Validate model performance meets threshold
# - Code review and approval
#
# 3. CD (automated on merge to main)
# - Deploy updated notebooks/code to production workspace
# - Update Workflow job definitions via Databricks CLI/API
# - Trigger model retraining workflow
# - Validate deployment via smoke tests
#
# 4. MONITOR (continuous)
# - Track inference table metrics
# - Alert on data drift or performance degradation
# - Trigger retraining if needed
Databricks CLI and REST API
# Deploy a workflow using Databricks CLI
# databricks jobs create --json @job-config.json
# Update an existing workflow
# databricks jobs reset --job-id 12345 --json @updated-config.json
# Trigger a workflow run
# databricks jobs run-now --job-id 12345
# Deploy a bundle (Databricks Asset Bundles)
# databricks bundle deploy --target production
databricks bundle deploy is the deployment command.Monitoring and Alerting
Inference Tables
Model Serving endpoints can automatically log all requests and responses to a Delta table:
- Request logging — Input features, timestamps, request metadata
- Response logging — Predictions, latency, status codes
- Use cases — Drift detection, debugging, compliance, model monitoring
Lakehouse Monitoring
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Create a monitor on the inference table
monitor = w.quality_monitors.create(
table_name="ml_serving.churn_predictor_inference",
output_schema_name="ml_monitoring",
schedule=MonitorCronSchedule(
quartz_cron_expression="0 0 * * * ?", # hourly
timezone_id="UTC"
),
inference_log=InferenceLog(
problem_type="PROBLEM_TYPE_CLASSIFICATION",
prediction_col="prediction",
timestamp_col="timestamp",
model_id_col="model_version"
)
)
Practice Questions
Question 1 — Delta Live Tables
customer_id is null without failing the pipeline. Which expectation should they use?A)
@dlt.expect("valid_id", "customer_id IS NOT NULL")B)
@dlt.expect_or_drop("valid_id", "customer_id IS NOT NULL")C)
@dlt.expect_or_fail("valid_id", "customer_id IS NOT NULL")D)
@dlt.expect_all({"valid_id": "customer_id IS NOT NULL"})Answer: B —
@dlt.expect_or_drop() drops rows that violate the constraint without failing the pipeline. @dlt.expect() (option A) only logs a warning but keeps invalid rows. @dlt.expect_or_fail() (option C) would fail the entire pipeline. @dlt.expect_all() (option D) is a warn-only variant for multiple constraints.
Question 2 — Workflows
A) Write the version to a Delta table and read it in the next task
B) Use
dbutils.jobs.taskValues.set(key="version", value=5) in the training task and dbutils.jobs.taskValues.get(taskKey="training_task", key="version") in the deployment taskC) Use environment variables to pass data between tasks
D) Use MLflow tags to communicate between tasks
Answer: B —
dbutils.jobs.taskValues is the built-in mechanism for passing data between workflow tasks. It is lightweight and purpose-built for inter-task communication. While option A would work, it is unnecessarily complex. Environment variables (C) are not shared between tasks. MLflow tags (D) are for metadata, not inter-task communication.
Question 3 — CI/CD
A) Manually pull the latest code in the production workspace after each merge
B) Use Databricks Asset Bundles with a CI/CD pipeline that runs
databricks bundle deploy --target production on mergeC) Set up a cron job on the cluster to periodically pull from Git
D) Use Databricks SQL to deploy the code
Answer: B — Databricks Asset Bundles (DABs) combined with a CI/CD pipeline (GitHub Actions, Azure DevOps, etc.) is the recommended deployment pattern. When code is merged to main, the CI/CD pipeline deploys the bundle to the production workspace automatically. This provides version control, repeatability, and auditability.
Question 4 — Monitoring
A) MLflow Model Registry tags
B) Databricks Lakehouse Monitoring with drift detection on the inference table
C) Spark Structured Streaming with custom drift calculations
D) Manual comparison of training and serving data statistics
Answer: B — Lakehouse Monitoring automatically profiles data and detects statistical drift between baseline (training) and current (serving) distributions. It generates drift metrics and can trigger alerts. The inference table logs all requests, providing the data needed for monitoring. This is the purpose-built Databricks solution for this problem.
Question 5 — DLT Streaming
A) Use
spark.read.format("json") in a @dlt.table functionB) Use
spark.readStream.format("cloudFiles") in a @dlt.table functionC) Use
dlt.read("source_table") with a batch readD) Use
spark.read.format("delta") with a scheduled triggerAnswer: B — Auto Loader (
cloudFiles) with Structured Streaming in a DLT table incrementally ingests new files. spark.readStream.format("cloudFiles") discovers and processes new files as they arrive, tracking which files have been processed. Option A would reprocess all files every run. Options C and D do not handle incremental file ingestion.