خطوط أنابيب ML والتنسيق

أساسيات تنسيق خطوط الأنابيب

4 دقيقة للقراءة

خطوط أنابيب ML هي العمود الفقري لتعلم الآلة القابل للتكرار. يختبر المحاورون فهمك لأنماط التنسيق، وليس فقط صياغة الأدوات.

ما يختبره المحاورون حقاً

المجال سؤال المبتدئ سؤال الخبير
تصميم DAG "ما هو DAG؟" "كيف تتعامل مع DAGs الديناميكية؟"
التبعيات "كيف تعتمد المهام على بعضها؟" "كيف تتعامل مع التبعيات عبر DAGs؟"
الفشل "ماذا يحدث عند فشل مهمة؟" "صمم استراتيجية إعادة المحاولة مع التراجع"
التوسع "هل يمكن لـ Airflow التشغيل بالتوازي؟" "كيف توسع إلى 10,000 DAG؟"

أنماط التنسيق الأساسية

النمط 1: استخراج-تحويل-تحميل (ETL)

# نمط ETL الكلاسيكي لإعداد بيانات ML
@dag(schedule="@daily", catchup=False)
def ml_data_pipeline():
    extract = extract_from_sources()
    transform = clean_and_feature_engineer(extract)
    validate = validate_data_quality(transform)
    load = load_to_feature_store(validate)

    # سلسلة تبعيات صريحة
    extract >> transform >> validate >> load

النمط 2: خط أنابيب التدريب

@dag(schedule=None)  # يُشغل بواسطة خط أنابيب البيانات أو يدوياً
def training_pipeline():
    data = fetch_training_data()
    split = create_train_test_split(data)
    train = train_model(split)
    evaluate = evaluate_metrics(train)
    register = register_if_better(evaluate)

    # التفرع بناءً على التقييم
    data >> split >> train >> evaluate >> register

النمط 3: خط أنابيب الاستدلال

@dag(schedule="*/5 * * * *")  # كل 5 دقائق
def batch_inference():
    fetch = fetch_new_records()
    predict = run_batch_predictions(fetch)
    store = write_predictions_to_db(predict)
    notify = send_completion_notification(store)

سؤال المقابلة: تصميم محفز إعادة التدريب

السؤال: "كيف ستعيد تدريب نموذج تلقائياً عند تدهور الأداء؟"

إجابة قوية:

from airflow.sensors.external_task import ExternalTaskSensor

@dag(schedule=None)
def retraining_pipeline():
    # المستشعر: انتظار DAG اكتشاف الانحراف للإشارة
    wait_for_drift = ExternalTaskSensor(
        task_id="wait_for_drift_alert",
        external_dag_id="model_monitoring",
        external_task_id="check_drift",
        allowed_states=["success"],
        poke_interval=300  # التحقق كل 5 دقائق
    )

    # يعمل فقط إذا تم اكتشاف الانحراف
    fetch_data = fetch_recent_training_data()
    retrain = trigger_training_pipeline()
    validate = validate_new_model()
    deploy = deploy_if_improved()

    wait_for_drift >> fetch_data >> retrain >> validate >> deploy

المفاهيم الرئيسية للمعرفة

orchestration_vocabulary:
  dag: "الرسم البياني الموجه غير الدوري - لا تبعيات دائرية"
  task: "وحدة عمل واحدة في خط الأنابيب"
  operator: "قالب لنوع من المهام (Python، Bash، K8s)"
  sensor: "مهمة تنتظر شرطاً خارجياً"
  trigger_rule: "متى تعمل هذه المهمة؟ (all_success، one_failed، إلخ.)"
  xcom: "الاتصال المتبادل بين المهام"
  pool: "تحديد المهام المتزامنة (مثلاً، 5 مهام GPU كحد أقصى)"
  variable: "تكوين وقت التشغيل"
  connection: "بيانات اعتماد النظام الخارجي"

إشارة المقابلة: ذكر المستشعرات وقواعد التشغيل يُظهر أنك تفهم تعقيد الإنتاج بعيداً عن الدروس الأساسية.

في الدرس التالي، سنقارن Airflow وKubeflow وPrefect. :::

اختبار

الوحدة 3: خطوط أنابيب ML والتنسيق

خذ الاختبار