Kubeflow & ML Pipelines

Kubeflow Pipelines: Building ML Workflows

4 min read

Kubeflow Pipelines (KFP) enables reproducible, portable ML workflows. Organizations report 60% improved pipeline reuse and 50% reduced deployment errors after adopting KFP.

Pipeline Architecture

How KFP Works

┌─────────────────────────────────────────────────────────────────┐
│                    Kubeflow Pipelines                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1. Define Pipeline (Python SDK)                                │
│     ↓                                                           │
│  2. Compile to YAML/IR                                          │
│     ↓                                                           │
│  3. Submit to KFP API Server                                    │
│     ↓                                                           │
│  4. Argo Workflows executes steps                               │
│     ↓                                                           │
│  5. Artifacts stored in MinIO/S3                                │
│     ↓                                                           │
│  6. Metadata tracked in MLMD                                    │
│                                                                  │
│  ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐        │
│  │  Load   │ → │ Process │ → │  Train  │ → │  Deploy │        │
│  │  Data   │   │  Data   │   │  Model  │   │  Model  │        │
│  └─────────┘   └─────────┘   └─────────┘   └─────────┘        │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

KFP V2 SDK (Current Standard)

Installing KFP SDK

# Install KFP SDK v2
pip install kfp==2.7.0

# Verify installation
python -c "import kfp; print(kfp.__version__)"

Basic Pipeline Structure

from kfp import dsl
from kfp import compiler

# Define a component
@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
    input_path: str,
    output_path: dsl.OutputPath("Dataset")
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_csv(input_path)
    scaler = StandardScaler()
    df_scaled = pd.DataFrame(
        scaler.fit_transform(df),
        columns=df.columns
    )
    df_scaled.to_csv(output_path, index=False)

# Define training component
@dsl.component(
    base_image="pytorch/pytorch:2.1-cuda12.1-cudnn8-runtime",
    packages_to_install=["scikit-learn"]
)
def train_model(
    data_path: dsl.InputPath("Dataset"),
    model_path: dsl.OutputPath("Model"),
    epochs: int = 100,
    learning_rate: float = 0.001
):
    import torch
    import pandas as pd
    # Training logic here
    torch.save(model.state_dict(), model_path)

# Define the pipeline
@dsl.pipeline(
    name="ml-training-pipeline",
    description="End-to-end ML training pipeline"
)
def training_pipeline(
    data_url: str,
    epochs: int = 100,
    learning_rate: float = 0.001
):
    preprocess_task = preprocess_data(input_path=data_url)

    train_task = train_model(
        data_path=preprocess_task.outputs["output_path"],
        epochs=epochs,
        learning_rate=learning_rate
    )
    # Set GPU resources
    train_task.set_gpu_limit(1)
    train_task.set_memory_limit("16Gi")
    train_task.set_cpu_limit("4")

# Compile pipeline
compiler.Compiler().compile(
    training_pipeline,
    "training_pipeline.yaml"
)

Running Pipelines

from kfp.client import Client

# Connect to KFP
client = Client(host="http://localhost:8080")

# Create a run
run = client.create_run_from_pipeline_func(
    training_pipeline,
    arguments={
        "data_url": "gs://my-bucket/data.csv",
        "epochs": 100,
        "learning_rate": 0.001
    },
    experiment_name="training-experiments"
)

print(f"Run ID: {run.run_id}")

Advanced Pipeline Features

Conditional Execution

@dsl.pipeline
def conditional_pipeline(accuracy_threshold: float = 0.9):
    train_task = train_model(...)

    with dsl.If(
        train_task.outputs["accuracy"] >= accuracy_threshold,
        name="deploy-if-accurate"
    ):
        deploy_task = deploy_model(
            model=train_task.outputs["model"]
        )

    with dsl.Else():
        notify_task = send_notification(
            message="Model accuracy below threshold"
        )

Parallel Execution

@dsl.pipeline
def parallel_training():
    # Fan-out: Train multiple models in parallel
    with dsl.ParallelFor(
        items=["model_a", "model_b", "model_c"]
    ) as model_type:
        train_task = train_model(model_type=model_type)

    # Fan-in: Ensemble the models
    ensemble_task = create_ensemble(
        models=[train_task.outputs["model"]]
    )

Caching and Retry

@dsl.component
def expensive_computation(...):
    ...

@dsl.pipeline
def pipeline_with_caching():
    task = expensive_computation(...)

    # Enable caching (skip if same inputs)
    task.set_caching_options(enable_caching=True)

    # Configure retry policy
    task.set_retry(
        num_retries=3,
        backoff_duration="60s",
        backoff_factor=2.0
    )

GPU-Enabled Components

Training Component with GPU

@dsl.component(
    base_image="nvcr.io/nvidia/pytorch:24.01-py3"
)
def gpu_training(
    data_path: dsl.InputPath("Dataset"),
    model_path: dsl.OutputPath("Model"),
    epochs: int
):
    import torch

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Training on {device}")

    # Multi-GPU training with DDP
    if torch.cuda.device_count() > 1:
        model = torch.nn.DataParallel(model)

    # Training loop...

@dsl.pipeline
def gpu_pipeline():
    train_task = gpu_training(...)

    # Request GPU resources
    train_task.set_accelerator_type("nvidia.com/gpu")
    train_task.set_accelerator_limit(4)
    train_task.set_memory_limit("64Gi")

    # Node selection
    train_task.add_node_selector_constraint(
        "nvidia.com/gpu.product",
        "NVIDIA-A100-SXM4-80GB"
    )

Pipeline Artifacts and Metadata

Artifact Types

from kfp.dsl import Dataset, Model, Metrics, HTML

@dsl.component
def evaluate_model(
    model: dsl.InputPath("Model"),
    test_data: dsl.InputPath("Dataset"),
    metrics: dsl.Output[Metrics],
    report: dsl.Output[HTML]
):
    # Log metrics
    metrics.log_metric("accuracy", 0.95)
    metrics.log_metric("f1_score", 0.93)
    metrics.log_metric("auc", 0.97)

    # Generate HTML report
    report.write(generate_report_html())

Viewing Artifacts in UI

# Artifacts automatically appear in KFP UI:
# - Metrics displayed as charts
# - HTML rendered inline
# - Models linked to lineage
# - Datasets tracked for reproducibility

Next, we'll explore Katib for automated hyperparameter tuning and neural architecture search. :::

Quiz

Module 3: Kubeflow & ML Pipelines

Take Quiz