ML Pipelines & Orchestration
Pipeline Orchestration Fundamentals
4 min read
ML pipelines are the backbone of reproducible machine learning. Interviewers test your understanding of orchestration patterns, not just tool syntax.
What Interviewers Really Test
| Area | Junior Question | Senior Question |
|---|---|---|
| DAG Design | "What is a DAG?" | "How do you handle dynamic DAGs?" |
| Dependencies | "How do tasks depend on each other?" | "How do you handle cross-DAG dependencies?" |
| Failure | "What happens when a task fails?" | "Design a retry strategy with backoff" |
| Scale | "Can Airflow run in parallel?" | "How do you scale to 10,000 DAGs?" |
Core Orchestration Patterns
Pattern 1: Extract-Transform-Load (ETL)
# Classic ETL pattern for ML data preparation
@dag(schedule="@daily", catchup=False)
def ml_data_pipeline():
extract = extract_from_sources()
transform = clean_and_feature_engineer(extract)
validate = validate_data_quality(transform)
load = load_to_feature_store(validate)
# Explicit dependency chain
extract >> transform >> validate >> load
Pattern 2: Training Pipeline
@dag(schedule=None) # Triggered by data pipeline or manually
def training_pipeline():
data = fetch_training_data()
split = create_train_test_split(data)
train = train_model(split)
evaluate = evaluate_metrics(train)
register = register_if_better(evaluate)
# Branching based on evaluation
data >> split >> train >> evaluate >> register
Pattern 3: Inference Pipeline
@dag(schedule="*/5 * * * *") # Every 5 minutes
def batch_inference():
fetch = fetch_new_records()
predict = run_batch_predictions(fetch)
store = write_predictions_to_db(predict)
notify = send_completion_notification(store)
Interview Question: Design a Retraining Trigger
Question: "How would you automatically retrain a model when performance degrades?"
Strong Answer:
from airflow.sensors.external_task import ExternalTaskSensor
@dag(schedule=None)
def retraining_pipeline():
# Sensor: Wait for drift detection DAG to signal
wait_for_drift = ExternalTaskSensor(
task_id="wait_for_drift_alert",
external_dag_id="model_monitoring",
external_task_id="check_drift",
allowed_states=["success"],
poke_interval=300 # Check every 5 minutes
)
# Only runs if drift detected
fetch_data = fetch_recent_training_data()
retrain = trigger_training_pipeline()
validate = validate_new_model()
deploy = deploy_if_improved()
wait_for_drift >> fetch_data >> retrain >> validate >> deploy
Key Concepts to Know
orchestration_vocabulary:
dag: "Directed Acyclic Graph - no circular dependencies"
task: "Single unit of work in a pipeline"
operator: "Template for a type of task (Python, Bash, K8s)"
sensor: "Task that waits for external condition"
trigger_rule: "When does this task run? (all_success, one_failed, etc.)"
xcom: "Cross-communication between tasks"
pool: "Limit concurrent tasks (e.g., max 5 GPU tasks)"
variable: "Runtime configuration"
connection: "External system credentials"
Interview Signal: Mentioning sensors and trigger rules shows you understand production complexity beyond basic tutorials.
Next, we'll compare Airflow, Kubeflow, and Prefect. :::