Big Data & Streaming Systems
Streaming Architectures: Kafka, Kinesis & Event Streaming
Understanding streaming architectures is essential for modern data engineering roles. This lesson covers Apache Kafka, AWS Kinesis, and the fundamental concepts of event-driven data systems.
Event Streaming Fundamentals
Batch vs Streaming Processing
| Aspect | Batch Processing | Stream Processing |
|---|---|---|
| Latency | Hours to days | Milliseconds to seconds |
| Data | Bounded datasets | Unbounded continuous flow |
| Processing | Once per batch | Continuous |
| State | Typically stateless | Stateful |
| Use Cases | ETL, reporting, ML training | Real-time analytics, alerting, fraud detection |
The Streaming Data Paradigm
+----------+ +---------+ +------------+ +----------+
| Sources | --> | Message | --> | Processing | --> | Sinks |
| (Events) | | Broker | | Engine | | (Output) |
+----------+ +---------+ +------------+ +----------+
|
v
Durable, Ordered
Event Log
Key Concepts:
- Events: Immutable facts that happened at a point in time
- Producers: Applications that publish events
- Consumers: Applications that subscribe to and process events
- Topics/Streams: Categories or feeds of events
- Partitions: Subdivisions for parallelism and ordering
Apache Kafka Deep Dive
Kafka Architecture
+------------------+ +------------------+ +------------------+
| Producer 1 | | Producer 2 | | Producer 3 |
+--------+---------+ +--------+---------+ +--------+---------+
| | |
+------------------------+------------------------+
|
v
+------------------------------------------------+
| Kafka Cluster |
| +--------+ +--------+ +--------+ |
| |Broker 1| |Broker 2| |Broker 3| |
| +--------+ +--------+ +--------+ |
| |
| Topic: orders (3 partitions, RF=3) |
| +--------+ +--------+ +--------+ |
| | P0 | | P1 | | P2 | |
| |Leader | |Leader | |Leader | |
| +--------+ +--------+ +--------+ |
+------------------------------------------------+
|
+------------------------+------------------------+
| | |
v v v
+--------+---------+ +--------+---------+ +--------+---------+
| Consumer 1 | | Consumer 2 | | Consumer 3 |
| (Partition 0) | | (Partition 1) | | (Partition 2) |
+------------------+ +------------------+ +------------------+
|________________________|________________________|
Consumer Group: order-processors
Interview Question: "Explain Kafka's durability and ordering guarantees"
Sample Response:
"Kafka provides strong durability and ordering guarantees:
Durability:
- Messages are persisted to disk before acknowledgment
- Replication factor (RF) determines how many copies exist
- With RF=3 and
acks=all, data survives loss of 2 brokers - ISR (In-Sync Replicas) ensures only caught-up replicas can become leader
Ordering Guarantees:
- Within a partition: Strict ordering guaranteed (messages processed in offset order)
- Across partitions: No ordering guarantee
- To maintain order for related events, use same partition key
For example, to ensure all events for a customer are ordered:
# All messages with same customer_id go to same partition
producer.send('orders',
key=customer_id.encode(),
value=order_data)
Exactly-Once Semantics (EOS):
- Enable with
enable.idempotence=trueandtransactional.id - Kafka 0.11+ supports transactions across multiple partitions
- Consumer must use
read_committedisolation level"
Kafka Producer Configuration
from confluent_kafka import Producer
producer_config = {
'bootstrap.servers': 'broker1:9092,broker2:9092',
# Durability settings
'acks': 'all', # Wait for all ISR replicas
'retries': 3, # Retry on transient failures
'retry.backoff.ms': 100,
# Performance tuning
'batch.size': 16384, # Batch messages (bytes)
'linger.ms': 5, # Wait for batch to fill
'buffer.memory': 33554432, # 32MB buffer
'compression.type': 'snappy', # Compress batches
# Exactly-once
'enable.idempotence': True,
'transactional.id': 'my-transactional-producer'
}
producer = Producer(producer_config)
def delivery_callback(err, msg):
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()}[{msg.partition()}]')
# Produce message
producer.produce(
topic='orders',
key=b'customer-123',
value=b'{"order_id": "O001", "amount": 99.99}',
callback=delivery_callback
)
producer.flush() # Wait for all messages to be delivered
Kafka Consumer Configuration
from confluent_kafka import Consumer
consumer_config = {
'bootstrap.servers': 'broker1:9092,broker2:9092',
'group.id': 'order-processors',
# Offset management
'auto.offset.reset': 'earliest', # Start from beginning if no offset
'enable.auto.commit': False, # Manual commit for at-least-once
# Performance
'fetch.min.bytes': 1024,
'fetch.max.wait.ms': 500,
'max.poll.records': 500,
# Session management
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000,
# Exactly-once (consumer side)
'isolation.level': 'read_committed'
}
consumer = Consumer(consumer_config)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f'Consumer error: {msg.error()}')
continue
# Process message
process_order(msg.key(), msg.value())
# Manual commit after successful processing
consumer.commit(asynchronous=False)
finally:
consumer.close()
Interview Question: "How do consumer groups work in Kafka?"
Sample Response:
"Consumer groups enable parallel processing and fault tolerance:
Partition Assignment:
- Each partition is assigned to exactly one consumer in a group
- If consumers < partitions: some consumers handle multiple partitions
- If consumers > partitions: excess consumers are idle
- Optimal: consumers = partitions for maximum parallelism
Rebalancing:
- Triggered when consumers join/leave group or partitions change
- Coordinator redistributes partitions among consumers
- During rebalance, processing pauses (impacts availability)
Offset Management:
- Consumer group tracks position (offset) in each partition
- Stored in internal
__consumer_offsetstopic - Enables resuming from last position after failure
Topic: orders (4 partitions)
Consumer Group: order-processors
Scenario 1: 2 consumers
+----------+ +----------+
|Consumer 1| |Consumer 2|
| P0, P1 | | P2, P3 |
+----------+ +----------+
Scenario 2: 4 consumers (optimal)
+----+ +----+ +----+ +----+
| C1 | | C2 | | C3 | | C4 |
| P0 | | P1 | | P2 | | P3 |
+----+ +----+ +----+ +----+
Scenario 3: 6 consumers (2 idle)
+----+ +----+ +----+ +----+ +------+ +------+
| C1 | | C2 | | C3 | | C4 | | C5 | | C6 |
| P0 | | P1 | | P2 | | P3 | | idle | | idle |
+----+ +----+ +----+ +----+ +------+ +------+
```"
## AWS Kinesis
### Kinesis vs Kafka Comparison
| Aspect | Apache Kafka | AWS Kinesis |
|--------|--------------|-------------|
| Deployment | Self-managed or managed | Fully managed |
| Retention | Configurable (unlimited) | 24h default, up to 365 days |
| Partitioning | Partitions | Shards |
| Ordering | Per partition | Per shard |
| Scaling | Add partitions (irreversible) | Split/merge shards |
| Pricing | Infrastructure cost | Per shard-hour + data |
| Ecosystem | Kafka Connect, KSQL | Firehose, Analytics |
### Kinesis Data Streams Architecture
```python
import boto3
import json
# Producer
kinesis = boto3.client('kinesis', region_name='us-east-1')
def put_record(stream_name, data, partition_key):
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(data).encode('utf-8'),
PartitionKey=partition_key
)
return response['SequenceNumber']
# Put single record
put_record(
'orders-stream',
{'order_id': 'O001', 'amount': 99.99},
'customer-123' # Partition key determines shard
)
# Batch put for efficiency
def put_records_batch(stream_name, records):
response = kinesis.put_records(
StreamName=stream_name,
Records=[
{
'Data': json.dumps(r['data']).encode('utf-8'),
'PartitionKey': r['partition_key']
}
for r in records
]
)
return response
# Consumer
def consume_stream(stream_name):
# Get shard iterator
response = kinesis.describe_stream(StreamName=stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
iterator_response = kinesis.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='TRIM_HORIZON' # Start from beginning
)
shard_iterator = iterator_response['ShardIterator']
while True:
response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for record in response['Records']:
data = json.loads(record['Data'].decode('utf-8'))
process_record(data)
shard_iterator = response['NextShardIterator']
Kinesis Data Firehose
Interview Question: "When would you use Kinesis Data Firehose vs Data Streams?"
Sample Response:
"The choice depends on your processing requirements:
Use Kinesis Data Firehose when:
- You need zero-code delivery to S3, Redshift, Elasticsearch, or Splunk
- Near-real-time is acceptable (1-5 minute latency)
- You want automatic scaling without shard management
- Simple transformations (Lambda) are sufficient
Use Kinesis Data Streams when:
- You need sub-second latency
- Custom processing logic is required
- Multiple consumers need to read the same stream
- You need replay capability
- Complex stream processing with Kinesis Data Analytics
Architecture Example:
+---> Lambda
Producers --> Kinesis Data Streams --+---> Kinesis Analytics
+---> Custom App
Producers --> Kinesis Firehose --> S3 --> Athena/Redshift
Firehose is simpler but less flexible. Data Streams gives you full control but requires more management."
Event-Driven Architecture Patterns
Event Sourcing
Traditional CRUD:
+-------------+
| Orders | <- Current state only
| id | amount |
+----+--------+
| 1 | 150.00 | <- No history of changes
+----+--------+
Event Sourcing:
+------------------------------------------+
| Order Events |
| event_id | order_id | type | payload |
+----------+----------+----------+---------+
| E1 | O1 | Created | $100 |
| E2 | O1 | Updated | $150 |
| E3 | O1 | Shipped | ... |
+------------------------------------------+
|
v
Materialized View (derived from events)
Benefits:
- Complete audit trail
- Can rebuild state at any point
- Supports temporal queries
- Enables event replay for debugging
CQRS (Command Query Responsibility Segregation)
+----------------+
| Commands |
| (Create, Update)|
+-------+--------+
|
v
+----------+ +--------+--------+ +-----------+
| Client | ----> | Command Handler | ----> | Event |
+----------+ +-----------------+ | Store |
| +-----+-----+
| |
| v
| +-----+-----+
| +---------------+ | Event |
+--> | Query Handler | <--------------- | Processor |
+-------+-------+ +-----------+
|
v
+-------+-------+
| Read Model |
| (Optimized |
| for queries) |
+---------------+
Interview Question: "Design a real-time fraud detection system"
Sample Response:
"I'd design this using a streaming architecture:
Components:
- Event Ingestion:
Transactions --> Kafka Topic: transactions
(Partitioned by user_id for ordering)
- Stream Processing (Flink/Kafka Streams):
# Fraud detection rules
rules = [
# Rule 1: Velocity check
lambda events: len(events) > 10, # >10 transactions in window
# Rule 2: Amount anomaly
lambda events: max(e.amount for e in events) > 5 * avg_amount,
# Rule 3: Geographic anomaly
lambda events: count_unique_countries(events) > 3,
]
# Stream processing pseudocode
transactions
.keyBy('user_id')
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(FraudDetector(rules))
.filter(is_suspicious)
.sink(KafkaSink('fraud-alerts'))
- Feature Store (for ML models):
Historical features:
- Average transaction amount (30 days)
- Typical transaction locations
- Usual transaction times
Real-time features:
- Transactions in last hour
- Current location
- Device fingerprint
- Decision Engine:
+-----------+ +----------+ +------------+
| Rules | --> | ML Model | --> | Decision |
| Engine | | Scoring | | (Block/ |
| (Fast) | | (Accurate)| | Flag/Pass)|
+-----------+ +----------+ +------------+
- Alerting & Response:
- Real-time alerts to fraud team
- Automatic blocking for high-confidence fraud
- Feedback loop for model improvement
Latency Target: <100ms from transaction to decision
Handling Scale:
- Partition by user_id for stateful processing
- Separate hot/cold paths for different risk levels"
Key Takeaways for Interviews
- Know the fundamentals: Producers, consumers, topics, partitions, consumer groups
- Understand ordering guarantees: Partitions enable ordering, not topics
- Delivery semantics: At-most-once, at-least-once, exactly-once trade-offs
- Kafka vs managed alternatives: When to use Kafka vs Kinesis vs Pub/Sub
- Event-driven patterns: Event sourcing, CQRS, and when to apply them
:::