ML Workflow Orchestration
Prefect & Alternatives
3 min read
Prefect offers a modern, Python-native approach to workflow orchestration. Let's explore it and compare with other alternatives.
Why Prefect?
| Feature | Benefit |
|---|---|
| Python-native | Just decorators, no config files |
| Dynamic workflows | Build DAGs at runtime |
| Modern UI | Beautiful dashboard |
| Hybrid execution | Local or cloud |
| Easy debugging | Run flows as regular Python |
Installation
# Install Prefect
pip install prefect
# Verify installation
prefect version
Core Concepts
Flows and Tasks
from prefect import flow, task
@task
def load_data(path: str) -> dict:
"""Load data from path."""
import pandas as pd
df = pd.read_csv(path)
return {"data": df, "rows": len(df)}
@task
def train_model(data: dict) -> str:
"""Train ML model."""
from sklearn.ensemble import RandomForestClassifier
import joblib
df = data["data"]
X = df.drop("target", axis=1)
y = df["target"]
model = RandomForestClassifier()
model.fit(X, y)
model_path = "/tmp/model.pkl"
joblib.dump(model, model_path)
return model_path
@task
def evaluate_model(model_path: str, data: dict) -> float:
"""Evaluate model accuracy."""
import joblib
from sklearn.metrics import accuracy_score
model = joblib.load(model_path)
df = data["data"]
X = df.drop("target", axis=1)
y = df["target"]
predictions = model.predict(X)
return accuracy_score(y, predictions)
@flow(name="ML Training Pipeline")
def ml_pipeline(data_path: str):
"""End-to-end ML training flow."""
data = load_data(data_path)
model_path = train_model(data)
accuracy = evaluate_model(model_path, data)
print(f"Model accuracy: {accuracy:.2%}")
return accuracy
# Run the flow
if __name__ == "__main__":
ml_pipeline("data/train.csv")
Advanced Features
Retries and Caching
from prefect import task, flow
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def expensive_computation(data: dict) -> dict:
"""Cached computation with retries."""
# This result will be cached for 1 hour
return process_data(data)
Parallel Execution
from prefect import flow, task
@task
def process_partition(partition_id: int) -> dict:
return {"partition": partition_id, "result": partition_id * 2}
@flow
def parallel_flow():
# Submit tasks in parallel
futures = [process_partition.submit(i) for i in range(10)]
# Wait for all results
results = [f.result() for f in futures]
return results
Conditional Logic
@flow
def conditional_flow(accuracy: float):
if accuracy > 0.9:
deploy_to_production()
elif accuracy > 0.8:
deploy_to_staging()
else:
notify_team("Model accuracy too low")
Scheduling Deployments
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
@flow
def daily_training():
# Training logic
pass
# Create deployment
deployment = Deployment.build_from_flow(
flow=daily_training,
name="daily-training",
schedule=CronSchedule(cron="0 2 * * *"), # 2 AM daily
work_queue_name="ml-queue"
)
deployment.apply()
Running with Prefect Cloud/Server
# Start local server
prefect server start
# Create deployment via CLI
prefect deployment build ./pipeline.py:ml_pipeline -n "ml-training" -q "ml-queue"
prefect deployment apply ml_pipeline-deployment.yaml
# Start worker
prefect worker start -q "ml-queue"
Comparison: Modern Orchestrators
Prefect vs Airflow
| Aspect | Prefect | Airflow |
|---|---|---|
| Syntax | Python decorators | Python + config |
| Dynamic DAGs | Native | Limited |
| Local testing | flow() |
Requires setup |
| Deployment | Simple | Complex |
| Maturity | Newer | Battle-tested |
Prefect vs Dagster
| Aspect | Prefect | Dagster |
|---|---|---|
| Focus | Workflow orchestration | Data orchestration |
| Assets | Task outputs | First-class assets |
| Testing | Standard pytest | Built-in framework |
| Best for | General workflows | Data pipelines |
When to Use What
| Tool | Best For |
|---|---|
| Prefect | Python teams, rapid development |
| Airflow | Large teams, data engineering |
| Kubeflow | Kubernetes-native ML |
| Dagster | Data-centric pipelines |
| DVC | Simple ML pipelines, versioning |
Other Alternatives
Dagster
from dagster import asset, Definitions
@asset
def raw_data():
"""Load raw data."""
return pd.read_csv("data.csv")
@asset
def processed_data(raw_data):
"""Process raw data."""
return raw_data.dropna()
@asset
def trained_model(processed_data):
"""Train model on processed data."""
model = train(processed_data)
return model
defs = Definitions(assets=[raw_data, processed_data, trained_model])
Metaflow (Netflix)
from metaflow import FlowSpec, step
class MLFlow(FlowSpec):
@step
def start(self):
self.data = load_data()
self.next(self.train)
@step
def train(self):
self.model = train_model(self.data)
self.next(self.evaluate)
@step
def evaluate(self):
self.accuracy = evaluate(self.model)
self.next(self.end)
@step
def end(self):
print(f"Final accuracy: {self.accuracy}")
if __name__ == "__main__":
MLFlow()
Best Practices
| Practice | Why |
|---|---|
| Start simple | Add complexity as needed |
| Use type hints | Better IDE support and validation |
| Log extensively | Debug production issues |
| Test locally first | Catch errors early |
| Version your flows | Track changes over time |
Choosing the Right Tool
Simple Complex
│ │
DVC ◄─────────────┼──────────────────────┼────► Kubeflow
│ │
│ Prefect │
│ │ │
│ ▼ │
│ Dagster │
│ │ │
└────────┼─────────────┘
│
Airflow
Key insight: Start with the simplest tool that meets your needs. You can always migrate to more complex solutions as your requirements grow.
Next module: We'll explore feature stores for training-serving consistency. :::