أنابيب ETL والتنسيق

جودة البيانات والاختبار

4 دقيقة للقراءة

جودة البيانات موضوع حاسم في مقابلات هندسة البيانات. كن مستعداً لمناقشة الأطر والأدوات والاستراتيجيات لضمان بيانات موثوقة.

أبعاد جودة البيانات

الركائز الست

البُعد الوصف فحص مثال
الاكتمال لا قيم مفقودة معدل NULL < 1%
التفرد لا تكرارات المفتاح الأساسي فريد
الصلاحية يتوافق مع القواعد صيغة البريد الإلكتروني صالحة
الدقة يعكس الواقع المبالغ تطابق المصدر
الاتساق نفسه عبر الأنظمة اسم العميل يطابق CRM
التوقيت حديث بما يكفي البيانات < ساعة عمرها

سؤال المقابلة: "كيف تقيس جودة البيانات؟"

إطار الإجابة:

1. حدد المقاييس لكل بُعد
2. نفذ فحوصات آلية
3. تتبع المقاييس بمرور الوقت (SLIs)
4. اضبط العتبات (SLOs)
5. أنذر عند الانتهاكات
6. أنشئ لوحات قيادة للرؤية

أدوات جودة البيانات

Great Expectations

إطار جودة البيانات القائم على Python.

import great_expectations as gx

# إنشاء مجموعة التوقعات
context = gx.get_context()
suite = context.add_expectation_suite("orders_quality")

# تعريف التوقعات
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_quality"
)

# الاكتمال
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")

# التفرد
validator.expect_column_values_to_be_unique("order_id")

# الصلاحية
validator.expect_column_values_to_be_in_set(
    "status",
    ["pending", "shipped", "delivered", "cancelled"]
)

validator.expect_column_values_to_match_regex(
    "email",
    r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)

# فحوصات النطاق
validator.expect_column_values_to_be_between(
    "order_total",
    min_value=0,
    max_value=1000000
)

# حفظ وتشغيل
validator.save_expectation_suite()
results = validator.validate()

التكامل مع Airflow

from airflow.operators.python import PythonOperator
from great_expectations_provider.operators.great_expectations import (
    GreatExpectationsOperator
)

validate_orders = GreatExpectationsOperator(
    task_id='validate_orders',
    data_context_root_dir='/great_expectations',
    checkpoint_name='orders_checkpoint',
)

# في DAG
extract >> validate_orders >> transform

اختبارات dbt

اختبار مدمج لأنابيب التحويل.

# models/_schema.yml
version: 2

models:
  - name: fct_orders
    description: "جدول حقائق للطلبات"
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: order_date
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_of_type:
              column_type: date

    tests:
      - dbt_utils.recency:
          datepart: hour
          field: created_at
          interval: 24

استراتيجيات الاختبار

اختبار العقود

التحقق من عقود البيانات بين المنتجين والمستهلكين.

# تعريف العقد
ORDER_CONTRACT = {
    "order_id": {"type": "string", "nullable": False},
    "customer_id": {"type": "string", "nullable": False},
    "order_date": {"type": "date", "nullable": False},
    "total_amount": {"type": "decimal", "nullable": False, "min": 0},
    "status": {"type": "string", "enum": ["pending", "shipped", "delivered"]}
}

def validate_contract(df, contract):
    """التحقق من dataframe مقابل العقد."""
    errors = []

    for col, rules in contract.items():
        if col not in df.columns:
            errors.append(f"Missing column: {col}")
            continue

        if not rules.get("nullable") and df[col].isna().any():
            errors.append(f"NULL values in non-nullable column: {col}")

        if "min" in rules and (df[col] < rules["min"]).any():
            errors.append(f"Values below minimum in column: {col}")

    return errors

الاختبار الإحصائي

كشف الشذوذ وتحولات التوزيع.

from scipy import stats

def detect_anomalies(current_df, historical_df, column):
    """كشف الشذوذ الإحصائي في البيانات."""

    # اختبار Z-score لتحول المتوسط
    current_mean = current_df[column].mean()
    historical_mean = historical_df[column].mean()
    historical_std = historical_df[column].std()

    z_score = (current_mean - historical_mean) / historical_std

    if abs(z_score) > 3:
        return f"Mean anomaly detected: z-score = {z_score}"

    # اختبار Kolmogorov-Smirnov لتحول التوزيع
    ks_stat, p_value = stats.ks_2samp(
        current_df[column],
        historical_df[column]
    )

    if p_value < 0.05:
        return f"Distribution shift detected: p-value = {p_value}"

    return None

اختبار الحجم

ضمان أحجام البيانات المتوقعة.

-- اختبار dbt لحدود الحجم
-- tests/assert_order_volume.sql
WITH daily_counts AS (
    SELECT
        DATE(created_at) AS order_date,
        COUNT(*) AS order_count
    FROM {{ ref('fct_orders') }}
    WHERE created_at >= CURRENT_DATE - 7
    GROUP BY DATE(created_at)
)

SELECT *
FROM daily_counts
WHERE order_count < 100  -- الحد الأدنى المتوقع
   OR order_count > 100000  -- الحد الأقصى المتوقع

أنبوب جودة البيانات

نمط الهندسة

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   المصدر   │───▶│   التحقق   │───▶│   التحميل  │
│  استخراج   │    │  (GE/dbt)   │    │   (نجاح)   │
└─────────────┘    └──────┬──────┘    └─────────────┘
                   ┌──────▼──────┐
                   │   الحجر     │
                   │   (فشل)    │
                   └──────┬──────┘
                   ┌──────▼──────┐
                   │  إنذار و   │
                   │   مراجعة   │
                   └─────────────┘

التنفيذ

from airflow import DAG
from airflow.operators.python import BranchPythonOperator

def validate_and_route(**context):
    """التحقق من البيانات والتوجيه للمسار المناسب."""
    validation_results = run_validation()

    if validation_results['success']:
        return 'load_to_warehouse'
    elif validation_results['severity'] == 'warning':
        return 'load_with_flag'
    else:
        return 'quarantine_data'

with DAG('quality_pipeline') as dag:
    extract = PythonOperator(task_id='extract', ...)

    validate = BranchPythonOperator(
        task_id='validate',
        python_callable=validate_and_route,
    )

    load = PythonOperator(task_id='load_to_warehouse', ...)
    load_flagged = PythonOperator(task_id='load_with_flag', ...)
    quarantine = PythonOperator(task_id='quarantine_data', ...)
    alert = PythonOperator(task_id='alert_team', ...)

    extract >> validate >> [load, load_flagged, quarantine]
    quarantine >> alert

مراقبة البيانات

المقاييس الرئيسية للمراقبة

المقياس الوصف الأداة
الحداثة الوقت منذ آخر تحديث Monte Carlo, Elementary
الحجم عدد الصفوف بمرور الوقت مخصص، dbt
المخطط تغييرات الأعمدة Great Expectations
التوزيع تحولات توزيع القيم Elementary
النسب تأثير التغييرات dbt, Atlan

إعداد التنبيهات

# مثال حزمة Elementary dbt
# models/elementary/alerts.sql
{{
  config(
    materialized='table',
    post_hook="{% do elementary.notify_slack() %}"
  )
}}

SELECT * FROM {{ ref('elementary', 'alerts') }}
WHERE severity = 'critical'
AND alert_time > CURRENT_TIMESTAMP - INTERVAL '1 hour'

سؤال تصميم المقابلة

السؤال: "صمم إطار جودة بيانات لشركة تجارة إلكترونية"

هيكل الإجابة:

1. حدد أبعاد الجودة
   - الطلبات: الاكتمال، الصلاحية، التوقيت
   - العملاء: التفرد، الدقة
   - المنتجات: الصلاحية، الاتساق

2. نفذ الفحوصات
   الطبقة 1 (الاستيعاب): التحقق من المخطط، فحوصات null
   الطبقة 2 (التدريج): قواعد الأعمال، السلامة المرجعية
   الطبقة 3 (Marts): فحوصات التجميع، التسوية

3. الأدوات
   - Great Expectations لأنابيب Python
   - اختبارات dbt لتحويلات SQL
   - Elementary للمراقبة

4. سير العمل
   - الفشل الحرج → حظر الأنبوب، إنذار
   - التحذيرات → وسم البيانات، استمرار
   - تتبع الاتجاهات → تقارير جودة أسبوعية

5. الحوكمة
   - مشرف بيانات لكل مجال
   - SLAs الجودة في عقود البيانات
   - عملية استجابة الحوادث

نظرة المقابلة: جودة البيانات ليست فقط عن الأدوات—إنها عن العملية والثقافة. ناقش كيف ستبني الجودة في سير عمل التطوير، ليس فقط تضيف الفحوصات في النهاية.

الوحدة التالية تغطي البيانات الضخمة وأنظمة البث. :::

اختبار

الوحدة 4: أنابيب ETL والتنسيق

خذ الاختبار