Vertex AI Pipelines Intermediate

Vertex AI Pipelines lets you orchestrate ML workflows as directed acyclic graphs (DAGs) using the Kubeflow Pipelines SDK. Each pipeline step runs in its own container on managed infrastructure, making workflows reproducible and scalable.

Building a Pipeline

Python
from kfp import dsl
from kfp.dsl import component, Input, Output, Dataset, Model

@component(base_image="python:3.10", packages_to_install=["pandas", "scikit-learn"])
def preprocess_data(input_path: str, output_data: Output[Dataset]):
    import pandas as pd
    df = pd.read_csv(input_path)
    df_clean = df.dropna()
    df_clean.to_csv(output_data.path, index=False)

@component(base_image="python:3.10", packages_to_install=["scikit-learn", "pandas"])
def train_model(data: Input[Dataset], model_out: Output[Model]):
    import pickle
    from sklearn.ensemble import RandomForestClassifier
    # Training logic here

@dsl.pipeline(name="ml-training-pipeline")
def training_pipeline(input_path: str):
    preprocess_task = preprocess_data(input_path=input_path)
    train_task = train_model(data=preprocess_task.outputs["output_data"])

Submitting a Pipeline Run

Python
from google.cloud import aiplatform

aiplatform.init(project="my-project", location="us-central1")

job = aiplatform.PipelineJob(
    display_name="training-run-001",
    template_path="pipeline.json",
    parameter_values={"input_path": "gs://my-bucket/data.csv"},
    pipeline_root="gs://my-bucket/pipeline-root",
)
job.submit(service_account="sa-pipeline@my-project.iam.gserviceaccount.com")

Scheduling Pipelines

Run pipelines on a schedule for regular model retraining:

Python
schedule = aiplatform.PipelineJobSchedule(
    pipeline_job=job,
    display_name="weekly-retrain",
    cron="0 2 * * 1",  # Every Monday at 2 AM
)
schedule.create()
Best Practice: Use Vertex AI Pipelines for any ML workflow that needs to be reproducible, auditable, or run on a schedule. Store pipeline templates in version control and use CI/CD to compile and deploy them.