البيانات الضخمة وأنظمة البث

معمارية Apache Spark والأساسيات

5 دقيقة للقراءة

معرفة Apache Spark أساسية لمقابلات هندسة البيانات على أي مستوى. فهم معماريته ونموذج التنفيذ وتقنيات التحسين يميز المهندسين الكبار عن المبتدئين.

نظرة عامة على معمارية Spark

مكونات المجموعة

+------------------+
|     Driver       |
|  (SparkContext)  |
+--------+---------+
         |
         | جدولة المهام
         | طلبات الموارد
         v
+------------------+
|  Cluster Manager |
| (YARN/K8s/Mesos) |
+--------+---------+
         |
    +----+----+
    |    |    |
    v    v    v
+------+ +------+ +------+
|Worker| |Worker| |Worker|
|Node 1| |Node 2| |Node 3|
+--+---+ +--+---+ +--+---+
   |        |        |
   v        v        v
Executor  Executor  Executor
(JVM)     (JVM)     (JVM)

برنامج Driver:

  • يحتوي على SparkContext
  • يحول كود المستخدم إلى مهام
  • يجدول المهام على المنفذين
  • ينسق تنفيذ المهام

المنفذون (Executors):

  • عمليات JVM على عقد العمال
  • تنفذ المهام وتخزن البيانات مؤقتاً
  • تبلغ عن الحالة للـ Driver

سؤال المقابلة: "اشرح التقييم الكسول في Spark"

نموذج الإجابة:

"يستخدم Spark التقييم الكسول حيث التحويلات على RDDs/DataFrames لا تُنفذ فوراً—بل تبني خطة منطقية. التنفيذ يحدث فقط عند استدعاء إجراء (collect، count، write).

هذا يمكّن فوائد التحسين:

  1. تحسين الاستعلام: محسن Catalyst يمكنه إعادة ترتيب ودمج العمليات
  2. دفع المرشحات: المرشحات تُدفع أقرب لمصادر البيانات
  3. تقليم الأقسام: يقرأ فقط الأقسام الضرورية
  4. تقليل نقل البيانات: يتجنب التجسيدات الوسيطة غير الضرورية

مثلاً، إذا استدعيت df.filter().select().filter()، Spark يجمع كلا المرشحين قبل التنفيذ بدلاً من مرورين منفصلين عبر البيانات."

RDDs مقابل DataFrames مقابل Datasets

جدول المقارنة

الجانب RDD DataFrame Dataset
التجريد مجموعة موزعة جدول موزع مجموعة موزعة منمطة
المخطط لا نعم (مستنتج/صريح) نعم (وقت التجميع)
التحسين يدوي محسن Catalyst محسن Catalyst
أمان النوع وقت التجميع (Scala) وقت التشغيل فقط وقت التجميع
التسلسل Java/Kryo Tungsten (ثنائي) Tungsten (ثنائي)
API وظيفي يشبه SQL هجين
حالة الاستخدام تحكم منخفض المستوى تحليلات SQL تطبيقات آمنة النوع

متى تستخدم كل واحد

# RDD: تحكم دقيق، تحويلات معقدة
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.map(lambda x: x * 2).filter(lambda x: x > 5).collect()

# DataFrame: عمليات تشبه SQL، بيانات منظمة
df = spark.read.parquet("s3://bucket/data/")
result = df.filter(col("revenue") > 1000)\
           .groupBy("region")\
           .agg(sum("revenue").alias("total_revenue"))

# Dataset (Scala/Java): أمان النوع مع التحسين
case class User(id: Long, name: String, age: Int)
val ds: Dataset[User] = spark.read.parquet("users/").as[User]
val adults = ds.filter(_.age >= 18)

نموذج تنفيذ Spark

تسلسل Job → Stage → Task

+-----------------+
|      Job        |  <- يُفعّل بإجراء (collect, save, count)
+--------+--------+
         |
   +-----+-----+
   |           |
   v           v
+------+   +------+
|Stage1|   |Stage2|  <- مفصولة بحدود التبديل
+--+---+   +--+---+
   |           |
+--+--+     +--+--+
|Task1|     |Task3|  <- مهمة واحدة لكل قسم
|Task2|     |Task4|
+-----+     +-----+

التحويلات الضيقة مقابل الواسعة

التحويلات الضيقة (بدون تبديل):

  • map، filter، flatMap
  • mapPartitions، mapPartitionsWithIndex
  • كل قسم إدخال يساهم في قسم إخراج واحد

التحويلات الواسعة (تبديل مطلوب):

  • groupByKey، reduceByKey، aggregateByKey
  • join، cogroup، distinct
  • repartition، coalesce (مع التبديل)
  • البيانات من أقسام إدخال متعددة مطلوبة للإخراج

سؤال المقابلة: "ماذا يحدث أثناء التبديل (Shuffle)؟"

نموذج الإجابة:

"التبديل في Spark يتضمن إعادة توزيع البيانات عبر الأقسام، عادة يُفعّل بعمليات مثل groupByKey أو join. العملية لها ثلاث مراحل:

1. جانب Map (كتابة التبديل):

  • كل مهمة تحسب أي قسم ينتمي إليه كل سجل
  • السجلات تُكتب على القرص المحلي في ملفات التبديل
  • ملف فهرس يتتبع حدود الأقسام

2. خدمة التبديل:

  • خدمة التبديل الخارجية (اختيارية) تدير ملفات التبديل
  • تسمح بإزالة المنفذين ديناميكياً بدون فقدان بيانات التبديل

3. جانب Reduce (قراءة التبديل):

  • المهام تجلب أقسامها من جميع مهام map
  • البيانات تُسحب عبر الشبكة
  • الدمج والفرز يجمع السجلات من جميع المصادر

التبديلات مكلفة لأنها تتضمن:

  • التسلسل/إلغاء التسلسل
  • I/O القرص
  • I/O الشبكة
  • مشاكل انحراف البيانات المحتملة

لتقليل التبديلات، أستخدم تقنيات مثل البث للجداول الصغيرة، تقسيم البيانات مسبقاً على مفاتيح الضم، واستخدام reduceByKey بدلاً من groupByKey."

إدارة الذاكرة

نموذج ذاكرة Spark (الذاكرة الموحدة)

+------------------------------------------+
|            ذاكرة المنفذ                  |
+------------------------------------------+
|  الذاكرة المحجوزة (300MB ثابتة)          |
+------------------------------------------+
|                                          |
|  الذاكرة الموحدة (spark.memory.fraction) |
|  +------------------------------------+  |
|  |  ذاكرة التخزين   |  ذاكرة          |  |
|  |  (RDDs المخزنة،  |  التنفيذ        |  |
|  |   البث)          |  (التبديلات،    |  |
|  |                  |   الضم،         |  |
|  |                  |   التجميعات)    |  |
|  +------------------------------------+  |
|        <- الحد يمكن أن يتحرك ->          |
|                                          |
+------------------------------------------+
|  ذاكرة المستخدم (1 - spark.memory.fraction)|
|  (هياكل بيانات المستخدم، UDFs)           |
+------------------------------------------+

معلمات التكوين الرئيسية

# تكوين ذاكرة المنفذ
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "2g")  # للحاويات
spark.conf.set("spark.memory.fraction", "0.6")  # جزء الذاكرة الموحدة
spark.conf.set("spark.memory.storageFraction", "0.5")  # تقسيم التخزين مقابل التنفيذ

محسن Catalyst و Tungsten

خط أنابيب تحسين Catalyst

+-------------+     +-------------+     +-------------+
|  خطة        |     |  خطة        |     | خطة         |
|  منطقية    | --> |  منطقية    | --> | منطقية     |
|  غير محلولة |     |  محللة      |     | محسنة       |
+-------------+     +-------------+     +-------------+
                                               |
                                               v
+-------------+     +-------------+     +-------------+
|  RDDs       |     |  خطة        |     |  خطط        |
|  منفذة     | <-- |  فيزيائية   | <-- |  فيزيائية   |
+-------------+     +-------------+     +-------------+
                    (اختيار قائم على التكلفة)

التحسينات الشائعة

دفع المرشحات (Predicate Pushdown):

# قبل التحسين: قراءة كل البيانات، ثم التصفية
# بعد: المرشح يُدفع لمصدر البيانات

df = spark.read.parquet("s3://bucket/data/")
filtered = df.filter(col("date") == "2024-01-01")

# Spark يدفع المرشح لقارئ Parquet
# يقرأ فقط مجموعات الصفوف ذات الصلة

تقليم الأعمدة (Column Pruning):

# فقط الأعمدة المطلوبة تُقرأ من المصدر
df = spark.read.parquet("s3://bucket/wide_table/")
result = df.select("id", "name")  # يقرأ عمودين فقط

ضم البث (Broadcast Join):

from pyspark.sql.functions import broadcast

# جدول صغير يُبث لجميع المنفذين
large_df = spark.read.parquet("s3://bucket/large/")
small_df = spark.read.parquet("s3://bucket/small/")

result = large_df.join(broadcast(small_df), "key")

النقاط الرئيسية للمقابلات

  1. فهم نموذج التنفيذ: Jobs → Stages → Tasks، ولماذا التبديلات تنشئ حدود المراحل
  2. معرفة التحويلات الضيقة مقابل الواسعة: حاسم لمناقشات التحسين
  3. إدارة الذاكرة: نموذج الذاكرة الموحدة وكيفية تشخيص مشاكل OOM
  4. فوائد التقييم الكسول: تحسين الاستعلام، دفع المرشحات، تقليل نقل البيانات
  5. محسن Catalyst: كيف يحسن Spark الخطط المنطقية إلى تنفيذ فيزيائي

:::

اختبار

الوحدة 5: البيانات الضخمة وأنظمة البث

خذ الاختبار