خطوط أنابيب ML والتنسيق
أساسيات تنسيق خطوط الأنابيب
4 دقيقة للقراءة
خطوط أنابيب ML هي العمود الفقري لتعلم الآلة القابل للتكرار. يختبر المحاورون فهمك لأنماط التنسيق، وليس فقط صياغة الأدوات.
ما يختبره المحاورون حقاً
| المجال | سؤال المبتدئ | سؤال الخبير |
|---|---|---|
| تصميم DAG | "ما هو DAG؟" | "كيف تتعامل مع DAGs الديناميكية؟" |
| التبعيات | "كيف تعتمد المهام على بعضها؟" | "كيف تتعامل مع التبعيات عبر DAGs؟" |
| الفشل | "ماذا يحدث عند فشل مهمة؟" | "صمم استراتيجية إعادة المحاولة مع التراجع" |
| التوسع | "هل يمكن لـ Airflow التشغيل بالتوازي؟" | "كيف توسع إلى 10,000 DAG؟" |
أنماط التنسيق الأساسية
النمط 1: استخراج-تحويل-تحميل (ETL)
# نمط ETL الكلاسيكي لإعداد بيانات ML
@dag(schedule="@daily", catchup=False)
def ml_data_pipeline():
extract = extract_from_sources()
transform = clean_and_feature_engineer(extract)
validate = validate_data_quality(transform)
load = load_to_feature_store(validate)
# سلسلة تبعيات صريحة
extract >> transform >> validate >> load
النمط 2: خط أنابيب التدريب
@dag(schedule=None) # يُشغل بواسطة خط أنابيب البيانات أو يدوياً
def training_pipeline():
data = fetch_training_data()
split = create_train_test_split(data)
train = train_model(split)
evaluate = evaluate_metrics(train)
register = register_if_better(evaluate)
# التفرع بناءً على التقييم
data >> split >> train >> evaluate >> register
النمط 3: خط أنابيب الاستدلال
@dag(schedule="*/5 * * * *") # كل 5 دقائق
def batch_inference():
fetch = fetch_new_records()
predict = run_batch_predictions(fetch)
store = write_predictions_to_db(predict)
notify = send_completion_notification(store)
سؤال المقابلة: تصميم محفز إعادة التدريب
السؤال: "كيف ستعيد تدريب نموذج تلقائياً عند تدهور الأداء؟"
إجابة قوية:
from airflow.sensors.external_task import ExternalTaskSensor
@dag(schedule=None)
def retraining_pipeline():
# المستشعر: انتظار DAG اكتشاف الانحراف للإشارة
wait_for_drift = ExternalTaskSensor(
task_id="wait_for_drift_alert",
external_dag_id="model_monitoring",
external_task_id="check_drift",
allowed_states=["success"],
poke_interval=300 # التحقق كل 5 دقائق
)
# يعمل فقط إذا تم اكتشاف الانحراف
fetch_data = fetch_recent_training_data()
retrain = trigger_training_pipeline()
validate = validate_new_model()
deploy = deploy_if_improved()
wait_for_drift >> fetch_data >> retrain >> validate >> deploy
المفاهيم الرئيسية للمعرفة
orchestration_vocabulary:
dag: "الرسم البياني الموجه غير الدوري - لا تبعيات دائرية"
task: "وحدة عمل واحدة في خط الأنابيب"
operator: "قالب لنوع من المهام (Python، Bash، K8s)"
sensor: "مهمة تنتظر شرطاً خارجياً"
trigger_rule: "متى تعمل هذه المهمة؟ (all_success، one_failed، إلخ.)"
xcom: "الاتصال المتبادل بين المهام"
pool: "تحديد المهام المتزامنة (مثلاً، 5 مهام GPU كحد أقصى)"
variable: "تكوين وقت التشغيل"
connection: "بيانات اعتماد النظام الخارجي"
إشارة المقابلة: ذكر المستشعرات وقواعد التشغيل يُظهر أنك تفهم تعقيد الإنتاج بعيداً عن الدروس الأساسية.
في الدرس التالي، سنقارن Airflow وKubeflow وPrefect. :::