ML Pipelines & Orchestration
Pipeline Design Patterns
4 min read
Pipeline design questions test your ability to architect ML systems. Know these patterns cold.
Pattern 1: Incremental Processing
Interview Question: "How do you process 10TB of data daily without reprocessing everything?"
@dag(schedule="@hourly")
def incremental_feature_pipeline():
@task
def get_watermark():
# Get last processed timestamp from state store
return read_from_redis("pipeline:last_processed")
@task
def get_new_records(watermark):
# Only fetch records after watermark
return query_warehouse(f"""
SELECT * FROM events
WHERE event_time > '{watermark}'
AND event_time <= CURRENT_TIMESTAMP
""")
@task
def process_features(records):
# Process only new records
return compute_features(records)
@task
def update_watermark(records):
# Advance watermark to latest processed timestamp
max_timestamp = records["event_time"].max()
write_to_redis("pipeline:last_processed", max_timestamp)
watermark = get_watermark()
records = get_new_records(watermark)
features = process_features(records)
update_watermark(features)
Key Points:
- Watermark pattern for exactly-once processing
- Store state externally (Redis, database) - not in Airflow
- Handle late-arriving data with grace periods
Pattern 2: Fan-Out / Fan-In
Interview Question: "How would you train 100 models in parallel?"
@dag(schedule="@weekly")
def multi_model_training():
@task
def get_model_configs() -> list[dict]:
# Return list of model configurations
return [
{"model_id": "us_west", "region": "us-west-1"},
{"model_id": "us_east", "region": "us-east-1"},
{"model_id": "eu", "region": "eu-west-1"},
# ... 100 models
]
@task
def train_single_model(config: dict):
# Train individual model
return train_and_evaluate(config)
@task
def aggregate_results(results: list[dict]):
# Combine all model results
best_models = select_best_per_region(results)
update_model_registry(best_models)
configs = get_model_configs()
# Fan-out: Dynamic task mapping
results = train_single_model.expand(config=configs)
# Fan-in: Aggregate all results
aggregate_results(results)
Airflow 2.x Feature: task.expand() creates dynamic parallel tasks
Pattern 3: Conditional Branching
Interview Question: "How do you handle model approval workflows?"
from airflow.operators.python import BranchPythonOperator
@dag(schedule=None)
def model_deployment_pipeline():
@task
def evaluate_model():
metrics = run_evaluation()
return metrics
@task.branch
def check_quality(metrics: dict):
if metrics["accuracy"] >= 0.95:
return "auto_deploy"
elif metrics["accuracy"] >= 0.90:
return "request_human_review"
else:
return "reject_model"
@task
def auto_deploy():
deploy_to_production()
@task
def request_human_review():
# Pause for approval
create_jira_ticket()
wait_for_approval()
@task
def reject_model():
notify_team("Model failed quality gates")
metrics = evaluate_model()
branch = check_quality(metrics)
branch >> [auto_deploy(), request_human_review(), reject_model()]
Pattern 4: Retry with Exponential Backoff
Interview Question: "How do you handle transient failures in ML pipelines?"
from airflow.decorators import task
from tenacity import retry, stop_after_attempt, wait_exponential
@task(
retries=5,
retry_delay=60, # Base delay
retry_exponential_backoff=True,
max_retry_delay=3600 # Max 1 hour delay
)
def fetch_from_external_api():
"""Task with Airflow-level retries"""
return call_unstable_api()
# Alternative: Code-level retry for finer control
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=300)
)
def call_unstable_api():
response = requests.get("https://api.example.com/data")
response.raise_for_status()
return response.json()
Pattern 5: Circuit Breaker
from pybreaker import CircuitBreaker
# Break after 5 consecutive failures
model_serving_breaker = CircuitBreaker(
fail_max=5,
reset_timeout=60 # Try again after 60 seconds
)
@task
def call_model_service(request):
@model_serving_breaker
def _call():
return requests.post(MODEL_URL, json=request)
try:
return _call()
except pybreaker.CircuitBreakerError:
# Fallback behavior when circuit is open
return get_cached_prediction(request)
Interview Summary Table
| Pattern | Use Case | Key Benefit |
|---|---|---|
| Incremental | Daily batch processing | 10x faster, cost efficient |
| Fan-Out/Fan-In | Multi-model training | Parallelization |
| Branching | Approval workflows | Conditional logic |
| Retry | External dependencies | Fault tolerance |
| Circuit Breaker | Degraded services | Graceful degradation |
Pro Tip: Draw these patterns on a whiteboard during interviews. Visual explanations score higher than verbal descriptions alone.
Next module covers Monitoring & Observability interview questions. :::