أنابيب ETL والتنسيق
dbt لتحويلات البيانات
4 دقيقة للقراءة
dbt (أداة بناء البيانات) أصبحت أساسية لهندسة البيانات الحديثة. أسئلة المقابلات تتراوح من المفاهيم الأساسية للأنماط المتقدمة.
أساسيات dbt
ما يفعله dbt
| القدرة | الوصف |
|---|---|
| تحويلات SQL | اكتب عبارات SELECT، dbt يتعامل مع DDL |
| إدارة التبعيات | DAG تلقائي من ref() |
| الاختبار | اختبارات بيانات مدمجة ومخصصة |
| التوثيق | مستندات مولدة تلقائياً من YAML |
| النماذج التدريجية | معالجة البيانات الجديدة فقط |
| اللقطات | تتبع SCD النوع 2 |
هيكل المشروع
dbt_project/
├── dbt_project.yml # تكوين المشروع
├── profiles.yml # إعدادات الاتصال
├── models/
│ ├── staging/ # خام → منظف
│ │ ├── stg_orders.sql
│ │ └── _stg_models.yml
│ ├── intermediate/ # منطق الأعمال
│ │ └── int_order_items.sql
│ └── marts/ # الجداول النهائية
│ ├── core/
│ │ └── fct_orders.sql
│ └── marketing/
│ └── dim_customers.sql
├── tests/
│ └── custom_tests.sql
├── macros/
│ └── custom_macros.sql
└── seeds/
└── country_codes.csv
أنواع النماذج
نماذج التدريج
تنظيف وتوحيد البيانات الخام.
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
),
renamed AS (
SELECT
id AS order_id,
customer_id,
CAST(order_date AS DATE) AS order_date,
status AS order_status,
total_amount,
_loaded_at
FROM source
)
SELECT * FROM renamed
النماذج الوسيطة
تطبيق منطق الأعمال، ربط البيانات.
-- models/intermediate/int_order_items_joined.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
order_items AS (
SELECT * FROM {{ ref('stg_order_items') }}
),
products AS (
SELECT * FROM {{ ref('stg_products') }}
)
SELECT
o.order_id,
o.customer_id,
o.order_date,
oi.product_id,
p.product_name,
p.category,
oi.quantity,
oi.unit_price,
oi.quantity * oi.unit_price AS line_total
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
نماذج Mart
جداول جاهزة للأعمال للتحليلات.
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
partition_by={'field': 'order_date', 'data_type': 'date'}
)
}}
WITH order_items AS (
SELECT * FROM {{ ref('int_order_items_joined') }}
),
aggregated AS (
SELECT
order_id,
customer_id,
order_date,
COUNT(DISTINCT product_id) AS unique_products,
SUM(quantity) AS total_items,
SUM(line_total) AS order_total
FROM order_items
GROUP BY order_id, customer_id, order_date
)
SELECT * FROM aggregated
{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
النماذج التدريجية
معالجة البيانات الجديدة/المتغيرة فقط للكفاءة.
الاستراتيجيات
| الاستراتيجية | حالة الاستخدام |
|---|---|
| append | إدراج صفوف جديدة، لا تحديثات |
| merge | upsert بناءً على مفتاح فريد |
| delete+insert | استبدال القسم |
| insert_overwrite | استبدال الأقسام المطابقة |
مثال: استراتيجية Merge
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
merge_exclude_columns=['created_at']
)
}}
SELECT
order_id,
customer_id,
order_total,
updated_at,
CURRENT_TIMESTAMP() AS created_at
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
الاختبار في dbt
الاختبارات المدمجة
# models/_schema.yml
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: order_status
tests:
- accepted_values:
values: ['pending', 'shipped', 'delivered', 'cancelled']
الاختبارات المخصصة
-- tests/assert_positive_amounts.sql
SELECT
order_id,
order_total
FROM {{ ref('fct_orders') }}
WHERE order_total < 0
الاختبارات العامة (Macros)
-- macros/test_positive_value.sql
{% test positive_value(model, column_name) %}
SELECT
{{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} < 0
{% endtest %}
# الاستخدام في المخطط
- name: order_total
tests:
- positive_value
الأنماط المتقدمة
Jinja Macros
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
ROUND({{ column_name }} / 100.0, 2)
{% endmacro %}
-- الاستخدام في النموذج
SELECT
order_id,
{{ cents_to_dollars('amount_cents') }} AS amount_dollars
FROM {{ ref('stg_orders') }}
المخططات الديناميكية
-- توليد اتحاد الجداول الشهرية
{% set months = ['01', '02', '03', '04', '05', '06'] %}
{% for month in months %}
SELECT * FROM raw.events_2024_{{ month }}
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}
اللقطات (SCD النوع 2)
-- snapshots/customer_snapshot.sql
{% snapshot customer_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
)
}}
SELECT
customer_id,
customer_name,
segment,
region,
updated_at
FROM {{ source('raw', 'customers') }}
{% endsnapshot %}
أسئلة المقابلة
السؤال: "اشرح كيف يتعامل dbt مع التبعيات"
الجواب: "dbt يبني DAG تلقائياً من دوال ref() و source(). عندما تكتب {{ ref('stg_orders') }}، dbt يعرف أن النموذج يجب أن يشغل أولاً. الـ DAG يمكّن:
- ترتيب تنفيذ صحيح
- تنفيذ متوازي حيث ممكن
- تشغيلات انتقائية (
dbt run --select stg_orders+) - توثيق تلقائي للنسب"
السؤال: "كيف تحسن dbt لمجموعات البيانات الكبيرة؟"
الجواب:
# 1. استخدم النماذج التدريجية
materialized='incremental'
# 2. قسّم الجداول الكبيرة
partition_by={'field': 'event_date', 'data_type': 'date'}
# 3. جمّع لأنماط الاستعلام
cluster_by=['user_id']
# 4. التأجيل للإنتاج للتطوير
# dbt run --defer --state prod-artifacts/
# 5. استخدم اختيار النموذج
# dbt run --select staging.stg_orders+
السؤال: "كيف ستنفذ فحوصات جودة البيانات في dbt؟"
الجواب: "سأنفذ نهج متعدد الطبقات:
- اختبارات المخطط: unique, not_null, relationships, accepted_values
- اختبارات SQL مخصصة: التحقق الخاص بالأعمال
- dbt-expectations: اختبارات إحصائية (القيم الشاذة، التوزيع)
- Pre-hooks: التحقق من حداثة المصدر
- Post-hooks: تسجيل نتائج الاختبار للمراقبة
للأنابيب الحرجة، سأستخدم dbt build الذي يشغل الاختبارات بعد كل نموذج، يفشل بسرعة على المشاكل."
نظرة المقابلة: dbt الآن أساسي لهندسة البيانات. اعرف الأساسيات بعمق، وكن مستعداً لمناقشة كيف ستهيكل مشروع لفريق من 10+ مهندسين.
بعد ذلك، سنغطي استراتيجيات جودة البيانات والاختبار. :::