ML Workflow Orchestration

Kubeflow Pipelines

4 min read

Kubeflow Pipelines (KFP) is a platform for building and deploying ML workflows on Kubernetes. It provides scalable, portable, and reproducible pipelines.

Why Kubeflow Pipelines?

Feature Benefit
Kubernetes-native Leverage existing infrastructure
Scalable Run distributed training easily
Portable Same pipeline works anywhere
Versioned Track every pipeline run
Visual UI Monitor and debug workflows

Installation

# Install the KFP SDK (v2)
pip install kfp

# Verify installation
python -c "import kfp; print(kfp.__version__)"

Core Concepts

Components

Components are the building blocks—containerized functions:

from kfp import dsl

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
    input_path: str,
    output_path: dsl.OutputPath("csv")
):
    """Preprocess raw data."""
    import pandas as pd

    df = pd.read_csv(input_path)
    df = df.dropna()
    df = df.drop_duplicates()
    df.to_csv(output_path, index=False)

Pipeline Definition

Chain components together:

from kfp import dsl

@dsl.pipeline(
    name="ml-training-pipeline",
    description="End-to-end ML training pipeline"
)
def training_pipeline(
    data_path: str,
    epochs: int = 100,
    learning_rate: float = 0.001
):
    # Step 1: Preprocess
    preprocess_task = preprocess_data(input_path=data_path)

    # Step 2: Train (depends on preprocess)
    train_task = train_model(
        data_path=preprocess_task.outputs["output_path"],
        epochs=epochs,
        learning_rate=learning_rate
    )

    # Step 3: Evaluate (depends on train)
    evaluate_task = evaluate_model(
        model_path=train_task.outputs["model_path"]
    )

Complete Pipeline Example

Define Components

from kfp import dsl
from kfp.dsl import Input, Output, Artifact, Model, Metrics

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn"]
)
def load_data(
    source_path: str,
    dataset: Output[Artifact]
):
    """Load and split dataset."""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(source_path)
    train, test = train_test_split(df, test_size=0.2)

    train.to_csv(f"{dataset.path}_train.csv", index=False)
    test.to_csv(f"{dataset.path}_test.csv", index=False)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def train_model(
    dataset: Input[Artifact],
    model: Output[Model],
    n_estimators: int = 100
):
    """Train a Random Forest model."""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib

    train = pd.read_csv(f"{dataset.path}_train.csv")
    X = train.drop("target", axis=1)
    y = train["target"]

    clf = RandomForestClassifier(n_estimators=n_estimators)
    clf.fit(X, y)

    joblib.dump(clf, model.path)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def evaluate_model(
    dataset: Input[Artifact],
    model: Input[Model],
    metrics: Output[Metrics]
):
    """Evaluate model performance."""
    import pandas as pd
    from sklearn.metrics import accuracy_score, f1_score
    import joblib

    test = pd.read_csv(f"{dataset.path}_test.csv")
    X = test.drop("target", axis=1)
    y = test["target"]

    clf = joblib.load(model.path)
    predictions = clf.predict(X)

    metrics.log_metric("accuracy", accuracy_score(y, predictions))
    metrics.log_metric("f1_score", f1_score(y, predictions, average="weighted"))

Define Pipeline

@dsl.pipeline(
    name="training-pipeline",
    description="Train and evaluate ML model"
)
def ml_pipeline(
    source_path: str = "gs://my-bucket/data.csv",
    n_estimators: int = 100
):
    load_task = load_data(source_path=source_path)

    train_task = train_model(
        dataset=load_task.outputs["dataset"],
        n_estimators=n_estimators
    )

    evaluate_task = evaluate_model(
        dataset=load_task.outputs["dataset"],
        model=train_task.outputs["model"]
    )

Compile and Run

from kfp import compiler

# Compile to YAML
compiler.Compiler().compile(
    pipeline_func=ml_pipeline,
    package_path="ml_pipeline.yaml"
)

# Submit to KFP cluster
from kfp.client import Client

client = Client(host="http://kubeflow-pipelines-ui:80")
run = client.create_run_from_pipeline_func(
    ml_pipeline,
    arguments={
        "source_path": "gs://my-bucket/data.csv",
        "n_estimators": 200
    }
)

Advanced Features

Conditional Execution

from kfp import dsl

@dsl.pipeline
def conditional_pipeline(accuracy_threshold: float = 0.8):
    train_task = train_model()

    with dsl.Condition(
        train_task.outputs["accuracy"] > accuracy_threshold,
        name="accuracy-check"
    ):
        deploy_model(model=train_task.outputs["model"])

Parallel Execution

@dsl.pipeline
def parallel_pipeline():
    # These run in parallel
    task_a = process_dataset_a()
    task_b = process_dataset_b()
    task_c = process_dataset_c()

    # This waits for all parallel tasks
    merge_task = merge_results(
        result_a=task_a.output,
        result_b=task_b.output,
        result_c=task_c.output
    )

Resource Requests

@dsl.component
def gpu_training(data: Input[Artifact], model: Output[Model]):
    # Training code
    pass

# In pipeline
train_task = gpu_training(data=data_task.output)
train_task.set_cpu_limit("4")
train_task.set_memory_limit("16G")
train_task.set_gpu_limit("1")

Pipeline Metadata

Tracking Experiments

# Create experiment
experiment = client.create_experiment(name="hyperparameter-tuning")

# Run with experiment tracking
run = client.create_run_from_pipeline_func(
    ml_pipeline,
    experiment_name="hyperparameter-tuning",
    run_name="lr-0.001-epochs-100"
)

Viewing Results

The Kubeflow UI provides:

  • Pipeline visualization
  • Run history
  • Artifact tracking
  • Metric comparisons
  • Logs access

Best Practices

Practice Why
Use typed artifacts Better tracking and UI integration
Pin package versions Reproducible environments
Keep components small Faster caching and debugging
Use meaningful names Easier to understand pipelines
Set resource limits Prevent resource exhaustion

Local Testing

Test components locally before deploying:

# Test a component function directly
from my_pipeline import preprocess_data

# Create mock outputs
import tempfile
with tempfile.NamedTemporaryFile(suffix=".csv") as f:
    preprocess_data.python_func(
        input_path="test_data.csv",
        output_path=f.name
    )

Key insight: Kubeflow Pipelines is ideal for teams already using Kubernetes. It provides enterprise-grade ML orchestration with built-in experiment tracking and artifact management.

Next, we'll explore Apache Airflow for data engineering and ML workflows. :::

Quiz

Module 3: ML Workflow Orchestration

Take Quiz