ETL Pipelines & Orchestration

Pipeline Orchestration Tools

4 min read

Orchestration is the backbone of data engineering. Expect interview questions about Airflow, Prefect, Dagster, and when to use each.

Apache Airflow

The industry standard for workflow orchestration.

Core Concepts

Concept Description
DAG Directed Acyclic Graph - workflow definition
Task Single unit of work
Operator Template for tasks (Python, SQL, Bash, etc.)
Sensor Wait for external condition
XCom Cross-communication between tasks
Connection External system credentials

DAG Example

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'daily_sales_pipeline',
    default_args=default_args,
    description='Daily sales data processing',
    schedule_interval='0 6 * * *',  # 6 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sales', 'daily'],
) as dag:

    extract = PythonOperator(
        task_id='extract_sales_data',
        python_callable=extract_sales_data,
    )

    transform = SnowflakeOperator(
        task_id='transform_sales',
        sql='sql/transform_sales.sql',
        snowflake_conn_id='snowflake_default',
    )

    dbt_run = BashOperator(
        task_id='dbt_run',
        bash_command='cd /dbt && dbt run --models staging.stg_sales+',
    )

    # Define dependencies
    extract >> transform >> dbt_run

Interview Question: "How do you handle task failures in Airflow?"

Answer:

# 1. Retries with backoff
default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
}

# 2. On-failure callbacks
def alert_on_failure(context):
    task_id = context['task_instance'].task_id
    send_slack_alert(f"Task {task_id} failed!")

task = PythonOperator(
    task_id='critical_task',
    on_failure_callback=alert_on_failure,
    ...
)

# 3. Trigger rules for downstream tasks
downstream = PythonOperator(
    task_id='cleanup',
    trigger_rule='all_done',  # Run regardless of upstream status
)

Key Patterns

Dynamic DAGs:

# Generate tasks dynamically
for table in ['orders', 'customers', 'products']:
    task = PythonOperator(
        task_id=f'process_{table}',
        python_callable=process_table,
        op_kwargs={'table': table},
    )

Task Groups (Airflow 2.0+):

from airflow.utils.task_group import TaskGroup

with TaskGroup('transform_group') as transform_group:
    task1 = PythonOperator(task_id='transform_1', ...)
    task2 = PythonOperator(task_id='transform_2', ...)

Prefect

Modern, Python-native orchestration with better developer experience.

Key Differences from Airflow

Aspect Airflow Prefect
DAG Definition DAG files parsed Python decorators
Scheduling Scheduler process Cloud or local server
State Management Postgres metadata Postgres or Prefect Cloud
Local Testing Harder Native support
Dynamic Workflows Complex First-class support

Flow Example

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(retries=3, retry_delay_seconds=60)
def extract_data(source: str) -> dict:
    """Extract data from source system."""
    return fetch_data(source)

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def transform_data(raw_data: dict) -> dict:
    """Transform extracted data."""
    return clean_and_transform(raw_data)

@task
def load_data(transformed_data: dict, target: str):
    """Load data to target."""
    write_to_warehouse(transformed_data, target)

@flow(name="daily-sales-etl")
def daily_sales_pipeline(source: str = "salesforce"):
    raw = extract_data(source)
    transformed = transform_data(raw)
    load_data(transformed, "sales_mart")

# Run locally for testing
if __name__ == "__main__":
    daily_sales_pipeline()

Advanced Features

from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

# Subflows for modularity
@flow
def parent_flow():
    result1 = child_flow_1()
    result2 = child_flow_2(result1)
    return result2

# Concurrent execution
@flow
def parallel_processing():
    futures = [process_item.submit(item) for item in items]
    results = [f.result() for f in futures]
    return results

# Deployment with schedule
deployment = Deployment.build_from_flow(
    flow=daily_sales_pipeline,
    name="production-deployment",
    schedule=CronSchedule(cron="0 6 * * *"),
    work_pool_name="default-agent-pool",
)

Dagster

Software-defined assets approach—focuses on data assets rather than tasks.

Core Concepts

Concept Description
Asset A data artifact (table, file, ML model)
Op A unit of computation (like Airflow task)
Job A set of ops to execute
Resource External system configuration
IO Manager How assets are stored/loaded

Asset-Based Pipeline

from dagster import asset, Definitions, AssetExecutionContext
import pandas as pd

@asset
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
    """Extract orders from source."""
    context.log.info("Extracting orders")
    return pd.read_sql("SELECT * FROM orders", source_conn)

@asset
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    """Clean and deduplicate orders."""
    return (
        raw_orders
        .drop_duplicates(subset=['order_id'])
        .dropna(subset=['customer_id'])
    )

@asset
def daily_revenue(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
    """Aggregate daily revenue."""
    return (
        cleaned_orders
        .groupby('order_date')
        .agg({'total': 'sum'})
        .reset_index()
    )

defs = Definitions(
    assets=[raw_orders, cleaned_orders, daily_revenue],
)

Why Dagster for Data Engineering

Data Lineage: Automatic dependency graph Materialization: Track when assets were last computed Partitions: Native support for time-partitioned assets Testing: Built-in testing utilities

# Partitioned assets
from dagster import asset, DailyPartitionsDefinition

@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def daily_events(context: AssetExecutionContext) -> pd.DataFrame:
    partition_date = context.partition_key
    return extract_events_for_date(partition_date)

Tool Comparison

Factor Airflow Prefect Dagster
Paradigm Task-based Task-based Asset-based
Learning Curve Steep Moderate Moderate
Community Largest Growing Growing
Cloud Offering Astronomer, MWAA Prefect Cloud Dagster Cloud
Best For Complex scheduling Python-heavy teams Data-centric teams
Local Dev Docker required Native Native
dbt Integration Cosmos, manual Native Native

Interview Question: "How do you choose between Airflow, Prefect, and Dagster?"

Answer Framework:

Choose When
Airflow Enterprise scale, complex scheduling, large team, existing expertise
Prefect Python-first team, need quick iteration, simpler deployment
Dagster Data assets focus, strong lineage needs, modern data stack

Sample Answer: "I'd choose based on team and use case. Airflow for enterprise scale with complex dependencies. Prefect for Python-heavy teams wanting faster iteration. Dagster when data lineage and asset-centric thinking are priorities, especially with dbt."

Interview Tip: Know one tool deeply (usually Airflow) but understand the trade-offs of alternatives. Be able to discuss why you'd migrate from one to another.

Next, we'll explore dbt for transformation and data modeling. :::

Quiz

Module 4: ETL Pipelines & Orchestration

Take Quiz
FREE WEEKLY NEWSLETTER

Stay on the Nerd Track

One email per week — courses, deep dives, tools, and AI experiments.

No spam. Unsubscribe anytime.