Kubeflow & ML Pipelines
Argo Workflows for ML Orchestration
3 min read
Argo Workflows is a container-native workflow engine that powers Kubeflow Pipelines. Understanding Argo directly enables advanced pipeline patterns and integration with the broader Argo ecosystem.
Argo Ecosystem for ML
┌─────────────────────────────────────────────────────────────────┐
│ Argo Ecosystem │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Argo │ │ Argo │ │ Argo │ │
│ │ Workflows │ │ CD │ │ Events │ │
│ │ (Pipelines) │ │ (GitOps) │ │ (Triggers) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ ML Platform Integration │ │
│ │ - Kubeflow Pipelines (uses Argo Workflows) │ │
│ │ - GitOps model deployment (ArgoCD) │ │
│ │ - Event-driven retraining (Argo Events) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Argo Workflows vs KFP
| Feature | Argo Workflows | Kubeflow Pipelines |
|---|---|---|
| Language | YAML | Python SDK |
| ML Focus | General purpose | ML-specific |
| Artifacts | Basic | ML Metadata |
| UI | Workflow focused | Experiment focused |
| Integration | Standalone | Kubeflow ecosystem |
When to use Argo directly:
- Complex DAG patterns
- Non-ML workflows in ML pipelines
- Integration with Argo CD/Events
Basic Workflow Structure
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ml-pipeline-
namespace: ml-research
spec:
entrypoint: ml-pipeline
serviceAccountName: argo-workflow
# Workflow-level parameters
arguments:
parameters:
- name: data-url
value: "s3://bucket/data.csv"
- name: epochs
value: "100"
# Volume claims for artifacts
volumeClaimTemplates:
- metadata:
name: workdir
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
templates:
- name: ml-pipeline
dag:
tasks:
- name: preprocess
template: preprocess-data
arguments:
parameters:
- name: input
value: "{{workflow.parameters.data-url}}"
- name: train
dependencies: [preprocess]
template: train-model
arguments:
parameters:
- name: epochs
value: "{{workflow.parameters.epochs}}"
artifacts:
- name: data
from: "{{tasks.preprocess.outputs.artifacts.processed-data}}"
- name: evaluate
dependencies: [train]
template: evaluate-model
arguments:
artifacts:
- name: model
from: "{{tasks.train.outputs.artifacts.model}}"
# Template definitions
- name: preprocess-data
inputs:
parameters:
- name: input
outputs:
artifacts:
- name: processed-data
path: /data/processed
container:
image: python:3.11
command: [python, preprocess.py]
args: ["--input", "{{inputs.parameters.input}}"]
volumeMounts:
- name: workdir
mountPath: /data
- name: train-model
inputs:
parameters:
- name: epochs
artifacts:
- name: data
path: /data
outputs:
artifacts:
- name: model
path: /models/trained
container:
image: pytorch/pytorch:2.1-cuda12.1
command: [python, train.py]
args: ["--epochs", "{{inputs.parameters.epochs}}"]
resources:
limits:
nvidia.com/gpu: 2
volumeMounts:
- name: workdir
mountPath: /data
Advanced Patterns
Parallel Fan-Out/Fan-In
templates:
- name: parallel-training
dag:
tasks:
- name: train-models
template: train-single
arguments:
parameters:
- name: model-type
value: "{{item}}"
withItems:
- "resnet50"
- "efficientnet"
- "vit"
- name: ensemble
dependencies: [train-models]
template: create-ensemble
arguments:
artifacts:
- name: models
from: "{{tasks.train-models.outputs.artifacts.model}}"
Conditional Execution
templates:
- name: conditional-deploy
dag:
tasks:
- name: evaluate
template: evaluate-model
- name: deploy-prod
dependencies: [evaluate]
when: "{{tasks.evaluate.outputs.parameters.accuracy}} > 0.95"
template: deploy-to-production
- name: notify-failure
dependencies: [evaluate]
when: "{{tasks.evaluate.outputs.parameters.accuracy}} <= 0.95"
template: send-notification
Retry and Timeout
templates:
- name: train-with-retry
retryStrategy:
limit: 3
retryPolicy: "OnFailure"
backoff:
duration: "1m"
factor: 2
maxDuration: "10m"
timeout: "4h"
container:
image: pytorch/pytorch:2.1-cuda12.1
command: [python, train.py]
resources:
limits:
nvidia.com/gpu: 4
GPU Resource Management
templates:
- name: gpu-training
nodeSelector:
nvidia.com/gpu.product: NVIDIA-A100-SXM4-80GB
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
container:
image: nvcr.io/nvidia/pytorch:24.01-py3
command: [python, train.py]
resources:
requests:
nvidia.com/gpu: 4
memory: "64Gi"
cpu: "16"
limits:
nvidia.com/gpu: 4
memory: "128Gi"
cpu: "32"
Event-Driven Pipelines
# Argo Events: Trigger retraining on new data
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: retrain-trigger
spec:
dependencies:
- name: s3-new-data
eventSourceName: s3-events
eventName: new-training-data
triggers:
- template:
name: trigger-retraining
argoWorkflow:
operation: submit
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retrain-
spec:
entrypoint: ml-pipeline
arguments:
parameters:
- name: data-url
value: "{{inputs.parameters.s3-key}}"
Monitoring Workflows
# List workflows
argo list -n ml-research
# Watch workflow progress
argo watch ml-pipeline-xyz -n ml-research
# Get workflow logs
argo logs ml-pipeline-xyz -n ml-research
# Resubmit failed workflow
argo resubmit ml-pipeline-xyz -n ml-research
Next module: Model Serving and Inference with KServe and Triton for production deployments. :::