Big Data & Streaming Systems
Real-Time Processing: Flink, Spark Streaming & Kafka Streams
Stream processing frameworks are essential for building real-time data pipelines. This lesson covers the three major frameworks and their trade-offs for data engineering interviews.
Stream Processing Framework Comparison
| Aspect | Apache Flink | Spark Streaming | Kafka Streams |
|---|---|---|---|
| Processing Model | True streaming (event-at-a-time) | Micro-batch | True streaming |
| Latency | Milliseconds | Seconds (micro-batch) | Milliseconds |
| Deployment | Standalone cluster | Spark cluster | Library (embedded) |
| State Management | Native, managed | External (RocksDB) | Local state stores |
| Exactly-Once | Native support | Requires careful config | Native support |
| Scaling | Task parallelism | Executor-based | Partition-based |
| Windowing | Advanced (tumbling, sliding, session) | Basic windowing | Tumbling, sliding, session |
| Use Case | Complex event processing | ETL with Spark ecosystem | Kafka-centric apps |
Apache Flink Deep Dive
Flink Architecture
+------------------+
| Job Manager |
| (Coordination) |
+--------+---------+
|
+----+----+
| |
v v
+------+ +------+
| Task | | Task |
|Mgr 1 | |Mgr 2 |
+--+---+ +--+---+
| |
v v
+------+ +------+
| Slots| | Slots|
|(Tasks)| |(Tasks)|
+------+ +------+
Flink DataStream API
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import KafkaSource, KafkaSink
from pyflink.common.serialization import SimpleStringSchema
# Create execution environment
env = StreamExecutionEnvironment.get_execution_environment()
# Configure checkpointing for exactly-once
env.enable_checkpointing(10000) # Every 10 seconds
env.get_checkpoint_config().set_min_pause_between_checkpoints(5000)
# Kafka source
source = KafkaSource.builder() \
.set_bootstrap_servers("broker:9092") \
.set_topics("transactions") \
.set_group_id("flink-consumer") \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
# Create stream
transactions = env.from_source(
source,
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)),
"Kafka Source"
)
# Process stream
processed = transactions \
.map(parse_transaction) \
.filter(lambda t: t.amount > 100) \
.key_by(lambda t: t.customer_id) \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.aggregate(TransactionAggregator())
# Execute
env.execute("Transaction Processing")
Interview Question: "Explain Flink's checkpointing mechanism"
Sample Response:
"Flink's checkpointing enables exactly-once processing through distributed snapshots:
How It Works:
- Barrier Injection: Job Manager injects checkpoint barriers into source streams
- Barrier Alignment: Operators wait for barriers from all input channels
- State Snapshot: Once aligned, operators snapshot their state
- Checkpoint Completion: When all operators complete, checkpoint is finalized
Source 1 ─────|B1|─────────|B2|─────>
\
\
Operator Wait for Snapshot
both State
/
/
Source 2 ─────|B1|─────────|B2|─────>
B1, B2 = Checkpoint barriers
Configuration Options:
# Exactly-once (default)
env.get_checkpoint_config().set_checkpointing_mode(
CheckpointingMode.EXACTLY_ONCE
)
# At-least-once (lower latency)
env.get_checkpoint_config().set_checkpointing_mode(
CheckpointingMode.AT_LEAST_ONCE
)
# State backend
env.set_state_backend(RocksDBStateBackend("s3://bucket/checkpoints"))
# Incremental checkpoints for large state
env.set_state_backend(
RocksDBStateBackend("s3://bucket/checkpoints", True) # Incremental
)
Recovery: On failure, Flink restores operators to last successful checkpoint, replays events from that point, ensuring exactly-once semantics."
Flink Windowing
from pyflink.datastream.window import (
TumblingEventTimeWindows,
SlidingEventTimeWindows,
SessionWindows
)
from pyflink.common import Time
# Tumbling windows: Non-overlapping, fixed-size
transactions.key_by(lambda t: t.customer_id) \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.sum("amount")
# Sliding windows: Overlapping, fixed-size
transactions.key_by(lambda t: t.customer_id) \
.window(SlidingEventTimeWindows.of(
Time.minutes(10), # Window size
Time.minutes(1) # Slide interval
)) \
.sum("amount")
# Session windows: Gap-based
transactions.key_by(lambda t: t.customer_id) \
.window(SessionWindows.with_gap(Time.minutes(30))) \
.sum("amount")
Event Time vs Processing Time
Interview Question: "Explain the difference between event time and processing time in Flink"
Sample Response:
"Flink supports three time semantics:
1. Event Time: Time when event actually occurred (embedded in data)
# Extract timestamp and generate watermarks
stream.assign_timestamps_and_watermarks(
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(5))
.with_timestamp_assigner(
lambda event, _: event.timestamp_ms
)
)
- Best for correctness with out-of-order events
- Requires watermarks to handle late data
- Results are deterministic and reproducible
2. Processing Time: Time when event is processed by Flink
stream.key_by(lambda t: t.key) \
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
- Lowest latency
- Non-deterministic results
- Simpler to implement
3. Ingestion Time: Time when event enters Flink
- Compromise between event and processing time
- Assigned once at source
Watermarks:
Event Time: |--1--|--3--|--2--|--5--|--4--|--6--|
Watermark: W1=2 W2=4 W3=5
W1 signals: 'No more events with timestamp < 2'
Late events (timestamp < watermark) handled separately
For most production systems, I recommend event time with bounded out-of-orderness watermarks. This handles late data gracefully while providing correct results."
Spark Structured Streaming
Structured Streaming Model
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
window, col, sum as spark_sum, count
)
spark = SparkSession.builder \
.appName("StructuredStreamingApp") \
.config("spark.sql.streaming.checkpointLocation", "/checkpoints") \
.getOrCreate()
# Read from Kafka
transactions = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "transactions") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
schema = StructType() \
.add("customer_id", StringType()) \
.add("amount", DoubleType()) \
.add("timestamp", TimestampType())
parsed = transactions \
.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
# Windowed aggregation
result = parsed \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
col("customer_id"),
window(col("timestamp"), "10 minutes", "5 minutes")
) \
.agg(
spark_sum("amount").alias("total_amount"),
count("*").alias("transaction_count")
)
# Write to console (for debugging)
query = result.writeStream \
.outputMode("update") \
.format("console") \
.start()
# Write to Kafka
query = result.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "aggregated-transactions") \
.option("checkpointLocation", "/checkpoints/kafka-sink") \
.start()
query.awaitTermination()
Output Modes
Interview Question: "Explain the different output modes in Structured Streaming"
Sample Response:
"Structured Streaming has three output modes:
1. Append Mode (default):
- Only new rows added since last trigger are output
- Works only with queries that don't have aggregations or have aggregations on event-time with watermark
- Use case: Writing to append-only sinks like files
result.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "s3://bucket/output/")
2. Complete Mode:
- Entire result table is output every trigger
- Requires aggregation query
- Use case: Small result sets, dashboards
result.writeStream \
.outputMode("complete") \
.format("console")
3. Update Mode:
- Only rows that changed since last trigger are output
- Works with most queries
- Use case: Updating key-value stores, databases
result.writeStream \
.outputMode("update") \
.foreachBatch(write_to_database)
Trade-offs:
| Mode | Memory | Output Size | Use Case |
|---|---|---|---|
| Append | Low | New rows only | Files, logs |
| Complete | High | All rows | Dashboards |
| Update | Medium | Changed rows | Databases |
Exactly-Once with Structured Streaming
# Configure exactly-once semantics
spark.conf.set("spark.sql.streaming.kafka.useDeprecatedOffsetFetching", "false")
# Idempotent writes with foreachBatch
def write_to_delta(batch_df, batch_id):
batch_df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("s3://bucket/delta-table/")
query = result.writeStream \
.foreachBatch(write_to_delta) \
.option("checkpointLocation", "/checkpoints/delta") \
.trigger(processingTime="1 minute") \
.start()
Kafka Streams
Kafka Streams Architecture
+------------------------------------------+
| Application Instance |
| |
| +--------+ +--------+ +--------+ |
| | Stream | | Stream | | Stream | |
| | Thread | | Thread | | Thread | |
| +---+----+ +---+----+ +---+----+ |
| | | | |
| v v v |
| +-------+ +-------+ +-------+ |
| | State | | State | | State | |
| | Store | | Store | | Store | |
| +-------+ +-------+ +-------+ |
| |
+------------------------------------------+
| ^
| Consume | Produce
v |
+------------------------------------------+
| Kafka Cluster |
+------------------------------------------+
Kafka Streams DSL (Java/Kotlin)
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "transaction-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
// Source stream
KStream<String, Transaction> transactions = builder.stream(
"transactions",
Consumed.with(Serdes.String(), transactionSerde)
);
// Processing
KTable<Windowed<String>, TransactionSummary> summary = transactions
.filter((key, value) -> value.getAmount() > 100)
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
TransactionSummary::new,
(key, value, aggregate) -> aggregate.add(value),
Materialized.with(Serdes.String(), summarySerde)
);
// Output to topic
summary.toStream()
.map((windowedKey, value) -> KeyValue.pair(
windowedKey.key(),
value
))
.to("transaction-summaries");
// Build and start
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Interview Question: "When would you choose Kafka Streams over Flink?"
Sample Response:
"I'd choose Kafka Streams when:
1. Kafka-Centric Architecture:
- Input and output are both Kafka topics
- No need to connect to external systems directly
- Want tight integration with Kafka ecosystem
2. Simpler Deployment:
Kafka Streams: Library embedded in your app
├── Deploy as normal JAR/container
├── Scale by adding app instances
└── No separate cluster to manage
Flink: Separate cluster required
├── Job Manager + Task Managers
├── Additional operational overhead
└── More powerful but complex
3. Lightweight Processing:
- Simple transformations, filtering, aggregations
- Per-partition state is sufficient
- Don't need cross-partition joins
4. Microservices Pattern:
- Each service owns its processing
- Want to avoid centralized streaming platform
- Easier local development and testing
Choose Flink when:
- Need complex event processing (CEP)
- Require sophisticated windowing (session windows with gaps)
- Processing non-Kafka sources
- Need broadcast state patterns
- Require exactly-once across external systems
Example Decision:
Use Case: Aggregate orders per customer per hour
→ Kafka Streams (simple aggregation, Kafka-centric)
Use Case: Detect fraud patterns across multiple data streams
→ Flink (complex CEP, multiple sources, advanced state)
```"
## Stream Processing Design Patterns
### Pattern 1: Enrichment
```python
# Flink example: Enrich transactions with customer data
# Customer data as broadcast state
class EnrichmentFunction(KeyedBroadcastProcessFunction):
def process_element(self, transaction, ctx):
customer = ctx.get_broadcast_state(
customer_state_desc
).get(transaction.customer_id)
if customer:
yield EnrichedTransaction(
transaction,
customer.name,
customer.segment
)
def process_broadcast_element(self, customer, ctx):
ctx.get_broadcast_state(customer_state_desc).put(
customer.id,
customer
)
Pattern 2: Deduplication
# Deduplicate events within a time window
class DeduplicationFunction(KeyedProcessFunction):
def __init__(self):
self.seen_ids = ValueState("seen", set())
def process_element(self, event, ctx):
seen = self.seen_ids.value() or set()
if event.id not in seen:
seen.add(event.id)
self.seen_ids.update(seen)
# Set timer to clean up old IDs
ctx.timer_service().register_event_time_timer(
ctx.timestamp() + Duration.of_hours(1).to_millis()
)
yield event
def on_timer(self, timestamp, ctx):
# Clean up state
self.seen_ids.clear()
Pattern 3: Late Data Handling
# Flink side outputs for late data
late_output_tag = OutputTag("late-data")
result = transactions \
.key_by(lambda t: t.customer_id) \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.allowed_lateness(Time.minutes(1)) \
.side_output_late_data(late_output_tag) \
.aggregate(TransactionAggregator())
# Get late data stream
late_data = result.get_side_output(late_output_tag)
# Process late data separately
late_data.map(lambda t: store_for_reprocessing(t))
Key Takeaways for Interviews
- Know the trade-offs: Flink (power) vs Spark Streaming (ecosystem) vs Kafka Streams (simplicity)
- Understand exactly-once: How each framework achieves it (checkpoints, transactions)
- Time semantics: Event time vs processing time implications
- Windowing: Tumbling, sliding, session windows and when to use each
- State management: How state is stored, checkpointed, and recovered
:::