خطوط أنابيب ML والتنسيق
أنماط تصميم خطوط الأنابيب
4 دقيقة للقراءة
أسئلة تصميم خطوط الأنابيب تختبر قدرتك على هندسة أنظمة ML. اعرف هذه الأنماط جيداً.
النمط 1: المعالجة التزايدية
سؤال المقابلة: "كيف تعالج 10TB من البيانات يومياً دون إعادة معالجة كل شيء؟"
@dag(schedule="@hourly")
def incremental_feature_pipeline():
@task
def get_watermark():
# الحصول على آخر طابع زمني معالج من مخزن الحالة
return read_from_redis("pipeline:last_processed")
@task
def get_new_records(watermark):
# جلب السجلات بعد العلامة المائية فقط
return query_warehouse(f"""
SELECT * FROM events
WHERE event_time > '{watermark}'
AND event_time <= CURRENT_TIMESTAMP
""")
@task
def process_features(records):
# معالجة السجلات الجديدة فقط
return compute_features(records)
@task
def update_watermark(records):
# تقدم العلامة المائية إلى آخر طابع زمني معالج
max_timestamp = records["event_time"].max()
write_to_redis("pipeline:last_processed", max_timestamp)
watermark = get_watermark()
records = get_new_records(watermark)
features = process_features(records)
update_watermark(features)
النقاط الرئيسية:
- نمط العلامة المائية للمعالجة مرة واحدة بالضبط
- تخزين الحالة خارجياً (Redis، قاعدة بيانات) - ليس في Airflow
- التعامل مع البيانات المتأخرة مع فترات سماح
النمط 2: التفرع/التجميع
سؤال المقابلة: "كيف ستدرب 100 نموذج بالتوازي؟"
@dag(schedule="@weekly")
def multi_model_training():
@task
def get_model_configs() -> list[dict]:
# إرجاع قائمة تكوينات النماذج
return [
{"model_id": "us_west", "region": "us-west-1"},
{"model_id": "us_east", "region": "us-east-1"},
{"model_id": "eu", "region": "eu-west-1"},
# ... 100 نموذج
]
@task
def train_single_model(config: dict):
# تدريب نموذج فردي
return train_and_evaluate(config)
@task
def aggregate_results(results: list[dict]):
# دمج جميع نتائج النماذج
best_models = select_best_per_region(results)
update_model_registry(best_models)
configs = get_model_configs()
# التفرع: تعيين المهام الديناميكي
results = train_single_model.expand(config=configs)
# التجميع: تجميع جميع النتائج
aggregate_results(results)
ميزة Airflow 2.x: task.expand() ينشئ مهام متوازية ديناميكية
النمط 3: التفرع الشرطي
سؤال المقابلة: "كيف تتعامل مع سير عمل الموافقة على النموذج؟"
from airflow.operators.python import BranchPythonOperator
@dag(schedule=None)
def model_deployment_pipeline():
@task
def evaluate_model():
metrics = run_evaluation()
return metrics
@task.branch
def check_quality(metrics: dict):
if metrics["accuracy"] >= 0.95:
return "auto_deploy"
elif metrics["accuracy"] >= 0.90:
return "request_human_review"
else:
return "reject_model"
@task
def auto_deploy():
deploy_to_production()
@task
def request_human_review():
# توقف للموافقة
create_jira_ticket()
wait_for_approval()
@task
def reject_model():
notify_team("النموذج فشل في بوابات الجودة")
metrics = evaluate_model()
branch = check_quality(metrics)
branch >> [auto_deploy(), request_human_review(), reject_model()]
النمط 4: إعادة المحاولة مع التراجع الأسي
سؤال المقابلة: "كيف تتعامل مع الفشل العابر في خطوط أنابيب ML؟"
from airflow.decorators import task
from tenacity import retry, stop_after_attempt, wait_exponential
@task(
retries=5,
retry_delay=60, # التأخير الأساسي
retry_exponential_backoff=True,
max_retry_delay=3600 # تأخير أقصى ساعة واحدة
)
def fetch_from_external_api():
"""مهمة مع إعادة محاولة على مستوى Airflow"""
return call_unstable_api()
# البديل: إعادة المحاولة على مستوى الكود لتحكم أدق
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=300)
)
def call_unstable_api():
response = requests.get("https://api.example.com/data")
response.raise_for_status()
return response.json()
النمط 5: قاطع الدائرة
from pybreaker import CircuitBreaker
# الانقطاع بعد 5 فشل متتالي
model_serving_breaker = CircuitBreaker(
fail_max=5,
reset_timeout=60 # حاول مرة أخرى بعد 60 ثانية
)
@task
def call_model_service(request):
@model_serving_breaker
def _call():
return requests.post(MODEL_URL, json=request)
try:
return _call()
except pybreaker.CircuitBreakerError:
# السلوك الاحتياطي عندما تكون الدائرة مفتوحة
return get_cached_prediction(request)
جدول ملخص المقابلة
| النمط | حالة الاستخدام | الفائدة الرئيسية |
|---|---|---|
| التزايدي | المعالجة الدفعية اليومية | أسرع 10x، كفاءة التكلفة |
| التفرع/التجميع | تدريب نماذج متعددة | التوازي |
| التفرع | سير عمل الموافقة | المنطق الشرطي |
| إعادة المحاولة | التبعيات الخارجية | تحمل الأخطاء |
| قاطع الدائرة | الخدمات المتدهورة | التدهور السلس |
نصيحة احترافية: ارسم هذه الأنماط على السبورة أثناء المقابلات. التفسيرات المرئية تحصل على درجات أعلى من الأوصاف الشفهية وحدها.
الوحدة التالية تغطي أسئلة مقابلات المراقبة والملاحظة. :::