تنسيق سير عمل التعلم الآلي

Prefect والبدائل

3 دقيقة للقراءة

Prefect تُقدم نهجاً حديثاً وأصلياً لـ Python لتنسيق سير العمل. لنستكشفها ونُقارن مع البدائل الأخرى.

لماذا Prefect؟

الميزة الفائدة
أصلية لـ Python فقط مُزخرفات، لا ملفات تكوين
سير عمل ديناميكي ابنِ DAGs وقت التشغيل
واجهة حديثة لوحة تحكم جميلة
تنفيذ هجين محلي أو سحابي
تصحيح سهل شغّل التدفقات كـ Python عادي

التثبيت

# ثبّت Prefect
pip install prefect

# تحقق من التثبيت
prefect version

المفاهيم الأساسية

التدفقات والمهام

from prefect import flow, task

@task
def load_data(path: str) -> dict:
    """تحميل البيانات من المسار."""
    import pandas as pd
    df = pd.read_csv(path)
    return {"data": df, "rows": len(df)}

@task
def train_model(data: dict) -> str:
    """تدريب نموذج ML."""
    from sklearn.ensemble import RandomForestClassifier
    import joblib

    df = data["data"]
    X = df.drop("target", axis=1)
    y = df["target"]

    model = RandomForestClassifier()
    model.fit(X, y)

    model_path = "/tmp/model.pkl"
    joblib.dump(model, model_path)
    return model_path

@task
def evaluate_model(model_path: str, data: dict) -> float:
    """تقييم دقة النموذج."""
    import joblib
    from sklearn.metrics import accuracy_score

    model = joblib.load(model_path)
    df = data["data"]
    X = df.drop("target", axis=1)
    y = df["target"]

    predictions = model.predict(X)
    return accuracy_score(y, predictions)

@flow(name="ML Training Pipeline")
def ml_pipeline(data_path: str):
    """تدفق تدريب ML من البداية للنهاية."""
    data = load_data(data_path)
    model_path = train_model(data)
    accuracy = evaluate_model(model_path, data)

    print(f"دقة النموذج: {accuracy:.2%}")
    return accuracy

# شغّل التدفق
if __name__ == "__main__":
    ml_pipeline("data/train.csv")

الميزات المتقدمة

إعادة المحاولة والتخزين المؤقت

from prefect import task, flow
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def expensive_computation(data: dict) -> dict:
    """حساب مُخزن مؤقتاً مع إعادة المحاولة."""
    # هذه النتيجة ستُخزن مؤقتاً لساعة واحدة
    return process_data(data)

التنفيذ المتوازي

from prefect import flow, task

@task
def process_partition(partition_id: int) -> dict:
    return {"partition": partition_id, "result": partition_id * 2}

@flow
def parallel_flow():
    # أرسل المهام بالتوازي
    futures = [process_partition.submit(i) for i in range(10)]

    # انتظر جميع النتائج
    results = [f.result() for f in futures]
    return results

المنطق الشرطي

@flow
def conditional_flow(accuracy: float):
    if accuracy > 0.9:
        deploy_to_production()
    elif accuracy > 0.8:
        deploy_to_staging()
    else:
        notify_team("دقة النموذج منخفضة جداً")

جدولة النشر

from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

@flow
def daily_training():
    # منطق التدريب
    pass

# أنشئ النشر
deployment = Deployment.build_from_flow(
    flow=daily_training,
    name="daily-training",
    schedule=CronSchedule(cron="0 2 * * *"),  # 2 صباحاً يومياً
    work_queue_name="ml-queue"
)

deployment.apply()

التشغيل مع Prefect Cloud/Server

# ابدأ الخادم المحلي
prefect server start

# أنشئ النشر عبر CLI
prefect deployment build ./pipeline.py:ml_pipeline -n "ml-training" -q "ml-queue"
prefect deployment apply ml_pipeline-deployment.yaml

# ابدأ العامل
prefect worker start -q "ml-queue"

المقارنة: المنسقات الحديثة

Prefect مقابل Airflow

الجانب Prefect Airflow
الصيغة مُزخرفات Python Python + تكوين
DAGs ديناميكية أصلية محدودة
اختبار محلي flow() يتطلب إعداد
النشر بسيط معقد
النضج أحدث مُختبرة في المعارك

Prefect مقابل Dagster

الجانب Prefect Dagster
التركيز تنسيق سير العمل تنسيق البيانات
الأصول مخرجات المهام أصول من الدرجة الأولى
الاختبار pytest قياسي إطار مدمج
الأفضل لـ سير عمل عام خطوط أنابيب البيانات

متى تستخدم ماذا

الأداة الأفضل لـ
Prefect فرق Python، التطوير السريع
Airflow فرق كبيرة، هندسة البيانات
Kubeflow ML أصلي لـ Kubernetes
Dagster خطوط أنابيب مُركزة على البيانات
DVC خطوط أنابيب ML بسيطة، التحكم في الإصدار

بدائل أخرى

Dagster

from dagster import asset, Definitions

@asset
def raw_data():
    """تحميل البيانات الخام."""
    return pd.read_csv("data.csv")

@asset
def processed_data(raw_data):
    """معالجة البيانات الخام."""
    return raw_data.dropna()

@asset
def trained_model(processed_data):
    """تدريب النموذج على البيانات المُعالجة."""
    model = train(processed_data)
    return model

defs = Definitions(assets=[raw_data, processed_data, trained_model])

Metaflow (Netflix)

from metaflow import FlowSpec, step

class MLFlow(FlowSpec):
    @step
    def start(self):
        self.data = load_data()
        self.next(self.train)

    @step
    def train(self):
        self.model = train_model(self.data)
        self.next(self.evaluate)

    @step
    def evaluate(self):
        self.accuracy = evaluate(self.model)
        self.next(self.end)

    @step
    def end(self):
        print(f"الدقة النهائية: {self.accuracy}")

if __name__ == "__main__":
    MLFlow()

أفضل الممارسات

الممارسة لماذا
ابدأ بسيطاً أضف التعقيد حسب الحاجة
استخدم تلميحات النوع دعم IDE وتحقق أفضل
سجّل بشكل موسع صحح مشاكل الإنتاج
اختبر محلياً أولاً اكتشف الأخطاء مبكراً
أصدر تدفقاتك تتبع التغييرات عبر الزمن

اختيار الأداة الصحيحة

                    بسيط                  معقد
                      │                      │
    DVC ◄─────────────┼──────────────────────┼────► Kubeflow
                      │                      │
                      │     Prefect          │
                      │        │             │
                      │        ▼             │
                      │    Dagster           │
                      │        │             │
                      └────────┼─────────────┘
                            Airflow

الرؤية الرئيسية: ابدأ بأبسط أداة تُلبي احتياجاتك. يمكنك دائماً الانتقال لحلول أكثر تعقيداً مع نمو متطلباتك.

الوحدة التالية: سنستكشف مخازن الميزات لاتساق التدريب-التقديم. :::

اختبار

الوحدة 3: تنسيق سير عمل التعلم الآلي

خذ الاختبار