تنسيق سير عمل التعلم الآلي

خطوط أنابيب Kubeflow

4 دقيقة للقراءة

Kubeflow Pipelines (KFP) هي منصة لبناء ونشر سير عمل ML على Kubernetes. توفر خطوط أنابيب قابلة للتوسع ومحمولة وقابلة لإعادة الإنتاج.

لماذا Kubeflow Pipelines؟

الميزةالفائدة
أصلية لـ Kubernetesاستفد من البنية التحتية الموجودة
قابلة للتوسعشغّل التدريب الموزع بسهولة
محمولةنفس خط الأنابيب يعمل في أي مكان
مُصدرةتتبع كل تشغيل خط أنابيب
واجهة مرئيةراقب وصحح سير العمل

التثبيت

# ثبّت KFP SDK (v2)
pip install kfp

# تحقق من التثبيت
python -c "import kfp; print(kfp.__version__)"

المفاهيم الأساسية

المكونات

المكونات هي اللبنات الأساسية—دوال محتواة:

from kfp import dsl

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
    input_path: str,
    output_path: dsl.OutputPath("csv")
):
    """معالجة البيانات الخام مسبقاً."""
    import pandas as pd

    df = pd.read_csv(input_path)
    df = df.dropna()
    df = df.drop_duplicates()
    df.to_csv(output_path, index=False)

تعريف خط الأنابيب

اربط المكونات معاً:

from kfp import dsl

@dsl.pipeline(
    name="ml-training-pipeline",
    description="خط أنابيب تدريب ML من البداية للنهاية"
)
def training_pipeline(
    data_path: str,
    epochs: int = 100,
    learning_rate: float = 0.001
):
    # الخطوة 1: المعالجة المسبقة
    preprocess_task = preprocess_data(input_path=data_path)

    # الخطوة 2: التدريب (يعتمد على المعالجة المسبقة)
    train_task = train_model(
        data_path=preprocess_task.outputs["output_path"],
        epochs=epochs,
        learning_rate=learning_rate
    )

    # الخطوة 3: التقييم (يعتمد على التدريب)
    evaluate_task = evaluate_model(
        model_path=train_task.outputs["model_path"]
    )

مثال خط أنابيب كامل

تعريف المكونات

from kfp import dsl
from kfp.dsl import Input, Output, Artifact, Model, Metrics

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn"]
)
def load_data(
    source_path: str,
    dataset: Output[Artifact]
):
    """تحميل وتقسيم مجموعة البيانات."""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(source_path)
    train, test = train_test_split(df, test_size=0.2)

    train.to_csv(f"{dataset.path}_train.csv", index=False)
    test.to_csv(f"{dataset.path}_test.csv", index=False)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def train_model(
    dataset: Input[Artifact],
    model: Output[Model],
    n_estimators: int = 100
):
    """تدريب نموذج Random Forest."""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib

    train = pd.read_csv(f"{dataset.path}_train.csv")
    X = train.drop("target", axis=1)
    y = train["target"]

    clf = RandomForestClassifier(n_estimators=n_estimators)
    clf.fit(X, y)

    joblib.dump(clf, model.path)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def evaluate_model(
    dataset: Input[Artifact],
    model: Input[Model],
    metrics: Output[Metrics]
):
    """تقييم أداء النموذج."""
    import pandas as pd
    from sklearn.metrics import accuracy_score, f1_score
    import joblib

    test = pd.read_csv(f"{dataset.path}_test.csv")
    X = test.drop("target", axis=1)
    y = test["target"]

    clf = joblib.load(model.path)
    predictions = clf.predict(X)

    metrics.log_metric("accuracy", accuracy_score(y, predictions))
    metrics.log_metric("f1_score", f1_score(y, predictions, average="weighted"))

تعريف خط الأنابيب

@dsl.pipeline(
    name="training-pipeline",
    description="تدريب وتقييم نموذج ML"
)
def ml_pipeline(
    source_path: str = "gs://my-bucket/data.csv",
    n_estimators: int = 100
):
    load_task = load_data(source_path=source_path)

    train_task = train_model(
        dataset=load_task.outputs["dataset"],
        n_estimators=n_estimators
    )

    evaluate_task = evaluate_model(
        dataset=load_task.outputs["dataset"],
        model=train_task.outputs["model"]
    )

التجميع والتشغيل

from kfp import compiler

# جمّع إلى YAML
compiler.Compiler().compile(
    pipeline_func=ml_pipeline,
    package_path="ml_pipeline.yaml"
)

# أرسل لعنقود KFP
from kfp.client import Client

client = Client(host="http://kubeflow-pipelines-ui:80")
run = client.create_run_from_pipeline_func(
    ml_pipeline,
    arguments={
        "source_path": "gs://my-bucket/data.csv",
        "n_estimators": 200
    }
)

الميزات المتقدمة

التنفيذ الشرطي

from kfp import dsl

@dsl.pipeline
def conditional_pipeline(accuracy_threshold: float = 0.8):
    train_task = train_model()

    with dsl.Condition(
        train_task.outputs["accuracy"] > accuracy_threshold,
        name="accuracy-check"
    ):
        deploy_model(model=train_task.outputs["model"])

التنفيذ المتوازي

@dsl.pipeline
def parallel_pipeline():
    # هذه تعمل بالتوازي
    task_a = process_dataset_a()
    task_b = process_dataset_b()
    task_c = process_dataset_c()

    # هذه تنتظر جميع المهام المتوازية
    merge_task = merge_results(
        result_a=task_a.output,
        result_b=task_b.output,
        result_c=task_c.output
    )

طلبات الموارد

@dsl.component
def gpu_training(data: Input[Artifact], model: Output[Model]):
    # كود التدريب
    pass

# في خط الأنابيب
train_task = gpu_training(data=data_task.output)
train_task.set_cpu_limit("4")
train_task.set_memory_limit("16G")
train_task.set_gpu_limit("1")

بيانات خط الأنابيب الوصفية

تتبع التجارب

# أنشئ تجربة
experiment = client.create_experiment(name="hyperparameter-tuning")

# شغّل مع تتبع التجربة
run = client.create_run_from_pipeline_func(
    ml_pipeline,
    experiment_name="hyperparameter-tuning",
    run_name="lr-0.001-epochs-100"
)

عرض النتائج

واجهة Kubeflow توفر:

  • تصور خط الأنابيب
  • سجل التشغيلات
  • تتبع المخرجات
  • مقارنات المقاييس
  • الوصول للسجلات

أفضل الممارسات

الممارسةلماذا
استخدم مخرجات مُنمطةتتبع أفضل وتكامل واجهة
ثبّت إصدارات الحزمبيئات قابلة لإعادة الإنتاج
أبقِ المكونات صغيرةتخزين مؤقت وتصحيح أسرع
استخدم أسماء ذات معنىخطوط أنابيب أسهل للفهم
عيّن حدود المواردامنع استنفاد الموارد

الاختبار المحلي

اختبر المكونات محلياً قبل النشر:

# اختبر دالة مكون مباشرة
from my_pipeline import preprocess_data

# أنشئ مخرجات وهمية
import tempfile
with tempfile.NamedTemporaryFile(suffix=".csv") as f:
    preprocess_data.python_func(
        input_path="test_data.csv",
        output_path=f.name
    )

الرؤية الرئيسية: Kubeflow Pipelines مثالية للفرق التي تستخدم Kubernetes بالفعل. توفر تنسيق ML بمستوى المؤسسات مع تتبع تجارب وإدارة مخرجات مدمجة.

التالي، سنستكشف Apache Airflow لسير عمل هندسة البيانات وML. :::

مراجعة سريعة: كيف تجد هذا الدرس؟

اختبار

الوحدة 3: تنسيق سير عمل التعلم الآلي

خذ الاختبار
نشرة أسبوعية مجانية

ابقَ على مسار النيرد

بريد واحد أسبوعياً — دورات، مقالات معمّقة، أدوات، وتجارب ذكاء اصطناعي.

بدون إزعاج. إلغاء الاشتراك في أي وقت.