تنسيق سير عمل التعلم الآلي
خطوط أنابيب Kubeflow
4 دقيقة للقراءة
Kubeflow Pipelines (KFP) هي منصة لبناء ونشر سير عمل ML على Kubernetes. توفر خطوط أنابيب قابلة للتوسع ومحمولة وقابلة لإعادة الإنتاج.
لماذا Kubeflow Pipelines؟
| الميزة | الفائدة |
|---|---|
| أصلية لـ Kubernetes | استفد من البنية التحتية الموجودة |
| قابلة للتوسع | شغّل التدريب الموزع بسهولة |
| محمولة | نفس خط الأنابيب يعمل في أي مكان |
| مُصدرة | تتبع كل تشغيل خط أنابيب |
| واجهة مرئية | راقب وصحح سير العمل |
التثبيت
# ثبّت KFP SDK (v2)
pip install kfp
# تحقق من التثبيت
python -c "import kfp; print(kfp.__version__)"
المفاهيم الأساسية
المكونات
المكونات هي اللبنات الأساسية—دوال محتواة:
from kfp import dsl
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
input_path: str,
output_path: dsl.OutputPath("csv")
):
"""معالجة البيانات الخام مسبقاً."""
import pandas as pd
df = pd.read_csv(input_path)
df = df.dropna()
df = df.drop_duplicates()
df.to_csv(output_path, index=False)
تعريف خط الأنابيب
اربط المكونات معاً:
from kfp import dsl
@dsl.pipeline(
name="ml-training-pipeline",
description="خط أنابيب تدريب ML من البداية للنهاية"
)
def training_pipeline(
data_path: str,
epochs: int = 100,
learning_rate: float = 0.001
):
# الخطوة 1: المعالجة المسبقة
preprocess_task = preprocess_data(input_path=data_path)
# الخطوة 2: التدريب (يعتمد على المعالجة المسبقة)
train_task = train_model(
data_path=preprocess_task.outputs["output_path"],
epochs=epochs,
learning_rate=learning_rate
)
# الخطوة 3: التقييم (يعتمد على التدريب)
evaluate_task = evaluate_model(
model_path=train_task.outputs["model_path"]
)
مثال خط أنابيب كامل
تعريف المكونات
from kfp import dsl
from kfp.dsl import Input, Output, Artifact, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn"]
)
def load_data(
source_path: str,
dataset: Output[Artifact]
):
"""تحميل وتقسيم مجموعة البيانات."""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(source_path)
train, test = train_test_split(df, test_size=0.2)
train.to_csv(f"{dataset.path}_train.csv", index=False)
test.to_csv(f"{dataset.path}_test.csv", index=False)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def train_model(
dataset: Input[Artifact],
model: Output[Model],
n_estimators: int = 100
):
"""تدريب نموذج Random Forest."""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
train = pd.read_csv(f"{dataset.path}_train.csv")
X = train.drop("target", axis=1)
y = train["target"]
clf = RandomForestClassifier(n_estimators=n_estimators)
clf.fit(X, y)
joblib.dump(clf, model.path)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def evaluate_model(
dataset: Input[Artifact],
model: Input[Model],
metrics: Output[Metrics]
):
"""تقييم أداء النموذج."""
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score
import joblib
test = pd.read_csv(f"{dataset.path}_test.csv")
X = test.drop("target", axis=1)
y = test["target"]
clf = joblib.load(model.path)
predictions = clf.predict(X)
metrics.log_metric("accuracy", accuracy_score(y, predictions))
metrics.log_metric("f1_score", f1_score(y, predictions, average="weighted"))
تعريف خط الأنابيب
@dsl.pipeline(
name="training-pipeline",
description="تدريب وتقييم نموذج ML"
)
def ml_pipeline(
source_path: str = "gs://my-bucket/data.csv",
n_estimators: int = 100
):
load_task = load_data(source_path=source_path)
train_task = train_model(
dataset=load_task.outputs["dataset"],
n_estimators=n_estimators
)
evaluate_task = evaluate_model(
dataset=load_task.outputs["dataset"],
model=train_task.outputs["model"]
)
التجميع والتشغيل
from kfp import compiler
# جمّع إلى YAML
compiler.Compiler().compile(
pipeline_func=ml_pipeline,
package_path="ml_pipeline.yaml"
)
# أرسل لعنقود KFP
from kfp.client import Client
client = Client(host="http://kubeflow-pipelines-ui:80")
run = client.create_run_from_pipeline_func(
ml_pipeline,
arguments={
"source_path": "gs://my-bucket/data.csv",
"n_estimators": 200
}
)
الميزات المتقدمة
التنفيذ الشرطي
from kfp import dsl
@dsl.pipeline
def conditional_pipeline(accuracy_threshold: float = 0.8):
train_task = train_model()
with dsl.Condition(
train_task.outputs["accuracy"] > accuracy_threshold,
name="accuracy-check"
):
deploy_model(model=train_task.outputs["model"])
التنفيذ المتوازي
@dsl.pipeline
def parallel_pipeline():
# هذه تعمل بالتوازي
task_a = process_dataset_a()
task_b = process_dataset_b()
task_c = process_dataset_c()
# هذه تنتظر جميع المهام المتوازية
merge_task = merge_results(
result_a=task_a.output,
result_b=task_b.output,
result_c=task_c.output
)
طلبات الموارد
@dsl.component
def gpu_training(data: Input[Artifact], model: Output[Model]):
# كود التدريب
pass
# في خط الأنابيب
train_task = gpu_training(data=data_task.output)
train_task.set_cpu_limit("4")
train_task.set_memory_limit("16G")
train_task.set_gpu_limit("1")
بيانات خط الأنابيب الوصفية
تتبع التجارب
# أنشئ تجربة
experiment = client.create_experiment(name="hyperparameter-tuning")
# شغّل مع تتبع التجربة
run = client.create_run_from_pipeline_func(
ml_pipeline,
experiment_name="hyperparameter-tuning",
run_name="lr-0.001-epochs-100"
)
عرض النتائج
واجهة Kubeflow توفر:
- تصور خط الأنابيب
- سجل التشغيلات
- تتبع المخرجات
- مقارنات المقاييس
- الوصول للسجلات
أفضل الممارسات
| الممارسة | لماذا |
|---|---|
| استخدم مخرجات مُنمطة | تتبع أفضل وتكامل واجهة |
| ثبّت إصدارات الحزم | بيئات قابلة لإعادة الإنتاج |
| أبقِ المكونات صغيرة | تخزين مؤقت وتصحيح أسرع |
| استخدم أسماء ذات معنى | خطوط أنابيب أسهل للفهم |
| عيّن حدود الموارد | امنع استنفاد الموارد |
الاختبار المحلي
اختبر المكونات محلياً قبل النشر:
# اختبر دالة مكون مباشرة
from my_pipeline import preprocess_data
# أنشئ مخرجات وهمية
import tempfile
with tempfile.NamedTemporaryFile(suffix=".csv") as f:
preprocess_data.python_func(
input_path="test_data.csv",
output_path=f.name
)
الرؤية الرئيسية: Kubeflow Pipelines مثالية للفرق التي تستخدم Kubernetes بالفعل. توفر تنسيق ML بمستوى المؤسسات مع تتبع تجارب وإدارة مخرجات مدمجة.
التالي، سنستكشف Apache Airflow لسير عمل هندسة البيانات وML. :::