ML Workflow Orchestration
Kubeflow Pipelines
4 min read
Kubeflow Pipelines (KFP) is a platform for building and deploying ML workflows on Kubernetes. It provides scalable, portable, and reproducible pipelines.
Why Kubeflow Pipelines?
| Feature | Benefit |
|---|---|
| Kubernetes-native | Leverage existing infrastructure |
| Scalable | Run distributed training easily |
| Portable | Same pipeline works anywhere |
| Versioned | Track every pipeline run |
| Visual UI | Monitor and debug workflows |
Installation
# Install the KFP SDK (v2)
pip install kfp
# Verify installation
python -c "import kfp; print(kfp.__version__)"
Core Concepts
Components
Components are the building blocks—containerized functions:
from kfp import dsl
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
input_path: str,
output_path: dsl.OutputPath("csv")
):
"""Preprocess raw data."""
import pandas as pd
df = pd.read_csv(input_path)
df = df.dropna()
df = df.drop_duplicates()
df.to_csv(output_path, index=False)
Pipeline Definition
Chain components together:
from kfp import dsl
@dsl.pipeline(
name="ml-training-pipeline",
description="End-to-end ML training pipeline"
)
def training_pipeline(
data_path: str,
epochs: int = 100,
learning_rate: float = 0.001
):
# Step 1: Preprocess
preprocess_task = preprocess_data(input_path=data_path)
# Step 2: Train (depends on preprocess)
train_task = train_model(
data_path=preprocess_task.outputs["output_path"],
epochs=epochs,
learning_rate=learning_rate
)
# Step 3: Evaluate (depends on train)
evaluate_task = evaluate_model(
model_path=train_task.outputs["model_path"]
)
Complete Pipeline Example
Define Components
from kfp import dsl
from kfp.dsl import Input, Output, Artifact, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn"]
)
def load_data(
source_path: str,
dataset: Output[Artifact]
):
"""Load and split dataset."""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(source_path)
train, test = train_test_split(df, test_size=0.2)
train.to_csv(f"{dataset.path}_train.csv", index=False)
test.to_csv(f"{dataset.path}_test.csv", index=False)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def train_model(
dataset: Input[Artifact],
model: Output[Model],
n_estimators: int = 100
):
"""Train a Random Forest model."""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
train = pd.read_csv(f"{dataset.path}_train.csv")
X = train.drop("target", axis=1)
y = train["target"]
clf = RandomForestClassifier(n_estimators=n_estimators)
clf.fit(X, y)
joblib.dump(clf, model.path)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def evaluate_model(
dataset: Input[Artifact],
model: Input[Model],
metrics: Output[Metrics]
):
"""Evaluate model performance."""
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score
import joblib
test = pd.read_csv(f"{dataset.path}_test.csv")
X = test.drop("target", axis=1)
y = test["target"]
clf = joblib.load(model.path)
predictions = clf.predict(X)
metrics.log_metric("accuracy", accuracy_score(y, predictions))
metrics.log_metric("f1_score", f1_score(y, predictions, average="weighted"))
Define Pipeline
@dsl.pipeline(
name="training-pipeline",
description="Train and evaluate ML model"
)
def ml_pipeline(
source_path: str = "gs://my-bucket/data.csv",
n_estimators: int = 100
):
load_task = load_data(source_path=source_path)
train_task = train_model(
dataset=load_task.outputs["dataset"],
n_estimators=n_estimators
)
evaluate_task = evaluate_model(
dataset=load_task.outputs["dataset"],
model=train_task.outputs["model"]
)
Compile and Run
from kfp import compiler
# Compile to YAML
compiler.Compiler().compile(
pipeline_func=ml_pipeline,
package_path="ml_pipeline.yaml"
)
# Submit to KFP cluster
from kfp.client import Client
client = Client(host="http://kubeflow-pipelines-ui:80")
run = client.create_run_from_pipeline_func(
ml_pipeline,
arguments={
"source_path": "gs://my-bucket/data.csv",
"n_estimators": 200
}
)
Advanced Features
Conditional Execution
from kfp import dsl
@dsl.pipeline
def conditional_pipeline(accuracy_threshold: float = 0.8):
train_task = train_model()
with dsl.Condition(
train_task.outputs["accuracy"] > accuracy_threshold,
name="accuracy-check"
):
deploy_model(model=train_task.outputs["model"])
Parallel Execution
@dsl.pipeline
def parallel_pipeline():
# These run in parallel
task_a = process_dataset_a()
task_b = process_dataset_b()
task_c = process_dataset_c()
# This waits for all parallel tasks
merge_task = merge_results(
result_a=task_a.output,
result_b=task_b.output,
result_c=task_c.output
)
Resource Requests
@dsl.component
def gpu_training(data: Input[Artifact], model: Output[Model]):
# Training code
pass
# In pipeline
train_task = gpu_training(data=data_task.output)
train_task.set_cpu_limit("4")
train_task.set_memory_limit("16G")
train_task.set_gpu_limit("1")
Pipeline Metadata
Tracking Experiments
# Create experiment
experiment = client.create_experiment(name="hyperparameter-tuning")
# Run with experiment tracking
run = client.create_run_from_pipeline_func(
ml_pipeline,
experiment_name="hyperparameter-tuning",
run_name="lr-0.001-epochs-100"
)
Viewing Results
The Kubeflow UI provides:
- Pipeline visualization
- Run history
- Artifact tracking
- Metric comparisons
- Logs access
Best Practices
| Practice | Why |
|---|---|
| Use typed artifacts | Better tracking and UI integration |
| Pin package versions | Reproducible environments |
| Keep components small | Faster caching and debugging |
| Use meaningful names | Easier to understand pipelines |
| Set resource limits | Prevent resource exhaustion |
Local Testing
Test components locally before deploying:
# Test a component function directly
from my_pipeline import preprocess_data
# Create mock outputs
import tempfile
with tempfile.NamedTemporaryFile(suffix=".csv") as f:
preprocess_data.python_func(
input_path="test_data.csv",
output_path=f.name
)
Key insight: Kubeflow Pipelines is ideal for teams already using Kubernetes. It provides enterprise-grade ML orchestration with built-in experiment tracking and artifact management.
Next, we'll explore Apache Airflow for data engineering and ML workflows. :::