Big Data & Streaming Systems

Real-Time Processing: Flink, Spark Streaming & Kafka Streams

5 min read

Stream processing frameworks are essential for building real-time data pipelines. This lesson covers the three major frameworks and their trade-offs for data engineering interviews.

Stream Processing Framework Comparison

Aspect Apache Flink Spark Streaming Kafka Streams
Processing Model True streaming (event-at-a-time) Micro-batch True streaming
Latency Milliseconds Seconds (micro-batch) Milliseconds
Deployment Standalone cluster Spark cluster Library (embedded)
State Management Native, managed External (RocksDB) Local state stores
Exactly-Once Native support Requires careful config Native support
Scaling Task parallelism Executor-based Partition-based
Windowing Advanced (tumbling, sliding, session) Basic windowing Tumbling, sliding, session
Use Case Complex event processing ETL with Spark ecosystem Kafka-centric apps
+------------------+
|   Job Manager    |
| (Coordination)   |
+--------+---------+
         |
    +----+----+
    |         |
    v         v
+------+   +------+
| Task |   | Task |
|Mgr 1 |   |Mgr 2 |
+--+---+   +--+---+
   |          |
   v          v
+------+   +------+
| Slots|   | Slots|
|(Tasks)|  |(Tasks)|
+------+   +------+
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import KafkaSource, KafkaSink
from pyflink.common.serialization import SimpleStringSchema

# Create execution environment
env = StreamExecutionEnvironment.get_execution_environment()

# Configure checkpointing for exactly-once
env.enable_checkpointing(10000)  # Every 10 seconds
env.get_checkpoint_config().set_min_pause_between_checkpoints(5000)

# Kafka source
source = KafkaSource.builder() \
    .set_bootstrap_servers("broker:9092") \
    .set_topics("transactions") \
    .set_group_id("flink-consumer") \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()

# Create stream
transactions = env.from_source(
    source,
    WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)),
    "Kafka Source"
)

# Process stream
processed = transactions \
    .map(parse_transaction) \
    .filter(lambda t: t.amount > 100) \
    .key_by(lambda t: t.customer_id) \
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) \
    .aggregate(TransactionAggregator())

# Execute
env.execute("Transaction Processing")

Sample Response:

"Flink's checkpointing enables exactly-once processing through distributed snapshots:

How It Works:

  1. Barrier Injection: Job Manager injects checkpoint barriers into source streams
  2. Barrier Alignment: Operators wait for barriers from all input channels
  3. State Snapshot: Once aligned, operators snapshot their state
  4. Checkpoint Completion: When all operators complete, checkpoint is finalized
Source 1 ─────|B1|─────────|B2|─────>
                    \
                     \
Operator        Wait for     Snapshot
                  both         State
                    /
                   /
Source 2 ─────|B1|─────────|B2|─────>

B1, B2 = Checkpoint barriers

Configuration Options:

# Exactly-once (default)
env.get_checkpoint_config().set_checkpointing_mode(
    CheckpointingMode.EXACTLY_ONCE
)

# At-least-once (lower latency)
env.get_checkpoint_config().set_checkpointing_mode(
    CheckpointingMode.AT_LEAST_ONCE
)

# State backend
env.set_state_backend(RocksDBStateBackend("s3://bucket/checkpoints"))

# Incremental checkpoints for large state
env.set_state_backend(
    RocksDBStateBackend("s3://bucket/checkpoints", True)  # Incremental
)

Recovery: On failure, Flink restores operators to last successful checkpoint, replays events from that point, ensuring exactly-once semantics."

from pyflink.datastream.window import (
    TumblingEventTimeWindows,
    SlidingEventTimeWindows,
    SessionWindows
)
from pyflink.common import Time

# Tumbling windows: Non-overlapping, fixed-size
transactions.key_by(lambda t: t.customer_id) \
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) \
    .sum("amount")

# Sliding windows: Overlapping, fixed-size
transactions.key_by(lambda t: t.customer_id) \
    .window(SlidingEventTimeWindows.of(
        Time.minutes(10),  # Window size
        Time.minutes(1)    # Slide interval
    )) \
    .sum("amount")

# Session windows: Gap-based
transactions.key_by(lambda t: t.customer_id) \
    .window(SessionWindows.with_gap(Time.minutes(30))) \
    .sum("amount")

Event Time vs Processing Time

Interview Question: "Explain the difference between event time and processing time in Flink"

Sample Response:

"Flink supports three time semantics:

1. Event Time: Time when event actually occurred (embedded in data)

# Extract timestamp and generate watermarks
stream.assign_timestamps_and_watermarks(
    WatermarkStrategy
        .for_bounded_out_of_orderness(Duration.of_seconds(5))
        .with_timestamp_assigner(
            lambda event, _: event.timestamp_ms
        )
)
  • Best for correctness with out-of-order events
  • Requires watermarks to handle late data
  • Results are deterministic and reproducible

2. Processing Time: Time when event is processed by Flink

stream.key_by(lambda t: t.key) \
    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
  • Lowest latency
  • Non-deterministic results
  • Simpler to implement

3. Ingestion Time: Time when event enters Flink

  • Compromise between event and processing time
  • Assigned once at source

Watermarks:

Event Time:  |--1--|--3--|--2--|--5--|--4--|--6--|
Watermark:        W1=2      W2=4      W3=5

W1 signals: 'No more events with timestamp < 2'
Late events (timestamp < watermark) handled separately

For most production systems, I recommend event time with bounded out-of-orderness watermarks. This handles late data gracefully while providing correct results."

Spark Structured Streaming

Structured Streaming Model

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    window, col, sum as spark_sum, count
)

spark = SparkSession.builder \
    .appName("StructuredStreamingApp") \
    .config("spark.sql.streaming.checkpointLocation", "/checkpoints") \
    .getOrCreate()

# Read from Kafka
transactions = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "latest") \
    .load()

# Parse JSON
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

schema = StructType() \
    .add("customer_id", StringType()) \
    .add("amount", DoubleType()) \
    .add("timestamp", TimestampType())

parsed = transactions \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# Windowed aggregation
result = parsed \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(
        col("customer_id"),
        window(col("timestamp"), "10 minutes", "5 minutes")
    ) \
    .agg(
        spark_sum("amount").alias("total_amount"),
        count("*").alias("transaction_count")
    )

# Write to console (for debugging)
query = result.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

# Write to Kafka
query = result.writeStream \
    .outputMode("update") \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("topic", "aggregated-transactions") \
    .option("checkpointLocation", "/checkpoints/kafka-sink") \
    .start()

query.awaitTermination()

Output Modes

Interview Question: "Explain the different output modes in Structured Streaming"

Sample Response:

"Structured Streaming has three output modes:

1. Append Mode (default):

  • Only new rows added since last trigger are output
  • Works only with queries that don't have aggregations or have aggregations on event-time with watermark
  • Use case: Writing to append-only sinks like files
result.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "s3://bucket/output/")

2. Complete Mode:

  • Entire result table is output every trigger
  • Requires aggregation query
  • Use case: Small result sets, dashboards
result.writeStream \
    .outputMode("complete") \
    .format("console")

3. Update Mode:

  • Only rows that changed since last trigger are output
  • Works with most queries
  • Use case: Updating key-value stores, databases
result.writeStream \
    .outputMode("update") \
    .foreachBatch(write_to_database)

Trade-offs:

Mode Memory Output Size Use Case
Append Low New rows only Files, logs
Complete High All rows Dashboards
Update Medium Changed rows Databases

Exactly-Once with Structured Streaming

# Configure exactly-once semantics
spark.conf.set("spark.sql.streaming.kafka.useDeprecatedOffsetFetching", "false")

# Idempotent writes with foreachBatch
def write_to_delta(batch_df, batch_id):
    batch_df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .save("s3://bucket/delta-table/")

query = result.writeStream \
    .foreachBatch(write_to_delta) \
    .option("checkpointLocation", "/checkpoints/delta") \
    .trigger(processingTime="1 minute") \
    .start()

Kafka Streams

Kafka Streams Architecture

+------------------------------------------+
|            Application Instance           |
|                                          |
|  +--------+  +--------+  +--------+      |
|  | Stream |  | Stream |  | Stream |      |
|  | Thread |  | Thread |  | Thread |      |
|  +---+----+  +---+----+  +---+----+      |
|      |           |           |           |
|      v           v           v           |
|  +-------+   +-------+   +-------+       |
|  | State |   | State |   | State |       |
|  | Store |   | Store |   | Store |       |
|  +-------+   +-------+   +-------+       |
|                                          |
+------------------------------------------+
            |                   ^
            | Consume           | Produce
            v                   |
+------------------------------------------+
|              Kafka Cluster               |
+------------------------------------------+

Kafka Streams DSL (Java/Kotlin)

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "transaction-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
          StreamsConfig.EXACTLY_ONCE_V2);

StreamsBuilder builder = new StreamsBuilder();

// Source stream
KStream<String, Transaction> transactions = builder.stream(
    "transactions",
    Consumed.with(Serdes.String(), transactionSerde)
);

// Processing
KTable<Windowed<String>, TransactionSummary> summary = transactions
    .filter((key, value) -> value.getAmount() > 100)
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .aggregate(
        TransactionSummary::new,
        (key, value, aggregate) -> aggregate.add(value),
        Materialized.with(Serdes.String(), summarySerde)
    );

// Output to topic
summary.toStream()
    .map((windowedKey, value) -> KeyValue.pair(
        windowedKey.key(),
        value
    ))
    .to("transaction-summaries");

// Build and start
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Sample Response:

"I'd choose Kafka Streams when:

1. Kafka-Centric Architecture:

  • Input and output are both Kafka topics
  • No need to connect to external systems directly
  • Want tight integration with Kafka ecosystem

2. Simpler Deployment:

Kafka Streams: Library embedded in your app
├── Deploy as normal JAR/container
├── Scale by adding app instances
└── No separate cluster to manage

Flink: Separate cluster required
├── Job Manager + Task Managers
├── Additional operational overhead
└── More powerful but complex

3. Lightweight Processing:

  • Simple transformations, filtering, aggregations
  • Per-partition state is sufficient
  • Don't need cross-partition joins

4. Microservices Pattern:

  • Each service owns its processing
  • Want to avoid centralized streaming platform
  • Easier local development and testing

Choose Flink when:

  • Need complex event processing (CEP)
  • Require sophisticated windowing (session windows with gaps)
  • Processing non-Kafka sources
  • Need broadcast state patterns
  • Require exactly-once across external systems

Example Decision:

Use Case: Aggregate orders per customer per hour
→ Kafka Streams (simple aggregation, Kafka-centric)

Use Case: Detect fraud patterns across multiple data streams
→ Flink (complex CEP, multiple sources, advanced state)
```"

## Stream Processing Design Patterns

### Pattern 1: Enrichment

```python
# Flink example: Enrich transactions with customer data
# Customer data as broadcast state

class EnrichmentFunction(KeyedBroadcastProcessFunction):
    def process_element(self, transaction, ctx):
        customer = ctx.get_broadcast_state(
            customer_state_desc
        ).get(transaction.customer_id)

        if customer:
            yield EnrichedTransaction(
                transaction,
                customer.name,
                customer.segment
            )

    def process_broadcast_element(self, customer, ctx):
        ctx.get_broadcast_state(customer_state_desc).put(
            customer.id,
            customer
        )

Pattern 2: Deduplication

# Deduplicate events within a time window
class DeduplicationFunction(KeyedProcessFunction):
    def __init__(self):
        self.seen_ids = ValueState("seen", set())

    def process_element(self, event, ctx):
        seen = self.seen_ids.value() or set()

        if event.id not in seen:
            seen.add(event.id)
            self.seen_ids.update(seen)

            # Set timer to clean up old IDs
            ctx.timer_service().register_event_time_timer(
                ctx.timestamp() + Duration.of_hours(1).to_millis()
            )

            yield event

    def on_timer(self, timestamp, ctx):
        # Clean up state
        self.seen_ids.clear()

Pattern 3: Late Data Handling

# Flink side outputs for late data
late_output_tag = OutputTag("late-data")

result = transactions \
    .key_by(lambda t: t.customer_id) \
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) \
    .allowed_lateness(Time.minutes(1)) \
    .side_output_late_data(late_output_tag) \
    .aggregate(TransactionAggregator())

# Get late data stream
late_data = result.get_side_output(late_output_tag)

# Process late data separately
late_data.map(lambda t: store_for_reprocessing(t))

Key Takeaways for Interviews

  1. Know the trade-offs: Flink (power) vs Spark Streaming (ecosystem) vs Kafka Streams (simplicity)
  2. Understand exactly-once: How each framework achieves it (checkpoints, transactions)
  3. Time semantics: Event time vs processing time implications
  4. Windowing: Tumbling, sliding, session windows and when to use each
  5. State management: How state is stored, checkpointed, and recovered

:::

Quiz

Module 5: Big Data & Streaming Systems

Take Quiz