Kubeflow وخطوط ML
Argo Workflows لتنسيق ML
3 دقيقة للقراءة
Argo Workflows هو محرك سير عمل أصلي للحاويات يشغل Kubeflow Pipelines. فهم Argo مباشرة يمكّن أنماط خطوط متقدمة والتكامل مع نظام Argo البيئي الأوسع.
نظام Argo البيئي لـ ML
┌─────────────────────────────────────────────────────────────────┐
│ نظام Argo البيئي │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Argo │ │ Argo │ │ Argo │ │
│ │ Workflows │ │ CD │ │ Events │ │
│ │ (الخطوط) │ │ (GitOps) │ │ (المشغلات) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ تكامل منصة ML │ │
│ │ - Kubeflow Pipelines (يستخدم Argo Workflows) │ │
│ │ - نشر النموذج GitOps (ArgoCD) │ │
│ │ - إعادة التدريب المدفوعة بالأحداث (Argo Events) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Argo Workflows مقابل KFP
| الميزة | Argo Workflows | Kubeflow Pipelines |
|---|---|---|
| اللغة | YAML | Python SDK |
| تركيز ML | عام الغرض | خاص بـ ML |
| القطع الأثرية | أساسي | ML Metadata |
| واجهة المستخدم | تركيز سير العمل | تركيز التجارب |
| التكامل | مستقل | نظام Kubeflow |
متى تستخدم Argo مباشرة:
- أنماط DAG معقدة
- سير عمل غير ML في خطوط ML
- التكامل مع Argo CD/Events
بنية سير العمل الأساسية
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ml-pipeline-
namespace: ml-research
spec:
entrypoint: ml-pipeline
serviceAccountName: argo-workflow
# معاملات مستوى سير العمل
arguments:
parameters:
- name: data-url
value: "s3://bucket/data.csv"
- name: epochs
value: "100"
# مطالبات الحجم للقطع الأثرية
volumeClaimTemplates:
- metadata:
name: workdir
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
templates:
- name: ml-pipeline
dag:
tasks:
- name: preprocess
template: preprocess-data
arguments:
parameters:
- name: input
value: "{{workflow.parameters.data-url}}"
- name: train
dependencies: [preprocess]
template: train-model
arguments:
parameters:
- name: epochs
value: "{{workflow.parameters.epochs}}"
artifacts:
- name: data
from: "{{tasks.preprocess.outputs.artifacts.processed-data}}"
- name: evaluate
dependencies: [train]
template: evaluate-model
arguments:
artifacts:
- name: model
from: "{{tasks.train.outputs.artifacts.model}}"
# تعريفات القالب
- name: preprocess-data
inputs:
parameters:
- name: input
outputs:
artifacts:
- name: processed-data
path: /data/processed
container:
image: python:3.11
command: [python, preprocess.py]
args: ["--input", "{{inputs.parameters.input}}"]
volumeMounts:
- name: workdir
mountPath: /data
- name: train-model
inputs:
parameters:
- name: epochs
artifacts:
- name: data
path: /data
outputs:
artifacts:
- name: model
path: /models/trained
container:
image: pytorch/pytorch:2.1-cuda12.1
command: [python, train.py]
args: ["--epochs", "{{inputs.parameters.epochs}}"]
resources:
limits:
nvidia.com/gpu: 2
volumeMounts:
- name: workdir
mountPath: /data
أنماط متقدمة
التوزيع/الجمع المتوازي
templates:
- name: parallel-training
dag:
tasks:
- name: train-models
template: train-single
arguments:
parameters:
- name: model-type
value: "{{item}}"
withItems:
- "resnet50"
- "efficientnet"
- "vit"
- name: ensemble
dependencies: [train-models]
template: create-ensemble
arguments:
artifacts:
- name: models
from: "{{tasks.train-models.outputs.artifacts.model}}"
التنفيذ الشرطي
templates:
- name: conditional-deploy
dag:
tasks:
- name: evaluate
template: evaluate-model
- name: deploy-prod
dependencies: [evaluate]
when: "{{tasks.evaluate.outputs.parameters.accuracy}} > 0.95"
template: deploy-to-production
- name: notify-failure
dependencies: [evaluate]
when: "{{tasks.evaluate.outputs.parameters.accuracy}} <= 0.95"
template: send-notification
إعادة المحاولة والمهلة
templates:
- name: train-with-retry
retryStrategy:
limit: 3
retryPolicy: "OnFailure"
backoff:
duration: "1m"
factor: 2
maxDuration: "10m"
timeout: "4h"
container:
image: pytorch/pytorch:2.1-cuda12.1
command: [python, train.py]
resources:
limits:
nvidia.com/gpu: 4
إدارة موارد GPU
templates:
- name: gpu-training
nodeSelector:
nvidia.com/gpu.product: NVIDIA-A100-SXM4-80GB
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
container:
image: nvcr.io/nvidia/pytorch:24.01-py3
command: [python, train.py]
resources:
requests:
nvidia.com/gpu: 4
memory: "64Gi"
cpu: "16"
limits:
nvidia.com/gpu: 4
memory: "128Gi"
cpu: "32"
الخطوط المدفوعة بالأحداث
# Argo Events: تشغيل إعادة التدريب على بيانات جديدة
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: retrain-trigger
spec:
dependencies:
- name: s3-new-data
eventSourceName: s3-events
eventName: new-training-data
triggers:
- template:
name: trigger-retraining
argoWorkflow:
operation: submit
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retrain-
spec:
entrypoint: ml-pipeline
arguments:
parameters:
- name: data-url
value: "{{inputs.parameters.s3-key}}"
مراقبة سير العمل
# عرض قائمة سير العمل
argo list -n ml-research
# مراقبة تقدم سير العمل
argo watch ml-pipeline-xyz -n ml-research
# الحصول على سجلات سير العمل
argo logs ml-pipeline-xyz -n ml-research
# إعادة تقديم سير عمل فاشل
argo resubmit ml-pipeline-xyz -n ml-research
الوحدة التالية: خدمة النماذج والاستدلال مع KServe وTriton للنشر الإنتاجي. :::