تنسيق سير عمل التعلم الآلي

Airflow للتعلم الآلي

4 دقيقة للقراءة

Apache Airflow هو المعيار الصناعي لتنسيق سير العمل. بُني أصلاً لهندسة البيانات، ويُستخدم على نطاق واسع لخطوط أنابيب ML أيضاً.

لماذا Airflow لـ ML؟

القوة الوصف
نظام بيئي ناضج 10+ سنوات، مجتمع ضخم
تكاملات غنية AWS، GCP، Kubernetes، Spark
جدولة مرنة Cron، حساسات، محفزات
مُختبر في المعارك تستخدمه Airbnb، Uber، Netflix

المفاهيم الأساسية

DAG (رسم بياني موجه غير دوري)

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    "owner": "ml-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    dag_id="ml_training_pipeline",
    default_args=default_args,
    description="خط أنابيب تدريب ML يومي",
    schedule="0 2 * * *",  # شغّل الساعة 2 صباحاً يومياً
    start_date=datetime(2025, 1, 1),
    catchup=False,
)

المشغلات

المشغلات تُعرف ماذا تفعل المهمة:

المشغل حالة الاستخدام
PythonOperator تشغيل دوال Python
BashOperator تنفيذ أوامر shell
KubernetesPodOperator تشغيل حاويات على K8s
S3ToRedshiftOperator نقل البيانات

المهام والتبعيات

from airflow.operators.python import PythonOperator

def preprocess_data():
    # منطق المعالجة المسبقة
    pass

def train_model():
    # منطق التدريب
    pass

def evaluate_model():
    # منطق التقييم
    pass

# عرّف المهام
preprocess = PythonOperator(
    task_id="preprocess_data",
    python_callable=preprocess_data,
    dag=dag,
)

train = PythonOperator(
    task_id="train_model",
    python_callable=train_model,
    dag=dag,
)

evaluate = PythonOperator(
    task_id="evaluate_model",
    python_callable=evaluate_model,
    dag=dag,
)

# عرّف التبعيات
preprocess >> train >> evaluate

مثال خط أنابيب ML كامل

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from datetime import datetime, timedelta
import json

default_args = {
    "owner": "ml-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["ml-team@company.com"],
}


def load_data(**context):
    """تحميل والتحقق من البيانات."""
    import pandas as pd

    df = pd.read_csv("s3://bucket/raw/data.csv")

    # التحقق
    assert len(df) > 1000, "بيانات غير كافية"
    assert df.isnull().sum().sum() < 100, "قيم فارغة كثيرة جداً"

    # احفظ المُعالج
    df.to_parquet("/tmp/processed.parquet")

    # مرر البيانات الوصفية للمهمة التالية
    context["ti"].xcom_push(key="row_count", value=len(df))


def train_model(**context):
    """تدريب نموذج ML."""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib

    df = pd.read_parquet("/tmp/processed.parquet")
    X = df.drop("target", axis=1)
    y = df["target"]

    model = RandomForestClassifier(n_estimators=100)
    model.fit(X, y)

    joblib.dump(model, "/tmp/model.pkl")

    # احصل على البيانات من المهمة السابقة
    row_count = context["ti"].xcom_pull(
        task_ids="load_data",
        key="row_count"
    )
    print(f"تدربت على {row_count} صف")


def evaluate_model(**context):
    """تقييم أداء النموذج."""
    import pandas as pd
    from sklearn.metrics import accuracy_score
    import joblib

    df = pd.read_parquet("/tmp/processed.parquet")
    model = joblib.load("/tmp/model.pkl")

    X = df.drop("target", axis=1)
    y = df["target"]
    predictions = model.predict(X)

    accuracy = accuracy_score(y, predictions)

    # خزّن المقاييس
    metrics = {"accuracy": accuracy, "timestamp": str(datetime.now())}
    context["ti"].xcom_push(key="metrics", value=metrics)

    # افشل إذا الدقة منخفضة جداً
    if accuracy < 0.8:
        raise ValueError(f"الدقة {accuracy} أقل من العتبة 0.8")


def deploy_model(**context):
    """انشر النموذج إذا نجح التقييم."""
    import shutil

    metrics = context["ti"].xcom_pull(
        task_ids="evaluate_model",
        key="metrics"
    )

    print(f"نشر النموذج بدقة: {metrics['accuracy']}")
    shutil.copy("/tmp/model.pkl", "/models/production/model.pkl")


with DAG(
    dag_id="ml_training_pipeline",
    default_args=default_args,
    description="تدريب ML من البداية للنهاية",
    schedule="0 2 * * *",
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["ml", "training"],
) as dag:

    load_task = PythonOperator(
        task_id="load_data",
        python_callable=load_data,
    )

    train_task = PythonOperator(
        task_id="train_model",
        python_callable=train_model,
    )

    evaluate_task = PythonOperator(
        task_id="evaluate_model",
        python_callable=evaluate_model,
    )

    deploy_task = PythonOperator(
        task_id="deploy_model",
        python_callable=deploy_model,
    )

    # خط الأنابيب: تحميل -> تدريب -> تقييم -> نشر
    load_task >> train_task >> evaluate_task >> deploy_task

XCom: تمرير البيانات بين المهام

# ادفع البيانات
def task_a(**context):
    result = {"accuracy": 0.95, "model_path": "/tmp/model.pkl"}
    context["ti"].xcom_push(key="result", value=result)

# اسحب البيانات
def task_b(**context):
    result = context["ti"].xcom_pull(task_ids="task_a", key="result")
    print(f"الدقة: {result['accuracy']}")

ملاحظة: XCom للبيانات الصغيرة (< 48KB). للبيانات الكبيرة، استخدم تخزين خارجي (S3، GCS).

الحساسات: انتظر الشروط

from airflow.sensors.s3_key_sensor import S3KeySensor

wait_for_data = S3KeySensor(
    task_id="wait_for_data",
    bucket_name="my-bucket",
    bucket_key="data/daily/{{ ds }}/data.csv",
    timeout=3600,  # ساعة واحدة
    poke_interval=60,  # تحقق كل دقيقة
    dag=dag,
)

wait_for_data >> load_task

TaskFlow API (الصيغة الحديثة)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="ml_pipeline_taskflow",
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
)
def ml_pipeline():

    @task
    def load_data():
        return {"data_path": "/tmp/data.parquet"}

    @task
    def train_model(data_info: dict):
        return {"model_path": "/tmp/model.pkl"}

    @task
    def evaluate(model_info: dict):
        return {"accuracy": 0.92}

    # التبعيات تُستنتج من استدعاءات الدوال
    data = load_data()
    model = train_model(data)
    metrics = evaluate(model)

ml_pipeline()

أفضل الممارسات

الممارسة لماذا
استخدم TaskFlow API كود أنظف، تلميحات نوع أفضل
أبقِ المهام ذرية إعادة محاولة وتصحيح أسهل
خارج الحوسبة الثقيلة استخدم K8s، Spark للوظائف الكبيرة
استخدم المجمعات تحكم في استخدام الموارد
اختبر DAGs محلياً airflow dags test dag_id

Airflow مقابل أدوات أخرى

الميزة Airflow Kubeflow Prefect
الأفضل لـ البيانات + ML ML على K8s Python الحديث
الجدولة ممتازة أساسية جيدة
أصلي لـ K8s لا نعم لا
منحنى التعلم متوسط عالٍ منخفض
الواجهة جيدة جيدة ممتازة

الأنماط الشائعة

التفرع

from airflow.operators.python import BranchPythonOperator

def choose_branch(**context):
    accuracy = context["ti"].xcom_pull(task_ids="evaluate")["accuracy"]
    if accuracy > 0.9:
        return "deploy_production"
    else:
        return "deploy_staging"

branch = BranchPythonOperator(
    task_id="choose_deployment",
    python_callable=choose_branch,
)

تعيين المهام الديناميكي

@task
def process_partition(partition_id: int):
    # معالجة قسم واحد
    pass

@dag
def dynamic_pipeline():
    partitions = list(range(10))
    process_partition.expand(partition_id=partitions)

الرؤية الرئيسية: Airflow يتفوق عندما تحتاج جدولة قوية ومراقبة وتكامل مع بنية البيانات التحتية. لسير عمل ML الخالص، فكر في Kubeflow أو Prefect.

التالي، سنستكشف Prefect كبديل حديث لسير عمل Python الأصلي. :::

اختبار

الوحدة 3: تنسيق سير عمل التعلم الآلي

خذ الاختبار