Kafka, Flink, event sourcing, windowing — processing data as it arrives.
You run a bank. Every time a customer swipes their card, your system needs to decide: is this transaction legitimate, or is it fraud? Your current system works in batches. Every hour, a MapReduce job pulls the last hour of transactions, runs a fraud model on them, and flags the suspicious ones. The analytics team loves it — 99.2% accuracy!
But here is the problem. A stolen credit card gets used at 2:03 PM. Your batch job ran at 2:00 PM and will not run again until 3:00 PM. For the next 57 minutes, the thief racks up $14,000 in purchases. Your model would have caught it on the first transaction — if only it had seen it in time.
This is the fundamental limitation of batch processing. It processes data that has already been collected, in bounded chunks, on a schedule. The data sits idle between runs. The lag between "event happens" and "system reacts" is bounded by the batch interval. For fraud detection, this is unacceptable. For recommendation engines, it means serving stale suggestions. For monitoring dashboards, it means flying blind between updates.
Stream processing flips the model: instead of collecting data and then processing it, you process each event as it arrives. The fraud model sees the 2:03 PM transaction at 2:03 PM, not at 3:00 PM. The thief gets one transaction through, not thirty.
The simulation below shows the same fraud scenario. On the left, a batch system processes every 60 seconds. On the right, a stream system processes each event immediately. Watch the fraudulent transactions (red) slip through the batch window while the stream system catches them in real time.
Fraudulent transactions appear in red. Watch how many slip through the batch window before detection.
The longer the batch interval, the more fraud slips through. At 120 seconds, a single stolen card can accumulate thousands in charges. The stream system catches fraud within milliseconds of the event arriving.
But stream processing is not just "faster batch." It introduces fundamentally new challenges: What does "time" mean when events arrive out of order? How do you aggregate over an unbounded dataset? How do you recover from failures without losing or duplicating events? These are the questions that make stream processing its own discipline, and the questions we will answer in this lesson.
Before we can process a stream, we need to define what a stream is. Forget queues and brokers for a moment. The concept is simpler than the infrastructure.
An event is an immutable record of something that happened at a point in time. "User 42 clicked the buy button at 14:03:27.391 UTC." "Sensor 7 reported temperature 23.4C at 14:03:27.500 UTC." "Order 9001 was shipped at 14:03:28.012 UTC." Each event has a timestamp, a type, and a payload. Once it happened, it cannot un-happen. You can record a compensating event (a refund reverses a charge), but the original event remains in the record forever.
A stream (also called an event stream, event log, or event feed) is an unbounded, append-only, ordered sequence of events. "Unbounded" is the key word: unlike a file that has a beginning and an end, a stream has a beginning but no end. New events keep arriving indefinitely.
The basic architecture has three roles:
This decoupling is powerful. The producer does not slow down if a consumer is behind. A new consumer can join at any time and read from the beginning. A slow consumer does not block a fast one. The stream is the buffer, the ordering guarantee, and the persistent record all in one.
The simulation below shows a stream with three consumers reading at different speeds. Consumer A is fast (real-time analytics). Consumer B is medium (search indexing). Consumer C is slow (batch export to a data warehouse). Each tracks its own position in the stream.
Events flow from the producer into the log. Each consumer reads at its own pace. Watch the offsets diverge.
Notice something profound: if we store every event, we can reconstruct any state at any point in time. The current balance of account 42 is just the sum of all deposits minus all withdrawals up to now. The balance at 3 PM yesterday is the same sum, but stopping at that timestamp. This is event sourcing, and we will explore it in depth in Chapter 5.
python # An event: immutable, timestamped, typed from dataclasses import dataclass from datetime import datetime @dataclass(frozen=True) # frozen = immutable class Event: timestamp: datetime event_type: str # "deposit", "withdrawal", "transfer" payload: dict # {"account": 42, "amount": 100.00} # A stream: append-only, ordered class EventStream: def __init__(self): self.log = [] # append-only list of events self.consumers = {} # consumer_id -> offset def append(self, event: Event) -> int: offset = len(self.log) self.log.append(event) return offset # sequential position def read(self, consumer_id: str, batch_size: int = 1): offset = self.consumers.get(consumer_id, 0) events = self.log[offset:offset + batch_size] self.consumers[consumer_id] = offset + len(events) return events def seek(self, consumer_id: str, offset: int): """Reset a consumer's position — enables replay.""" self.consumers[consumer_id] = offset
seek method is what makes log-based streams fundamentally different from traditional message queues. A consumer can rewind to any point and replay events. Found a bug in your search indexer? Fix the code, seek to offset 0, replay everything, and your index is now correct. Try doing that with a message queue that deletes messages after consumption.Events need to get from producers to consumers. In theory, you could have producers write directly to consumers (a webhook, a TCP socket, a UDP multicast). In practice, this creates tight coupling: the producer must know every consumer's address, handle retries if a consumer is down, and buffer events if a consumer is slow. This does not scale.
A message broker (also called a message queue or messaging middleware) sits between producers and consumers. Producers send events to the broker. Consumers pull events from the broker. The broker handles durability, buffering, delivery guarantees, and fan-out.
There are two fundamentally different models for message brokers, and understanding the difference is critical for system design interviews.
Systems like RabbitMQ, ActiveMQ, and Amazon SQS follow this model. The broker maintains a queue of messages. When a consumer acknowledges a message, the broker deletes it. Key properties:
| Property | Behavior |
|---|---|
| Delivery | Each message delivered to exactly one consumer (load balancing). If you have 3 consumers, each message goes to one of them. |
| Acknowledgment | Consumer sends ACK after processing. Broker deletes message on ACK. If consumer crashes before ACK, message is redelivered to another consumer. |
| Ordering | Best-effort. With multiple consumers, messages may be processed out of order (consumer A gets message 1, consumer B gets message 2, B finishes first). |
| Replay | Impossible. Messages are deleted after acknowledgment. If you need to reprocess, you must re-produce the messages. |
| Fan-out | Requires explicit configuration (exchanges/bindings in RabbitMQ). Each additional consumer group adds complexity. |
Systems like Apache Kafka, Amazon Kinesis, and Apache Pulsar follow this model. The broker is an append-only log. Messages are never deleted (until a configurable retention period expires). Key properties:
| Property | Behavior |
|---|---|
| Delivery | Each partition delivered to one consumer per group, but multiple groups can read the same topic independently. |
| Acknowledgment | Consumer commits its offset. Messages stay in the log regardless. If consumer crashes, it restarts from last committed offset. |
| Ordering | Guaranteed within a partition. If all events for a given key hash to the same partition, they are processed in order. |
| Replay | Trivial. Reset the consumer offset to any position. Replay all events from the beginning if needed. |
| Fan-out | Free. Any number of consumer groups can independently consume the same topic with zero additional broker configuration. |
The simulation below shows both models handling the same events. On the left, a traditional message queue delivers each message to one of two consumers and deletes it. On the right, a log-based broker appends each message and lets consumers track their own offset. Watch what happens when a consumer crashes and recovers.
Watch messages get delivered and deleted (left) vs. persisted and consumed by offset (right). Crash a consumer to see the recovery difference.
Notice what happens after recovery. In the traditional queue, the crashed consumer lost the unacknowledged message — it may be redelivered (at-least-once) or lost (at-most-once), depending on configuration. In the log-based broker, the consumer simply resumes from its last committed offset. No message is ever lost because the log persists regardless of consumer state.
text Use a traditional message queue (RabbitMQ, SQS) when: - Task distribution: "process this image", "send this email" - You need per-message routing (complex topic/exchange patterns) - Message order does not matter - You do not need replay - Consumer count >> partition count (thousands of workers) Use a log-based broker (Kafka, Kinesis) when: - Event sourcing: "this thing happened" - Multiple independent consumers need the same data - You need replay (reprocess after a bug fix) - Ordering within a key matters (all events for user X in order) - You are building CDC, analytics, or materialized views
Kafka is the log-based broker that won. Originally built at LinkedIn in 2011 to handle their activity stream (page views, clicks, searches — 1.4 trillion messages per day at peak), it is now the backbone of event-driven architectures at Netflix, Uber, Airbnb, and thousands of other companies. Let us understand exactly how it works.
A topic is a named stream of events. "orders", "user-clicks", "sensor-readings" — each is a topic. But a single topic on a single machine would be a bottleneck. So Kafka splits each topic into partitions.
A partition is an ordered, append-only log stored on a single broker. Each event in a partition gets a monotonically increasing offset (0, 1, 2, 3...). Ordering is guaranteed within a partition but NOT across partitions.
When a producer sends an event, it must go to exactly one partition. Two strategies:
| Strategy | How it works | When to use |
|---|---|---|
| Key-based | hash(key) % num_partitions. All events with the same key go to the same partition. | When ordering per entity matters. E.g., all events for user_42 must be in order. |
| Round-robin | Distribute events evenly across partitions, ignoring keys. | When ordering does not matter and you want maximum throughput. |
max(expected_throughput_MB / 10, expected_consumer_count) partitions.A consumer group is a set of consumers that cooperate to consume a topic. Kafka assigns each partition to exactly one consumer in the group. If you have 6 partitions and 3 consumers in a group, each consumer gets 2 partitions. If a consumer crashes, its partitions are reassigned to the remaining consumers — this is called a rebalance.
Multiple consumer groups can independently consume the same topic. Consumer group "fraud-detection" reads the "transactions" topic. Consumer group "analytics" reads the same topic. They do not interfere with each other. Each group tracks its own offsets.
The simulation below shows a Kafka cluster with 3 brokers and 1 topic split into 6 partitions. Send messages with keys and watch them hash to partitions. Add consumers to a group and watch partition assignment. Remove a consumer and watch rebalancing.
Send keyed messages, add/remove consumers, and watch partition assignment and rebalancing in real time.
Each consumer tracks its offset — the position of the last event it has processed in each partition. Periodically (or after each event), the consumer commits its offset back to Kafka (stored in an internal topic called __consumer_offsets).
If a consumer crashes and restarts, it resumes from the last committed offset. This creates a delivery guarantee trade-off:
python from kafka import KafkaProducer, KafkaConsumer import json # --- Producer --- producer = KafkaProducer( bootstrap_servers=['broker1:9092', 'broker2:9092'], key_serializer=lambda k: k.encode('utf-8'), value_serializer=lambda v: json.dumps(v).encode('utf-8'), acks='all', # wait for all replicas to acknowledge retries=3, # retry on transient failures linger_ms=5, # batch messages for 5ms for throughput ) # Key determines partition: all user_42 events go to the same partition producer.send( topic='transactions', key='user_42', value={'amount': 99.99, 'merchant': 'coffee_shop'} ) producer.flush() # block until all messages are sent # --- Consumer --- consumer = KafkaConsumer( 'transactions', bootstrap_servers=['broker1:9092'], group_id='fraud-detection', # consumer group name auto_offset_reset='earliest', # start from beginning if no offset enable_auto_commit=False, # manual commit for at-least-once value_deserializer=lambda m: json.loads(m.decode('utf-8')), ) for msg in consumer: # msg.topic, msg.partition, msg.offset, msg.key, msg.value result = run_fraud_model(msg.value) if result.is_fraud: alert(msg.key, msg.value) consumer.commit() # commit after successful processing
acks=1, the producer considers a message "sent" once the partition leader acknowledges it. If the leader crashes before replicating, the message is lost. With acks=all, the producer waits until all in-sync replicas (ISRs) acknowledge. This is slower but durable. For financial transactions, always use acks=all. For click tracking, acks=1 is often acceptable.Your application writes to PostgreSQL. Your search team wants every product update in Elasticsearch for full-text search. Your analytics team wants every order in the data warehouse. Your caching layer wants every user profile change in Redis. How do you keep all these systems in sync?
The naive approach: after writing to PostgreSQL, also write to Elasticsearch, the warehouse, and Redis. This is called dual writes, and it is a trap. If the PostgreSQL write succeeds but the Elasticsearch write fails, your systems are inconsistent. Even if you retry, the order of writes across systems is not guaranteed — one system might see update A then B, while another sees B then A. Dual writes are not atomic and do not preserve ordering.
Change Data Capture (CDC) is the solution. Instead of having the application write to multiple systems, you make the database the single source of truth and capture every change from its internal log. PostgreSQL has the Write-Ahead Log (WAL). MySQL has the binlog. These logs already record every INSERT, UPDATE, and DELETE — the database uses them for crash recovery and replication. CDC reads these logs and publishes the changes as events to a stream (usually Kafka).
Kafka topics have a retention period (default 7 days). After 7 days, old events are deleted. But what if a new consumer joins and needs the current state, not just the last 7 days of changes?
Log compaction solves this. Instead of deleting events by age, Kafka keeps only the latest event for each key. If user_42 has been updated 500 times, the compacted log retains only the most recent update. A new consumer can read the compacted log to build a complete snapshot of current state, then switch to the uncompacted tail for real-time updates.
To delete a record, the producer sends a tombstone — a message with the key but a null value. During compaction, the tombstone removes the key from the log.
Application writes hit PostgreSQL. Debezium captures WAL changes. Kafka distributes to Elasticsearch, Redis, and the warehouse. Watch propagation lag.
json { "name": "orders-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres-primary", "database.port": "5432", "database.user": "debezium", "database.dbname": "ecommerce", "database.server.name": "prod-db", "table.include.list": "public.orders,public.products", "plugin.name": "pgoutput", "slot.name": "debezium_slot", "publication.name": "dbz_publication", "topic.prefix": "prod-db", "snapshot.mode": "initial" } }
CDC captures changes that the application made to a database. The application writes "current state" (UPDATE accounts SET balance = 150 WHERE id = 42), and CDC derives the event stream from those mutations. Event sourcing flips this: the application writes events directly ("user 42 deposited $50"), and current state is derived from the event log.
Think of it like accounting. A traditional database is like a balance sheet: it tells you "Account 42 has $150 right now." Event sourcing is like a general ledger: it records every transaction. "Account 42 opened with $0. Deposited $100. Withdrew $20. Deposited $70." The balance sheet is derived from the ledger, not the other way around.
| Benefit | Explanation |
|---|---|
| Complete audit trail | Every change is recorded forever. Who changed what, when, and what was the previous value. Required in finance, healthcare, and legal systems. |
| Temporal queries | "What was the account balance at 3 PM yesterday?" Just replay events up to that timestamp. With a state-based database, you would need point-in-time backups. |
| Debugging | A bug corrupted some user accounts. With event sourcing, replay the events through the fixed code to reconstruct correct state. With a state database, the corrupted state is all you have. |
| Multiple read models | Derive different views from the same events. One view optimized for querying balances. Another for fraud detection. Another for monthly statements. Each is a different projection of the same event log. |
| Decoupling | New features can subscribe to existing events without modifying the write path. "We want to send SMS notifications on large withdrawals" — just add a consumer, no code changes to the withdrawal service. |
Event sourcing naturally leads to CQRS: separate the write side (accepting commands, producing events) from the read side (querying derived views).
The simulation below is an event-sourced bank account. Append events (deposits, withdrawals) to the log on the left, and watch the derived balance update on the right. Hit "Replay from Scratch" to rebuild state from the event log.
Append banking events and watch the derived balance recompute. Hit Replay to rebuild state from scratch.
python from dataclasses import dataclass, field from typing import List import time @dataclass(frozen=True) class AccountEvent: event_type: str # "created", "deposited", "withdrawn" amount: float timestamp: float = field(default_factory=time.time) class EventSourcedAccount: def __init__(self, account_id: str): self.account_id = account_id self.events: List[AccountEvent] = [] self._balance = 0.0 # derived state (cache) def deposit(self, amount: float): assert amount > 0 event = AccountEvent("deposited", amount) self.events.append(event) self._balance += amount # update cache def withdraw(self, amount: float): assert 0 < amount <= self._balance event = AccountEvent("withdrawn", amount) self.events.append(event) self._balance -= amount def replay(self) -> float: """Rebuild balance from events. The source of truth.""" balance = 0.0 for e in self.events: if e.event_type == "deposited": balance += e.amount elif e.event_type == "withdrawn": balance -= e.amount self._balance = balance return balance def balance_at(self, timestamp: float) -> float: """Temporal query: what was the balance at a point in time?""" balance = 0.0 for e in self.events: if e.timestamp > timestamp: break if e.event_type == "deposited": balance += e.amount elif e.event_type == "withdrawn": balance -= e.amount return balance
In batch processing, aggregation is straightforward: "count all the clicks in yesterday's log." The dataset is bounded — you know when it starts and when it ends. In stream processing, the dataset is unbounded. Events keep arriving forever. You cannot "count all the clicks" because there is no "all." You need a way to slice the infinite stream into finite chunks for aggregation. This is called windowing.
A window is a time interval over which you compute an aggregate (count, sum, average, max). There are four types, and they differ in how they slice time.
| Window Type | Description | Example |
|---|---|---|
| Tumbling | Fixed-size, non-overlapping. Each event belongs to exactly one window. | Count clicks per 5-minute window: [0:00-5:00), [5:00-10:00), [10:00-15:00)... |
| Hopping | Fixed-size, overlapping. Window size > hop size. Each event may belong to multiple windows. | 5-minute window, 1-minute hop: [0:00-5:00), [1:00-6:00), [2:00-7:00)... A click at 3:30 is in 4 windows. |
| Sliding | Triggered by events. Contains all events within a time duration of each other. | "Alert if more than 3 failed logins within 10 minutes of each other." No fixed grid — the window slides with each event. |
| Session | Grouped by activity gaps. A session ends when no events arrive for a configurable gap duration. | A user's website session: starts with first click, ends after 30 minutes of inactivity. Sessions can be arbitrarily long. |
A watermark is the system's estimate of "all events with timestamps up to time T have probably arrived." When the watermark advances past the end of a window, the system fires the window's computation and emits results.
Watermarks are a heuristic. They can be:
| Type | Behavior | Trade-off |
|---|---|---|
| Perfect | Wait until you are certain all events have arrived (e.g., all sources have advanced past T). | Correct but slow. Delays results until the slowest source catches up. |
| Heuristic | Estimate based on observed event times, allowing some tolerance for late events. | Fast but some late events may arrive after the watermark. These require special handling. |
Late events arrive after the watermark has passed their window. Strategies: (1) Drop them. (2) Refire the window computation with updated results. (3) Route them to a side output for manual handling. Most production systems use a combination: allow late events up to an allowed lateness threshold, and drop anything beyond that.
This is the showcase simulation. Events arrive on a timeline. Choose a window type and size. Watch events get grouped into windows and aggregations computed per window. Late events (after the watermark) are highlighted.
Events arrive over time. Select a window type, adjust the size, and watch aggregation in real time. Late events appear in red.
python # Apache Flink Python API (PyFlink) — tumbling window aggregation from pyflink.table import EnvironmentSettings, TableEnvironment from pyflink.table.window import Tumble from pyflink.table.expressions import col, lit env_settings = EnvironmentSettings.in_streaming_mode() t_env = TableEnvironment.create(env_settings) # Read from Kafka topic t_env.execute_sql(""" CREATE TABLE clicks ( user_id STRING, url STRING, click_time TIMESTAMP(3), WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user-clicks', 'properties.bootstrap.servers' = 'broker:9092', 'format' = 'json' ) """) # Tumbling window: count clicks per user per 5-minute window result = t_env.from_path('clicks') \ .window(Tumble.over(lit(5).minutes).on(col('click_time')).alias('w')) \ .group_by(col('user_id'), col('w')) \ .select( col('user_id'), col('w').start.alias('window_start'), col('w').end.alias('window_end'), col('url').count.alias('click_count') )
In batch processing, joining two datasets is straightforward: both are bounded, both are fully available, and you can sort-merge or hash-join them. In stream processing, data arrives continuously and you cannot "wait for all the data" because it never ends. Stream joins are fundamentally harder, and there are three types — each with different semantics and implementation costs.
You have two event streams and you want to match events from one with events from the other, within a time window. The canonical example: an ad platform wants to match ad impressions (user saw an ad) with ad clicks (user clicked the ad), where the click must happen within 1 hour of the impression.
Implementation: maintain a buffer for each stream. When an impression arrives, store it in the impression buffer. When a click arrives, search the impression buffer for a matching impression within the time window. Emit a joined result. Expire entries from the buffer when they fall outside the window.
You have an event stream and you want to enrich each event with data from a table. Example: a "user activity" stream where each event has a user_id, and you want to add the user's name, country, and subscription tier from the user profile table.
Two approaches:
| Approach | How it works | Pros | Cons |
|---|---|---|---|
| Remote lookup | For each event, query the database for the user profile. | Always fresh data. | Network latency per event. Database becomes a bottleneck at high throughput. |
| Local table copy | Use CDC to maintain a local copy of the user table in the stream processor. Join against the local copy. | No network latency. No database load. | Data may be slightly stale (CDC lag). Uses local memory/disk. |
In practice, the local table copy (via CDC) is almost always the right choice for high-throughput systems. The table is typically much smaller than the stream and fits in memory.
Both inputs are CDC streams (representing tables), and the output is a continuously updated join. Example: a "orders" table and a "products" table. You want a materialized view that joins them: for each order, include the product name and price.
When an order is inserted, look up the product. When a product's price changes, update all orders that reference it. This is a live, continuously maintained join — the same thing a database does internally for a materialized view, but implemented at the application level using streams.
Watch events flow through three different join patterns. Stream-stream buffers events within a time window. Stream-table enriches events from a local table. Table-table maintains a live materialized view.
python # Stream-stream join: match impressions with clicks within 1 hour from collections import defaultdict import time class StreamStreamJoin: def __init__(self, window_seconds: int): self.window = window_seconds self.left_buffer = defaultdict(list) # key -> [(timestamp, event)] self.right_buffer = defaultdict(list) def add_left(self, key, ts, event): self._evict(self.left_buffer, key, ts) self.left_buffer[key].append((ts, event)) # Check right buffer for matches matches = [] for rts, revt in self.right_buffer.get(key, []): if abs(ts - rts) <= self.window: matches.append((event, revt)) return matches def add_right(self, key, ts, event): self._evict(self.right_buffer, key, ts) self.right_buffer[key].append((ts, event)) matches = [] for lts, levt in self.left_buffer.get(key, []): if abs(ts - lts) <= self.window: matches.append((levt, event)) return matches def _evict(self, buffer, key, current_ts): """Remove entries outside the join window.""" if key in buffer: buffer[key] = [(t, e) for t, e in buffer[key] if current_ts - t <= self.window] # Usage: match ad impressions with clicks within 1 hour join = StreamStreamJoin(window_seconds=3600) # Impression arrives matches = join.add_left("ad_123", time.time(), {"type": "impression"}) # Later, click arrives matches = join.add_right("ad_123", time.time(), {"type": "click"}) # matches = [(impression_event, click_event)]
A batch job that fails halfway through is easy to recover: delete the partial output and re-run the whole job. The input is immutable, the output is deterministic, and the job is bounded. Stream processing is harder. The job runs forever. It accumulates state (window aggregates, join buffers, counters). If it crashes, you cannot just "re-run from the beginning" — that would take hours or days. And you must guarantee that each event is processed correctly despite failures. Not once. Not zero times. Exactly once.
| Guarantee | Meaning | How achieved | Risk |
|---|---|---|---|
| At-most-once | Events may be lost, but never duplicated. | Commit offset before processing. If crash, skipped events are gone. | Data loss. Acceptable for low-value metrics (page view counts). |
| At-least-once | Events are never lost, but may be duplicated. | Commit offset after processing. If crash, replay from last offset. Some events are reprocessed. | Duplicates. Fine if processing is idempotent (SET key=value). Dangerous if not (INCREMENT counter). |
| Exactly-once | Each event is processed exactly once, even across failures. | Either idempotent operations, or transactional writes, or Chandy-Lamport checkpointing. | Performance cost. Harder to implement. But required for financial systems. |
Design your output operations so that re-processing an event produces the same result as processing it once.
To make non-idempotent operations safe, include a deduplication key (event ID or offset) in the output. Before writing, check if that key already exists. If it does, skip the write.
Kafka provides transactional producers that atomically write to multiple partitions and commit consumer offsets in a single transaction. Either all writes succeed and the offset is committed, or none of them do.
python # Kafka transactional producer — exactly-once semantics from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers=['broker:9092'], transactional_id='fraud-processor-1', # unique per instance enable_idempotence=True, # required for transactions ) producer.init_transactions() try: producer.begin_transaction() # Process input events and produce output for msg in batch: result = process(msg.value) producer.send('fraud-alerts', key=msg.key, value=result) # Commit input offsets as part of the transaction producer.send_offsets_to_transaction( offsets={tp: offset for tp, offset in current_offsets.items()}, group_id='fraud-detection' ) producer.commit_transaction() except Exception: producer.abort_transaction() # rollback everything
Apache Flink uses a distributed snapshotting algorithm based on the Chandy-Lamport protocol. The idea:
The simulation below shows a Flink pipeline with 3 operators processing events. A checkpoint barrier flows through the pipeline, each operator saves its state. Then a failure occurs — watch the system restore from the checkpoint and replay.
Watch checkpoint barriers flow through operators. Each operator snapshots state. On failure, the system rolls back and replays from the checkpoint.
Design a real-time fraud detection system that processes credit card transactions from Kafka, applies a fraud model, and writes alerts to a "fraud-alerts" topic with exactly-once semantics. Consider: what happens if the fraud model has side effects (e.g., incrementing a "suspicious activity count" per user)?
text Design sketch: 1. Input: Kafka topic "transactions" (partitioned by card_id) 2. Flink job with: - Source: KafkaSource with exactly-once checkpoint mode - Operator 1: Enrich with user profile (stream-table join via CDC) - Operator 2: Windowed aggregation (count transactions per card in 5-min window) - Operator 3: Fraud model (flag if count > threshold or model score > 0.9) - Sink: KafkaTransactionalSink to "fraud-alerts" topic 3. Checkpointing: every 5 seconds to S3 4. The "suspicious activity count" per user MUST be part of Flink's managed state (not an external database), so it is included in checkpoints and restored correctly on failure. 5. If count is in an external DB, use idempotent writes: SET suspicious_count[user] = computed_value (not INCREMENT)
This chapter compresses every stream processing concept into interview-ready form. Organized by the five interview dimensions: concept, design, code, debug, and frontier.
| Concept | One-liner | Key detail |
|---|---|---|
| Event stream | Unbounded, append-only, ordered sequence of immutable events | Stream-table duality: stream = changelog of a table |
| Traditional broker | Message delivered to one consumer, deleted after ACK | RabbitMQ, SQS. Good for task distribution. No replay. |
| Log-based broker | Append-only log, consumers track offsets, messages persist | Kafka, Kinesis. Replay by resetting offset. Fan-out is free. |
| Partition | Unit of ordering and parallelism in Kafka | Max consumers per group = num partitions. Key hash determines partition. |
| Consumer group | Set of consumers cooperating on a topic. Each partition -> one consumer. | Multiple groups independently consume the same topic. |
| CDC | Capture database mutations from the WAL/binlog as events | Debezium reads WAL, publishes to Kafka. Single source of truth. |
| Log compaction | Keep only latest value per key in the log | Enables new consumers to build full snapshot from compacted log. |
| Event sourcing | Store events, derive state. Opposite of CDC. | Enables temporal queries, audit trails, multiple read models. |
| CQRS | Separate write model (events) from read model (materialized views) | Read models are rebuildable by replaying the event log. |
| Tumbling window | Fixed-size, non-overlapping time buckets | Each event in exactly one window. Simplest and cheapest. |
| Hopping window | Fixed-size, overlapping (window > hop) | Smooths spiky data. Events counted in multiple windows. |
| Session window | Grouped by activity gap (no events for N seconds = end) | Variable length. Good for user sessions, browsing activity. |
| Watermark | System's estimate: "all events up to time T have arrived" | Triggers window computation. Late events arrive after watermark. |
| Event time vs. processing time | When it happened vs. when the system saw it | Always window by event time. Processing time gives wrong results. |
| Stream-stream join | Join two streams within a time window | Requires buffering both sides. Memory = f(window size, throughput). |
| Stream-table join | Enrich events with table lookups | Use CDC to keep local table copy. Avoids DB bottleneck. |
| Exactly-once | Observable output same as if each event processed once | Achieved via idempotence, Kafka transactions, or Flink checkpointing. |
| Chandy-Lamport | Distributed snapshot via barrier injection | Flink's approach. Barriers flow through pipeline. Each operator snapshots state. |
| Symptom | Likely cause | Fix |
|---|---|---|
| Consumer lag growing unboundedly | Consumer is slower than producer. Could be: slow processing logic, GC pauses, insufficient parallelism (too few partitions), or partition rebalance storms (consumers joining/leaving too frequently). | Profile the consumer. Increase partitions and consumers. Tune max.poll.records and max.poll.interval.ms. Use static group membership to avoid rebalance storms. |
| Duplicate events in output | At-least-once delivery without idempotent sinks. Consumer committed offset after crash, replayed events, and wrote them again. | Make sinks idempotent (UPSERT with dedup key) or use Kafka transactions / Flink exactly-once. |
| Out-of-order results | Events from different partitions interleaved. Or windowing uses processing time instead of event time. | Ensure events that need ordering share the same key (same partition). Switch to event-time windowing with watermarks. |
| High checkpoint duration | Large operator state (big join buffers, wide windows). State backend (RocksDB) creating large incremental snapshots. | Reduce window sizes. Enable incremental checkpointing. Use SSD for RocksDB. Increase checkpoint timeout. |
| Missing events after schema change | Producer changed the event schema (added/removed fields). Consumer's deserializer fails and silently drops events. | Use a schema registry (Confluent, Apicurio). Enforce backward/forward compatibility. Use Avro or Protobuf with schema evolution rules. |
python from collections import defaultdict import time class TumblingWindowAggregator: """Count events per key per tumbling window.""" def __init__(self, window_size_seconds: int): self.window_size = window_size_seconds # (window_start, key) -> count self.windows = defaultdict(int) self.emitted = set() # track which windows have been emitted def _window_start(self, event_time: float) -> float: """Assign event to its tumbling window.""" return event_time - (event_time % self.window_size) def process(self, key: str, event_time: float): ws = self._window_start(event_time) self.windows[(ws, key)] += 1 def fire_windows(self, watermark: float): """Emit results for all windows whose end <= watermark.""" results = [] for (ws, key), count in list(self.windows.items()): window_end = ws + self.window_size if window_end <= watermark and (ws, key) not in self.emitted: results.append({ 'window_start': ws, 'window_end': window_end, 'key': key, 'count': count, }) self.emitted.add((ws, key)) return results # Example usage agg = TumblingWindowAggregator(window_size_seconds=60) agg.process("url_a", 1000.0) # window [960, 1020) agg.process("url_a", 1010.0) # same window agg.process("url_b", 1005.0) # same window, different key agg.process("url_a", 1025.0) # next window [1020, 1080) # Watermark advances past 1020 -> fire the first window results = agg.fire_windows(watermark=1025.0) # [{'window_start': 960, 'window_end': 1020, 'key': 'url_a', 'count': 2}, # {'window_start': 960, 'window_end': 1020, 'key': 'url_b', 'count': 1}]
python import hashlib, json class IdempotentProcessor: """Process events exactly once using a deduplication log.""" def __init__(self): self.seen = set() # set of processed event IDs self.state = {} # application state (e.g., user balances) def _event_id(self, event: dict) -> str: """Deterministic ID from event content.""" raw = json.dumps(event, sort_keys=True).encode() return hashlib.sha256(raw).hexdigest() def process(self, event: dict) -> bool: eid = self._event_id(event) if eid in self.seen: return False # already processed, skip # Process the event user = event['user_id'] amount = event['amount'] self.state[user] = self.state.get(user, 0) + amount self.seen.add(eid) return True # successfully processed # Same event processed twice -> only applied once proc = IdempotentProcessor() event = {'user_id': '42', 'amount': 100, 'ts': 1234567890} proc.process(event) # True — applied, balance = 100 proc.process(event) # False — skipped, balance still 100
Stream processing does not exist in isolation. It connects to nearly every topic in distributed systems. Here is how this chapter relates to the rest of DDIA and beyond.
| Related Topic | Connection |
|---|---|
| Batch Processing (DDIA Ch 11) | Stream processing is "unbounded batch." Lambda Architecture runs both. Modern systems (Flink) unify them. Batch for backfills and retraining, stream for real-time. |
| Storage & Retrieval (DDIA Ch 4) | Kafka is an LSM-tree-like append-only log with compaction. Flink's RocksDB state backend IS an LSM-tree. Stream processing state management is a storage engine problem. |
| Replication (DDIA Ch 6) | CDC is database replication exposed as a public API. Kafka itself uses leader-follower replication for fault tolerance. Consumer groups are a form of partitioned processing. |
| Transactions (DDIA Ch 8) | Exactly-once semantics in stream processing is a transaction problem. Kafka transactions are lightweight distributed transactions. Flink checkpointing is a form of distributed snapshot. |
| Consistency (DDIA Ch 10) | Stream processing systems provide eventual consistency by default. Exactly-once makes the output consistent. Watermarks are a form of causal ordering. |
| Partitioning (DDIA Ch 7) | Kafka partitions are hash-partitioned. Key-based routing ensures related events are co-located. Repartitioning (changing key) in Flink is a shuffle operation. |
| Framework | Processing model | State backend | Exactly-once | Best for |
|---|---|---|---|---|
| Apache Kafka Streams | Library (no cluster) | RocksDB, in-memory | Via Kafka transactions | Kafka-native apps, microservices |
| Apache Flink | Distributed runtime | RocksDB, heap | Via Chandy-Lamport checkpointing | Complex event processing, large state |
| Apache Spark Structured Streaming | Micro-batch | HDFS/S3 | Via write-ahead log + idempotent sinks | Teams already using Spark for batch |
| Amazon Kinesis Data Analytics | Managed Flink | Managed | Flink-level | AWS-native, operational simplicity |
| Google Dataflow (Beam) | Unified batch/stream | Managed | Via shuffle + dedup | GCP-native, portable pipelines |
| Paper/Resource | Why it matters |
|---|---|
| Kafka: a Distributed Messaging System for Log Processing (Kreps et al., 2011) | The original Kafka paper. Explains the log abstraction and why it works. |
| MillWheel: Fault-Tolerant Stream Processing at Internet Scale (Akidau et al., 2013) | Google's internal stream processor. Introduced per-key exactly-once and low watermarks. |
| The Dataflow Model (Akidau et al., 2015) | Unified model for batch and stream. Event time, watermarks, triggers, accumulation. Apache Beam's theoretical foundation. |
| Apache Flink: Stream and Batch Processing in a Single Engine (Carbone et al., 2015) | Flink's architecture. Chandy-Lamport checkpointing for exactly-once. |
| Questioning the Lambda Architecture (Kreps, 2014) | Jay Kreps argues that a single stream processing system can replace the dual batch+stream Lambda Architecture. The "Kappa Architecture." |
| Turning the database inside-out (Kleppmann, 2015) | Martin Kleppmann's talk connecting databases, caches, indexes, and streams as different views of the same data. The intellectual foundation for CDC and event sourcing. |