أنابيب ETL والتنسيق

dbt لتحويلات البيانات

4 دقيقة للقراءة

dbt (أداة بناء البيانات) أصبحت أساسية لهندسة البيانات الحديثة. أسئلة المقابلات تتراوح من المفاهيم الأساسية للأنماط المتقدمة.

أساسيات dbt

ما يفعله dbt

القدرة الوصف
تحويلات SQL اكتب عبارات SELECT، dbt يتعامل مع DDL
إدارة التبعيات DAG تلقائي من ref()
الاختبار اختبارات بيانات مدمجة ومخصصة
التوثيق مستندات مولدة تلقائياً من YAML
النماذج التدريجية معالجة البيانات الجديدة فقط
اللقطات تتبع SCD النوع 2

هيكل المشروع

dbt_project/
├── dbt_project.yml          # تكوين المشروع
├── profiles.yml             # إعدادات الاتصال
├── models/
│   ├── staging/             # خام → منظف
│   │   ├── stg_orders.sql
│   │   └── _stg_models.yml
│   ├── intermediate/        # منطق الأعمال
│   │   └── int_order_items.sql
│   └── marts/               # الجداول النهائية
│       ├── core/
│       │   └── fct_orders.sql
│       └── marketing/
│           └── dim_customers.sql
├── tests/
│   └── custom_tests.sql
├── macros/
│   └── custom_macros.sql
└── seeds/
    └── country_codes.csv

أنواع النماذج

نماذج التدريج

تنظيف وتوحيد البيانات الخام.

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('raw', 'orders') }}
),

renamed AS (
    SELECT
        id AS order_id,
        customer_id,
        CAST(order_date AS DATE) AS order_date,
        status AS order_status,
        total_amount,
        _loaded_at
    FROM source
)

SELECT * FROM renamed

النماذج الوسيطة

تطبيق منطق الأعمال، ربط البيانات.

-- models/intermediate/int_order_items_joined.sql
WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

order_items AS (
    SELECT * FROM {{ ref('stg_order_items') }}
),

products AS (
    SELECT * FROM {{ ref('stg_products') }}
)

SELECT
    o.order_id,
    o.customer_id,
    o.order_date,
    oi.product_id,
    p.product_name,
    p.category,
    oi.quantity,
    oi.unit_price,
    oi.quantity * oi.unit_price AS line_total
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id

نماذج Mart

جداول جاهزة للأعمال للتحليلات.

-- models/marts/core/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        partition_by={'field': 'order_date', 'data_type': 'date'}
    )
}}

WITH order_items AS (
    SELECT * FROM {{ ref('int_order_items_joined') }}
),

aggregated AS (
    SELECT
        order_id,
        customer_id,
        order_date,
        COUNT(DISTINCT product_id) AS unique_products,
        SUM(quantity) AS total_items,
        SUM(line_total) AS order_total
    FROM order_items
    GROUP BY order_id, customer_id, order_date
)

SELECT * FROM aggregated

{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}

النماذج التدريجية

معالجة البيانات الجديدة/المتغيرة فقط للكفاءة.

الاستراتيجيات

الاستراتيجية حالة الاستخدام
append إدراج صفوف جديدة، لا تحديثات
merge upsert بناءً على مفتاح فريد
delete+insert استبدال القسم
insert_overwrite استبدال الأقسام المطابقة

مثال: استراتيجية Merge

{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='merge',
        merge_exclude_columns=['created_at']
    )
}}

SELECT
    order_id,
    customer_id,
    order_total,
    updated_at,
    CURRENT_TIMESTAMP() AS created_at
FROM {{ ref('stg_orders') }}

{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

الاختبار في dbt

الاختبارات المدمجة

# models/_schema.yml
version: 2

models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id
      - name: order_status
        tests:
          - accepted_values:
              values: ['pending', 'shipped', 'delivered', 'cancelled']

الاختبارات المخصصة

-- tests/assert_positive_amounts.sql
SELECT
    order_id,
    order_total
FROM {{ ref('fct_orders') }}
WHERE order_total < 0

الاختبارات العامة (Macros)

-- macros/test_positive_value.sql
{% test positive_value(model, column_name) %}

SELECT
    {{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} < 0

{% endtest %}
# الاستخدام في المخطط
- name: order_total
  tests:
    - positive_value

الأنماط المتقدمة

Jinja Macros

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
    ROUND({{ column_name }} / 100.0, 2)
{% endmacro %}

-- الاستخدام في النموذج
SELECT
    order_id,
    {{ cents_to_dollars('amount_cents') }} AS amount_dollars
FROM {{ ref('stg_orders') }}

المخططات الديناميكية

-- توليد اتحاد الجداول الشهرية
{% set months = ['01', '02', '03', '04', '05', '06'] %}

{% for month in months %}
SELECT * FROM raw.events_2024_{{ month }}
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}

اللقطات (SCD النوع 2)

-- snapshots/customer_snapshot.sql
{% snapshot customer_snapshot %}

{{
    config(
        target_schema='snapshots',
        unique_key='customer_id',
        strategy='timestamp',
        updated_at='updated_at',
    )
}}

SELECT
    customer_id,
    customer_name,
    segment,
    region,
    updated_at
FROM {{ source('raw', 'customers') }}

{% endsnapshot %}

أسئلة المقابلة

السؤال: "اشرح كيف يتعامل dbt مع التبعيات"

الجواب: "dbt يبني DAG تلقائياً من دوال ref() و source(). عندما تكتب {{ ref('stg_orders') }}، dbt يعرف أن النموذج يجب أن يشغل أولاً. الـ DAG يمكّن:

  1. ترتيب تنفيذ صحيح
  2. تنفيذ متوازي حيث ممكن
  3. تشغيلات انتقائية (dbt run --select stg_orders+)
  4. توثيق تلقائي للنسب"

السؤال: "كيف تحسن dbt لمجموعات البيانات الكبيرة؟"

الجواب:

# 1. استخدم النماذج التدريجية
materialized='incremental'

# 2. قسّم الجداول الكبيرة
partition_by={'field': 'event_date', 'data_type': 'date'}

# 3. جمّع لأنماط الاستعلام
cluster_by=['user_id']

# 4. التأجيل للإنتاج للتطوير
# dbt run --defer --state prod-artifacts/

# 5. استخدم اختيار النموذج
# dbt run --select staging.stg_orders+

السؤال: "كيف ستنفذ فحوصات جودة البيانات في dbt؟"

الجواب: "سأنفذ نهج متعدد الطبقات:

  1. اختبارات المخطط: unique, not_null, relationships, accepted_values
  2. اختبارات SQL مخصصة: التحقق الخاص بالأعمال
  3. dbt-expectations: اختبارات إحصائية (القيم الشاذة، التوزيع)
  4. Pre-hooks: التحقق من حداثة المصدر
  5. Post-hooks: تسجيل نتائج الاختبار للمراقبة

للأنابيب الحرجة، سأستخدم dbt build الذي يشغل الاختبارات بعد كل نموذج، يفشل بسرعة على المشاكل."

نظرة المقابلة: dbt الآن أساسي لهندسة البيانات. اعرف الأساسيات بعمق، وكن مستعداً لمناقشة كيف ستهيكل مشروع لفريق من 10+ مهندسين.

بعد ذلك، سنغطي استراتيجيات جودة البيانات والاختبار. :::

اختبار

الوحدة 4: أنابيب ETL والتنسيق

خذ الاختبار