تنسيق سير عمل التعلم الآلي
Airflow للتعلم الآلي
4 دقيقة للقراءة
Apache Airflow هو المعيار الصناعي لتنسيق سير العمل. بُني أصلاً لهندسة البيانات، ويُستخدم على نطاق واسع لخطوط أنابيب ML أيضاً.
لماذا Airflow لـ ML؟
| القوة | الوصف |
|---|---|
| نظام بيئي ناضج | 10+ سنوات، مجتمع ضخم |
| تكاملات غنية | AWS، GCP، Kubernetes، Spark |
| جدولة مرنة | Cron، حساسات، محفزات |
| مُختبر في المعارك | تستخدمه Airbnb، Uber، Netflix |
المفاهيم الأساسية
DAG (رسم بياني موجه غير دوري)
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
"owner": "ml-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
dag_id="ml_training_pipeline",
default_args=default_args,
description="خط أنابيب تدريب ML يومي",
schedule="0 2 * * *", # شغّل الساعة 2 صباحاً يومياً
start_date=datetime(2025, 1, 1),
catchup=False,
)
المشغلات
المشغلات تُعرف ماذا تفعل المهمة:
| المشغل | حالة الاستخدام |
|---|---|
PythonOperator |
تشغيل دوال Python |
BashOperator |
تنفيذ أوامر shell |
KubernetesPodOperator |
تشغيل حاويات على K8s |
S3ToRedshiftOperator |
نقل البيانات |
المهام والتبعيات
from airflow.operators.python import PythonOperator
def preprocess_data():
# منطق المعالجة المسبقة
pass
def train_model():
# منطق التدريب
pass
def evaluate_model():
# منطق التقييم
pass
# عرّف المهام
preprocess = PythonOperator(
task_id="preprocess_data",
python_callable=preprocess_data,
dag=dag,
)
train = PythonOperator(
task_id="train_model",
python_callable=train_model,
dag=dag,
)
evaluate = PythonOperator(
task_id="evaluate_model",
python_callable=evaluate_model,
dag=dag,
)
# عرّف التبعيات
preprocess >> train >> evaluate
مثال خط أنابيب ML كامل
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from datetime import datetime, timedelta
import json
default_args = {
"owner": "ml-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["ml-team@company.com"],
}
def load_data(**context):
"""تحميل والتحقق من البيانات."""
import pandas as pd
df = pd.read_csv("s3://bucket/raw/data.csv")
# التحقق
assert len(df) > 1000, "بيانات غير كافية"
assert df.isnull().sum().sum() < 100, "قيم فارغة كثيرة جداً"
# احفظ المُعالج
df.to_parquet("/tmp/processed.parquet")
# مرر البيانات الوصفية للمهمة التالية
context["ti"].xcom_push(key="row_count", value=len(df))
def train_model(**context):
"""تدريب نموذج ML."""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
df = pd.read_parquet("/tmp/processed.parquet")
X = df.drop("target", axis=1)
y = df["target"]
model = RandomForestClassifier(n_estimators=100)
model.fit(X, y)
joblib.dump(model, "/tmp/model.pkl")
# احصل على البيانات من المهمة السابقة
row_count = context["ti"].xcom_pull(
task_ids="load_data",
key="row_count"
)
print(f"تدربت على {row_count} صف")
def evaluate_model(**context):
"""تقييم أداء النموذج."""
import pandas as pd
from sklearn.metrics import accuracy_score
import joblib
df = pd.read_parquet("/tmp/processed.parquet")
model = joblib.load("/tmp/model.pkl")
X = df.drop("target", axis=1)
y = df["target"]
predictions = model.predict(X)
accuracy = accuracy_score(y, predictions)
# خزّن المقاييس
metrics = {"accuracy": accuracy, "timestamp": str(datetime.now())}
context["ti"].xcom_push(key="metrics", value=metrics)
# افشل إذا الدقة منخفضة جداً
if accuracy < 0.8:
raise ValueError(f"الدقة {accuracy} أقل من العتبة 0.8")
def deploy_model(**context):
"""انشر النموذج إذا نجح التقييم."""
import shutil
metrics = context["ti"].xcom_pull(
task_ids="evaluate_model",
key="metrics"
)
print(f"نشر النموذج بدقة: {metrics['accuracy']}")
shutil.copy("/tmp/model.pkl", "/models/production/model.pkl")
with DAG(
dag_id="ml_training_pipeline",
default_args=default_args,
description="تدريب ML من البداية للنهاية",
schedule="0 2 * * *",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["ml", "training"],
) as dag:
load_task = PythonOperator(
task_id="load_data",
python_callable=load_data,
)
train_task = PythonOperator(
task_id="train_model",
python_callable=train_model,
)
evaluate_task = PythonOperator(
task_id="evaluate_model",
python_callable=evaluate_model,
)
deploy_task = PythonOperator(
task_id="deploy_model",
python_callable=deploy_model,
)
# خط الأنابيب: تحميل -> تدريب -> تقييم -> نشر
load_task >> train_task >> evaluate_task >> deploy_task
XCom: تمرير البيانات بين المهام
# ادفع البيانات
def task_a(**context):
result = {"accuracy": 0.95, "model_path": "/tmp/model.pkl"}
context["ti"].xcom_push(key="result", value=result)
# اسحب البيانات
def task_b(**context):
result = context["ti"].xcom_pull(task_ids="task_a", key="result")
print(f"الدقة: {result['accuracy']}")
ملاحظة: XCom للبيانات الصغيرة (< 48KB). للبيانات الكبيرة، استخدم تخزين خارجي (S3، GCS).
الحساسات: انتظر الشروط
from airflow.sensors.s3_key_sensor import S3KeySensor
wait_for_data = S3KeySensor(
task_id="wait_for_data",
bucket_name="my-bucket",
bucket_key="data/daily/{{ ds }}/data.csv",
timeout=3600, # ساعة واحدة
poke_interval=60, # تحقق كل دقيقة
dag=dag,
)
wait_for_data >> load_task
TaskFlow API (الصيغة الحديثة)
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="ml_pipeline_taskflow",
schedule="@daily",
start_date=datetime(2025, 1, 1),
catchup=False,
)
def ml_pipeline():
@task
def load_data():
return {"data_path": "/tmp/data.parquet"}
@task
def train_model(data_info: dict):
return {"model_path": "/tmp/model.pkl"}
@task
def evaluate(model_info: dict):
return {"accuracy": 0.92}
# التبعيات تُستنتج من استدعاءات الدوال
data = load_data()
model = train_model(data)
metrics = evaluate(model)
ml_pipeline()
أفضل الممارسات
| الممارسة | لماذا |
|---|---|
| استخدم TaskFlow API | كود أنظف، تلميحات نوع أفضل |
| أبقِ المهام ذرية | إعادة محاولة وتصحيح أسهل |
| خارج الحوسبة الثقيلة | استخدم K8s، Spark للوظائف الكبيرة |
| استخدم المجمعات | تحكم في استخدام الموارد |
| اختبر DAGs محلياً | airflow dags test dag_id |
Airflow مقابل أدوات أخرى
| الميزة | Airflow | Kubeflow | Prefect |
|---|---|---|---|
| الأفضل لـ | البيانات + ML | ML على K8s | Python الحديث |
| الجدولة | ممتازة | أساسية | جيدة |
| أصلي لـ K8s | لا | نعم | لا |
| منحنى التعلم | متوسط | عالٍ | منخفض |
| الواجهة | جيدة | جيدة | ممتازة |
الأنماط الشائعة
التفرع
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
accuracy = context["ti"].xcom_pull(task_ids="evaluate")["accuracy"]
if accuracy > 0.9:
return "deploy_production"
else:
return "deploy_staging"
branch = BranchPythonOperator(
task_id="choose_deployment",
python_callable=choose_branch,
)
تعيين المهام الديناميكي
@task
def process_partition(partition_id: int):
# معالجة قسم واحد
pass
@dag
def dynamic_pipeline():
partitions = list(range(10))
process_partition.expand(partition_id=partitions)
الرؤية الرئيسية: Airflow يتفوق عندما تحتاج جدولة قوية ومراقبة وتكامل مع بنية البيانات التحتية. لسير عمل ML الخالص، فكر في Kubeflow أو Prefect.
التالي، سنستكشف Prefect كبديل حديث لسير عمل Python الأصلي. :::