تنسيق سير عمل التعلم الآلي

خطوط أنابيب Kubeflow

4 دقيقة للقراءة

Kubeflow Pipelines (KFP) هي منصة لبناء ونشر سير عمل ML على Kubernetes. توفر خطوط أنابيب قابلة للتوسع ومحمولة وقابلة لإعادة الإنتاج.

لماذا Kubeflow Pipelines؟

الميزة الفائدة
أصلية لـ Kubernetes استفد من البنية التحتية الموجودة
قابلة للتوسع شغّل التدريب الموزع بسهولة
محمولة نفس خط الأنابيب يعمل في أي مكان
مُصدرة تتبع كل تشغيل خط أنابيب
واجهة مرئية راقب وصحح سير العمل

التثبيت

# ثبّت KFP SDK (v2)
pip install kfp

# تحقق من التثبيت
python -c "import kfp; print(kfp.__version__)"

المفاهيم الأساسية

المكونات

المكونات هي اللبنات الأساسية—دوال محتواة:

from kfp import dsl

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
    input_path: str,
    output_path: dsl.OutputPath("csv")
):
    """معالجة البيانات الخام مسبقاً."""
    import pandas as pd

    df = pd.read_csv(input_path)
    df = df.dropna()
    df = df.drop_duplicates()
    df.to_csv(output_path, index=False)

تعريف خط الأنابيب

اربط المكونات معاً:

from kfp import dsl

@dsl.pipeline(
    name="ml-training-pipeline",
    description="خط أنابيب تدريب ML من البداية للنهاية"
)
def training_pipeline(
    data_path: str,
    epochs: int = 100,
    learning_rate: float = 0.001
):
    # الخطوة 1: المعالجة المسبقة
    preprocess_task = preprocess_data(input_path=data_path)

    # الخطوة 2: التدريب (يعتمد على المعالجة المسبقة)
    train_task = train_model(
        data_path=preprocess_task.outputs["output_path"],
        epochs=epochs,
        learning_rate=learning_rate
    )

    # الخطوة 3: التقييم (يعتمد على التدريب)
    evaluate_task = evaluate_model(
        model_path=train_task.outputs["model_path"]
    )

مثال خط أنابيب كامل

تعريف المكونات

from kfp import dsl
from kfp.dsl import Input, Output, Artifact, Model, Metrics

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn"]
)
def load_data(
    source_path: str,
    dataset: Output[Artifact]
):
    """تحميل وتقسيم مجموعة البيانات."""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(source_path)
    train, test = train_test_split(df, test_size=0.2)

    train.to_csv(f"{dataset.path}_train.csv", index=False)
    test.to_csv(f"{dataset.path}_test.csv", index=False)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def train_model(
    dataset: Input[Artifact],
    model: Output[Model],
    n_estimators: int = 100
):
    """تدريب نموذج Random Forest."""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib

    train = pd.read_csv(f"{dataset.path}_train.csv")
    X = train.drop("target", axis=1)
    y = train["target"]

    clf = RandomForestClassifier(n_estimators=n_estimators)
    clf.fit(X, y)

    joblib.dump(clf, model.path)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def evaluate_model(
    dataset: Input[Artifact],
    model: Input[Model],
    metrics: Output[Metrics]
):
    """تقييم أداء النموذج."""
    import pandas as pd
    from sklearn.metrics import accuracy_score, f1_score
    import joblib

    test = pd.read_csv(f"{dataset.path}_test.csv")
    X = test.drop("target", axis=1)
    y = test["target"]

    clf = joblib.load(model.path)
    predictions = clf.predict(X)

    metrics.log_metric("accuracy", accuracy_score(y, predictions))
    metrics.log_metric("f1_score", f1_score(y, predictions, average="weighted"))

تعريف خط الأنابيب

@dsl.pipeline(
    name="training-pipeline",
    description="تدريب وتقييم نموذج ML"
)
def ml_pipeline(
    source_path: str = "gs://my-bucket/data.csv",
    n_estimators: int = 100
):
    load_task = load_data(source_path=source_path)

    train_task = train_model(
        dataset=load_task.outputs["dataset"],
        n_estimators=n_estimators
    )

    evaluate_task = evaluate_model(
        dataset=load_task.outputs["dataset"],
        model=train_task.outputs["model"]
    )

التجميع والتشغيل

from kfp import compiler

# جمّع إلى YAML
compiler.Compiler().compile(
    pipeline_func=ml_pipeline,
    package_path="ml_pipeline.yaml"
)

# أرسل لعنقود KFP
from kfp.client import Client

client = Client(host="http://kubeflow-pipelines-ui:80")
run = client.create_run_from_pipeline_func(
    ml_pipeline,
    arguments={
        "source_path": "gs://my-bucket/data.csv",
        "n_estimators": 200
    }
)

الميزات المتقدمة

التنفيذ الشرطي

from kfp import dsl

@dsl.pipeline
def conditional_pipeline(accuracy_threshold: float = 0.8):
    train_task = train_model()

    with dsl.Condition(
        train_task.outputs["accuracy"] > accuracy_threshold,
        name="accuracy-check"
    ):
        deploy_model(model=train_task.outputs["model"])

التنفيذ المتوازي

@dsl.pipeline
def parallel_pipeline():
    # هذه تعمل بالتوازي
    task_a = process_dataset_a()
    task_b = process_dataset_b()
    task_c = process_dataset_c()

    # هذه تنتظر جميع المهام المتوازية
    merge_task = merge_results(
        result_a=task_a.output,
        result_b=task_b.output,
        result_c=task_c.output
    )

طلبات الموارد

@dsl.component
def gpu_training(data: Input[Artifact], model: Output[Model]):
    # كود التدريب
    pass

# في خط الأنابيب
train_task = gpu_training(data=data_task.output)
train_task.set_cpu_limit("4")
train_task.set_memory_limit("16G")
train_task.set_gpu_limit("1")

بيانات خط الأنابيب الوصفية

تتبع التجارب

# أنشئ تجربة
experiment = client.create_experiment(name="hyperparameter-tuning")

# شغّل مع تتبع التجربة
run = client.create_run_from_pipeline_func(
    ml_pipeline,
    experiment_name="hyperparameter-tuning",
    run_name="lr-0.001-epochs-100"
)

عرض النتائج

واجهة Kubeflow توفر:

  • تصور خط الأنابيب
  • سجل التشغيلات
  • تتبع المخرجات
  • مقارنات المقاييس
  • الوصول للسجلات

أفضل الممارسات

الممارسة لماذا
استخدم مخرجات مُنمطة تتبع أفضل وتكامل واجهة
ثبّت إصدارات الحزم بيئات قابلة لإعادة الإنتاج
أبقِ المكونات صغيرة تخزين مؤقت وتصحيح أسرع
استخدم أسماء ذات معنى خطوط أنابيب أسهل للفهم
عيّن حدود الموارد امنع استنفاد الموارد

الاختبار المحلي

اختبر المكونات محلياً قبل النشر:

# اختبر دالة مكون مباشرة
from my_pipeline import preprocess_data

# أنشئ مخرجات وهمية
import tempfile
with tempfile.NamedTemporaryFile(suffix=".csv") as f:
    preprocess_data.python_func(
        input_path="test_data.csv",
        output_path=f.name
    )

الرؤية الرئيسية: Kubeflow Pipelines مثالية للفرق التي تستخدم Kubernetes بالفعل. توفر تنسيق ML بمستوى المؤسسات مع تتبع تجارب وإدارة مخرجات مدمجة.

التالي، سنستكشف Apache Airflow لسير عمل هندسة البيانات وML. :::

اختبار

الوحدة 3: تنسيق سير عمل التعلم الآلي

خذ الاختبار