أنابيب ETL والتنسيق

أدوات تنسيق الأنابيب

4 دقيقة للقراءة

التنسيق هو العمود الفقري لهندسة البيانات. توقع أسئلة مقابلة حول Airflow و Prefect و Dagster ومتى تستخدم كل منها.

Apache Airflow

المعيار الصناعي لتنسيق سير العمل.

المفاهيم الأساسية

المفهومالوصف
DAGرسم بياني موجه غير دوري - تعريف سير العمل
Taskوحدة عمل واحدة
Operatorقالب للمهام (Python, SQL, Bash, إلخ)
Sensorانتظار شرط خارجي
XComالتواصل المتبادل بين المهام
Connectionبيانات اعتماد النظام الخارجي

مثال DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'daily_sales_pipeline',
    default_args=default_args,
    description='Daily sales data processing',
    schedule_interval='0 6 * * *',  # 6 صباحاً يومياً
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sales', 'daily'],
) as dag:

    extract = PythonOperator(
        task_id='extract_sales_data',
        python_callable=extract_sales_data,
    )

    transform = SnowflakeOperator(
        task_id='transform_sales',
        sql='sql/transform_sales.sql',
        snowflake_conn_id='snowflake_default',
    )

    dbt_run = BashOperator(
        task_id='dbt_run',
        bash_command='cd /dbt && dbt run --models staging.stg_sales+',
    )

    # تعريف التبعيات
    extract >> transform >> dbt_run

سؤال المقابلة: "كيف تتعامل مع فشل المهام في Airflow؟"

الجواب:

# 1. إعادة المحاولة مع تراجع
default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
}

# 2. استدعاءات عند الفشل
def alert_on_failure(context):
    task_id = context['task_instance'].task_id
    send_slack_alert(f"Task {task_id} failed!")

task = PythonOperator(
    task_id='critical_task',
    on_failure_callback=alert_on_failure,
    ...
)

# 3. قواعد التشغيل للمهام المصب
downstream = PythonOperator(
    task_id='cleanup',
    trigger_rule='all_done',  # شغل بغض النظر عن حالة المصعد
)

الأنماط الرئيسية

DAGs الديناميكية:

# توليد المهام ديناميكياً
for table in ['orders', 'customers', 'products']:
    task = PythonOperator(
        task_id=f'process_{table}',
        python_callable=process_table,
        op_kwargs={'table': table},
    )

مجموعات المهام (Airflow 2.0+):

from airflow.utils.task_group import TaskGroup

with TaskGroup('transform_group') as transform_group:
    task1 = PythonOperator(task_id='transform_1', ...)
    task2 = PythonOperator(task_id='transform_2', ...)

Prefect

تنسيق حديث، أصلي Python مع تجربة مطور أفضل.

الاختلافات الرئيسية عن Airflow

الجانبAirflowPrefect
تعريف DAGتحليل ملفات DAGمزخرفات Python
الجدولةعملية المجدولالسحابة أو الخادم المحلي
إدارة الحالةبيانات وصفية PostgresPostgres أو Prefect Cloud
الاختبار المحليأصعبدعم أصلي
سير العمل الديناميكيمعقددعم من الدرجة الأولى

مثال Flow

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(retries=3, retry_delay_seconds=60)
def extract_data(source: str) -> dict:
    """استخراج البيانات من نظام المصدر."""
    return fetch_data(source)

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def transform_data(raw_data: dict) -> dict:
    """تحويل البيانات المستخرجة."""
    return clean_and_transform(raw_data)

@task
def load_data(transformed_data: dict, target: str):
    """تحميل البيانات للهدف."""
    write_to_warehouse(transformed_data, target)

@flow(name="daily-sales-etl")
def daily_sales_pipeline(source: str = "salesforce"):
    raw = extract_data(source)
    transformed = transform_data(raw)
    load_data(transformed, "sales_mart")

# شغل محلياً للاختبار
if __name__ == "__main__":
    daily_sales_pipeline()

الميزات المتقدمة

from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

# Subflows للتعديل
@flow
def parent_flow():
    result1 = child_flow_1()
    result2 = child_flow_2(result1)
    return result2

# التنفيذ المتزامن
@flow
def parallel_processing():
    futures = [process_item.submit(item) for item in items]
    results = [f.result() for f in futures]
    return results

# النشر مع الجدولة
deployment = Deployment.build_from_flow(
    flow=daily_sales_pipeline,
    name="production-deployment",
    schedule=CronSchedule(cron="0 6 * * *"),
    work_pool_name="default-agent-pool",
)

Dagster

نهج الأصول المحددة بالبرمجيات—يركز على أصول البيانات بدلاً من المهام.

المفاهيم الأساسية

المفهومالوصف
Assetقطعة أثرية للبيانات (جدول، ملف، نموذج ML)
Opوحدة حساب (مثل مهمة Airflow)
Jobمجموعة من ops للتنفيذ
Resourceتكوين النظام الخارجي
IO Managerكيفية تخزين/تحميل الأصول

أنبوب قائم على الأصول

from dagster import asset, Definitions, AssetExecutionContext
import pandas as pd

@asset
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
    """استخراج الطلبات من المصدر."""
    context.log.info("Extracting orders")
    return pd.read_sql("SELECT * FROM orders", source_conn)

@asset
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    """تنظيف وإزالة تكرار الطلبات."""
    return (
        raw_orders
        .drop_duplicates(subset=['order_id'])
        .dropna(subset=['customer_id'])
    )

@asset
def daily_revenue(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
    """تجميع الإيرادات اليومية."""
    return (
        cleaned_orders
        .groupby('order_date')
        .agg({'total': 'sum'})
        .reset_index()
    )

defs = Definitions(
    assets=[raw_orders, cleaned_orders, daily_revenue],
)

لماذا Dagster لهندسة البيانات

نسب البيانات: رسم بياني للتبعيات تلقائي التحقق: تتبع متى تم حساب الأصول آخر مرة الأقسام: دعم أصلي للأصول المقسمة بالوقت الاختبار: أدوات اختبار مدمجة

# أصول مقسمة
from dagster import asset, DailyPartitionsDefinition

@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def daily_events(context: AssetExecutionContext) -> pd.DataFrame:
    partition_date = context.partition_key
    return extract_events_for_date(partition_date)

مقارنة الأدوات

العاملAirflowPrefectDagster
النموذجقائم على المهامقائم على المهامقائم على الأصول
منحنى التعلمحادمتوسطمتوسط
المجتمعالأكبرمتناميمتنامي
العرض السحابيAstronomer, MWAAPrefect CloudDagster Cloud
الأفضل لـجدولة معقدةفرق Pythonفرق تركز على البيانات
التطوير المحليDocker مطلوبأصليأصلي
تكامل dbtCosmos, يدويأصليأصلي

سؤال المقابلة: "كيف تختار بين Airflow و Prefect و Dagster؟"

إطار الإجابة:

اخترعندما
Airflowنطاق المؤسسة، جدولة معقدة، فريق كبير، خبرة موجودة
Prefectفريق Python أولاً، تحتاج تكرار سريع، نشر أبسط
Dagsterتركيز أصول البيانات، احتياجات نسب قوية، مجموعة بيانات حديثة

إجابة نموذجية: "سأختار بناءً على الفريق وحالة الاستخدام. Airflow لنطاق المؤسسة مع تبعيات معقدة. Prefect للفرق الثقيلة Python التي تريد تكرار أسرع. Dagster عندما يكون نسب البيانات والتفكير المركز على الأصول أولويات، خاصة مع dbt."

نصيحة المقابلة: اعرف أداة واحدة بعمق (عادة Airflow) لكن افهم مقايضات البدائل. كن قادراً على مناقشة لماذا ستهاجر من واحدة لأخرى.

بعد ذلك، سنستكشف dbt للتحويل ونمذجة البيانات. :::

مراجعة سريعة: كيف تجد هذا الدرس؟

اختبار

الوحدة 4: أنابيب ETL والتنسيق

خذ الاختبار