البيانات الضخمة وأنظمة البث
أنماط البرمجة في مقابلات PySpark
أسئلة البرمجة في PySpark تختبر قدرتك على حل مشاكل هندسة البيانات الحقيقية بكفاءة. يغطي هذا الدرس الأنماط الأكثر شيوعاً التي ستواجهها في المقابلات.
استيرادات PySpark الأساسية
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, lit, when, coalesce,
sum, count, avg, max, min,
row_number, rank, dense_rank, lead, lag,
date_format, to_date, datediff, date_add,
explode, array, struct, collect_list, collect_set,
concat, concat_ws, split, regexp_extract,
broadcast, monotonically_increasing_id
)
from pyspark.sql.window import Window
from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType,
DoubleType, DateType, TimestampType, ArrayType
)
النمط 1: دوال النوافذ
المجاميع التراكمية والحسابات الجارية
سؤال المقابلة: "احسب المجموع الجاري للمبيعات لكل عميل مرتب بالتاريخ."
from pyspark.sql.functions import sum as spark_sum
# بيانات عينة
data = [
("C001", "2024-01-01", 100),
("C001", "2024-01-02", 150),
("C001", "2024-01-03", 200),
("C002", "2024-01-01", 300),
("C002", "2024-01-02", 250),
]
df = spark.createDataFrame(data, ["customer_id", "date", "amount"])
# تعريف النافذة
window = Window.partitionBy("customer_id").orderBy("date")
# المجموع الجاري
df_with_running = df.withColumn(
"running_total",
spark_sum("amount").over(window)
)
# المجموع الجاري مع الصفوف بين
window_3day = Window.partitionBy("customer_id")\
.orderBy("date")\
.rowsBetween(-2, 0) # الحالي + 2 صفوف سابقة
df_with_3day = df.withColumn(
"rolling_3day_sum",
spark_sum("amount").over(window_3day)
)
الترتيب ضمن المجموعات
سؤال المقابلة: "أوجد أعلى 3 منتجات من حيث الإيرادات في كل فئة."
from pyspark.sql.functions import row_number
# بيانات عينة
products = [
("Electronics", "Laptop", 50000),
("Electronics", "Phone", 40000),
("Electronics", "Tablet", 30000),
("Electronics", "Watch", 20000),
("Clothing", "Jacket", 5000),
("Clothing", "Shirt", 3000),
("Clothing", "Pants", 4000),
]
df = spark.createDataFrame(products, ["category", "product", "revenue"])
# ترتيب المنتجات داخل كل فئة
window = Window.partitionBy("category").orderBy(col("revenue").desc())
df_ranked = df.withColumn("rank", row_number().over(window))
# تصفية أعلى 3
top_3_per_category = df_ranked.filter(col("rank") <= 3)
Lead/Lag لتحليل السلاسل الزمنية
سؤال المقابلة: "احسب التغير اليومي في سعر السهم."
from pyspark.sql.functions import lag
stock_data = [
("AAPL", "2024-01-01", 180.0),
("AAPL", "2024-01-02", 182.5),
("AAPL", "2024-01-03", 179.0),
("GOOG", "2024-01-01", 140.0),
("GOOG", "2024-01-02", 142.0),
]
df = spark.createDataFrame(stock_data, ["symbol", "date", "price"])
window = Window.partitionBy("symbol").orderBy("date")
df_with_change = df.withColumn(
"prev_price", lag("price", 1).over(window)
).withColumn(
"daily_change", col("price") - col("prev_price")
).withColumn(
"pct_change",
((col("price") - col("prev_price")) / col("prev_price") * 100).cast("decimal(10,2)")
)
النمط 2: التجميعات المعقدة
تجميعات متعددة مع GroupBy
سؤال المقابلة: "احسب إحصائيات متنوعة لكل قسم: إجمالي الراتب، متوسط الراتب، عدد الموظفين، أعلى موظف أجراً."
from pyspark.sql.functions import (
sum as spark_sum, avg as spark_avg,
count, max as spark_max, first
)
employees = [
("Engineering", "Alice", 150000),
("Engineering", "Bob", 120000),
("Engineering", "Charlie", 130000),
("Sales", "David", 100000),
("Sales", "Eve", 110000),
]
df = spark.createDataFrame(employees, ["dept", "name", "salary"])
# تجميعات متعددة
dept_stats = df.groupBy("dept").agg(
spark_sum("salary").alias("total_salary"),
spark_avg("salary").alias("avg_salary"),
count("*").alias("employee_count"),
spark_max("salary").alias("max_salary")
)
# للحصول على اسم أعلى موظف أجراً، استخدم دالة النافذة
window = Window.partitionBy("dept").orderBy(col("salary").desc())
df_with_rank = df.withColumn("rank", row_number().over(window))
highest_paid = df_with_rank.filter(col("rank") == 1)\
.select("dept", col("name").alias("highest_paid_employee"))
# ضم للخلف
result = dept_stats.join(highest_paid, "dept")
جداول Pivot
سؤال المقابلة: "أنشئ جدول pivot يظهر الإيرادات الشهرية لكل فئة منتج."
from pyspark.sql.functions import month, year
sales = [
("2024-01-15", "Electronics", 1000),
("2024-01-20", "Clothing", 500),
("2024-02-10", "Electronics", 1200),
("2024-02-15", "Clothing", 600),
("2024-03-01", "Electronics", 800),
]
df = spark.createDataFrame(sales, ["date", "category", "revenue"])
df_with_month = df.withColumn(
"month", date_format(to_date("date"), "yyyy-MM")
)
# Pivot: الصفوف هي الفئات، الأعمدة هي الأشهر
pivot_df = df_with_month.groupBy("category").pivot("month").sum("revenue")
# ملء القيم الفارغة بـ 0
pivot_df = pivot_df.fillna(0)
النمط 3: التعامل مع أنواع البيانات المعقدة
العمل مع المصفوفات
سؤال المقابلة: "افرد عمود مصفوفة وأجري تجميعات."
from pyspark.sql.functions import explode, collect_list, size
# مستخدم مع اهتمامات متعددة
users = [
(1, "Alice", ["Python", "Spark", "SQL"]),
(2, "Bob", ["Java", "Spark"]),
(3, "Charlie", ["Python", "ML", "SQL"]),
]
df = spark.createDataFrame(users, ["id", "name", "skills"])
# Explode: صف واحد لكل مهارة
exploded = df.select("id", "name", explode("skills").alias("skill"))
# عد المهارات لكل مستخدم
skill_counts = df.withColumn("skill_count", size("skills"))
# أوجد المهارات الأكثر شيوعاً
skill_popularity = exploded.groupBy("skill")\
.agg(count("*").alias("user_count"))\
.orderBy(col("user_count").desc())
النمط 4: استراتيجيات إزالة التكرار
الاحتفاظ بأحدث سجل
سؤال المقابلة: "أزل التكرارات مع الاحتفاظ بأحدث سجل لكل عميل فقط."
from pyspark.sql.functions import row_number, col
# بيانات مع تكرارات
data = [
("C001", "2024-01-01", "Address1"),
("C001", "2024-01-15", "Address2"), # الأحدث
("C001", "2024-01-10", "Address1.5"),
("C002", "2024-01-01", "AddressA"),
("C002", "2024-01-05", "AddressB"), # الأحدث
]
df = spark.createDataFrame(data, ["customer_id", "updated_at", "address"])
# الطريقة 1: دالة النافذة
window = Window.partitionBy("customer_id").orderBy(col("updated_at").desc())
deduplicated = df.withColumn("rn", row_number().over(window))\
.filter(col("rn") == 1)\
.drop("rn")
# الطريقة 2: استخدام dropDuplicates (يحتفظ بأول ظهور)
# يجب الترتيب أولاً
deduplicated_v2 = df.orderBy(col("updated_at").desc())\
.dropDuplicates(["customer_id"])
النمط 5: الضم الفعال
ضم البث للجداول الصغيرة
سؤال المقابلة: "حسّن الضم بين جدول حقائق كبير وجدول أبعاد صغير."
from pyspark.sql.functions import broadcast
# جدول حقائق كبير (ملايين الصفوف)
sales = spark.read.parquet("s3://bucket/sales/") # 100M صف
# جدول أبعاد صغير (آلاف الصفوف)
products = spark.read.parquet("s3://bucket/products/") # 10K صف
# ضم البث - يرسل الجدول الصغير لجميع المنفذين
result = sales.join(
broadcast(products),
sales.product_id == products.id
).select(
sales["*"],
products.name,
products.category
)
النمط 6: تجزئة الجلسات
سؤال المقابلة: "جمّع أحداث المستخدم في جلسات مع مهلة خمول 30 دقيقة."
from pyspark.sql.functions import (
lag, unix_timestamp, sum as spark_sum, col
)
from pyspark.sql.window import Window
events = [
("user1", "2024-01-01 10:00:00"),
("user1", "2024-01-01 10:05:00"),
("user1", "2024-01-01 10:20:00"),
("user1", "2024-01-01 11:00:00"), # جلسة جديدة (فجوة >30 دقيقة)
("user1", "2024-01-01 11:10:00"),
("user2", "2024-01-01 10:00:00"),
]
df = spark.createDataFrame(events, ["user_id", "event_time"])
# تحويل لطابع زمني
df = df.withColumn("event_ts", to_timestamp("event_time"))
# نافذة لـ lag
window = Window.partitionBy("user_id").orderBy("event_ts")
# حساب الوقت منذ الحدث السابق
df_with_gap = df.withColumn(
"prev_event_ts", lag("event_ts", 1).over(window)
).withColumn(
"gap_seconds",
unix_timestamp("event_ts") - unix_timestamp("prev_event_ts")
)
# تحديد حدود الجلسة (فجوة > 30 دقيقة أو أول حدث)
session_timeout = 30 * 60 # 30 دقيقة بالثواني
df_with_boundary = df_with_gap.withColumn(
"new_session",
when(
col("prev_event_ts").isNull() | (col("gap_seconds") > session_timeout),
1
).otherwise(0)
)
# إنشاء معرفات الجلسة باستخدام المجموع التراكمي
df_with_sessions = df_with_boundary.withColumn(
"session_id",
spark_sum("new_session").over(window)
)
نصائح المقابلة لأسئلة PySpark
- ابدأ دائماً بالاستيرادات: أظهر أنك تعرف الدوال الشائعة
- اشرح نهجك: تحدث عن الحل قبل البرمجة
- فكر في الأداء: اذكر ضم البث، التقسيم، تجنب التبديلات
- تعامل مع الحالات الحدية: القيم الفارغة، التكرارات، DataFrames الفارغة
- اعرف DataFrame مقابل SQL: كن مرتاحاً مع كلا الـ APIs
# DataFrame API
df.filter(col("status") == "active")\
.groupBy("region")\
.agg(sum("revenue").alias("total"))
# SQL API (مكافئ)
df.createOrReplaceTempView("sales")
spark.sql("""
SELECT region, SUM(revenue) as total
FROM sales
WHERE status = 'active'
GROUP BY region
""")
:::