أنابيب ETL والتنسيق
جودة البيانات والاختبار
4 دقيقة للقراءة
جودة البيانات موضوع حاسم في مقابلات هندسة البيانات. كن مستعداً لمناقشة الأطر والأدوات والاستراتيجيات لضمان بيانات موثوقة.
أبعاد جودة البيانات
الركائز الست
| البُعد | الوصف | فحص مثال |
|---|---|---|
| الاكتمال | لا قيم مفقودة | معدل NULL < 1% |
| التفرد | لا تكرارات | المفتاح الأساسي فريد |
| الصلاحية | يتوافق مع القواعد | صيغة البريد الإلكتروني صالحة |
| الدقة | يعكس الواقع | المبالغ تطابق المصدر |
| الاتساق | نفسه عبر الأنظمة | اسم العميل يطابق CRM |
| التوقيت | حديث بما يكفي | البيانات < ساعة عمرها |
سؤال المقابلة: "كيف تقيس جودة البيانات؟"
إطار الإجابة:
1. حدد المقاييس لكل بُعد
2. نفذ فحوصات آلية
3. تتبع المقاييس بمرور الوقت (SLIs)
4. اضبط العتبات (SLOs)
5. أنذر عند الانتهاكات
6. أنشئ لوحات قيادة للرؤية
أدوات جودة البيانات
Great Expectations
إطار جودة البيانات القائم على Python.
import great_expectations as gx
# إنشاء مجموعة التوقعات
context = gx.get_context()
suite = context.add_expectation_suite("orders_quality")
# تعريف التوقعات
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_quality"
)
# الاكتمال
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
# التفرد
validator.expect_column_values_to_be_unique("order_id")
# الصلاحية
validator.expect_column_values_to_be_in_set(
"status",
["pending", "shipped", "delivered", "cancelled"]
)
validator.expect_column_values_to_match_regex(
"email",
r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)
# فحوصات النطاق
validator.expect_column_values_to_be_between(
"order_total",
min_value=0,
max_value=1000000
)
# حفظ وتشغيل
validator.save_expectation_suite()
results = validator.validate()
التكامل مع Airflow
from airflow.operators.python import PythonOperator
from great_expectations_provider.operators.great_expectations import (
GreatExpectationsOperator
)
validate_orders = GreatExpectationsOperator(
task_id='validate_orders',
data_context_root_dir='/great_expectations',
checkpoint_name='orders_checkpoint',
)
# في DAG
extract >> validate_orders >> transform
اختبارات dbt
اختبار مدمج لأنابيب التحويل.
# models/_schema.yml
version: 2
models:
- name: fct_orders
description: "جدول حقائق للطلبات"
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_date
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: date
tests:
- dbt_utils.recency:
datepart: hour
field: created_at
interval: 24
استراتيجيات الاختبار
اختبار العقود
التحقق من عقود البيانات بين المنتجين والمستهلكين.
# تعريف العقد
ORDER_CONTRACT = {
"order_id": {"type": "string", "nullable": False},
"customer_id": {"type": "string", "nullable": False},
"order_date": {"type": "date", "nullable": False},
"total_amount": {"type": "decimal", "nullable": False, "min": 0},
"status": {"type": "string", "enum": ["pending", "shipped", "delivered"]}
}
def validate_contract(df, contract):
"""التحقق من dataframe مقابل العقد."""
errors = []
for col, rules in contract.items():
if col not in df.columns:
errors.append(f"Missing column: {col}")
continue
if not rules.get("nullable") and df[col].isna().any():
errors.append(f"NULL values in non-nullable column: {col}")
if "min" in rules and (df[col] < rules["min"]).any():
errors.append(f"Values below minimum in column: {col}")
return errors
الاختبار الإحصائي
كشف الشذوذ وتحولات التوزيع.
from scipy import stats
def detect_anomalies(current_df, historical_df, column):
"""كشف الشذوذ الإحصائي في البيانات."""
# اختبار Z-score لتحول المتوسط
current_mean = current_df[column].mean()
historical_mean = historical_df[column].mean()
historical_std = historical_df[column].std()
z_score = (current_mean - historical_mean) / historical_std
if abs(z_score) > 3:
return f"Mean anomaly detected: z-score = {z_score}"
# اختبار Kolmogorov-Smirnov لتحول التوزيع
ks_stat, p_value = stats.ks_2samp(
current_df[column],
historical_df[column]
)
if p_value < 0.05:
return f"Distribution shift detected: p-value = {p_value}"
return None
اختبار الحجم
ضمان أحجام البيانات المتوقعة.
-- اختبار dbt لحدود الحجم
-- tests/assert_order_volume.sql
WITH daily_counts AS (
SELECT
DATE(created_at) AS order_date,
COUNT(*) AS order_count
FROM {{ ref('fct_orders') }}
WHERE created_at >= CURRENT_DATE - 7
GROUP BY DATE(created_at)
)
SELECT *
FROM daily_counts
WHERE order_count < 100 -- الحد الأدنى المتوقع
OR order_count > 100000 -- الحد الأقصى المتوقع
أنبوب جودة البيانات
نمط الهندسة
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ المصدر │───▶│ التحقق │───▶│ التحميل │
│ استخراج │ │ (GE/dbt) │ │ (نجاح) │
└─────────────┘ └──────┬──────┘ └─────────────┘
│
┌──────▼──────┐
│ الحجر │
│ (فشل) │
└──────┬──────┘
│
┌──────▼──────┐
│ إنذار و │
│ مراجعة │
└─────────────┘
التنفيذ
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
def validate_and_route(**context):
"""التحقق من البيانات والتوجيه للمسار المناسب."""
validation_results = run_validation()
if validation_results['success']:
return 'load_to_warehouse'
elif validation_results['severity'] == 'warning':
return 'load_with_flag'
else:
return 'quarantine_data'
with DAG('quality_pipeline') as dag:
extract = PythonOperator(task_id='extract', ...)
validate = BranchPythonOperator(
task_id='validate',
python_callable=validate_and_route,
)
load = PythonOperator(task_id='load_to_warehouse', ...)
load_flagged = PythonOperator(task_id='load_with_flag', ...)
quarantine = PythonOperator(task_id='quarantine_data', ...)
alert = PythonOperator(task_id='alert_team', ...)
extract >> validate >> [load, load_flagged, quarantine]
quarantine >> alert
مراقبة البيانات
المقاييس الرئيسية للمراقبة
| المقياس | الوصف | الأداة |
|---|---|---|
| الحداثة | الوقت منذ آخر تحديث | Monte Carlo, Elementary |
| الحجم | عدد الصفوف بمرور الوقت | مخصص، dbt |
| المخطط | تغييرات الأعمدة | Great Expectations |
| التوزيع | تحولات توزيع القيم | Elementary |
| النسب | تأثير التغييرات | dbt, Atlan |
إعداد التنبيهات
# مثال حزمة Elementary dbt
# models/elementary/alerts.sql
{{
config(
materialized='table',
post_hook="{% do elementary.notify_slack() %}"
)
}}
SELECT * FROM {{ ref('elementary', 'alerts') }}
WHERE severity = 'critical'
AND alert_time > CURRENT_TIMESTAMP - INTERVAL '1 hour'
سؤال تصميم المقابلة
السؤال: "صمم إطار جودة بيانات لشركة تجارة إلكترونية"
هيكل الإجابة:
1. حدد أبعاد الجودة
- الطلبات: الاكتمال، الصلاحية، التوقيت
- العملاء: التفرد، الدقة
- المنتجات: الصلاحية، الاتساق
2. نفذ الفحوصات
الطبقة 1 (الاستيعاب): التحقق من المخطط، فحوصات null
الطبقة 2 (التدريج): قواعد الأعمال، السلامة المرجعية
الطبقة 3 (Marts): فحوصات التجميع، التسوية
3. الأدوات
- Great Expectations لأنابيب Python
- اختبارات dbt لتحويلات SQL
- Elementary للمراقبة
4. سير العمل
- الفشل الحرج → حظر الأنبوب، إنذار
- التحذيرات → وسم البيانات، استمرار
- تتبع الاتجاهات → تقارير جودة أسبوعية
5. الحوكمة
- مشرف بيانات لكل مجال
- SLAs الجودة في عقود البيانات
- عملية استجابة الحوادث
نظرة المقابلة: جودة البيانات ليست فقط عن الأدوات—إنها عن العملية والثقافة. ناقش كيف ستبني الجودة في سير عمل التطوير، ليس فقط تضيف الفحوصات في النهاية.
الوحدة التالية تغطي البيانات الضخمة وأنظمة البث. :::