ML Pipelines & Orchestration

Pipeline Design Patterns

4 min read

Pipeline design questions test your ability to architect ML systems. Know these patterns cold.

Pattern 1: Incremental Processing

Interview Question: "How do you process 10TB of data daily without reprocessing everything?"

@dag(schedule="@hourly")
def incremental_feature_pipeline():
    @task
    def get_watermark():
        # Get last processed timestamp from state store
        return read_from_redis("pipeline:last_processed")

    @task
    def get_new_records(watermark):
        # Only fetch records after watermark
        return query_warehouse(f"""
            SELECT * FROM events
            WHERE event_time > '{watermark}'
            AND event_time <= CURRENT_TIMESTAMP
        """)

    @task
    def process_features(records):
        # Process only new records
        return compute_features(records)

    @task
    def update_watermark(records):
        # Advance watermark to latest processed timestamp
        max_timestamp = records["event_time"].max()
        write_to_redis("pipeline:last_processed", max_timestamp)

    watermark = get_watermark()
    records = get_new_records(watermark)
    features = process_features(records)
    update_watermark(features)

Key Points:

  • Watermark pattern for exactly-once processing
  • Store state externally (Redis, database) - not in Airflow
  • Handle late-arriving data with grace periods

Pattern 2: Fan-Out / Fan-In

Interview Question: "How would you train 100 models in parallel?"

@dag(schedule="@weekly")
def multi_model_training():
    @task
    def get_model_configs() -> list[dict]:
        # Return list of model configurations
        return [
            {"model_id": "us_west", "region": "us-west-1"},
            {"model_id": "us_east", "region": "us-east-1"},
            {"model_id": "eu", "region": "eu-west-1"},
            # ... 100 models
        ]

    @task
    def train_single_model(config: dict):
        # Train individual model
        return train_and_evaluate(config)

    @task
    def aggregate_results(results: list[dict]):
        # Combine all model results
        best_models = select_best_per_region(results)
        update_model_registry(best_models)

    configs = get_model_configs()
    # Fan-out: Dynamic task mapping
    results = train_single_model.expand(config=configs)
    # Fan-in: Aggregate all results
    aggregate_results(results)

Airflow 2.x Feature: task.expand() creates dynamic parallel tasks

Pattern 3: Conditional Branching

Interview Question: "How do you handle model approval workflows?"

from airflow.operators.python import BranchPythonOperator

@dag(schedule=None)
def model_deployment_pipeline():
    @task
    def evaluate_model():
        metrics = run_evaluation()
        return metrics

    @task.branch
    def check_quality(metrics: dict):
        if metrics["accuracy"] >= 0.95:
            return "auto_deploy"
        elif metrics["accuracy"] >= 0.90:
            return "request_human_review"
        else:
            return "reject_model"

    @task
    def auto_deploy():
        deploy_to_production()

    @task
    def request_human_review():
        # Pause for approval
        create_jira_ticket()
        wait_for_approval()

    @task
    def reject_model():
        notify_team("Model failed quality gates")

    metrics = evaluate_model()
    branch = check_quality(metrics)
    branch >> [auto_deploy(), request_human_review(), reject_model()]

Pattern 4: Retry with Exponential Backoff

Interview Question: "How do you handle transient failures in ML pipelines?"

from airflow.decorators import task
from tenacity import retry, stop_after_attempt, wait_exponential

@task(
    retries=5,
    retry_delay=60,  # Base delay
    retry_exponential_backoff=True,
    max_retry_delay=3600  # Max 1 hour delay
)
def fetch_from_external_api():
    """Task with Airflow-level retries"""
    return call_unstable_api()

# Alternative: Code-level retry for finer control
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=4, max=300)
)
def call_unstable_api():
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()
    return response.json()

Pattern 5: Circuit Breaker

from pybreaker import CircuitBreaker

# Break after 5 consecutive failures
model_serving_breaker = CircuitBreaker(
    fail_max=5,
    reset_timeout=60  # Try again after 60 seconds
)

@task
def call_model_service(request):
    @model_serving_breaker
    def _call():
        return requests.post(MODEL_URL, json=request)

    try:
        return _call()
    except pybreaker.CircuitBreakerError:
        # Fallback behavior when circuit is open
        return get_cached_prediction(request)

Interview Summary Table

Pattern Use Case Key Benefit
Incremental Daily batch processing 10x faster, cost efficient
Fan-Out/Fan-In Multi-model training Parallelization
Branching Approval workflows Conditional logic
Retry External dependencies Fault tolerance
Circuit Breaker Degraded services Graceful degradation

Pro Tip: Draw these patterns on a whiteboard during interviews. Visual explanations score higher than verbal descriptions alone.

Next module covers Monitoring & Observability interview questions. :::

Quiz

Module 3: ML Pipelines & Orchestration

Take Quiz