ETL Pipelines & Orchestration

Pipeline Orchestration Tools

4 min read

Orchestration is the backbone of data engineering. Expect interview questions about Airflow, Prefect, Dagster, and when to use each.

Apache Airflow

The industry standard for workflow orchestration.

Core Concepts

ConceptDescription
DAGDirected Acyclic Graph - workflow definition
TaskSingle unit of work
OperatorTemplate for tasks (Python, SQL, Bash, etc.)
SensorWait for external condition
XComCross-communication between tasks
ConnectionExternal system credentials

DAG Example

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'daily_sales_pipeline',
    default_args=default_args,
    description='Daily sales data processing',
    schedule_interval='0 6 * * *',  # 6 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sales', 'daily'],
) as dag:

    extract = PythonOperator(
        task_id='extract_sales_data',
        python_callable=extract_sales_data,
    )

    transform = SnowflakeOperator(
        task_id='transform_sales',
        sql='sql/transform_sales.sql',
        snowflake_conn_id='snowflake_default',
    )

    dbt_run = BashOperator(
        task_id='dbt_run',
        bash_command='cd /dbt && dbt run --models staging.stg_sales+',
    )

    # Define dependencies
    extract >> transform >> dbt_run

Interview Question: "How do you handle task failures in Airflow?"

Answer:

# 1. Retries with backoff
default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
}

# 2. On-failure callbacks
def alert_on_failure(context):
    task_id = context['task_instance'].task_id
    send_slack_alert(f"Task {task_id} failed!")

task = PythonOperator(
    task_id='critical_task',
    on_failure_callback=alert_on_failure,
    ...
)

# 3. Trigger rules for downstream tasks
downstream = PythonOperator(
    task_id='cleanup',
    trigger_rule='all_done',  # Run regardless of upstream status
)

Key Patterns

Dynamic DAGs:

# Generate tasks dynamically
for table in ['orders', 'customers', 'products']:
    task = PythonOperator(
        task_id=f'process_{table}',
        python_callable=process_table,
        op_kwargs={'table': table},
    )

Task Groups (Airflow 2.0+):

from airflow.utils.task_group import TaskGroup

with TaskGroup('transform_group') as transform_group:
    task1 = PythonOperator(task_id='transform_1', ...)
    task2 = PythonOperator(task_id='transform_2', ...)

Prefect

Modern, Python-native orchestration with better developer experience.

Key Differences from Airflow

AspectAirflowPrefect
DAG DefinitionDAG files parsedPython decorators
SchedulingScheduler processCloud or local server
State ManagementPostgres metadataPostgres or Prefect Cloud
Local TestingHarderNative support
Dynamic WorkflowsComplexFirst-class support

Flow Example

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(retries=3, retry_delay_seconds=60)
def extract_data(source: str) -> dict:
    """Extract data from source system."""
    return fetch_data(source)

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def transform_data(raw_data: dict) -> dict:
    """Transform extracted data."""
    return clean_and_transform(raw_data)

@task
def load_data(transformed_data: dict, target: str):
    """Load data to target."""
    write_to_warehouse(transformed_data, target)

@flow(name="daily-sales-etl")
def daily_sales_pipeline(source: str = "salesforce"):
    raw = extract_data(source)
    transformed = transform_data(raw)
    load_data(transformed, "sales_mart")

# Run locally for testing
if __name__ == "__main__":
    daily_sales_pipeline()

Advanced Features

from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

# Subflows for modularity
@flow
def parent_flow():
    result1 = child_flow_1()
    result2 = child_flow_2(result1)
    return result2

# Concurrent execution
@flow
def parallel_processing():
    futures = [process_item.submit(item) for item in items]
    results = [f.result() for f in futures]
    return results

# Deployment with schedule
deployment = Deployment.build_from_flow(
    flow=daily_sales_pipeline,
    name="production-deployment",
    schedule=CronSchedule(cron="0 6 * * *"),
    work_pool_name="default-agent-pool",
)

Dagster

Software-defined assets approach—focuses on data assets rather than tasks.

Core Concepts

ConceptDescription
AssetA data artifact (table, file, ML model)
OpA unit of computation (like Airflow task)
JobA set of ops to execute
ResourceExternal system configuration
IO ManagerHow assets are stored/loaded

Asset-Based Pipeline

from dagster import asset, Definitions, AssetExecutionContext
import pandas as pd

@asset
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
    """Extract orders from source."""
    context.log.info("Extracting orders")
    return pd.read_sql("SELECT * FROM orders", source_conn)

@asset
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    """Clean and deduplicate orders."""
    return (
        raw_orders
        .drop_duplicates(subset=['order_id'])
        .dropna(subset=['customer_id'])
    )

@asset
def daily_revenue(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
    """Aggregate daily revenue."""
    return (
        cleaned_orders
        .groupby('order_date')
        .agg({'total': 'sum'})
        .reset_index()
    )

defs = Definitions(
    assets=[raw_orders, cleaned_orders, daily_revenue],
)

Why Dagster for Data Engineering

Data Lineage: Automatic dependency graph Materialization: Track when assets were last computed Partitions: Native support for time-partitioned assets Testing: Built-in testing utilities

# Partitioned assets
from dagster import asset, DailyPartitionsDefinition

@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def daily_events(context: AssetExecutionContext) -> pd.DataFrame:
    partition_date = context.partition_key
    return extract_events_for_date(partition_date)

Tool Comparison

FactorAirflowPrefectDagster
ParadigmTask-basedTask-basedAsset-based
Learning CurveSteepModerateModerate
CommunityLargestGrowingGrowing
Cloud OfferingAstronomer, MWAAPrefect CloudDagster Cloud
Best ForComplex schedulingPython-heavy teamsData-centric teams
Local DevDocker requiredNativeNative
dbt IntegrationCosmos, manualNativeNative

Interview Question: "How do you choose between Airflow, Prefect, and Dagster?"

Answer Framework:

ChooseWhen
AirflowEnterprise scale, complex scheduling, large team, existing expertise
PrefectPython-first team, need quick iteration, simpler deployment
DagsterData assets focus, strong lineage needs, modern data stack

Sample Answer: "I'd choose based on team and use case. Airflow for enterprise scale with complex dependencies. Prefect for Python-heavy teams wanting faster iteration. Dagster when data lineage and asset-centric thinking are priorities, especially with dbt."

Interview Tip: Know one tool deeply (usually Airflow) but understand the trade-offs of alternatives. Be able to discuss why you'd migrate from one to another.

Next, we'll explore dbt for transformation and data modeling. :::

Quick check: how does this lesson land for you?

Quiz

Module 4: ETL Pipelines & Orchestration

Take Quiz
FREE WEEKLY NEWSLETTER

Stay on the Nerd Track

One email per week — courses, deep dives, tools, and AI experiments.

No spam. Unsubscribe anytime.