أنابيب ETL والتنسيق

أدوات تنسيق الأنابيب

4 دقيقة للقراءة

التنسيق هو العمود الفقري لهندسة البيانات. توقع أسئلة مقابلة حول Airflow و Prefect و Dagster ومتى تستخدم كل منها.

Apache Airflow

المعيار الصناعي لتنسيق سير العمل.

المفاهيم الأساسية

المفهوم الوصف
DAG رسم بياني موجه غير دوري - تعريف سير العمل
Task وحدة عمل واحدة
Operator قالب للمهام (Python, SQL, Bash, إلخ)
Sensor انتظار شرط خارجي
XCom التواصل المتبادل بين المهام
Connection بيانات اعتماد النظام الخارجي

مثال DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'daily_sales_pipeline',
    default_args=default_args,
    description='Daily sales data processing',
    schedule_interval='0 6 * * *',  # 6 صباحاً يومياً
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sales', 'daily'],
) as dag:

    extract = PythonOperator(
        task_id='extract_sales_data',
        python_callable=extract_sales_data,
    )

    transform = SnowflakeOperator(
        task_id='transform_sales',
        sql='sql/transform_sales.sql',
        snowflake_conn_id='snowflake_default',
    )

    dbt_run = BashOperator(
        task_id='dbt_run',
        bash_command='cd /dbt && dbt run --models staging.stg_sales+',
    )

    # تعريف التبعيات
    extract >> transform >> dbt_run

سؤال المقابلة: "كيف تتعامل مع فشل المهام في Airflow؟"

الجواب:

# 1. إعادة المحاولة مع تراجع
default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
}

# 2. استدعاءات عند الفشل
def alert_on_failure(context):
    task_id = context['task_instance'].task_id
    send_slack_alert(f"Task {task_id} failed!")

task = PythonOperator(
    task_id='critical_task',
    on_failure_callback=alert_on_failure,
    ...
)

# 3. قواعد التشغيل للمهام المصب
downstream = PythonOperator(
    task_id='cleanup',
    trigger_rule='all_done',  # شغل بغض النظر عن حالة المصعد
)

الأنماط الرئيسية

DAGs الديناميكية:

# توليد المهام ديناميكياً
for table in ['orders', 'customers', 'products']:
    task = PythonOperator(
        task_id=f'process_{table}',
        python_callable=process_table,
        op_kwargs={'table': table},
    )

مجموعات المهام (Airflow 2.0+):

from airflow.utils.task_group import TaskGroup

with TaskGroup('transform_group') as transform_group:
    task1 = PythonOperator(task_id='transform_1', ...)
    task2 = PythonOperator(task_id='transform_2', ...)

Prefect

تنسيق حديث، أصلي Python مع تجربة مطور أفضل.

الاختلافات الرئيسية عن Airflow

الجانب Airflow Prefect
تعريف DAG تحليل ملفات DAG مزخرفات Python
الجدولة عملية المجدول السحابة أو الخادم المحلي
إدارة الحالة بيانات وصفية Postgres Postgres أو Prefect Cloud
الاختبار المحلي أصعب دعم أصلي
سير العمل الديناميكي معقد دعم من الدرجة الأولى

مثال Flow

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(retries=3, retry_delay_seconds=60)
def extract_data(source: str) -> dict:
    """استخراج البيانات من نظام المصدر."""
    return fetch_data(source)

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def transform_data(raw_data: dict) -> dict:
    """تحويل البيانات المستخرجة."""
    return clean_and_transform(raw_data)

@task
def load_data(transformed_data: dict, target: str):
    """تحميل البيانات للهدف."""
    write_to_warehouse(transformed_data, target)

@flow(name="daily-sales-etl")
def daily_sales_pipeline(source: str = "salesforce"):
    raw = extract_data(source)
    transformed = transform_data(raw)
    load_data(transformed, "sales_mart")

# شغل محلياً للاختبار
if __name__ == "__main__":
    daily_sales_pipeline()

الميزات المتقدمة

from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

# Subflows للتعديل
@flow
def parent_flow():
    result1 = child_flow_1()
    result2 = child_flow_2(result1)
    return result2

# التنفيذ المتزامن
@flow
def parallel_processing():
    futures = [process_item.submit(item) for item in items]
    results = [f.result() for f in futures]
    return results

# النشر مع الجدولة
deployment = Deployment.build_from_flow(
    flow=daily_sales_pipeline,
    name="production-deployment",
    schedule=CronSchedule(cron="0 6 * * *"),
    work_pool_name="default-agent-pool",
)

Dagster

نهج الأصول المحددة بالبرمجيات—يركز على أصول البيانات بدلاً من المهام.

المفاهيم الأساسية

المفهوم الوصف
Asset قطعة أثرية للبيانات (جدول، ملف، نموذج ML)
Op وحدة حساب (مثل مهمة Airflow)
Job مجموعة من ops للتنفيذ
Resource تكوين النظام الخارجي
IO Manager كيفية تخزين/تحميل الأصول

أنبوب قائم على الأصول

from dagster import asset, Definitions, AssetExecutionContext
import pandas as pd

@asset
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
    """استخراج الطلبات من المصدر."""
    context.log.info("Extracting orders")
    return pd.read_sql("SELECT * FROM orders", source_conn)

@asset
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    """تنظيف وإزالة تكرار الطلبات."""
    return (
        raw_orders
        .drop_duplicates(subset=['order_id'])
        .dropna(subset=['customer_id'])
    )

@asset
def daily_revenue(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
    """تجميع الإيرادات اليومية."""
    return (
        cleaned_orders
        .groupby('order_date')
        .agg({'total': 'sum'})
        .reset_index()
    )

defs = Definitions(
    assets=[raw_orders, cleaned_orders, daily_revenue],
)

لماذا Dagster لهندسة البيانات

نسب البيانات: رسم بياني للتبعيات تلقائي التحقق: تتبع متى تم حساب الأصول آخر مرة الأقسام: دعم أصلي للأصول المقسمة بالوقت الاختبار: أدوات اختبار مدمجة

# أصول مقسمة
from dagster import asset, DailyPartitionsDefinition

@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def daily_events(context: AssetExecutionContext) -> pd.DataFrame:
    partition_date = context.partition_key
    return extract_events_for_date(partition_date)

مقارنة الأدوات

العامل Airflow Prefect Dagster
النموذج قائم على المهام قائم على المهام قائم على الأصول
منحنى التعلم حاد متوسط متوسط
المجتمع الأكبر متنامي متنامي
العرض السحابي Astronomer, MWAA Prefect Cloud Dagster Cloud
الأفضل لـ جدولة معقدة فرق Python فرق تركز على البيانات
التطوير المحلي Docker مطلوب أصلي أصلي
تكامل dbt Cosmos, يدوي أصلي أصلي

سؤال المقابلة: "كيف تختار بين Airflow و Prefect و Dagster؟"

إطار الإجابة:

اختر عندما
Airflow نطاق المؤسسة، جدولة معقدة، فريق كبير، خبرة موجودة
Prefect فريق Python أولاً، تحتاج تكرار سريع، نشر أبسط
Dagster تركيز أصول البيانات، احتياجات نسب قوية، مجموعة بيانات حديثة

إجابة نموذجية: "سأختار بناءً على الفريق وحالة الاستخدام. Airflow لنطاق المؤسسة مع تبعيات معقدة. Prefect للفرق الثقيلة Python التي تريد تكرار أسرع. Dagster عندما يكون نسب البيانات والتفكير المركز على الأصول أولويات، خاصة مع dbt."

نصيحة المقابلة: اعرف أداة واحدة بعمق (عادة Airflow) لكن افهم مقايضات البدائل. كن قادراً على مناقشة لماذا ستهاجر من واحدة لأخرى.

بعد ذلك، سنستكشف dbt للتحويل ونمذجة البيانات. :::

اختبار

الوحدة 4: أنابيب ETL والتنسيق

خذ الاختبار