البيانات الضخمة وأنظمة البث

المعالجة في الوقت الحقيقي: Flink و Spark Streaming و Kafka Streams

5 دقيقة للقراءة

أطر معالجة التدفق ضرورية لبناء أنابيب بيانات الوقت الحقيقي. يغطي هذا الدرس الأطر الرئيسية الثلاثة ومقايضاتها لمقابلات هندسة البيانات.

مقارنة أطر معالجة التدفق

الجانب Apache Flink Spark Streaming Kafka Streams
نموذج المعالجة بث حقيقي (حدث تلو الآخر) دفعات صغيرة بث حقيقي
الكمون ميلي ثانية ثواني (دفعات صغيرة) ميلي ثانية
النشر مجموعة مستقلة مجموعة Spark مكتبة (مدمجة)
إدارة الحالة أصلية، مُدارة خارجية (RocksDB) مخازن حالة محلية
مرة واحدة بالضبط دعم أصلي يتطلب تكوين دقيق دعم أصلي
التوسع توازي المهام قائم على المنفذ قائم على الأقسام
النوافذ متقدمة (متتالية، منزلقة، جلسة) نوافذ أساسية متتالية، منزلقة، جلسة
حالة الاستخدام معالجة أحداث معقدة ETL مع نظام Spark تطبيقات تركز على Kafka
+------------------+
|   Job Manager    |
| (التنسيق)        |
+--------+---------+
         |
    +----+----+
    |         |
    v         v
+------+   +------+
| Task |   | Task |
|Mgr 1 |   |Mgr 2 |
+--+---+   +--+---+
   |          |
   v          v
+------+   +------+
| Slots|   | Slots|
|(مهام)|   |(مهام)|
+------+   +------+

نموذج الإجابة:

"تمكّن نقاط تفتيش Flink المعالجة مرة واحدة بالضبط من خلال اللقطات الموزعة:

كيف تعمل:

  1. حقن الحواجز: Job Manager يحقن حواجز نقاط التفتيش في تدفقات المصدر
  2. محاذاة الحواجز: المشغلون ينتظرون الحواجز من جميع قنوات الإدخال
  3. لقطة الحالة: بمجرد المحاذاة، المشغلون يأخذون لقطة لحالتهم
  4. اكتمال نقطة التفتيش: عندما يكمل جميع المشغلون، نقطة التفتيش تُنهى
Source 1 ─────|B1|─────────|B2|─────>
                    \
                     \
Operator        انتظار     لقطة
                 الاثنين    الحالة
                    /
                   /
Source 2 ─────|B1|─────────|B2|─────>

B1, B2 = حواجز نقاط التفتيش

خيارات التكوين:

# مرة واحدة بالضبط (افتراضي)
env.get_checkpoint_config().set_checkpointing_mode(
    CheckpointingMode.EXACTLY_ONCE
)

# مرة واحدة على الأقل (كمون أقل)
env.get_checkpoint_config().set_checkpointing_mode(
    CheckpointingMode.AT_LEAST_ONCE
)

الاستعادة: عند الفشل، Flink يستعيد المشغلين لآخر نقطة تفتيش ناجحة، يعيد تشغيل الأحداث من تلك النقطة، مما يضمن دلالات مرة واحدة بالضبط."

from pyflink.datastream.window import (
    TumblingEventTimeWindows,
    SlidingEventTimeWindows,
    SessionWindows
)
from pyflink.common import Time

# نوافذ متتالية: غير متداخلة، حجم ثابت
transactions.key_by(lambda t: t.customer_id) \
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) \
    .sum("amount")

# نوافذ منزلقة: متداخلة، حجم ثابت
transactions.key_by(lambda t: t.customer_id) \
    .window(SlidingEventTimeWindows.of(
        Time.minutes(10),  # حجم النافذة
        Time.minutes(1)    # فترة الانزلاق
    )) \
    .sum("amount")

# نوافذ الجلسة: قائمة على الفجوة
transactions.key_by(lambda t: t.customer_id) \
    .window(SessionWindows.with_gap(Time.minutes(30))) \
    .sum("amount")

وقت الحدث مقابل وقت المعالجة

سؤال المقابلة: "اشرح الفرق بين وقت الحدث ووقت المعالجة في Flink"

نموذج الإجابة:

"يدعم Flink ثلاث دلالات زمنية:

1. وقت الحدث: الوقت الذي حدث فيه الحدث فعلياً (مضمن في البيانات)

  • الأفضل للصحة مع الأحداث غير المرتبة
  • يتطلب علامات مائية للتعامل مع البيانات المتأخرة
  • النتائج حتمية وقابلة للتكرار

2. وقت المعالجة: الوقت الذي يُعالج فيه الحدث بواسطة Flink

  • أقل كمون
  • نتائج غير حتمية
  • أبسط للتنفيذ

3. وقت الاستيعاب: الوقت الذي يدخل فيه الحدث Flink

  • حل وسط بين وقت الحدث والمعالجة
  • يُعيّن مرة واحدة في المصدر

لمعظم أنظمة الإنتاج، أوصي بوقت الحدث مع علامات مائية محدودة خارج الترتيب. هذا يتعامل مع البيانات المتأخرة بأناقة مع توفير نتائج صحيحة."

Spark Structured Streaming

نموذج Structured Streaming

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    window, col, sum as spark_sum, count
)

spark = SparkSession.builder \
    .appName("StructuredStreamingApp") \
    .config("spark.sql.streaming.checkpointLocation", "/checkpoints") \
    .getOrCreate()

# القراءة من Kafka
transactions = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "latest") \
    .load()

# تجميع النوافذ
result = parsed \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(
        col("customer_id"),
        window(col("timestamp"), "10 minutes", "5 minutes")
    ) \
    .agg(
        spark_sum("amount").alias("total_amount"),
        count("*").alias("transaction_count")
    )

أوضاع الإخراج

سؤال المقابلة: "اشرح أوضاع الإخراج المختلفة في Structured Streaming"

نموذج الإجابة:

"Structured Streaming لديه ثلاثة أوضاع إخراج:

1. وضع الإلحاق (الافتراضي):

  • فقط الصفوف الجديدة المضافة منذ آخر تشغيل تُخرج
  • يعمل فقط مع الاستعلامات التي ليس لديها تجميعات أو لديها تجميعات على وقت الحدث مع علامة مائية

2. وضع الاكتمال:

  • جدول النتائج بالكامل يُخرج في كل تشغيل
  • يتطلب استعلام تجميع

3. وضع التحديث:

  • فقط الصفوف التي تغيرت منذ آخر تشغيل تُخرج
  • يعمل مع معظم الاستعلامات

المقايضات:

الوضع الذاكرة حجم الإخراج حالة الاستخدام
إلحاق منخفضة صفوف جديدة فقط ملفات، سجلات
اكتمال عالية كل الصفوف لوحات المعلومات
تحديث متوسطة صفوف متغيرة قواعد البيانات

Kafka Streams

نموذج الإجابة:

"سأختار Kafka Streams عندما:

1. معمارية تركز على Kafka:

  • الإدخال والإخراج كلاهما مواضيع Kafka
  • لا حاجة للاتصال بأنظمة خارجية مباشرة
  • تريد تكامل وثيق مع نظام Kafka البيئي

2. نشر أبسط:

Kafka Streams: مكتبة مدمجة في تطبيقك
├── انشر كـ JAR/حاوية عادية
├── وسّع بإضافة نسخ التطبيق
└── لا مجموعة منفصلة للإدارة

Flink: مجموعة منفصلة مطلوبة
├── Job Manager + Task Managers
├── عبء تشغيلي إضافي
└── أقوى لكن أكثر تعقيداً

3. معالجة خفيفة:

  • تحويلات بسيطة، تصفية، تجميعات
  • حالة لكل قسم كافية
  • لا تحتاج ضم عبر الأقسام

اختر Flink عندما:

  • تحتاج معالجة أحداث معقدة (CEP)
  • تتطلب نوافذ متطورة (نوافذ جلسة مع فجوات)
  • معالجة مصادر غير Kafka
  • تحتاج أنماط حالة البث
  • تتطلب مرة واحدة بالضبط عبر أنظمة خارجية"

أنماط تصميم معالجة التدفق

النمط 1: الإثراء

# مثال Flink: إثراء المعاملات ببيانات العميل
# بيانات العميل كحالة بث

class EnrichmentFunction(KeyedBroadcastProcessFunction):
    def process_element(self, transaction, ctx):
        customer = ctx.get_broadcast_state(
            customer_state_desc
        ).get(transaction.customer_id)

        if customer:
            yield EnrichedTransaction(
                transaction,
                customer.name,
                customer.segment
            )

النمط 2: إزالة التكرار

# إزالة تكرار الأحداث ضمن نافذة زمنية
class DeduplicationFunction(KeyedProcessFunction):
    def __init__(self):
        self.seen_ids = ValueState("seen", set())

    def process_element(self, event, ctx):
        seen = self.seen_ids.value() or set()

        if event.id not in seen:
            seen.add(event.id)
            self.seen_ids.update(seen)
            yield event

النمط 3: التعامل مع البيانات المتأخرة

# مخرجات جانبية Flink للبيانات المتأخرة
late_output_tag = OutputTag("late-data")

result = transactions \
    .key_by(lambda t: t.customer_id) \
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) \
    .allowed_lateness(Time.minutes(1)) \
    .side_output_late_data(late_output_tag) \
    .aggregate(TransactionAggregator())

# الحصول على تدفق البيانات المتأخرة
late_data = result.get_side_output(late_output_tag)

النقاط الرئيسية للمقابلات

  1. اعرف المقايضات: Flink (القوة) مقابل Spark Streaming (النظام البيئي) مقابل Kafka Streams (البساطة)
  2. افهم مرة واحدة بالضبط: كيف يحققها كل إطار (نقاط التفتيش، المعاملات)
  3. دلالات الوقت: تأثيرات وقت الحدث مقابل وقت المعالجة
  4. النوافذ: متتالية، منزلقة، نوافذ الجلسة ومتى تستخدم كل منها
  5. إدارة الحالة: كيف تُخزن الحالة، تُؤخذ نقاط تفتيشها، وتُستعاد

:::

اختبار

الوحدة 5: البيانات الضخمة وأنظمة البث

خذ الاختبار