تنسيق سير عمل التعلم الآلي

Prefect والبدائل

3 دقيقة للقراءة

Prefect تُقدم نهجاً حديثاً وأصلياً لـ Python لتنسيق سير العمل. لنستكشفها ونُقارن مع البدائل الأخرى.

لماذا Prefect؟

الميزةالفائدة
أصلية لـ Pythonفقط مُزخرفات، لا ملفات تكوين
سير عمل ديناميكيابنِ DAGs وقت التشغيل
واجهة حديثةلوحة تحكم جميلة
تنفيذ هجينمحلي أو سحابي
تصحيح سهلشغّل التدفقات كـ Python عادي

التثبيت

# ثبّت Prefect
pip install prefect

# تحقق من التثبيت
prefect version

المفاهيم الأساسية

التدفقات والمهام

from prefect import flow, task

@task
def load_data(path: str) -> dict:
    """تحميل البيانات من المسار."""
    import pandas as pd
    df = pd.read_csv(path)
    return {"data": df, "rows": len(df)}

@task
def train_model(data: dict) -> str:
    """تدريب نموذج ML."""
    from sklearn.ensemble import RandomForestClassifier
    import joblib

    df = data["data"]
    X = df.drop("target", axis=1)
    y = df["target"]

    model = RandomForestClassifier()
    model.fit(X, y)

    model_path = "/tmp/model.pkl"
    joblib.dump(model, model_path)
    return model_path

@task
def evaluate_model(model_path: str, data: dict) -> float:
    """تقييم دقة النموذج."""
    import joblib
    from sklearn.metrics import accuracy_score

    model = joblib.load(model_path)
    df = data["data"]
    X = df.drop("target", axis=1)
    y = df["target"]

    predictions = model.predict(X)
    return accuracy_score(y, predictions)

@flow(name="ML Training Pipeline")
def ml_pipeline(data_path: str):
    """تدفق تدريب ML من البداية للنهاية."""
    data = load_data(data_path)
    model_path = train_model(data)
    accuracy = evaluate_model(model_path, data)

    print(f"دقة النموذج: {accuracy:.2%}")
    return accuracy

# شغّل التدفق
if __name__ == "__main__":
    ml_pipeline("data/train.csv")

الميزات المتقدمة

إعادة المحاولة والتخزين المؤقت

from prefect import task, flow
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def expensive_computation(data: dict) -> dict:
    """حساب مُخزن مؤقتاً مع إعادة المحاولة."""
    # هذه النتيجة ستُخزن مؤقتاً لساعة واحدة
    return process_data(data)

التنفيذ المتوازي

from prefect import flow, task

@task
def process_partition(partition_id: int) -> dict:
    return {"partition": partition_id, "result": partition_id * 2}

@flow
def parallel_flow():
    # أرسل المهام بالتوازي
    futures = [process_partition.submit(i) for i in range(10)]

    # انتظر جميع النتائج
    results = [f.result() for f in futures]
    return results

المنطق الشرطي

@flow
def conditional_flow(accuracy: float):
    if accuracy > 0.9:
        deploy_to_production()
    elif accuracy > 0.8:
        deploy_to_staging()
    else:
        notify_team("دقة النموذج منخفضة جداً")

جدولة النشر

from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

@flow
def daily_training():
    # منطق التدريب
    pass

# أنشئ النشر
deployment = Deployment.build_from_flow(
    flow=daily_training,
    name="daily-training",
    schedule=CronSchedule(cron="0 2 * * *"),  # 2 صباحاً يومياً
    work_queue_name="ml-queue"
)

deployment.apply()

التشغيل مع Prefect Cloud/Server

# ابدأ الخادم المحلي
prefect server start

# أنشئ النشر عبر CLI
prefect deployment build ./pipeline.py:ml_pipeline -n "ml-training" -q "ml-queue"
prefect deployment apply ml_pipeline-deployment.yaml

# ابدأ العامل
prefect worker start -q "ml-queue"

المقارنة: المنسقات الحديثة

Prefect مقابل Airflow

الجانبPrefectAirflow
الصيغةمُزخرفات PythonPython + تكوين
DAGs ديناميكيةأصليةمحدودة
اختبار محليflow()يتطلب إعداد
النشربسيطمعقد
النضجأحدثمُختبرة في المعارك

Prefect مقابل Dagster

الجانبPrefectDagster
التركيزتنسيق سير العملتنسيق البيانات
الأصولمخرجات المهامأصول من الدرجة الأولى
الاختبارpytest قياسيإطار مدمج
الأفضل لـسير عمل عامخطوط أنابيب البيانات

متى تستخدم ماذا

الأداةالأفضل لـ
Prefectفرق Python، التطوير السريع
Airflowفرق كبيرة، هندسة البيانات
KubeflowML أصلي لـ Kubernetes
Dagsterخطوط أنابيب مُركزة على البيانات
DVCخطوط أنابيب ML بسيطة، التحكم في الإصدار

بدائل أخرى

Dagster

from dagster import asset, Definitions

@asset
def raw_data():
    """تحميل البيانات الخام."""
    return pd.read_csv("data.csv")

@asset
def processed_data(raw_data):
    """معالجة البيانات الخام."""
    return raw_data.dropna()

@asset
def trained_model(processed_data):
    """تدريب النموذج على البيانات المُعالجة."""
    model = train(processed_data)
    return model

defs = Definitions(assets=[raw_data, processed_data, trained_model])

Metaflow (Netflix)

from metaflow import FlowSpec, step

class MLFlow(FlowSpec):
    @step
    def start(self):
        self.data = load_data()
        self.next(self.train)

    @step
    def train(self):
        self.model = train_model(self.data)
        self.next(self.evaluate)

    @step
    def evaluate(self):
        self.accuracy = evaluate(self.model)
        self.next(self.end)

    @step
    def end(self):
        print(f"الدقة النهائية: {self.accuracy}")

if __name__ == "__main__":
    MLFlow()

أفضل الممارسات

الممارسةلماذا
ابدأ بسيطاًأضف التعقيد حسب الحاجة
استخدم تلميحات النوعدعم IDE وتحقق أفضل
سجّل بشكل موسعصحح مشاكل الإنتاج
اختبر محلياً أولاًاكتشف الأخطاء مبكراً
أصدر تدفقاتكتتبع التغييرات عبر الزمن

اختيار الأداة الصحيحة

                    بسيط                  معقد
                      │                      │
    DVC ◄─────────────┼──────────────────────┼────► Kubeflow
                      │                      │
                      │     Prefect          │
                      │        │             │
                      │        ▼             │
                      │    Dagster           │
                      │        │             │
                      └────────┼─────────────┘
                            Airflow

الرؤية الرئيسية: ابدأ بأبسط أداة تُلبي احتياجاتك. يمكنك دائماً الانتقال لحلول أكثر تعقيداً مع نمو متطلباتك.

الوحدة التالية: سنستكشف مخازن الميزات لاتساق التدريب-التقديم. :::

مراجعة سريعة: كيف تجد هذا الدرس؟

اختبار

الوحدة 3: تنسيق سير عمل التعلم الآلي

خذ الاختبار
نشرة أسبوعية مجانية

ابقَ على مسار النيرد

بريد واحد أسبوعياً — دورات، مقالات معمّقة، أدوات، وتجارب ذكاء اصطناعي.

بدون إزعاج. إلغاء الاشتراك في أي وقت.