خطوط أنابيب ML والتنسيق

أنماط تصميم خطوط الأنابيب

4 دقيقة للقراءة

أسئلة تصميم خطوط الأنابيب تختبر قدرتك على هندسة أنظمة ML. اعرف هذه الأنماط جيداً.

النمط 1: المعالجة التزايدية

سؤال المقابلة: "كيف تعالج 10TB من البيانات يومياً دون إعادة معالجة كل شيء؟"

@dag(schedule="@hourly")
def incremental_feature_pipeline():
    @task
    def get_watermark():
        # الحصول على آخر طابع زمني معالج من مخزن الحالة
        return read_from_redis("pipeline:last_processed")

    @task
    def get_new_records(watermark):
        # جلب السجلات بعد العلامة المائية فقط
        return query_warehouse(f"""
            SELECT * FROM events
            WHERE event_time > '{watermark}'
            AND event_time <= CURRENT_TIMESTAMP
        """)

    @task
    def process_features(records):
        # معالجة السجلات الجديدة فقط
        return compute_features(records)

    @task
    def update_watermark(records):
        # تقدم العلامة المائية إلى آخر طابع زمني معالج
        max_timestamp = records["event_time"].max()
        write_to_redis("pipeline:last_processed", max_timestamp)

    watermark = get_watermark()
    records = get_new_records(watermark)
    features = process_features(records)
    update_watermark(features)

النقاط الرئيسية:

  • نمط العلامة المائية للمعالجة مرة واحدة بالضبط
  • تخزين الحالة خارجياً (Redis، قاعدة بيانات) - ليس في Airflow
  • التعامل مع البيانات المتأخرة مع فترات سماح

النمط 2: التفرع/التجميع

سؤال المقابلة: "كيف ستدرب 100 نموذج بالتوازي؟"

@dag(schedule="@weekly")
def multi_model_training():
    @task
    def get_model_configs() -> list[dict]:
        # إرجاع قائمة تكوينات النماذج
        return [
            {"model_id": "us_west", "region": "us-west-1"},
            {"model_id": "us_east", "region": "us-east-1"},
            {"model_id": "eu", "region": "eu-west-1"},
            # ... 100 نموذج
        ]

    @task
    def train_single_model(config: dict):
        # تدريب نموذج فردي
        return train_and_evaluate(config)

    @task
    def aggregate_results(results: list[dict]):
        # دمج جميع نتائج النماذج
        best_models = select_best_per_region(results)
        update_model_registry(best_models)

    configs = get_model_configs()
    # التفرع: تعيين المهام الديناميكي
    results = train_single_model.expand(config=configs)
    # التجميع: تجميع جميع النتائج
    aggregate_results(results)

ميزة Airflow 2.x: task.expand() ينشئ مهام متوازية ديناميكية

النمط 3: التفرع الشرطي

سؤال المقابلة: "كيف تتعامل مع سير عمل الموافقة على النموذج؟"

from airflow.operators.python import BranchPythonOperator

@dag(schedule=None)
def model_deployment_pipeline():
    @task
    def evaluate_model():
        metrics = run_evaluation()
        return metrics

    @task.branch
    def check_quality(metrics: dict):
        if metrics["accuracy"] >= 0.95:
            return "auto_deploy"
        elif metrics["accuracy"] >= 0.90:
            return "request_human_review"
        else:
            return "reject_model"

    @task
    def auto_deploy():
        deploy_to_production()

    @task
    def request_human_review():
        # توقف للموافقة
        create_jira_ticket()
        wait_for_approval()

    @task
    def reject_model():
        notify_team("النموذج فشل في بوابات الجودة")

    metrics = evaluate_model()
    branch = check_quality(metrics)
    branch >> [auto_deploy(), request_human_review(), reject_model()]

النمط 4: إعادة المحاولة مع التراجع الأسي

سؤال المقابلة: "كيف تتعامل مع الفشل العابر في خطوط أنابيب ML؟"

from airflow.decorators import task
from tenacity import retry, stop_after_attempt, wait_exponential

@task(
    retries=5,
    retry_delay=60,  # التأخير الأساسي
    retry_exponential_backoff=True,
    max_retry_delay=3600  # تأخير أقصى ساعة واحدة
)
def fetch_from_external_api():
    """مهمة مع إعادة محاولة على مستوى Airflow"""
    return call_unstable_api()

# البديل: إعادة المحاولة على مستوى الكود لتحكم أدق
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=4, max=300)
)
def call_unstable_api():
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()
    return response.json()

النمط 5: قاطع الدائرة

from pybreaker import CircuitBreaker

# الانقطاع بعد 5 فشل متتالي
model_serving_breaker = CircuitBreaker(
    fail_max=5,
    reset_timeout=60  # حاول مرة أخرى بعد 60 ثانية
)

@task
def call_model_service(request):
    @model_serving_breaker
    def _call():
        return requests.post(MODEL_URL, json=request)

    try:
        return _call()
    except pybreaker.CircuitBreakerError:
        # السلوك الاحتياطي عندما تكون الدائرة مفتوحة
        return get_cached_prediction(request)

جدول ملخص المقابلة

النمط حالة الاستخدام الفائدة الرئيسية
التزايدي المعالجة الدفعية اليومية أسرع 10x، كفاءة التكلفة
التفرع/التجميع تدريب نماذج متعددة التوازي
التفرع سير عمل الموافقة المنطق الشرطي
إعادة المحاولة التبعيات الخارجية تحمل الأخطاء
قاطع الدائرة الخدمات المتدهورة التدهور السلس

نصيحة احترافية: ارسم هذه الأنماط على السبورة أثناء المقابلات. التفسيرات المرئية تحصل على درجات أعلى من الأوصاف الشفهية وحدها.

الوحدة التالية تغطي أسئلة مقابلات المراقبة والملاحظة. :::

اختبار

الوحدة 3: خطوط أنابيب ML والتنسيق

خذ الاختبار