Big Data & Streaming Systems
Apache Spark Architecture & Fundamentals
Apache Spark knowledge is fundamental for data engineering interviews at any level. Understanding its architecture, execution model, and optimization techniques separates senior engineers from juniors.
Spark Architecture Overview
Cluster Components
+------------------+
| Driver |
| (SparkContext) |
+--------+---------+
|
| Task Scheduling
| Resource Requests
v
+------------------+
| Cluster Manager |
| (YARN/K8s/Mesos) |
+--------+---------+
|
+----+----+
| | |
v v v
+------+ +------+ +------+
|Worker| |Worker| |Worker|
|Node 1| |Node 2| |Node 3|
+--+---+ +--+---+ +--+---+
| | |
v v v
Executor Executor Executor
(JVM) (JVM) (JVM)
Driver Program:
- Contains the SparkContext
- Converts user code into tasks
- Schedules tasks on executors
- Coordinates job execution
Executors:
- JVM processes on worker nodes
- Execute tasks and cache data
- Report status back to driver
Interview Question: "Explain Spark's lazy evaluation"
Sample Response:
"Spark uses lazy evaluation where transformations on RDDs/DataFrames don't execute immediately—they build a logical plan. Execution only happens when an action (collect, count, write) is called.
This enables optimization benefits:
- Query optimization: Catalyst optimizer can reorder and combine operations
- Predicate pushdown: Filters pushed closer to data sources
- Partition pruning: Only reads necessary partitions
- Reduced data movement: Avoids unnecessary intermediate materializations
For example, if I call df.filter().select().filter(), Spark combines both filters before execution rather than making two separate passes through the data."
RDDs vs DataFrames vs Datasets
Comparison Table
| Aspect | RDD | DataFrame | Dataset |
|---|---|---|---|
| Abstraction | Distributed collection | Distributed table | Typed distributed collection |
| Schema | No | Yes (inferred/explicit) | Yes (compile-time) |
| Optimization | Manual | Catalyst optimizer | Catalyst optimizer |
| Type Safety | Compile-time (Scala) | Runtime only | Compile-time |
| Serialization | Java/Kryo | Tungsten (binary) | Tungsten (binary) |
| API | Functional | SQL-like | Hybrid |
| Use Case | Low-level control | SQL analytics | Type-safe applications |
When to Use Each
# RDD: Fine-grained control, complex transformations
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.map(lambda x: x * 2).filter(lambda x: x > 5).collect()
# DataFrame: SQL-like operations, structured data
df = spark.read.parquet("s3://bucket/data/")
result = df.filter(col("revenue") > 1000)\
.groupBy("region")\
.agg(sum("revenue").alias("total_revenue"))
# Dataset (Scala/Java): Type safety with optimization
case class User(id: Long, name: String, age: Int)
val ds: Dataset[User] = spark.read.parquet("users/").as[User]
val adults = ds.filter(_.age >= 18)
Spark Execution Model
Job → Stage → Task Hierarchy
+-----------------+
| Job | <- Triggered by action (collect, save, count)
+--------+--------+
|
+-----+-----+
| |
v v
+------+ +------+
|Stage1| |Stage2| <- Separated by shuffle boundaries
+--+---+ +--+---+
| |
+--+--+ +--+--+
|Task1| |Task3| <- One task per partition
|Task2| |Task4|
+-----+ +-----+
Narrow vs Wide Transformations
Narrow Transformations (no shuffle):
map,filter,flatMapmapPartitions,mapPartitionsWithIndex- Each input partition contributes to one output partition
Wide Transformations (shuffle required):
groupByKey,reduceByKey,aggregateByKeyjoin,cogroup,distinctrepartition,coalesce(with shuffle)- Data from multiple input partitions needed for output
Interview Question: "What happens during a shuffle?"
Sample Response:
"A shuffle in Spark involves redistributing data across partitions, typically triggered by operations like groupByKey or join. The process has three phases:
1. Map Side (Shuffle Write):
- Each task computes which partition each record belongs to
- Records are written to local disk in shuffle files
- An index file tracks partition boundaries
2. Shuffle Service:
- External shuffle service (optional) manages shuffle files
- Allows executors to be dynamically removed without losing shuffle data
3. Reduce Side (Shuffle Read):
- Tasks fetch their partitions from all map tasks
- Data is pulled over the network
- Merge-sort combines records from all sources
Shuffles are expensive because they involve:
- Serialization/deserialization
- Disk I/O
- Network I/O
- Potential data skew issues
To minimize shuffles, I use techniques like broadcast joins for small tables, pre-partitioning data on join keys, and using reduceByKey instead of groupByKey."
Memory Management
Spark Memory Model (Unified Memory)
+------------------------------------------+
| Executor Memory |
+------------------------------------------+
| Reserved Memory (300MB fixed) |
+------------------------------------------+
| |
| Unified Memory (spark.memory.fraction) |
| +------------------------------------+ |
| | Storage Memory | Execution | |
| | (cached RDDs, | Memory | |
| | broadcasts) | (shuffles, | |
| | | joins, | |
| | | aggregates) | |
| +------------------------------------+ |
| <- Boundary can shift -> |
| |
+------------------------------------------+
| User Memory (1 - spark.memory.fraction) |
| (User data structures, UDFs) |
+------------------------------------------+
Key Configuration Parameters
# Executor memory configuration
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "2g") # For containers
spark.conf.set("spark.memory.fraction", "0.6") # Unified memory portion
spark.conf.set("spark.memory.storageFraction", "0.5") # Storage vs execution split
Interview Question: "How do you handle out-of-memory errors in Spark?"
Sample Response:
"OOM errors in Spark require systematic diagnosis:
1. Identify the failure point:
- Driver OOM: Collecting too much data, large broadcast variables
- Executor OOM: Data skew, insufficient memory per task, memory-intensive operations
2. Common solutions:
For Driver OOM:
# Instead of collecting all data
# BAD: df.collect()
# GOOD: df.limit(1000).collect()
# Use take() for sampling
sample = df.take(100)
# Write to storage instead of collecting
df.write.parquet("s3://bucket/output/")
For Executor OOM:
# Increase partitions to reduce data per task
df = df.repartition(200)
# Handle skew with salting
from pyspark.sql.functions import concat, lit, rand
df_salted = df.withColumn("salted_key",
concat(col("skewed_key"), lit("_"), (rand() * 10).cast("int")))
# Reduce parallelism for memory-heavy operations
spark.conf.set("spark.sql.shuffle.partitions", "100")
3. Tuning memory:
# Increase executor memory
--executor-memory 8g
# Add overhead for off-heap
--conf spark.executor.memoryOverhead=2g
# Enable off-heap memory for Tungsten
--conf spark.memory.offHeap.enabled=true
--conf spark.memory.offHeap.size=4g
```"
## Catalyst Optimizer & Tungsten
### Catalyst Optimization Pipeline
+-------------+ +-------------+ +-------------+ | Unresolved | | Analyzed | | Optimized | | Logical | --> | Logical | --> | Logical | | Plan | | Plan | | Plan | +-------------+ +-------------+ +-------------+ | v +-------------+ +-------------+ +-------------+ | Executed | | Physical | | Physical | | RDDs | <-- | Plan | <-- | Plans | +-------------+ +-------------+ +-------------+ (Cost-based selection)
### Common Optimizations
**Predicate Pushdown**:
```python
# Before optimization: read all data, then filter
# After: filter pushed to data source
df = spark.read.parquet("s3://bucket/data/")
filtered = df.filter(col("date") == "2024-01-01")
# Spark pushes the filter to Parquet reader
# Only reads relevant row groups
Column Pruning:
# Only requested columns are read from source
df = spark.read.parquet("s3://bucket/wide_table/")
result = df.select("id", "name") # Only reads 2 columns
Broadcast Join:
from pyspark.sql.functions import broadcast
# Small table broadcasted to all executors
large_df = spark.read.parquet("s3://bucket/large/")
small_df = spark.read.parquet("s3://bucket/small/")
result = large_df.join(broadcast(small_df), "key")
Interview Practice: Architecture Scenario
Question: "You have a Spark job processing 10TB of data. It's taking 4 hours but needs to complete in 1 hour. Walk me through your optimization approach."
Strong Response:
"I'd approach this systematically:
1. Profile the Current Job:
- Check Spark UI for stage durations, shuffle sizes, task distribution
- Identify bottlenecks: Is it CPU-bound, I/O-bound, or shuffle-bound?
- Look for data skew in task duration variance
2. Data Layout Optimizations:
# Partition data by frequently filtered columns
df.write.partitionBy("date", "region").parquet("s3://bucket/partitioned/")
# Use bucketing for frequent join keys
df.write.bucketBy(100, "customer_id").sortBy("customer_id")\
.saveAsTable("bucketed_orders")
# Choose optimal file sizes (128MB-256MB per file)
spark.conf.set("spark.sql.files.maxPartitionBytes", "268435456")
3. Join Optimization:
# Broadcast small dimensions
result = facts.join(broadcast(dimensions), "dim_key")
# For skewed joins, use salt technique
facts_salted = facts.withColumn("salted_key",
concat(col("skewed_key"), lit("_"), (rand() * 10).cast("int")))
4. Resource Tuning:
# Increase parallelism
--num-executors 100
--executor-cores 4
--executor-memory 16g
# Optimize shuffle
spark.conf.set("spark.sql.shuffle.partitions", "400")
spark.conf.set("spark.sql.adaptive.enabled", "true")
5. Adaptive Query Execution (Spark 3.0+):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
This combination of data layout, join optimization, and resource tuning typically yields 4-10x improvements."
Key Takeaways for Interviews
- Understand the execution model: Jobs → Stages → Tasks, and why shuffles create stage boundaries
- Know narrow vs wide transformations: Critical for optimization discussions
- Memory management: Unified memory model and how to diagnose OOM issues
- Lazy evaluation benefits: Query optimization, predicate pushdown, reduced data movement
- Catalyst optimizer: How Spark optimizes logical plans into physical execution
:::