ETL Pipelines & Orchestration
Pipeline Orchestration Tools
Orchestration is the backbone of data engineering. Expect interview questions about Airflow, Prefect, Dagster, and when to use each.
Apache Airflow
The industry standard for workflow orchestration.
Core Concepts
| Concept | Description |
|---|---|
| DAG | Directed Acyclic Graph - workflow definition |
| Task | Single unit of work |
| Operator | Template for tasks (Python, SQL, Bash, etc.) |
| Sensor | Wait for external condition |
| XCom | Cross-communication between tasks |
| Connection | External system credentials |
DAG Example
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_sales_pipeline',
default_args=default_args,
description='Daily sales data processing',
schedule_interval='0 6 * * *', # 6 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sales', 'daily'],
) as dag:
extract = PythonOperator(
task_id='extract_sales_data',
python_callable=extract_sales_data,
)
transform = SnowflakeOperator(
task_id='transform_sales',
sql='sql/transform_sales.sql',
snowflake_conn_id='snowflake_default',
)
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='cd /dbt && dbt run --models staging.stg_sales+',
)
# Define dependencies
extract >> transform >> dbt_run
Interview Question: "How do you handle task failures in Airflow?"
Answer:
# 1. Retries with backoff
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
}
# 2. On-failure callbacks
def alert_on_failure(context):
task_id = context['task_instance'].task_id
send_slack_alert(f"Task {task_id} failed!")
task = PythonOperator(
task_id='critical_task',
on_failure_callback=alert_on_failure,
...
)
# 3. Trigger rules for downstream tasks
downstream = PythonOperator(
task_id='cleanup',
trigger_rule='all_done', # Run regardless of upstream status
)
Key Patterns
Dynamic DAGs:
# Generate tasks dynamically
for table in ['orders', 'customers', 'products']:
task = PythonOperator(
task_id=f'process_{table}',
python_callable=process_table,
op_kwargs={'table': table},
)
Task Groups (Airflow 2.0+):
from airflow.utils.task_group import TaskGroup
with TaskGroup('transform_group') as transform_group:
task1 = PythonOperator(task_id='transform_1', ...)
task2 = PythonOperator(task_id='transform_2', ...)
Prefect
Modern, Python-native orchestration with better developer experience.
Key Differences from Airflow
| Aspect | Airflow | Prefect |
|---|---|---|
| DAG Definition | DAG files parsed | Python decorators |
| Scheduling | Scheduler process | Cloud or local server |
| State Management | Postgres metadata | Postgres or Prefect Cloud |
| Local Testing | Harder | Native support |
| Dynamic Workflows | Complex | First-class support |
Flow Example
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(retries=3, retry_delay_seconds=60)
def extract_data(source: str) -> dict:
"""Extract data from source system."""
return fetch_data(source)
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def transform_data(raw_data: dict) -> dict:
"""Transform extracted data."""
return clean_and_transform(raw_data)
@task
def load_data(transformed_data: dict, target: str):
"""Load data to target."""
write_to_warehouse(transformed_data, target)
@flow(name="daily-sales-etl")
def daily_sales_pipeline(source: str = "salesforce"):
raw = extract_data(source)
transformed = transform_data(raw)
load_data(transformed, "sales_mart")
# Run locally for testing
if __name__ == "__main__":
daily_sales_pipeline()
Advanced Features
from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
# Subflows for modularity
@flow
def parent_flow():
result1 = child_flow_1()
result2 = child_flow_2(result1)
return result2
# Concurrent execution
@flow
def parallel_processing():
futures = [process_item.submit(item) for item in items]
results = [f.result() for f in futures]
return results
# Deployment with schedule
deployment = Deployment.build_from_flow(
flow=daily_sales_pipeline,
name="production-deployment",
schedule=CronSchedule(cron="0 6 * * *"),
work_pool_name="default-agent-pool",
)
Dagster
Software-defined assets approach—focuses on data assets rather than tasks.
Core Concepts
| Concept | Description |
|---|---|
| Asset | A data artifact (table, file, ML model) |
| Op | A unit of computation (like Airflow task) |
| Job | A set of ops to execute |
| Resource | External system configuration |
| IO Manager | How assets are stored/loaded |
Asset-Based Pipeline
from dagster import asset, Definitions, AssetExecutionContext
import pandas as pd
@asset
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
"""Extract orders from source."""
context.log.info("Extracting orders")
return pd.read_sql("SELECT * FROM orders", source_conn)
@asset
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
"""Clean and deduplicate orders."""
return (
raw_orders
.drop_duplicates(subset=['order_id'])
.dropna(subset=['customer_id'])
)
@asset
def daily_revenue(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
"""Aggregate daily revenue."""
return (
cleaned_orders
.groupby('order_date')
.agg({'total': 'sum'})
.reset_index()
)
defs = Definitions(
assets=[raw_orders, cleaned_orders, daily_revenue],
)
Why Dagster for Data Engineering
Data Lineage: Automatic dependency graph Materialization: Track when assets were last computed Partitions: Native support for time-partitioned assets Testing: Built-in testing utilities
# Partitioned assets
from dagster import asset, DailyPartitionsDefinition
@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def daily_events(context: AssetExecutionContext) -> pd.DataFrame:
partition_date = context.partition_key
return extract_events_for_date(partition_date)
Tool Comparison
| Factor | Airflow | Prefect | Dagster |
|---|---|---|---|
| Paradigm | Task-based | Task-based | Asset-based |
| Learning Curve | Steep | Moderate | Moderate |
| Community | Largest | Growing | Growing |
| Cloud Offering | Astronomer, MWAA | Prefect Cloud | Dagster Cloud |
| Best For | Complex scheduling | Python-heavy teams | Data-centric teams |
| Local Dev | Docker required | Native | Native |
| dbt Integration | Cosmos, manual | Native | Native |
Interview Question: "How do you choose between Airflow, Prefect, and Dagster?"
Answer Framework:
| Choose | When |
|---|---|
| Airflow | Enterprise scale, complex scheduling, large team, existing expertise |
| Prefect | Python-first team, need quick iteration, simpler deployment |
| Dagster | Data assets focus, strong lineage needs, modern data stack |
Sample Answer: "I'd choose based on team and use case. Airflow for enterprise scale with complex dependencies. Prefect for Python-heavy teams wanting faster iteration. Dagster when data lineage and asset-centric thinking are priorities, especially with dbt."
Interview Tip: Know one tool deeply (usually Airflow) but understand the trade-offs of alternatives. Be able to discuss why you'd migrate from one to another.
Next, we'll explore dbt for transformation and data modeling. :::