Kubeflow وخطوط ML

Kubeflow Pipelines: بناء سير عمل ML

4 دقيقة للقراءة

Kubeflow Pipelines (KFP) يمكّن سير عمل ML القابلة للتكرار والنقل. المنظمات تبلغ عن تحسن 60% في إعادة استخدام الخطوط وانخفاض 50% في أخطاء النشر بعد اعتماد KFP.

بنية الخط

كيف يعمل KFP

┌─────────────────────────────────────────────────────────────────┐
│                    Kubeflow Pipelines                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1. تعريف الخط (Python SDK)                                     │
│     ↓                                                           │
│  2. تجميع إلى YAML/IR                                           │
│     ↓                                                           │
│  3. تقديم إلى KFP API Server                                    │
│     ↓                                                           │
│  4. Argo Workflows ينفذ الخطوات                                 │
│     ↓                                                           │
│  5. القطع الأثرية مخزنة في MinIO/S3                             │
│     ↓                                                           │
│  6. البيانات الوصفية متتبعة في MLMD                             │
│                                                                  │
│  ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐        │
│  │  تحميل  │ → │ معالجة │ → │  تدريب  │ → │  نشر   │        │
│  │ البيانات│   │ البيانات│   │ النموذج │   │ النموذج │        │
│  └─────────┘   └─────────┘   └─────────┘   └─────────┘        │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

KFP V2 SDK (المعيار الحالي)

تثبيت KFP SDK

# تثبيت KFP SDK v2
pip install kfp==2.7.0

# التحقق من التثبيت
python -c "import kfp; print(kfp.__version__)"

بنية الخط الأساسية

from kfp import dsl
from kfp import compiler

# تعريف مكون
@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
    input_path: str,
    output_path: dsl.OutputPath("Dataset")
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_csv(input_path)
    scaler = StandardScaler()
    df_scaled = pd.DataFrame(
        scaler.fit_transform(df),
        columns=df.columns
    )
    df_scaled.to_csv(output_path, index=False)

# تعريف مكون التدريب
@dsl.component(
    base_image="pytorch/pytorch:2.1-cuda12.1-cudnn8-runtime",
    packages_to_install=["scikit-learn"]
)
def train_model(
    data_path: dsl.InputPath("Dataset"),
    model_path: dsl.OutputPath("Model"),
    epochs: int = 100,
    learning_rate: float = 0.001
):
    import torch
    import pandas as pd
    # منطق التدريب هنا
    torch.save(model.state_dict(), model_path)

# تعريف الخط
@dsl.pipeline(
    name="ml-training-pipeline",
    description="خط تدريب ML من البداية للنهاية"
)
def training_pipeline(
    data_url: str,
    epochs: int = 100,
    learning_rate: float = 0.001
):
    preprocess_task = preprocess_data(input_path=data_url)

    train_task = train_model(
        data_path=preprocess_task.outputs["output_path"],
        epochs=epochs,
        learning_rate=learning_rate
    )
    # تعيين موارد GPU
    train_task.set_gpu_limit(1)
    train_task.set_memory_limit("16Gi")
    train_task.set_cpu_limit("4")

# تجميع الخط
compiler.Compiler().compile(
    training_pipeline,
    "training_pipeline.yaml"
)

تشغيل الخطوط

from kfp.client import Client

# الاتصال بـ KFP
client = Client(host="http://localhost:8080")

# إنشاء تشغيل
run = client.create_run_from_pipeline_func(
    training_pipeline,
    arguments={
        "data_url": "gs://my-bucket/data.csv",
        "epochs": 100,
        "learning_rate": 0.001
    },
    experiment_name="training-experiments"
)

print(f"Run ID: {run.run_id}")

ميزات الخط المتقدمة

التنفيذ الشرطي

@dsl.pipeline
def conditional_pipeline(accuracy_threshold: float = 0.9):
    train_task = train_model(...)

    with dsl.If(
        train_task.outputs["accuracy"] >= accuracy_threshold,
        name="deploy-if-accurate"
    ):
        deploy_task = deploy_model(
            model=train_task.outputs["model"]
        )

    with dsl.Else():
        notify_task = send_notification(
            message="دقة النموذج أقل من العتبة"
        )

التنفيذ المتوازي

@dsl.pipeline
def parallel_training():
    # التوزيع: تدريب نماذج متعددة بالتوازي
    with dsl.ParallelFor(
        items=["model_a", "model_b", "model_c"]
    ) as model_type:
        train_task = train_model(model_type=model_type)

    # الجمع: تجميع النماذج
    ensemble_task = create_ensemble(
        models=[train_task.outputs["model"]]
    )

التخزين المؤقت وإعادة المحاولة

@dsl.component
def expensive_computation(...):
    ...

@dsl.pipeline
def pipeline_with_caching():
    task = expensive_computation(...)

    # تمكين التخزين المؤقت (تخطي إذا نفس المدخلات)
    task.set_caching_options(enable_caching=True)

    # تكوين سياسة إعادة المحاولة
    task.set_retry(
        num_retries=3,
        backoff_duration="60s",
        backoff_factor=2.0
    )

مكونات ممكّنة بـ GPU

مكون التدريب مع GPU

@dsl.component(
    base_image="nvcr.io/nvidia/pytorch:24.01-py3"
)
def gpu_training(
    data_path: dsl.InputPath("Dataset"),
    model_path: dsl.OutputPath("Model"),
    epochs: int
):
    import torch

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"التدريب على {device}")

    # تدريب Multi-GPU مع DDP
    if torch.cuda.device_count() > 1:
        model = torch.nn.DataParallel(model)

    # حلقة التدريب...

@dsl.pipeline
def gpu_pipeline():
    train_task = gpu_training(...)

    # طلب موارد GPU
    train_task.set_accelerator_type("nvidia.com/gpu")
    train_task.set_accelerator_limit(4)
    train_task.set_memory_limit("64Gi")

    # اختيار العقدة
    train_task.add_node_selector_constraint(
        "nvidia.com/gpu.product",
        "NVIDIA-A100-SXM4-80GB"
    )

القطع الأثرية والبيانات الوصفية للخط

أنواع القطع الأثرية

from kfp.dsl import Dataset, Model, Metrics, HTML

@dsl.component
def evaluate_model(
    model: dsl.InputPath("Model"),
    test_data: dsl.InputPath("Dataset"),
    metrics: dsl.Output[Metrics],
    report: dsl.Output[HTML]
):
    # تسجيل المقاييس
    metrics.log_metric("accuracy", 0.95)
    metrics.log_metric("f1_score", 0.93)
    metrics.log_metric("auc", 0.97)

    # إنشاء تقرير HTML
    report.write(generate_report_html())

عرض القطع الأثرية في واجهة المستخدم

# القطع الأثرية تظهر تلقائياً في واجهة KFP:
# - المقاييس معروضة كرسوم بيانية
# - HTML مُقدم مباشرة
# - النماذج مرتبطة بالنسب
# - مجموعات البيانات متتبعة للتكرار

التالي، سنستكشف Katib لضبط المعاملات الفائقة الآلي والبحث عن بنية الشبكة العصبية. :::

اختبار

الوحدة 3: Kubeflow وخطوط ML

خذ الاختبار