Data Versioning & Lineage
Version your datasets so every training run is reproducible. Track data lineage so you can answer "what data produced this model?" and "what models are affected by this data change?" Learn DVC, Delta Lake time travel, OpenLineage, and practical versioning strategies.
Why Data Versioning Matters for ML
You version your code with Git. You version your models with a model registry. But if you do not version your data, you cannot reproduce training runs. When a model starts underperforming in production, the first question is: "What changed?" If you cannot compare the training data from last week's good model with this week's bad model, debugging is guesswork.
| Scenario | Without Data Versioning | With Data Versioning |
|---|---|---|
| Model accuracy drops | No idea if data changed; manually inspect logs for hours | Diff the training datasets between good and bad model versions in minutes |
| Regulatory audit | "We think this model was trained on data from around Q3 2025" | Exact dataset hash, row count, schema version, and lineage for every model |
| Bug in feature code | Retrain on current data, hope it is close enough | Fix code, recompute features from versioned raw data, retrain with identical splits |
| New team member | "The training data is in s3://data/final_v3_FIXED_use_this/" | dvc checkout v2.3.1 — exact dataset, documented, reproducible |
DVC: Git for Data
DVC (Data Version Control) extends Git to handle large files and datasets. It stores data in remote storage (S3, GCS, Azure) and tracks pointers in Git, so your data is versioned alongside your code.
# DVC setup and workflow for ML dataset versioning
# 1. Initialize DVC in your ML project
$ git init && dvc init
$ dvc remote add -d s3store s3://ml-datasets/dvc-storage
$ dvc remote modify s3store region us-east-1
# 2. Track a training dataset
$ dvc add data/training/user_features_v1.parquet
# Creates: data/training/user_features_v1.parquet.dvc (pointer file)
# The actual data is pushed to S3, Git only stores the .dvc pointer
$ git add data/training/user_features_v1.parquet.dvc data/training/.gitignore
$ git commit -m "Add training dataset v1: 2.3M users, 12 features"
$ dvc push # Upload data to S3
# 3. Update the dataset (new features added)
$ python compute_features.py --version v2 --output data/training/user_features_v2.parquet
$ dvc add data/training/user_features_v2.parquet
$ git add data/training/user_features_v2.parquet.dvc
$ git commit -m "Training dataset v2: added click_through_rate and purchase_rate features"
$ dvc push
# 4. Reproduce any past training run
$ git checkout v1.0.0 # Checkout the code version
$ dvc checkout # DVC pulls the matching dataset version
$ python train.py # Exact same data + code = reproducible result
# 5. Compare datasets between versions
$ git diff HEAD~1 -- data/training/user_features.parquet.dvc
# Shows: md5 hash changed, size changed
# For detailed comparison, use a custom diff tool:
$ python compare_datasets.py --old v1 --new v2
# DVC pipeline: define your data pipeline as reproducible stages
# dvc.yaml - pipeline definition (checked into Git)
stages:
ingest:
cmd: python src/ingest.py --date ${date} --output data/raw/
deps:
- src/ingest.py
params:
- ingest.source_table
- ingest.date_range
outs:
- data/raw/events.parquet
validate:
cmd: python src/validate.py --input data/raw/events.parquet
deps:
- src/validate.py
- data/raw/events.parquet
metrics:
- reports/validation_metrics.json:
cache: false
transform:
cmd: python src/transform.py --input data/raw/ --output data/features/
deps:
- src/transform.py
- data/raw/events.parquet
params:
- features.window_days
- features.min_events
outs:
- data/features/user_features.parquet
train:
cmd: python src/train.py --data data/features/ --output models/
deps:
- src/train.py
- data/features/user_features.parquet
params:
- model.learning_rate
- model.epochs
- model.batch_size
outs:
- models/model.pt
metrics:
- reports/training_metrics.json:
cache: false
# Reproduce entire pipeline: dvc repro
# Reproduce from a specific stage: dvc repro train
# DVC skips stages whose inputs haven't changed (like Make)
Delta Lake: Time Travel for Data
Delta Lake is an open-source storage layer that brings ACID transactions and time travel to data lakes. Every write creates a new version, and you can query any past version by timestamp or version number.
# Delta Lake time travel for ML dataset versioning
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("delta-versioning") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Write features to Delta Lake (each write = new version)
features.write.format("delta").mode("overwrite").save("s3://feature-store/user_features/")
# Version 0 created
# Next day: write updated features
new_features.write.format("delta").mode("overwrite").save("s3://feature-store/user_features/")
# Version 1 created
# Time travel: read any past version
# By version number
v0_features = spark.read.format("delta").option("versionAsOf", 0).load("s3://feature-store/user_features/")
# By timestamp
yesterday_features = spark.read.format("delta") \
.option("timestampAsOf", "2026-03-19T00:00:00Z") \
.load("s3://feature-store/user_features/")
# View version history
delta_table = DeltaTable.forPath(spark, "s3://feature-store/user_features/")
history = delta_table.history()
history.select("version", "timestamp", "operation", "operationMetrics").show()
# +-------+-------------------+-----------+--------------------+
# |version| timestamp| operation| operationMetrics |
# +-------+-------------------+-----------+--------------------+
# | 1|2026-03-20 02:00:00| WRITE|{numOutputRows: 2.3M}|
# | 0|2026-03-19 02:00:00| WRITE|{numOutputRows: 2.1M}|
# +-------+-------------------+-----------+--------------------+
# Link training run to exact data version
import mlflow
with mlflow.start_run():
data_version = delta_table.history(1).select("version").first()[0]
mlflow.log_param("data_version", data_version)
mlflow.log_param("data_path", "s3://feature-store/user_features/")
mlflow.log_param("data_timestamp", "2026-03-20T02:00:00Z")
# ... train model
# Retention: keep versions for 90 days, vacuum old files
delta_table.vacuum(retentionHours=90 * 24) # Delete files older than 90 days
Lineage Tracking with OpenLineage
Data lineage answers two critical questions: "What data was used to produce this model?" (backward lineage) and "If this data source changes, what models are affected?" (forward lineage). OpenLineage is the open standard for capturing lineage across tools.
# OpenLineage integration for tracking data pipeline lineage
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState, Run, Job
from openlineage.client.facet import (
DataSourceDatasetFacet, SchemaDatasetFacet, SchemaField,
SqlJobFacet, SourceCodeLocationJobFacet,
)
from openlineage.client.event_v2 import InputDataset, OutputDataset
from datetime import datetime
import uuid
# Initialize OpenLineage client (sends events to Marquez, DataHub, or Atlan)
client = OpenLineageClient(url="http://marquez:5000/api/v1/lineage")
def emit_pipeline_lineage(
job_name: str,
input_datasets: list,
output_datasets: list,
run_state: RunState,
):
"""Emit a lineage event for a pipeline stage"""
run_id = str(uuid.uuid4())
inputs = [
InputDataset(
namespace="s3://data-lake",
name=ds["name"],
facets={
"schema": SchemaDatasetFacet(
fields=[SchemaField(name=f["name"], type=f["type"]) for f in ds["schema"]]
),
}
)
for ds in input_datasets
]
outputs = [
OutputDataset(
namespace="s3://feature-store",
name=ds["name"],
facets={
"schema": SchemaDatasetFacet(
fields=[SchemaField(name=f["name"], type=f["type"]) for f in ds["schema"]]
),
}
)
for ds in output_datasets
]
event = RunEvent(
eventType=run_state,
eventTime=datetime.utcnow().isoformat() + "Z",
run=Run(runId=run_id),
job=Job(namespace="ml-team", name=job_name),
inputs=inputs,
outputs=outputs,
)
client.emit(event)
# Usage in your pipeline
emit_pipeline_lineage(
job_name="compute_user_engagement_features",
input_datasets=[{
"name": "raw/user_events",
"schema": [
{"name": "user_id", "type": "STRING"},
{"name": "event_type", "type": "STRING"},
{"name": "timestamp", "type": "TIMESTAMP"},
{"name": "value", "type": "DOUBLE"},
]
}],
output_datasets=[{
"name": "features/user_engagement",
"schema": [
{"name": "user_id", "type": "STRING"},
{"name": "total_events_30d", "type": "INT"},
{"name": "click_through_rate", "type": "DOUBLE"},
{"name": "purchase_rate", "type": "DOUBLE"},
]
}],
run_state=RunState.COMPLETE,
)
# Now you can query Marquez/DataHub:
# "Show me all models trained on features/user_engagement"
# "Show me all datasets derived from raw/user_events"
# "What is the full lineage from raw events to production model?"
What Is Next
Now that you can version datasets and track lineage, the next lesson covers Pipeline Monitoring and Observability. You will build dashboards that monitor data freshness, pipeline SLAs, data quality metrics, and cost tracking so you catch failures before your models degrade.
Lilly Tech Systems