Advanced

ML Pipelines & Automation

Master Delta Live Tables, Databricks Workflows, and CI/CD patterns for production ML — covering approximately 20% of the exam (ML Pipelines & Production domain).

Delta Live Tables (DLT)

Delta Live Tables is a declarative framework for building reliable, maintainable data pipelines. For ML, DLT is commonly used to prepare and transform feature data before it enters the Feature Store or training pipeline.

DLT Core Concepts

  • Live Table — A materialized view defined with @dlt.table that is recomputed from scratch on each pipeline run
  • Streaming Live Table — An incrementally updated table defined with @dlt.table that reads from a streaming source
  • Expectations — Data quality constraints that can warn, drop invalid rows, or fail the pipeline
  • Pipeline — A DAG of tables and views that DLT manages as a unit

DLT for ML Feature Preparation

import dlt
from pyspark.sql.functions import col, avg, count, datediff, current_date

@dlt.table(
    comment="Raw customer events from streaming source",
    table_properties={"quality": "bronze"}
)
def raw_customer_events():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .load("/data/customer_events/")

@dlt.table(
    comment="Cleaned customer events with quality checks",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_amount", "amount > 0")
def cleaned_customer_events():
    return dlt.read_stream("raw_customer_events").select(
        col("customer_id"),
        col("event_type"),
        col("amount").cast("double"),
        col("event_timestamp").cast("timestamp")
    )

@dlt.table(
    comment="Aggregated customer features for ML",
    table_properties={"quality": "gold"}
)
def customer_ml_features():
    return dlt.read("cleaned_customer_events").groupBy("customer_id").agg(
        count("*").alias("total_events"),
        avg("amount").alias("avg_amount"),
        count("event_type").alias("event_count")
    )
💡
Exam tip: Know the three expectation actions: @dlt.expect() (warn only, keep row), @dlt.expect_or_drop() (drop invalid rows), and @dlt.expect_or_fail() (fail the entire pipeline). The exam tests which to use in different data quality scenarios.

Databricks Workflows

Databricks Workflows is the orchestration service for scheduling and managing multi-task jobs. For ML, it orchestrates the full pipeline from data prep to model training to deployment.

Workflow Task Types

  • Notebook task — Runs a Databricks notebook (most common for ML)
  • Python script task — Runs a Python file from a repo or workspace
  • DLT pipeline task — Triggers a Delta Live Tables pipeline
  • SQL task — Runs a SQL query or dashboard refresh
  • JAR task — Runs a Java/Scala JAR file
  • Conditional task — Branches based on conditions (e.g., model accuracy threshold)

ML Workflow Pattern

A typical production ML workflow has these tasks in sequence:

# Typical multi-task ML workflow structure:
#
# Task 1: feature_pipeline (DLT pipeline task)
#   - Runs DLT to refresh feature tables
#   - Triggers: daily schedule
#
# Task 2: train_model (Notebook task, depends on Task 1)
#   - Trains model using Feature Store
#   - Logs to MLflow
#   - Registers new model version
#
# Task 3: evaluate_model (Notebook task, depends on Task 2)
#   - Compares new model vs champion
#   - Sets task value: "promote" or "skip"
#
# Task 4: deploy_model (Conditional + Notebook, depends on Task 3)
#   - IF evaluate_model output == "promote":
#     - Transition new version to Production
#     - Update serving endpoint
#   - ELSE: skip deployment

Task Values for Communication

# In the training notebook (Task 2):
dbutils.jobs.taskValues.set(
    key="new_model_version",
    value=5
)
dbutils.jobs.taskValues.set(
    key="new_model_accuracy",
    value=0.94
)

# In the evaluation notebook (Task 3):
new_version = dbutils.jobs.taskValues.get(
    taskKey="train_model",
    key="new_model_version"
)
new_accuracy = dbutils.jobs.taskValues.get(
    taskKey="train_model",
    key="new_model_accuracy"
)
Exam concept: dbutils.jobs.taskValues is the mechanism for passing data between tasks in a workflow. The taskKey parameter references the task name (not the notebook name). This is frequently tested. Also know that task values must be JSON-serializable (strings, numbers, booleans, lists, dicts).

CI/CD for ML

Production ML requires version-controlled code, automated testing, and controlled deployment. Databricks supports Git integration and Repos for source control.

Databricks Repos

  • Git integration — Clone, pull, push, and branch directly in the Databricks workspace
  • Supported providers — GitHub, GitLab, Azure DevOps, Bitbucket
  • Branch-based development — Develop on feature branches, merge to main for production

ML CI/CD Pipeline Pattern

# Typical CI/CD pipeline for ML on Databricks:
#
# 1. DEVELOP (feature branch)
#    - Data scientist develops in a Databricks notebook or IDE
#    - Commits code to feature branch
#
# 2. CI (automated on PR)
#    - Run unit tests (pytest on model code)
#    - Run integration tests (small-scale training on sample data)
#    - Validate model performance meets threshold
#    - Code review and approval
#
# 3. CD (automated on merge to main)
#    - Deploy updated notebooks/code to production workspace
#    - Update Workflow job definitions via Databricks CLI/API
#    - Trigger model retraining workflow
#    - Validate deployment via smoke tests
#
# 4. MONITOR (continuous)
#    - Track inference table metrics
#    - Alert on data drift or performance degradation
#    - Trigger retraining if needed

Databricks CLI and REST API

# Deploy a workflow using Databricks CLI
# databricks jobs create --json @job-config.json

# Update an existing workflow
# databricks jobs reset --job-id 12345 --json @updated-config.json

# Trigger a workflow run
# databricks jobs run-now --job-id 12345

# Deploy a bundle (Databricks Asset Bundles)
# databricks bundle deploy --target production
💡
Exam tip: Databricks Asset Bundles (DABs) are the recommended way to define, deploy, and manage Databricks resources as code. A bundle is a collection of YAML configuration files that define jobs, pipelines, and other resources. Know that databricks bundle deploy is the deployment command.

Monitoring and Alerting

Inference Tables

Model Serving endpoints can automatically log all requests and responses to a Delta table:

  • Request logging — Input features, timestamps, request metadata
  • Response logging — Predictions, latency, status codes
  • Use cases — Drift detection, debugging, compliance, model monitoring

Lakehouse Monitoring

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Create a monitor on the inference table
monitor = w.quality_monitors.create(
    table_name="ml_serving.churn_predictor_inference",
    output_schema_name="ml_monitoring",
    schedule=MonitorCronSchedule(
        quartz_cron_expression="0 0 * * * ?",  # hourly
        timezone_id="UTC"
    ),
    inference_log=InferenceLog(
        problem_type="PROBLEM_TYPE_CLASSIFICATION",
        prediction_col="prediction",
        timestamp_col="timestamp",
        model_id_col="model_version"
    )
)

Practice Questions


Question 1 — Delta Live Tables

Q1
A DLT pipeline processes customer data for ML features. The team wants to silently remove rows where customer_id is null without failing the pipeline. Which expectation should they use?

A) @dlt.expect("valid_id", "customer_id IS NOT NULL")
B) @dlt.expect_or_drop("valid_id", "customer_id IS NOT NULL")
C) @dlt.expect_or_fail("valid_id", "customer_id IS NOT NULL")
D) @dlt.expect_all({"valid_id": "customer_id IS NOT NULL"})

Answer: B@dlt.expect_or_drop() drops rows that violate the constraint without failing the pipeline. @dlt.expect() (option A) only logs a warning but keeps invalid rows. @dlt.expect_or_fail() (option C) would fail the entire pipeline. @dlt.expect_all() (option D) is a warn-only variant for multiple constraints.

Question 2 — Workflows

Q2
In a multi-task Workflow, the training task needs to pass the new model version number to the deployment task. What is the correct approach?

A) Write the version to a Delta table and read it in the next task
B) Use dbutils.jobs.taskValues.set(key="version", value=5) in the training task and dbutils.jobs.taskValues.get(taskKey="training_task", key="version") in the deployment task
C) Use environment variables to pass data between tasks
D) Use MLflow tags to communicate between tasks

Answer: Bdbutils.jobs.taskValues is the built-in mechanism for passing data between workflow tasks. It is lightweight and purpose-built for inter-task communication. While option A would work, it is unnecessarily complex. Environment variables (C) are not shared between tasks. MLflow tags (D) are for metadata, not inter-task communication.

Question 3 — CI/CD

Q3
A team uses Databricks Repos and wants to automatically deploy updated ML code when a PR is merged to the main branch. What is the recommended approach?

A) Manually pull the latest code in the production workspace after each merge
B) Use Databricks Asset Bundles with a CI/CD pipeline that runs databricks bundle deploy --target production on merge
C) Set up a cron job on the cluster to periodically pull from Git
D) Use Databricks SQL to deploy the code

Answer: B — Databricks Asset Bundles (DABs) combined with a CI/CD pipeline (GitHub Actions, Azure DevOps, etc.) is the recommended deployment pattern. When code is merged to main, the CI/CD pipeline deploys the bundle to the production workspace automatically. This provides version control, repeatability, and auditability.

Question 4 — Monitoring

Q4
A serving endpoint has been running for 3 months. The team notices prediction accuracy has dropped. They want to detect feature distribution changes. Which Databricks feature should they use?

A) MLflow Model Registry tags
B) Databricks Lakehouse Monitoring with drift detection on the inference table
C) Spark Structured Streaming with custom drift calculations
D) Manual comparison of training and serving data statistics

Answer: B — Lakehouse Monitoring automatically profiles data and detects statistical drift between baseline (training) and current (serving) distributions. It generates drift metrics and can trigger alerts. The inference table logs all requests, providing the data needed for monitoring. This is the purpose-built Databricks solution for this problem.

Question 5 — DLT Streaming

Q5
A DLT pipeline needs to incrementally ingest new JSON files as they arrive in cloud storage. Which approach is correct?

A) Use spark.read.format("json") in a @dlt.table function
B) Use spark.readStream.format("cloudFiles") in a @dlt.table function
C) Use dlt.read("source_table") with a batch read
D) Use spark.read.format("delta") with a scheduled trigger

Answer: B — Auto Loader (cloudFiles) with Structured Streaming in a DLT table incrementally ingests new files. spark.readStream.format("cloudFiles") discovers and processes new files as they arrive, tracking which files have been processed. Option A would reprocess all files every run. Options C and D do not handle incremental file ingestion.