Kubeflow & ML Pipelines

Argo Workflows for ML Orchestration

3 min read

Argo Workflows is a container-native workflow engine that powers Kubeflow Pipelines. Understanding Argo directly enables advanced pipeline patterns and integration with the broader Argo ecosystem.

Argo Ecosystem for ML

┌─────────────────────────────────────────────────────────────────┐
│                    Argo Ecosystem                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │
│  │    Argo      │  │    Argo      │  │    Argo      │          │
│  │  Workflows   │  │     CD       │  │   Events     │          │
│  │  (Pipelines) │  │  (GitOps)    │  │  (Triggers)  │          │
│  └──────────────┘  └──────────────┘  └──────────────┘          │
│          │                 │                 │                  │
│          └─────────────────┼─────────────────┘                  │
│                            ↓                                     │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │               ML Platform Integration                      │  │
│  │  - Kubeflow Pipelines (uses Argo Workflows)               │  │
│  │  - GitOps model deployment (ArgoCD)                        │  │
│  │  - Event-driven retraining (Argo Events)                   │  │
│  └───────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Argo Workflows vs KFP

Feature Argo Workflows Kubeflow Pipelines
Language YAML Python SDK
ML Focus General purpose ML-specific
Artifacts Basic ML Metadata
UI Workflow focused Experiment focused
Integration Standalone Kubeflow ecosystem

When to use Argo directly:

  • Complex DAG patterns
  • Non-ML workflows in ML pipelines
  • Integration with Argo CD/Events

Basic Workflow Structure

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: ml-pipeline-
  namespace: ml-research
spec:
  entrypoint: ml-pipeline
  serviceAccountName: argo-workflow

  # Workflow-level parameters
  arguments:
    parameters:
    - name: data-url
      value: "s3://bucket/data.csv"
    - name: epochs
      value: "100"

  # Volume claims for artifacts
  volumeClaimTemplates:
  - metadata:
      name: workdir
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 100Gi

  templates:
  - name: ml-pipeline
    dag:
      tasks:
      - name: preprocess
        template: preprocess-data
        arguments:
          parameters:
          - name: input
            value: "{{workflow.parameters.data-url}}"

      - name: train
        dependencies: [preprocess]
        template: train-model
        arguments:
          parameters:
          - name: epochs
            value: "{{workflow.parameters.epochs}}"
          artifacts:
          - name: data
            from: "{{tasks.preprocess.outputs.artifacts.processed-data}}"

      - name: evaluate
        dependencies: [train]
        template: evaluate-model
        arguments:
          artifacts:
          - name: model
            from: "{{tasks.train.outputs.artifacts.model}}"

  # Template definitions
  - name: preprocess-data
    inputs:
      parameters:
      - name: input
    outputs:
      artifacts:
      - name: processed-data
        path: /data/processed
    container:
      image: python:3.11
      command: [python, preprocess.py]
      args: ["--input", "{{inputs.parameters.input}}"]
      volumeMounts:
      - name: workdir
        mountPath: /data

  - name: train-model
    inputs:
      parameters:
      - name: epochs
      artifacts:
      - name: data
        path: /data
    outputs:
      artifacts:
      - name: model
        path: /models/trained
    container:
      image: pytorch/pytorch:2.1-cuda12.1
      command: [python, train.py]
      args: ["--epochs", "{{inputs.parameters.epochs}}"]
      resources:
        limits:
          nvidia.com/gpu: 2
      volumeMounts:
      - name: workdir
        mountPath: /data

Advanced Patterns

Parallel Fan-Out/Fan-In

templates:
- name: parallel-training
  dag:
    tasks:
    - name: train-models
      template: train-single
      arguments:
        parameters:
        - name: model-type
          value: "{{item}}"
      withItems:
      - "resnet50"
      - "efficientnet"
      - "vit"

    - name: ensemble
      dependencies: [train-models]
      template: create-ensemble
      arguments:
        artifacts:
        - name: models
          from: "{{tasks.train-models.outputs.artifacts.model}}"

Conditional Execution

templates:
- name: conditional-deploy
  dag:
    tasks:
    - name: evaluate
      template: evaluate-model

    - name: deploy-prod
      dependencies: [evaluate]
      when: "{{tasks.evaluate.outputs.parameters.accuracy}} > 0.95"
      template: deploy-to-production

    - name: notify-failure
      dependencies: [evaluate]
      when: "{{tasks.evaluate.outputs.parameters.accuracy}} <= 0.95"
      template: send-notification

Retry and Timeout

templates:
- name: train-with-retry
  retryStrategy:
    limit: 3
    retryPolicy: "OnFailure"
    backoff:
      duration: "1m"
      factor: 2
      maxDuration: "10m"
  timeout: "4h"
  container:
    image: pytorch/pytorch:2.1-cuda12.1
    command: [python, train.py]
    resources:
      limits:
        nvidia.com/gpu: 4

GPU Resource Management

templates:
- name: gpu-training
  nodeSelector:
    nvidia.com/gpu.product: NVIDIA-A100-SXM4-80GB
  tolerations:
  - key: nvidia.com/gpu
    operator: Exists
    effect: NoSchedule
  container:
    image: nvcr.io/nvidia/pytorch:24.01-py3
    command: [python, train.py]
    resources:
      requests:
        nvidia.com/gpu: 4
        memory: "64Gi"
        cpu: "16"
      limits:
        nvidia.com/gpu: 4
        memory: "128Gi"
        cpu: "32"

Event-Driven Pipelines

# Argo Events: Trigger retraining on new data
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: retrain-trigger
spec:
  dependencies:
  - name: s3-new-data
    eventSourceName: s3-events
    eventName: new-training-data

  triggers:
  - template:
      name: trigger-retraining
      argoWorkflow:
        operation: submit
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: retrain-
            spec:
              entrypoint: ml-pipeline
              arguments:
                parameters:
                - name: data-url
                  value: "{{inputs.parameters.s3-key}}"

Monitoring Workflows

# List workflows
argo list -n ml-research

# Watch workflow progress
argo watch ml-pipeline-xyz -n ml-research

# Get workflow logs
argo logs ml-pipeline-xyz -n ml-research

# Resubmit failed workflow
argo resubmit ml-pipeline-xyz -n ml-research

Next module: Model Serving and Inference with KServe and Triton for production deployments. :::

Quiz

Module 3: Kubeflow & ML Pipelines

Take Quiz