تنسيق سير عمل التعلم الآلي
Prefect والبدائل
3 دقيقة للقراءة
Prefect تُقدم نهجاً حديثاً وأصلياً لـ Python لتنسيق سير العمل. لنستكشفها ونُقارن مع البدائل الأخرى.
لماذا Prefect؟
| الميزة | الفائدة |
|---|---|
| أصلية لـ Python | فقط مُزخرفات، لا ملفات تكوين |
| سير عمل ديناميكي | ابنِ DAGs وقت التشغيل |
| واجهة حديثة | لوحة تحكم جميلة |
| تنفيذ هجين | محلي أو سحابي |
| تصحيح سهل | شغّل التدفقات كـ Python عادي |
التثبيت
# ثبّت Prefect
pip install prefect
# تحقق من التثبيت
prefect version
المفاهيم الأساسية
التدفقات والمهام
from prefect import flow, task
@task
def load_data(path: str) -> dict:
"""تحميل البيانات من المسار."""
import pandas as pd
df = pd.read_csv(path)
return {"data": df, "rows": len(df)}
@task
def train_model(data: dict) -> str:
"""تدريب نموذج ML."""
from sklearn.ensemble import RandomForestClassifier
import joblib
df = data["data"]
X = df.drop("target", axis=1)
y = df["target"]
model = RandomForestClassifier()
model.fit(X, y)
model_path = "/tmp/model.pkl"
joblib.dump(model, model_path)
return model_path
@task
def evaluate_model(model_path: str, data: dict) -> float:
"""تقييم دقة النموذج."""
import joblib
from sklearn.metrics import accuracy_score
model = joblib.load(model_path)
df = data["data"]
X = df.drop("target", axis=1)
y = df["target"]
predictions = model.predict(X)
return accuracy_score(y, predictions)
@flow(name="ML Training Pipeline")
def ml_pipeline(data_path: str):
"""تدفق تدريب ML من البداية للنهاية."""
data = load_data(data_path)
model_path = train_model(data)
accuracy = evaluate_model(model_path, data)
print(f"دقة النموذج: {accuracy:.2%}")
return accuracy
# شغّل التدفق
if __name__ == "__main__":
ml_pipeline("data/train.csv")
الميزات المتقدمة
إعادة المحاولة والتخزين المؤقت
from prefect import task, flow
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def expensive_computation(data: dict) -> dict:
"""حساب مُخزن مؤقتاً مع إعادة المحاولة."""
# هذه النتيجة ستُخزن مؤقتاً لساعة واحدة
return process_data(data)
التنفيذ المتوازي
from prefect import flow, task
@task
def process_partition(partition_id: int) -> dict:
return {"partition": partition_id, "result": partition_id * 2}
@flow
def parallel_flow():
# أرسل المهام بالتوازي
futures = [process_partition.submit(i) for i in range(10)]
# انتظر جميع النتائج
results = [f.result() for f in futures]
return results
المنطق الشرطي
@flow
def conditional_flow(accuracy: float):
if accuracy > 0.9:
deploy_to_production()
elif accuracy > 0.8:
deploy_to_staging()
else:
notify_team("دقة النموذج منخفضة جداً")
جدولة النشر
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
@flow
def daily_training():
# منطق التدريب
pass
# أنشئ النشر
deployment = Deployment.build_from_flow(
flow=daily_training,
name="daily-training",
schedule=CronSchedule(cron="0 2 * * *"), # 2 صباحاً يومياً
work_queue_name="ml-queue"
)
deployment.apply()
التشغيل مع Prefect Cloud/Server
# ابدأ الخادم المحلي
prefect server start
# أنشئ النشر عبر CLI
prefect deployment build ./pipeline.py:ml_pipeline -n "ml-training" -q "ml-queue"
prefect deployment apply ml_pipeline-deployment.yaml
# ابدأ العامل
prefect worker start -q "ml-queue"
المقارنة: المنسقات الحديثة
Prefect مقابل Airflow
| الجانب | Prefect | Airflow |
|---|---|---|
| الصيغة | مُزخرفات Python | Python + تكوين |
| DAGs ديناميكية | أصلية | محدودة |
| اختبار محلي | flow() |
يتطلب إعداد |
| النشر | بسيط | معقد |
| النضج | أحدث | مُختبرة في المعارك |
Prefect مقابل Dagster
| الجانب | Prefect | Dagster |
|---|---|---|
| التركيز | تنسيق سير العمل | تنسيق البيانات |
| الأصول | مخرجات المهام | أصول من الدرجة الأولى |
| الاختبار | pytest قياسي | إطار مدمج |
| الأفضل لـ | سير عمل عام | خطوط أنابيب البيانات |
متى تستخدم ماذا
| الأداة | الأفضل لـ |
|---|---|
| Prefect | فرق Python، التطوير السريع |
| Airflow | فرق كبيرة، هندسة البيانات |
| Kubeflow | ML أصلي لـ Kubernetes |
| Dagster | خطوط أنابيب مُركزة على البيانات |
| DVC | خطوط أنابيب ML بسيطة، التحكم في الإصدار |
بدائل أخرى
Dagster
from dagster import asset, Definitions
@asset
def raw_data():
"""تحميل البيانات الخام."""
return pd.read_csv("data.csv")
@asset
def processed_data(raw_data):
"""معالجة البيانات الخام."""
return raw_data.dropna()
@asset
def trained_model(processed_data):
"""تدريب النموذج على البيانات المُعالجة."""
model = train(processed_data)
return model
defs = Definitions(assets=[raw_data, processed_data, trained_model])
Metaflow (Netflix)
from metaflow import FlowSpec, step
class MLFlow(FlowSpec):
@step
def start(self):
self.data = load_data()
self.next(self.train)
@step
def train(self):
self.model = train_model(self.data)
self.next(self.evaluate)
@step
def evaluate(self):
self.accuracy = evaluate(self.model)
self.next(self.end)
@step
def end(self):
print(f"الدقة النهائية: {self.accuracy}")
if __name__ == "__main__":
MLFlow()
أفضل الممارسات
| الممارسة | لماذا |
|---|---|
| ابدأ بسيطاً | أضف التعقيد حسب الحاجة |
| استخدم تلميحات النوع | دعم IDE وتحقق أفضل |
| سجّل بشكل موسع | صحح مشاكل الإنتاج |
| اختبر محلياً أولاً | اكتشف الأخطاء مبكراً |
| أصدر تدفقاتك | تتبع التغييرات عبر الزمن |
اختيار الأداة الصحيحة
بسيط معقد
│ │
DVC ◄─────────────┼──────────────────────┼────► Kubeflow
│ │
│ Prefect │
│ │ │
│ ▼ │
│ Dagster │
│ │ │
└────────┼─────────────┘
│
Airflow
الرؤية الرئيسية: ابدأ بأبسط أداة تُلبي احتياجاتك. يمكنك دائماً الانتقال لحلول أكثر تعقيداً مع نمو متطلباتك.
الوحدة التالية: سنستكشف مخازن الميزات لاتساق التدريب-التقديم. :::