خطوط أنابيب ML والتنسيق

Airflow مقابل Kubeflow مقابل Prefect

5 دقيقة للقراءة

اختيار المنسق الصحيح موضوع مقابلة شائع. اعرف المفاضلات لإظهار التفكير المعماري.

مقارنة الميزات

الميزة Airflow Kubeflow Pipelines Prefect
الاستخدام الأساسي سير عمل عام خاص بـ ML سير عمل حديث
البنية التحتية أي (VM، K8s) Kubernetes فقط أي (سحابة هجينة)
منحنى التعلم متوسط عالي منخفض
DAGs الديناميكية محدود أصلي أصلي
تكاملات ML عبر operators أصلي (TFX، KServe) عبر tasks
التوسع Celery/K8s executor K8s أصلي وكلاء هجين
المجتمع الأكبر ينمو ينمو

سؤال المقابلة: أي منسق؟

السؤال: "أنت تبني خطوط أنابيب ML لفريق من 10 مهندسي ML. أي منسق ستختار ولماذا؟"

إطار الإجابة:

def choose_orchestrator(context):
    # Airflow: الأفضل لأحمال العمل المختلطة
    if context["team_has_existing_airflow"]:
        return "Airflow - الاستفادة من الخبرة الموجودة"

    if context["workflows"] == "general_etl_plus_ml":
        return "Airflow - الأكثر تنوعاً، أكبر مجتمع"

    # Kubeflow: الأفضل للفرق المتخصصة في ML
    if context["infrastructure"] == "kubernetes_only":
        if context["ml_heavy"] and context["team_familiar_with_k8s"]:
            return "Kubeflow - ميزات ML أصلية، أصلي K8s"

    # Prefect: الأفضل للفرق الحديثة
    if context["wants_modern_developer_experience"]:
        if context["needs_dynamic_workflows"]:
            return "Prefect - DAGs ديناميكية، اختبار أفضل"

    return "Airflow - الخيار الافتراضي الأكثر أماناً"

تعمق في Airflow

متى تستخدم: التنسيق للأغراض العامة، خطوط أنابيب البيانات + ML المختلطة

# Airflow 2.x TaskFlow API
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator
)

@dag(schedule="@daily", catchup=False)
def ml_training_dag():
    @task
    def prepare_data():
        # يعمل على عامل Airflow
        return {"dataset_path": "s3://bucket/training_data"}

    # تدريب GPU على Kubernetes
    train_model = KubernetesPodOperator(
        task_id="train_model",
        name="model-training",
        namespace="ml-pipelines",
        image="training:v1.2",
        arguments=["--data", "{{ ti.xcom_pull('prepare_data') }}"],
        resources={
            "limits": {"nvidia.com/gpu": "1"},
            "requests": {"memory": "8Gi"}
        },
        node_selector={"gpu-type": "a100"}
    )

    data = prepare_data()
    data >> train_model

تعمق في Kubeflow Pipelines

متى تستخدم: الفرق الأصلية لـ Kubernetes، تركيز كبير على ML

from kfp import dsl
from kfp.dsl import component, pipeline, Input, Output, Dataset, Model

@component(base_image="python:3.11")
def preprocess_data(
    raw_data: Input[Dataset],
    processed_data: Output[Dataset]
):
    import pandas as pd
    df = pd.read_csv(raw_data.path)
    df_clean = df.dropna()
    df_clean.to_csv(processed_data.path)

@component(base_image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime")
def train_model(
    training_data: Input[Dataset],
    model_artifact: Output[Model]
):
    # منطق التدريب
    pass

@pipeline(name="ml-training-pipeline")
def training_pipeline():
    preprocess = preprocess_data(raw_data=...)
    train = train_model(training_data=preprocess.outputs["processed_data"])

نقطة المقابلة: Kubeflow Pipelines v2 يستخدم IR (التمثيل الوسيط) لقابلية النقل بين المنصات.

تعمق في Prefect

متى تستخدم: الفرق الحديثة، سير العمل الديناميكي، السحابة الهجينة

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=[60, 120, 300],  # التراجع الأسي
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def fetch_training_data(dataset_id: str):
    # مخزن مؤقتاً لمدة ساعة
    return load_dataset(dataset_id)

@task
def train_model(data, hyperparams: dict):
    # منطق التدريب
    pass

@flow(name="dynamic-training-flow")
def training_flow(model_configs: list[dict]):
    # ديناميكي: عدد مختلف من المهام لكل تشغيل
    for config in model_configs:
        data = fetch_training_data(config["dataset_id"])
        train_model(data, config["hyperparams"])

مميزات Prefect:

  • Python أصلي - لا حاجة لـ decorators لتبعيات المهام
  • دعم أصلي للتخزين المؤقت وإعادة المحاولة
  • اختبار أسهل (فقط استدعاء الدوال)

نصيحة احترافية: في المقابلات، اعترف بأن اختيار المنسق غالباً يعتمد على البنية التحتية الموجودة. لا توجد أداة "الأفضل" عالمياً.

في الدرس التالي، سنغطي مخازن الميزات وإصدار البيانات. :::

اختبار

الوحدة 3: خطوط أنابيب ML والتنسيق

خذ الاختبار