Kubeflow & ML Pipelines
Kubeflow Pipelines: Building ML Workflows
4 min read
Kubeflow Pipelines (KFP) enables reproducible, portable ML workflows. Organizations report 60% improved pipeline reuse and 50% reduced deployment errors after adopting KFP.
Pipeline Architecture
How KFP Works
┌─────────────────────────────────────────────────────────────────┐
│ Kubeflow Pipelines │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Define Pipeline (Python SDK) │
│ ↓ │
│ 2. Compile to YAML/IR │
│ ↓ │
│ 3. Submit to KFP API Server │
│ ↓ │
│ 4. Argo Workflows executes steps │
│ ↓ │
│ 5. Artifacts stored in MinIO/S3 │
│ ↓ │
│ 6. Metadata tracked in MLMD │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Load │ → │ Process │ → │ Train │ → │ Deploy │ │
│ │ Data │ │ Data │ │ Model │ │ Model │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
KFP V2 SDK (Current Standard)
Installing KFP SDK
# Install KFP SDK v2
pip install kfp==2.7.0
# Verify installation
python -c "import kfp; print(kfp.__version__)"
Basic Pipeline Structure
from kfp import dsl
from kfp import compiler
# Define a component
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
input_path: str,
output_path: dsl.OutputPath("Dataset")
):
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_csv(input_path)
scaler = StandardScaler()
df_scaled = pd.DataFrame(
scaler.fit_transform(df),
columns=df.columns
)
df_scaled.to_csv(output_path, index=False)
# Define training component
@dsl.component(
base_image="pytorch/pytorch:2.1-cuda12.1-cudnn8-runtime",
packages_to_install=["scikit-learn"]
)
def train_model(
data_path: dsl.InputPath("Dataset"),
model_path: dsl.OutputPath("Model"),
epochs: int = 100,
learning_rate: float = 0.001
):
import torch
import pandas as pd
# Training logic here
torch.save(model.state_dict(), model_path)
# Define the pipeline
@dsl.pipeline(
name="ml-training-pipeline",
description="End-to-end ML training pipeline"
)
def training_pipeline(
data_url: str,
epochs: int = 100,
learning_rate: float = 0.001
):
preprocess_task = preprocess_data(input_path=data_url)
train_task = train_model(
data_path=preprocess_task.outputs["output_path"],
epochs=epochs,
learning_rate=learning_rate
)
# Set GPU resources
train_task.set_gpu_limit(1)
train_task.set_memory_limit("16Gi")
train_task.set_cpu_limit("4")
# Compile pipeline
compiler.Compiler().compile(
training_pipeline,
"training_pipeline.yaml"
)
Running Pipelines
from kfp.client import Client
# Connect to KFP
client = Client(host="http://localhost:8080")
# Create a run
run = client.create_run_from_pipeline_func(
training_pipeline,
arguments={
"data_url": "gs://my-bucket/data.csv",
"epochs": 100,
"learning_rate": 0.001
},
experiment_name="training-experiments"
)
print(f"Run ID: {run.run_id}")
Advanced Pipeline Features
Conditional Execution
@dsl.pipeline
def conditional_pipeline(accuracy_threshold: float = 0.9):
train_task = train_model(...)
with dsl.If(
train_task.outputs["accuracy"] >= accuracy_threshold,
name="deploy-if-accurate"
):
deploy_task = deploy_model(
model=train_task.outputs["model"]
)
with dsl.Else():
notify_task = send_notification(
message="Model accuracy below threshold"
)
Parallel Execution
@dsl.pipeline
def parallel_training():
# Fan-out: Train multiple models in parallel
with dsl.ParallelFor(
items=["model_a", "model_b", "model_c"]
) as model_type:
train_task = train_model(model_type=model_type)
# Fan-in: Ensemble the models
ensemble_task = create_ensemble(
models=[train_task.outputs["model"]]
)
Caching and Retry
@dsl.component
def expensive_computation(...):
...
@dsl.pipeline
def pipeline_with_caching():
task = expensive_computation(...)
# Enable caching (skip if same inputs)
task.set_caching_options(enable_caching=True)
# Configure retry policy
task.set_retry(
num_retries=3,
backoff_duration="60s",
backoff_factor=2.0
)
GPU-Enabled Components
Training Component with GPU
@dsl.component(
base_image="nvcr.io/nvidia/pytorch:24.01-py3"
)
def gpu_training(
data_path: dsl.InputPath("Dataset"),
model_path: dsl.OutputPath("Model"),
epochs: int
):
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Training on {device}")
# Multi-GPU training with DDP
if torch.cuda.device_count() > 1:
model = torch.nn.DataParallel(model)
# Training loop...
@dsl.pipeline
def gpu_pipeline():
train_task = gpu_training(...)
# Request GPU resources
train_task.set_accelerator_type("nvidia.com/gpu")
train_task.set_accelerator_limit(4)
train_task.set_memory_limit("64Gi")
# Node selection
train_task.add_node_selector_constraint(
"nvidia.com/gpu.product",
"NVIDIA-A100-SXM4-80GB"
)
Pipeline Artifacts and Metadata
Artifact Types
from kfp.dsl import Dataset, Model, Metrics, HTML
@dsl.component
def evaluate_model(
model: dsl.InputPath("Model"),
test_data: dsl.InputPath("Dataset"),
metrics: dsl.Output[Metrics],
report: dsl.Output[HTML]
):
# Log metrics
metrics.log_metric("accuracy", 0.95)
metrics.log_metric("f1_score", 0.93)
metrics.log_metric("auc", 0.97)
# Generate HTML report
report.write(generate_report_html())
Viewing Artifacts in UI
# Artifacts automatically appear in KFP UI:
# - Metrics displayed as charts
# - HTML rendered inline
# - Models linked to lineage
# - Datasets tracked for reproducibility
Next, we'll explore Katib for automated hyperparameter tuning and neural architecture search. :::