ETL Pipelines & Orchestration

Data Quality & Testing

4 min read

Data quality is a critical topic in data engineering interviews. Be prepared to discuss frameworks, tools, and strategies for ensuring reliable data.

Data Quality Dimensions

The Six Pillars

Dimension Description Example Check
Completeness No missing values NULL rate < 1%
Uniqueness No duplicates Primary key unique
Validity Conforms to rules Email format valid
Accuracy Reflects reality Amounts match source
Consistency Same across systems Customer name matches CRM
Timeliness Fresh enough Data < 1 hour old

Interview Question: "How do you measure data quality?"

Answer Framework:

1. Define metrics per dimension
2. Implement automated checks
3. Track metrics over time (SLIs)
4. Set thresholds (SLOs)
5. Alert on violations
6. Create dashboards for visibility

Data Quality Tools

Great Expectations

Python-based data quality framework.

import great_expectations as gx

# Create expectation suite
context = gx.get_context()
suite = context.add_expectation_suite("orders_quality")

# Define expectations
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_quality"
)

# Completeness
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")

# Uniqueness
validator.expect_column_values_to_be_unique("order_id")

# Validity
validator.expect_column_values_to_be_in_set(
    "status",
    ["pending", "shipped", "delivered", "cancelled"]
)

validator.expect_column_values_to_match_regex(
    "email",
    r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)

# Range checks
validator.expect_column_values_to_be_between(
    "order_total",
    min_value=0,
    max_value=1000000
)

# Save and run
validator.save_expectation_suite()
results = validator.validate()

Integration with Airflow

from airflow.operators.python import PythonOperator
from great_expectations_provider.operators.great_expectations import (
    GreatExpectationsOperator
)

validate_orders = GreatExpectationsOperator(
    task_id='validate_orders',
    data_context_root_dir='/great_expectations',
    checkpoint_name='orders_checkpoint',
)

# In DAG
extract >> validate_orders >> transform

dbt Tests

Built-in testing for transformation pipelines.

# models/_schema.yml
version: 2

models:
  - name: fct_orders
    description: "Fact table for orders"
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: order_date
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_of_type:
              column_type: date

    tests:
      - dbt_utils.recency:
          datepart: hour
          field: created_at
          interval: 24

Testing Strategies

Contract Testing

Validate data contracts between producers and consumers.

# Define contract
ORDER_CONTRACT = {
    "order_id": {"type": "string", "nullable": False},
    "customer_id": {"type": "string", "nullable": False},
    "order_date": {"type": "date", "nullable": False},
    "total_amount": {"type": "decimal", "nullable": False, "min": 0},
    "status": {"type": "string", "enum": ["pending", "shipped", "delivered"]}
}

def validate_contract(df, contract):
    """Validate dataframe against contract."""
    errors = []

    for col, rules in contract.items():
        if col not in df.columns:
            errors.append(f"Missing column: {col}")
            continue

        if not rules.get("nullable") and df[col].isna().any():
            errors.append(f"NULL values in non-nullable column: {col}")

        if "min" in rules and (df[col] < rules["min"]).any():
            errors.append(f"Values below minimum in column: {col}")

    return errors

Statistical Testing

Detect anomalies and distribution shifts.

from scipy import stats

def detect_anomalies(current_df, historical_df, column):
    """Detect statistical anomalies in data."""

    # Z-score test for mean shift
    current_mean = current_df[column].mean()
    historical_mean = historical_df[column].mean()
    historical_std = historical_df[column].std()

    z_score = (current_mean - historical_mean) / historical_std

    if abs(z_score) > 3:
        return f"Mean anomaly detected: z-score = {z_score}"

    # Kolmogorov-Smirnov test for distribution shift
    ks_stat, p_value = stats.ks_2samp(
        current_df[column],
        historical_df[column]
    )

    if p_value < 0.05:
        return f"Distribution shift detected: p-value = {p_value}"

    return None

Volume Testing

Ensure expected data volumes.

-- dbt test for volume bounds
-- tests/assert_order_volume.sql
WITH daily_counts AS (
    SELECT
        DATE(created_at) AS order_date,
        COUNT(*) AS order_count
    FROM {{ ref('fct_orders') }}
    WHERE created_at >= CURRENT_DATE - 7
    GROUP BY DATE(created_at)
)

SELECT *
FROM daily_counts
WHERE order_count < 100  -- Minimum expected
   OR order_count > 100000  -- Maximum expected

Data Quality Pipeline

Architecture Pattern

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Source    │───▶│  Validate   │───▶│   Load      │
│  Extract    │    │  (GE/dbt)   │    │  (Pass)     │
└─────────────┘    └──────┬──────┘    └─────────────┘
                   ┌──────▼──────┐
                   │  Quarantine │
                   │   (Fail)    │
                   └──────┬──────┘
                   ┌──────▼──────┐
                   │   Alert &   │
                   │   Review    │
                   └─────────────┘

Implementation

from airflow import DAG
from airflow.operators.python import BranchPythonOperator

def validate_and_route(**context):
    """Validate data and route to appropriate path."""
    validation_results = run_validation()

    if validation_results['success']:
        return 'load_to_warehouse'
    elif validation_results['severity'] == 'warning':
        return 'load_with_flag'
    else:
        return 'quarantine_data'

with DAG('quality_pipeline') as dag:
    extract = PythonOperator(task_id='extract', ...)

    validate = BranchPythonOperator(
        task_id='validate',
        python_callable=validate_and_route,
    )

    load = PythonOperator(task_id='load_to_warehouse', ...)
    load_flagged = PythonOperator(task_id='load_with_flag', ...)
    quarantine = PythonOperator(task_id='quarantine_data', ...)
    alert = PythonOperator(task_id='alert_team', ...)

    extract >> validate >> [load, load_flagged, quarantine]
    quarantine >> alert

Data Observability

Key Metrics to Monitor

Metric Description Tool
Freshness Time since last update Monte Carlo, Elementary
Volume Row counts over time Custom, dbt
Schema Column changes Great Expectations
Distribution Value distribution shifts Elementary
Lineage Impact of changes dbt, Atlan

Setting Up Alerts

# Elementary dbt package example
# models/elementary/alerts.sql
{{
  config(
    materialized='table',
    post_hook="{% do elementary.notify_slack() %}"
  )
}}

SELECT * FROM {{ ref('elementary', 'alerts') }}
WHERE severity = 'critical'
AND alert_time > CURRENT_TIMESTAMP - INTERVAL '1 hour'

Interview Design Question

Question: "Design a data quality framework for an e-commerce company"

Answer Structure:

1. DEFINE QUALITY DIMENSIONS
   - Orders: completeness, validity, timeliness
   - Customers: uniqueness, accuracy
   - Products: validity, consistency

2. IMPLEMENT CHECKS
   Layer 1 (Ingestion): Schema validation, null checks
   Layer 2 (Staging): Business rules, referential integrity
   Layer 3 (Marts): Aggregation checks, reconciliation

3. TOOLING
   - Great Expectations for Python pipelines
   - dbt tests for SQL transformations
   - Elementary for observability

4. WORKFLOW
   - Critical failures → block pipeline, alert
   - Warnings → flag data, continue
   - Track trends → weekly quality reports

5. GOVERNANCE
   - Data steward per domain
   - Quality SLAs in data contracts
   - Incident response process

Interview Insight: Data quality isn't just about tools—it's about process and culture. Discuss how you'd build quality into the development workflow, not just add checks at the end.

Next module covers Big Data and streaming systems. :::

Quiz

Module 4: ETL Pipelines & Orchestration

Take Quiz
FREE WEEKLY NEWSLETTER

Stay on the Nerd Track

One email per week — courses, deep dives, tools, and AI experiments.

No spam. Unsubscribe anytime.