أنابيب ETL والتنسيق
أدوات تنسيق الأنابيب
التنسيق هو العمود الفقري لهندسة البيانات. توقع أسئلة مقابلة حول Airflow و Prefect و Dagster ومتى تستخدم كل منها.
Apache Airflow
المعيار الصناعي لتنسيق سير العمل.
المفاهيم الأساسية
| المفهوم | الوصف |
|---|---|
| DAG | رسم بياني موجه غير دوري - تعريف سير العمل |
| Task | وحدة عمل واحدة |
| Operator | قالب للمهام (Python, SQL, Bash, إلخ) |
| Sensor | انتظار شرط خارجي |
| XCom | التواصل المتبادل بين المهام |
| Connection | بيانات اعتماد النظام الخارجي |
مثال DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_sales_pipeline',
default_args=default_args,
description='Daily sales data processing',
schedule_interval='0 6 * * *', # 6 صباحاً يومياً
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sales', 'daily'],
) as dag:
extract = PythonOperator(
task_id='extract_sales_data',
python_callable=extract_sales_data,
)
transform = SnowflakeOperator(
task_id='transform_sales',
sql='sql/transform_sales.sql',
snowflake_conn_id='snowflake_default',
)
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='cd /dbt && dbt run --models staging.stg_sales+',
)
# تعريف التبعيات
extract >> transform >> dbt_run
سؤال المقابلة: "كيف تتعامل مع فشل المهام في Airflow؟"
الجواب:
# 1. إعادة المحاولة مع تراجع
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
}
# 2. استدعاءات عند الفشل
def alert_on_failure(context):
task_id = context['task_instance'].task_id
send_slack_alert(f"Task {task_id} failed!")
task = PythonOperator(
task_id='critical_task',
on_failure_callback=alert_on_failure,
...
)
# 3. قواعد التشغيل للمهام المصب
downstream = PythonOperator(
task_id='cleanup',
trigger_rule='all_done', # شغل بغض النظر عن حالة المصعد
)
الأنماط الرئيسية
DAGs الديناميكية:
# توليد المهام ديناميكياً
for table in ['orders', 'customers', 'products']:
task = PythonOperator(
task_id=f'process_{table}',
python_callable=process_table,
op_kwargs={'table': table},
)
مجموعات المهام (Airflow 2.0+):
from airflow.utils.task_group import TaskGroup
with TaskGroup('transform_group') as transform_group:
task1 = PythonOperator(task_id='transform_1', ...)
task2 = PythonOperator(task_id='transform_2', ...)
Prefect
تنسيق حديث، أصلي Python مع تجربة مطور أفضل.
الاختلافات الرئيسية عن Airflow
| الجانب | Airflow | Prefect |
|---|---|---|
| تعريف DAG | تحليل ملفات DAG | مزخرفات Python |
| الجدولة | عملية المجدول | السحابة أو الخادم المحلي |
| إدارة الحالة | بيانات وصفية Postgres | Postgres أو Prefect Cloud |
| الاختبار المحلي | أصعب | دعم أصلي |
| سير العمل الديناميكي | معقد | دعم من الدرجة الأولى |
مثال Flow
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(retries=3, retry_delay_seconds=60)
def extract_data(source: str) -> dict:
"""استخراج البيانات من نظام المصدر."""
return fetch_data(source)
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def transform_data(raw_data: dict) -> dict:
"""تحويل البيانات المستخرجة."""
return clean_and_transform(raw_data)
@task
def load_data(transformed_data: dict, target: str):
"""تحميل البيانات للهدف."""
write_to_warehouse(transformed_data, target)
@flow(name="daily-sales-etl")
def daily_sales_pipeline(source: str = "salesforce"):
raw = extract_data(source)
transformed = transform_data(raw)
load_data(transformed, "sales_mart")
# شغل محلياً للاختبار
if __name__ == "__main__":
daily_sales_pipeline()
الميزات المتقدمة
from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
# Subflows للتعديل
@flow
def parent_flow():
result1 = child_flow_1()
result2 = child_flow_2(result1)
return result2
# التنفيذ المتزامن
@flow
def parallel_processing():
futures = [process_item.submit(item) for item in items]
results = [f.result() for f in futures]
return results
# النشر مع الجدولة
deployment = Deployment.build_from_flow(
flow=daily_sales_pipeline,
name="production-deployment",
schedule=CronSchedule(cron="0 6 * * *"),
work_pool_name="default-agent-pool",
)
Dagster
نهج الأصول المحددة بالبرمجيات—يركز على أصول البيانات بدلاً من المهام.
المفاهيم الأساسية
| المفهوم | الوصف |
|---|---|
| Asset | قطعة أثرية للبيانات (جدول، ملف، نموذج ML) |
| Op | وحدة حساب (مثل مهمة Airflow) |
| Job | مجموعة من ops للتنفيذ |
| Resource | تكوين النظام الخارجي |
| IO Manager | كيفية تخزين/تحميل الأصول |
أنبوب قائم على الأصول
from dagster import asset, Definitions, AssetExecutionContext
import pandas as pd
@asset
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
"""استخراج الطلبات من المصدر."""
context.log.info("Extracting orders")
return pd.read_sql("SELECT * FROM orders", source_conn)
@asset
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
"""تنظيف وإزالة تكرار الطلبات."""
return (
raw_orders
.drop_duplicates(subset=['order_id'])
.dropna(subset=['customer_id'])
)
@asset
def daily_revenue(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
"""تجميع الإيرادات اليومية."""
return (
cleaned_orders
.groupby('order_date')
.agg({'total': 'sum'})
.reset_index()
)
defs = Definitions(
assets=[raw_orders, cleaned_orders, daily_revenue],
)
لماذا Dagster لهندسة البيانات
نسب البيانات: رسم بياني للتبعيات تلقائي التحقق: تتبع متى تم حساب الأصول آخر مرة الأقسام: دعم أصلي للأصول المقسمة بالوقت الاختبار: أدوات اختبار مدمجة
# أصول مقسمة
from dagster import asset, DailyPartitionsDefinition
@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def daily_events(context: AssetExecutionContext) -> pd.DataFrame:
partition_date = context.partition_key
return extract_events_for_date(partition_date)
مقارنة الأدوات
| العامل | Airflow | Prefect | Dagster |
|---|---|---|---|
| النموذج | قائم على المهام | قائم على المهام | قائم على الأصول |
| منحنى التعلم | حاد | متوسط | متوسط |
| المجتمع | الأكبر | متنامي | متنامي |
| العرض السحابي | Astronomer, MWAA | Prefect Cloud | Dagster Cloud |
| الأفضل لـ | جدولة معقدة | فرق Python | فرق تركز على البيانات |
| التطوير المحلي | Docker مطلوب | أصلي | أصلي |
| تكامل dbt | Cosmos, يدوي | أصلي | أصلي |
سؤال المقابلة: "كيف تختار بين Airflow و Prefect و Dagster؟"
إطار الإجابة:
| اختر | عندما |
|---|---|
| Airflow | نطاق المؤسسة، جدولة معقدة، فريق كبير، خبرة موجودة |
| Prefect | فريق Python أولاً، تحتاج تكرار سريع، نشر أبسط |
| Dagster | تركيز أصول البيانات، احتياجات نسب قوية، مجموعة بيانات حديثة |
إجابة نموذجية: "سأختار بناءً على الفريق وحالة الاستخدام. Airflow لنطاق المؤسسة مع تبعيات معقدة. Prefect للفرق الثقيلة Python التي تريد تكرار أسرع. Dagster عندما يكون نسب البيانات والتفكير المركز على الأصول أولويات، خاصة مع dbt."
نصيحة المقابلة: اعرف أداة واحدة بعمق (عادة Airflow) لكن افهم مقايضات البدائل. كن قادراً على مناقشة لماذا ستهاجر من واحدة لأخرى.
بعد ذلك، سنستكشف dbt للتحويل ونمذجة البيانات. :::