Vertex AI Pipelines Intermediate
Vertex AI Pipelines lets you orchestrate ML workflows as directed acyclic graphs (DAGs) using the Kubeflow Pipelines SDK. Each pipeline step runs in its own container on managed infrastructure, making workflows reproducible and scalable.
Building a Pipeline
Python
from kfp import dsl from kfp.dsl import component, Input, Output, Dataset, Model @component(base_image="python:3.10", packages_to_install=["pandas", "scikit-learn"]) def preprocess_data(input_path: str, output_data: Output[Dataset]): import pandas as pd df = pd.read_csv(input_path) df_clean = df.dropna() df_clean.to_csv(output_data.path, index=False) @component(base_image="python:3.10", packages_to_install=["scikit-learn", "pandas"]) def train_model(data: Input[Dataset], model_out: Output[Model]): import pickle from sklearn.ensemble import RandomForestClassifier # Training logic here @dsl.pipeline(name="ml-training-pipeline") def training_pipeline(input_path: str): preprocess_task = preprocess_data(input_path=input_path) train_task = train_model(data=preprocess_task.outputs["output_data"])
Submitting a Pipeline Run
Python
from google.cloud import aiplatform aiplatform.init(project="my-project", location="us-central1") job = aiplatform.PipelineJob( display_name="training-run-001", template_path="pipeline.json", parameter_values={"input_path": "gs://my-bucket/data.csv"}, pipeline_root="gs://my-bucket/pipeline-root", ) job.submit(service_account="sa-pipeline@my-project.iam.gserviceaccount.com")
Scheduling Pipelines
Run pipelines on a schedule for regular model retraining:
Python
schedule = aiplatform.PipelineJobSchedule(
pipeline_job=job,
display_name="weekly-retrain",
cron="0 2 * * 1", # Every Monday at 2 AM
)
schedule.create()
Best Practice: Use Vertex AI Pipelines for any ML workflow that needs to be reproducible, auditable, or run on a schedule. Store pipeline templates in version control and use CI/CD to compile and deploy them.
Lilly Tech Systems