ETL Pipelines & Orchestration
Data Quality & Testing
Data quality is a critical topic in data engineering interviews. Be prepared to discuss frameworks, tools, and strategies for ensuring reliable data.
Data Quality Dimensions
The Six Pillars
| Dimension | Description | Example Check |
|---|---|---|
| Completeness | No missing values | NULL rate < 1% |
| Uniqueness | No duplicates | Primary key unique |
| Validity | Conforms to rules | Email format valid |
| Accuracy | Reflects reality | Amounts match source |
| Consistency | Same across systems | Customer name matches CRM |
| Timeliness | Fresh enough | Data < 1 hour old |
Interview Question: "How do you measure data quality?"
Answer Framework:
1. Define metrics per dimension
2. Implement automated checks
3. Track metrics over time (SLIs)
4. Set thresholds (SLOs)
5. Alert on violations
6. Create dashboards for visibility
Data Quality Tools
Great Expectations
Python-based data quality framework.
import great_expectations as gx
# Create expectation suite
context = gx.get_context()
suite = context.add_expectation_suite("orders_quality")
# Define expectations
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_quality"
)
# Completeness
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
# Uniqueness
validator.expect_column_values_to_be_unique("order_id")
# Validity
validator.expect_column_values_to_be_in_set(
"status",
["pending", "shipped", "delivered", "cancelled"]
)
validator.expect_column_values_to_match_regex(
"email",
r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)
# Range checks
validator.expect_column_values_to_be_between(
"order_total",
min_value=0,
max_value=1000000
)
# Save and run
validator.save_expectation_suite()
results = validator.validate()
Integration with Airflow
from airflow.operators.python import PythonOperator
from great_expectations_provider.operators.great_expectations import (
GreatExpectationsOperator
)
validate_orders = GreatExpectationsOperator(
task_id='validate_orders',
data_context_root_dir='/great_expectations',
checkpoint_name='orders_checkpoint',
)
# In DAG
extract >> validate_orders >> transform
dbt Tests
Built-in testing for transformation pipelines.
# models/_schema.yml
version: 2
models:
- name: fct_orders
description: "Fact table for orders"
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_date
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: date
tests:
- dbt_utils.recency:
datepart: hour
field: created_at
interval: 24
Testing Strategies
Contract Testing
Validate data contracts between producers and consumers.
# Define contract
ORDER_CONTRACT = {
"order_id": {"type": "string", "nullable": False},
"customer_id": {"type": "string", "nullable": False},
"order_date": {"type": "date", "nullable": False},
"total_amount": {"type": "decimal", "nullable": False, "min": 0},
"status": {"type": "string", "enum": ["pending", "shipped", "delivered"]}
}
def validate_contract(df, contract):
"""Validate dataframe against contract."""
errors = []
for col, rules in contract.items():
if col not in df.columns:
errors.append(f"Missing column: {col}")
continue
if not rules.get("nullable") and df[col].isna().any():
errors.append(f"NULL values in non-nullable column: {col}")
if "min" in rules and (df[col] < rules["min"]).any():
errors.append(f"Values below minimum in column: {col}")
return errors
Statistical Testing
Detect anomalies and distribution shifts.
from scipy import stats
def detect_anomalies(current_df, historical_df, column):
"""Detect statistical anomalies in data."""
# Z-score test for mean shift
current_mean = current_df[column].mean()
historical_mean = historical_df[column].mean()
historical_std = historical_df[column].std()
z_score = (current_mean - historical_mean) / historical_std
if abs(z_score) > 3:
return f"Mean anomaly detected: z-score = {z_score}"
# Kolmogorov-Smirnov test for distribution shift
ks_stat, p_value = stats.ks_2samp(
current_df[column],
historical_df[column]
)
if p_value < 0.05:
return f"Distribution shift detected: p-value = {p_value}"
return None
Volume Testing
Ensure expected data volumes.
-- dbt test for volume bounds
-- tests/assert_order_volume.sql
WITH daily_counts AS (
SELECT
DATE(created_at) AS order_date,
COUNT(*) AS order_count
FROM {{ ref('fct_orders') }}
WHERE created_at >= CURRENT_DATE - 7
GROUP BY DATE(created_at)
)
SELECT *
FROM daily_counts
WHERE order_count < 100 -- Minimum expected
OR order_count > 100000 -- Maximum expected
Data Quality Pipeline
Architecture Pattern
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Source │───▶│ Validate │───▶│ Load │
│ Extract │ │ (GE/dbt) │ │ (Pass) │
└─────────────┘ └──────┬──────┘ └─────────────┘
│
┌──────▼──────┐
│ Quarantine │
│ (Fail) │
└──────┬──────┘
│
┌──────▼──────┐
│ Alert & │
│ Review │
└─────────────┘
Implementation
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
def validate_and_route(**context):
"""Validate data and route to appropriate path."""
validation_results = run_validation()
if validation_results['success']:
return 'load_to_warehouse'
elif validation_results['severity'] == 'warning':
return 'load_with_flag'
else:
return 'quarantine_data'
with DAG('quality_pipeline') as dag:
extract = PythonOperator(task_id='extract', ...)
validate = BranchPythonOperator(
task_id='validate',
python_callable=validate_and_route,
)
load = PythonOperator(task_id='load_to_warehouse', ...)
load_flagged = PythonOperator(task_id='load_with_flag', ...)
quarantine = PythonOperator(task_id='quarantine_data', ...)
alert = PythonOperator(task_id='alert_team', ...)
extract >> validate >> [load, load_flagged, quarantine]
quarantine >> alert
Data Observability
Key Metrics to Monitor
| Metric | Description | Tool |
|---|---|---|
| Freshness | Time since last update | Monte Carlo, Elementary |
| Volume | Row counts over time | Custom, dbt |
| Schema | Column changes | Great Expectations |
| Distribution | Value distribution shifts | Elementary |
| Lineage | Impact of changes | dbt, Atlan |
Setting Up Alerts
# Elementary dbt package example
# models/elementary/alerts.sql
{{
config(
materialized='table',
post_hook="{% do elementary.notify_slack() %}"
)
}}
SELECT * FROM {{ ref('elementary', 'alerts') }}
WHERE severity = 'critical'
AND alert_time > CURRENT_TIMESTAMP - INTERVAL '1 hour'
Interview Design Question
Question: "Design a data quality framework for an e-commerce company"
Answer Structure:
1. DEFINE QUALITY DIMENSIONS
- Orders: completeness, validity, timeliness
- Customers: uniqueness, accuracy
- Products: validity, consistency
2. IMPLEMENT CHECKS
Layer 1 (Ingestion): Schema validation, null checks
Layer 2 (Staging): Business rules, referential integrity
Layer 3 (Marts): Aggregation checks, reconciliation
3. TOOLING
- Great Expectations for Python pipelines
- dbt tests for SQL transformations
- Elementary for observability
4. WORKFLOW
- Critical failures → block pipeline, alert
- Warnings → flag data, continue
- Track trends → weekly quality reports
5. GOVERNANCE
- Data steward per domain
- Quality SLAs in data contracts
- Incident response process
Interview Insight: Data quality isn't just about tools—it's about process and culture. Discuss how you'd build quality into the development workflow, not just add checks at the end.
Next module covers Big Data and streaming systems. :::