Linearizability, Raft, Paxos — how distributed nodes agree on the truth.
You are running a distributed database with three nodes. Node A is the leader, handling all writes. Nodes B and C are followers, replicating data from A. Life is good. Then Node A's network interface goes down. It is still running, still accepting writes from local clients, but it cannot talk to B or C. And B and C cannot talk to A.
Here is the crisis. Nodes B and C need a leader to accept writes. They cannot reach A, so they assume A is dead. They elect Node B as the new leader. Now you have two nodes that both think they are the leader. A is still accepting writes. B is also accepting writes. Different clients are sending different writes to different leaders. The data on A and the data on B are diverging with every passing second.
This is split-brain: a situation where two parts of a distributed system operate independently, each believing it is the sole authority. When the network heals and A and B try to reconcile, you have conflicting data and no way to know which version is "correct." Money has been double-spent. Inventory has been double-sold. The damage is already done.
The fundamental question of this chapter: in a network where messages can be delayed, dropped, or reordered, how do you get a group of nodes to agree on something? "Something" could be: who is the leader, what value a register holds, whether a transaction should commit, or what order events occurred in. All of these reduce to the same problem — consensus.
Consensus sounds simple. Just vote! If a majority agrees, the decision is final. But what happens when votes are delayed? What if a node votes, then crashes before learning the outcome? What if two candidates start elections simultaneously and split the vote? What if the network partitions right in the middle of voting?
For decades, researchers struggled to even prove that consensus was possible in an asynchronous system. In 1985, Fischer, Lynch, and Paterson proved a devastating result: it is impossible to guarantee consensus in an asynchronous system where even one process can crash. This is the FLP impossibility result. Every real consensus algorithm circumvents it by assuming partial synchrony — the network is usually well-behaved, and you can use timeouts to detect failures. It is a pragmatic compromise between theoretical impossibility and practical necessity.
At first glance, consensus seems trivial. Just pick a leader, right? But consider the failure modes:
| Failure | What Happens | Why It Breaks Naive Solutions |
|---|---|---|
| Message delay | A vote arrives after the election ends | Cannot distinguish "slow" from "lost." How long do you wait? |
| Message loss | A vote is dropped by the network | Cannot distinguish "lost" from "very delayed." Retries create duplicates. |
| Message duplication | A vote is received twice | Could double-count votes, elect a leader with fewer real supporters than thought. |
| Node crash | A node crashes after voting but before learning the outcome | When it recovers, its state is inconsistent with the cluster. |
| Network partition | Two groups of nodes can talk within groups but not across | Each group might independently elect a leader → split-brain. |
| Byzantine failure | A node sends intentionally wrong messages | Standard consensus (Raft, Paxos) doesn't handle this. Need BFT algorithms. |
Every real consensus algorithm must handle the first five failures correctly. The sixth (Byzantine) is out of scope for this chapter — it's the domain of blockchain and military systems. The algorithms we will study (Raft, Paxos, ZAB) assume crash-fault only: nodes either work correctly or stop completely.
The most common use of consensus is the replicated state machine (RSM). The idea: if every node starts in the same state and applies the same sequence of commands in the same order, they will all end up in the same state. Consensus ensures the "same sequence" part.
This is how etcd, ZooKeeper, and CockroachDB work internally. The consensus algorithm (Raft or ZAB or Paxos) is the mechanism for step 2. The state machine in step 3 is the actual key-value store, lock service, or database engine. The separation of concerns is clean: the consensus layer knows nothing about keys or values — it just orders bytes. The state machine knows nothing about nodes or elections — it just applies commands.
| Chapter | Topic | The Core Question |
|---|---|---|
| 1 | Linearizability | What does "consistent" even mean, formally? |
| 2 | Ordering Guarantees | How do you track which event happened before which? |
| 3 | Two-Phase Commit | Can we just coordinate with a single node? (No.) |
| 4-5 | Raft Consensus | The full algorithm: leader election + log replication |
| 6 | Paxos | The original (and harder) consensus algorithm |
| 7 | ZAB & Others | What do ZooKeeper, etcd, and CockroachDB actually use? |
| 8 | In Practice | Coordination services and real system design |
| 9 | Interview Arsenal | The cheat sheet for system design interviews |
A 3-node cluster. Click "Kill Leader" to take down the leader. Watch the remaining nodes try to elect a new leader — then click "Partition" to separate them. Split-brain emerges.
| Cluster Size | Majority | Tolerated Failures | When to Use |
|---|---|---|---|
| 1 | 1 | 0 | Development. No fault tolerance at all. |
| 2 | 2 | 0 | Never. Worse than 1 node (both must be up). |
| 3 | 2 | 1 | Small production systems. Minimum for fault tolerance. |
| 4 | 3 | 1 | Rarely used. Same fault tolerance as 3, but more overhead. |
| 5 | 3 | 2 | Standard production. Survives 1 planned + 1 unplanned failure. |
| 6 | 4 | 2 | Rarely used. Same fault tolerance as 5. |
| 7 | 4 | 3 | Critical infrastructure. Cross-region with 3 data centers. |
Before we can talk about how nodes agree, we need to define what "agreement" looks like from the outside. When a client reads a value from a distributed system, what should they see? The answer depends on the consistency model the system provides — a contract between the system and its clients about what behaviors are allowed.
The strongest consistency model is linearizability (also called atomic consistency or strong consistency). Here is the promise: the system behaves as if there is exactly one copy of the data, and every operation takes effect atomically at some single point in time between when the client sent the request and when the client received the response.
Let us make that precise. Every read or write operation has two timestamps: the invocation (when the client sends the request) and the response (when the client receives the reply). In a real system, there is some nonzero duration between these — the operation takes time. Linearizability says that you must be able to assign a single point in time within that duration where the operation "takes effect," and if you line up all operations by their effect points, the result must look like a sequential execution.
The classic technique for checking linearizability: draw operations as horizontal bars on a timeline (one row per client), then try to place a "linearization point" (a dot) within each bar such that the dots form a valid sequential order.
Each horizontal bar is an operation. Green = write, blue = read. The value of register X is shown on each operation. Click "Generate" to see a new history, then determine: is it linearizable?
These two terms are constantly confused — even in published papers. They are entirely different guarantees.
| Linearizability | Serializability | |
|---|---|---|
| Scope | Single object (a register, a key) | Multiple objects (a transaction touching many rows) |
| Guarantee | Real-time ordering: if op A finishes before op B starts, A's effect is visible to B | Equivalent to some serial order — but that order need not match real-time |
| About | Recency — you never see stale data | Isolation — transactions don't interfere with each other |
| Used by | Consensus protocols, linearizable registers | Database transaction isolation levels |
The combination of both — strict serializability — gives you transactions that are both serializable AND linearizable. This is what Spanner and CockroachDB provide, and it is the gold standard. But it comes at a cost: you need globally synchronized clocks (Spanner uses GPS and atomic clocks) or consensus on every transaction.
| Approach | Linearizable? | Why |
|---|---|---|
| Single-leader + sync replication | Potentially | Only if reads go through the leader or use sync followers. Async replicas can return stale data. |
| Consensus algorithms (Raft, Paxos) | Yes | The whole point of consensus is to agree on a total order of operations. |
| Multi-leader replication | No | Concurrent writes to different leaders produce conflicts that violate real-time order. |
| Leaderless (Dynamo-style) | Usually no | Sloppy quorums, read-repair races, and clock skew break linearizability in practice. |
The CAP theorem (Brewer, 2000; proved by Gilbert & Lynch, 2002) states: during a network partition, a distributed system must choose between Consistency (linearizability) and Availability (every non-crashed node responds). You cannot have both.
More precisely: if two nodes cannot communicate (partition), and a client writes to one node, the other node has two options: (1) refuse to answer reads until the partition heals (consistent but unavailable), or (2) answer reads with potentially stale data (available but inconsistent). There is no third option.
CAP is often presented as a binary choice, but real systems exist on a spectrum. Here is where common databases land:
| System | CAP Choice | During Partition | Normal Operation |
|---|---|---|---|
| Spanner | CP | Minority partition becomes unavailable (no writes, no strong reads) | Linearizable reads, serializable transactions. Uses TrueTime (GPS + atomic clocks) for tight clock sync. |
| CockroachDB | CP | Same as Spanner: minority stalls | Serializable transactions via Raft consensus per range. No special hardware (uses hybrid logical clocks). |
| DynamoDB | AP (default) | All nodes remain available. May return stale data. | Eventually consistent by default. Can opt into strongly-consistent reads (reads from leader). |
| Cassandra | AP (tunable) | Available as long as you can reach enough replicas for your chosen consistency level. | Tunable consistency: ONE (fast, stale) to ALL (slow, consistent). QUORUM is the common middle ground. |
| MongoDB | CP (primary reads) | Primary election stalls writes. Secondary reads may be stale. | Strong reads from primary. Causal consistency with causal sessions. |
| Redis (single) | CA | Single-node: no partition to worry about (but no fault tolerance either). | Linearizable (single node = trivially consistent). Redis Cluster: AP (async replication). |
Not everything needs the strongest guarantee. Here is a decision framework:
| Use Case | Need Linearizability? | Why |
|---|---|---|
| Leader election | Yes | Two leaders = split-brain = data loss. Must agree on exactly one. |
| Distributed lock | Yes | Two processes holding same lock = mutual exclusion violated. |
| Unique constraint (username) | Yes | Two users claiming same username = conflict. |
| Bank account balance | Yes | Overdraft from stale read = financial loss. |
| Social media timeline | No | Seeing a post 2 seconds late is fine. |
| Analytics dashboard | No | Approximate counts are acceptable. |
| CDN cache | No | Stale content for seconds/minutes is the whole point of caching. |
| Shopping cart | Usually no | Eventually consistent with conflict resolution (last-write-wins or merge). |
Where do real systems sit in the CAP space? The triangle shows the tradeoff. Systems cluster near CP (strong consistency, unavailable during partitions) or AP (always available, eventually consistent).
Linearizability is powerful but expensive. Can we get useful guarantees with something weaker? Yes — by focusing on causality. If event A caused event B, then A must be ordered before B. But if A and B are independent (neither caused the other), we don't care which comes first. This is causal consistency, and it is the strongest consistency model that does not sacrifice availability during partitions.
But how do you track "what caused what" in a distributed system? You cannot use wall clocks — we learned in the previous chapters that clocks drift, skew, and cannot be trusted. You need a logical clock.
Leslie Lamport (1978) invented a beautifully simple mechanism. Each node maintains a counter. The rules:
The result: if event A causally precedes event B, then A's timestamp is strictly less than B's. The contrapositive: if timestamp(A) ≥ timestamp(B), then A did NOT causally precede B.
Three nodes exchange messages. Watch how Lamport timestamps update with each send/receive. Click "Send Message" to trigger a random message between nodes, or "Internal Event" to advance one node's clock.
Lamport timestamps are insufficient for detecting concurrent events. Vector clocks extend the idea: instead of a single counter, each node maintains a vector of counters — one for every node in the system.
Vector clocks are used in systems like Riak and Amazon's Dynamo (internally) to detect write conflicts. When two writes are concurrent, the system can either: (a) keep both versions (siblings) and let the application resolve, or (b) use a deterministic rule (last-write-wins by wall clock — but this loses data).
Total order broadcast (also called atomic broadcast) is a protocol that delivers messages to all nodes such that: (1) every non-crashed node delivers the same set of messages, and (2) every node delivers them in the same order. This is strictly stronger than Lamport timestamps because it gives you a total order that all nodes agree on, not just one that's consistent per node.
Here is the remarkable equivalence: total order broadcast is equivalent to consensus. If you can solve total order broadcast, you can solve consensus (decide on the first message delivered). If you can solve consensus, you can solve total order broadcast (use consensus to decide each next message). This equivalence is why we care so deeply about consensus algorithms — they are the universal primitive.
Formally, total order broadcast has two properties:
| Property | Definition | Why It Matters |
|---|---|---|
| Reliable delivery | If a non-crashed node delivers a message, all non-crashed nodes eventually deliver it. | No node can miss a message that others see. Prevents state divergence. |
| Total order | All nodes deliver messages in the exact same order. | If nodes apply messages as state transitions, they all end up in the same state. This is the replicated state machine approach. |
python class LamportClock: def __init__(self, node_id): self.time = 0 self.node_id = node_id def local_event(self): self.time += 1 return self.time def send(self): self.time += 1 return (self.time, self.node_id) # attach to message def receive(self, msg_time): self.time = max(self.time, msg_time) + 1 return self.time # Usage a = LamportClock("A") b = LamportClock("B") a.local_event() # a.time = 1 ts = a.send() # a.time = 2, msg carries (2, "A") b.receive(ts[0]) # b.time = max(0, 2) + 1 = 3 b.local_event() # b.time = 4
Before diving into real consensus algorithms, let's examine a simpler approach that almost works: two-phase commit (2PC). It guarantees atomicity across distributed nodes — all nodes commit or all nodes abort — but it is NOT consensus. Understanding why reveals exactly what consensus needs that 2PC lacks.
2PC uses a designated coordinator node that orchestrates a two-phase dance with the participants (the nodes that hold the data):
Notice the critical rule: once a participant votes "yes," it has made an irrevocable promise. It has written all data to its log and it CANNOT unilaterally abort — it must wait for the coordinator's decision, no matter how long that takes.
What happens if the coordinator crashes after sending "prepare" but before sending the decision? Every participant that voted "yes" is stuck. They have locks held on their data. They cannot commit (they don't know the decision). They cannot abort (they promised they could commit, and maybe the coordinator will come back and tell them to commit). They are blocked — holding locks, unable to proceed, potentially forever if the coordinator never recovers.
A coordinator and 3 participants. Click "Start 2PC" to run the protocol. Then try "Kill Coordinator" during Phase 1 or Phase 2 and watch the participants get stuck.
2PC is used in practice despite its limitations, because the alternatives are worse for many use cases:
| System | How It Uses 2PC | How It Mitigates the Blocking Problem |
|---|---|---|
| MySQL (XA Transactions) | Cross-shard transactions in Vitess, MySQL Cluster | Timeout-based abort (risky — can violate atomicity). Prefers single-shard transactions. |
| PostgreSQL | PREPARE TRANSACTION / COMMIT PREPARED | pg_prepared_xacts view for manual cleanup. DBA intervention for orphaned transactions. |
| Google Spanner | 2PC for cross-shard writes, BUT coordinator is a Paxos group | The coordinator itself is replicated via Paxos, so coordinator crash ≠ lost decision. This is the gold standard. |
| CockroachDB | Parallel commits (optimized 2PC variant) | Transaction record is a Raft-replicated key. Amortizes consensus cost. |
The fix for 2PC's blocking problem is to replicate the coordinator using a consensus algorithm. Instead of one coordinator, you have a group of nodes that agree on the decision using Raft or Paxos. If one node crashes, the others still know the decision. This is called three-phase commit (3PC) in theory, but in practice everyone just uses consensus directly.
The insight: 2PC's problem is that the coordinator is a single point of failure with irrecoverable state (the commit decision). Consensus eliminates single points of failure by replicating the decision across a majority of nodes. This is the bridge from "simple coordination" to "fault-tolerant agreement."
| 2PC | Consensus (Raft/Paxos) | |
|---|---|---|
| Coordinator | One fixed node | Elected leader, can change |
| Tolerates coordinator crash? | No — blocks | Yes — new leader elected |
| Guarantees | Atomicity | Atomicity + liveness (with partial synchrony) |
| Message rounds | 2 | 2+ (but pipelining helps) |
| Used for | Distributed transactions | Leader election, replicated state machines |
In 2014, Diego Ongaro and John Ousterhout published "In Search of an Understandable Consensus Algorithm." They were frustrated that Paxos — the dominant consensus algorithm — was so notoriously difficult to understand that most implementations got it wrong. Their goal was to create an algorithm that was equivalent in correctness and performance but dramatically easier to understand. The result was Raft.
Raft decomposes consensus into three independent sub-problems:
| Sub-problem | What it solves | Mechanism |
|---|---|---|
| Leader election | Who makes decisions? | Random timeouts + majority vote |
| Log replication | How do decisions propagate? | Leader appends entries, followers replicate |
| Safety | How do we prevent inconsistency? | Election restriction + commitment rules |
Every Raft node is in exactly one of three states at any time:
Follower: passive. Responds to requests from leaders and candidates. If it hears nothing from a leader for a random timeout period (the election timeout, typically 150-300ms), it becomes a candidate.
Candidate: actively seeking votes. Increments its term number (a logical clock for the election cycle), votes for itself, and sends RequestVote RPCs to all other nodes. If it gets votes from a majority, it becomes the leader. If it hears from a new leader, it steps down to follower. If the election times out with no winner, it starts a new election with a higher term.
Leader: handles all client requests. Appends entries to its log, sends AppendEntries RPCs (also used as heartbeats) to all followers, and tracks which entries have been committed. There is at most one leader per term.
The randomized timeout is key to preventing split votes. If two candidates start elections simultaneously, they might split the vote and neither gets a majority. In that case, both time out and start new elections — but with freshly randomized timeouts, so one will almost certainly start before the other and win. In practice, split votes are rare (Ongaro reports them happening less than once per thousand elections in testing).
Every Raft node moves between exactly three states. The transitions are triggered by specific events:
The three states of a Raft node and the events that trigger transitions. Click each state to highlight its outgoing transitions.
A leader sends heartbeats (empty AppendEntries RPCs) to all followers at a regular interval — typically every 50-100ms. This serves two purposes: (1) it prevents followers from timing out and starting elections, and (2) it carries the leader's commitIndex, telling followers which entries are committed.
The election timeout must be significantly longer than the heartbeat interval. The Raft paper recommends:
Where broadcastTime is the average time for an RPC round trip (~1-20ms), electionTimeout is the random timeout (150-300ms), and MTBF is the mean time between failures for a single node (months to years). This ensures elections are rare, fast, and don't overlap with heartbeats.
Once a leader is elected, it handles all client requests. Each request becomes an entry in the leader's log: an ordered sequence of (term, command) pairs. The leader then replicates each entry to followers:
How fast is Raft? The critical metric is commit latency: the time from when a client sends a request to when it receives a committed response. This depends on:
The key insight for system design: consensus is fast enough for metadata (who is the leader? what's the partition map?) but too slow for data-plane operations (every user write, every analytics event). This is why the pattern is always: consensus for coordination, something faster for data.
A 5-node Raft cluster. Send client requests, kill nodes, create partitions. Watch leader election and log replication happen in real time. Each node shows its state (F=Follower, C=Candidate, L=Leader) and its log entries.
python from enum import Enum import random class State(Enum): FOLLOWER = "follower" CANDIDATE = "candidate" LEADER = "leader" class RaftNode: def __init__(self, node_id, peers): self.id = node_id self.peers = peers self.state = State.FOLLOWER self.current_term = 0 self.voted_for = None self.log = [] # list of (term, command) self.commit_index = -1 self.election_timeout = random.uniform(0.15, 0.3) def start_election(self): self.state = State.CANDIDATE self.current_term += 1 self.voted_for = self.id votes = 1 # vote for self for peer in self.peers: if self.request_vote(peer): votes += 1 if votes > len(self.peers + [self]) // 2: self.state = State.LEADER self.send_heartbeats() def append_entry(self, command): """Leader receives client request.""" assert self.state == State.LEADER self.log.append((self.current_term, command)) acks = 1 # leader has it for peer in self.peers: if self.replicate_to(peer): acks += 1 if acks > len(self.peers + [self]) // 2: self.commit_index = len(self.log) - 1 return True # committed return False
Raft uses only two RPCs for the core protocol. This simplicity is intentional — fewer message types means fewer edge cases to reason about.
The leader maintains two indices for each follower:
| Index | What It Tracks | How It Changes |
|---|---|---|
| nextIndex[i] | The next log entry to send to follower i | Initialized to leader's last log index + 1. Decremented on AppendEntries failure (follower's log is behind). Advanced on success. |
| matchIndex[i] | The highest log entry known to be replicated on follower i | Initialized to 0. Updated when follower successfully appends entries. |
The commit rule: an entry at index N is committed if matchIndex[i] >= N for a majority of nodes, AND the entry at N has the current term. This last condition — the entry must be from the current term — prevents a subtle bug where a leader commits entries from a previous term based on replication count alone. (See the Raft paper Section 5.4.2 for the counterexample that motivates this rule.)
The elegance of Raft lies in two safety properties that guarantee consistency even through crashes and elections. Let us examine each in detail.
If two nodes have a log entry with the same index and the same term, then: (1) that entry stores the same command, and (2) ALL preceding entries are identical in both logs. This property is maintained by the consistency check in AppendEntries: the leader sends (prevLogIndex, prevLogTerm) and the follower rejects the append if its log doesn't match at that position. The leader then decrements the index and retries until it finds the point where the logs agree, then replays entries from there.
If a log entry is committed in a given term, then that entry will be present in the logs of all leaders in all higher terms. This is enforced by the election restriction: a candidate includes its (lastLogIndex, lastLogTerm) in RequestVote, and a voter rejects candidates whose logs are less up-to-date. Since committed entries are present on a majority, and a candidate needs a majority to win, at least one voter in any winning majority must have the committed entry — and will not vote for a candidate that lacks it.
Let us trace a complete scenario. Our cluster has nodes {1, 2, 3, 4, 5}. We will go through leader election, writes, a crash, re-election, and log reconciliation.
Shows a leader reconciling a divergent follower's log. The leader finds the first matching entry and replays from there. Click "Diverge" to create a conflict, then "Reconcile" to watch the fix.
| Scenario | What Happens | Why It's Safe |
|---|---|---|
| Leader crashes | Followers time out, election begins. Candidate with most up-to-date log wins. | Leader completeness: all committed entries are on the new leader. |
| Network partition | Majority partition can elect leader and proceed. Minority partition stalls. | Only one partition has a majority, so at most one leader exists. |
| Split vote | No candidate gets majority. All candidates time out and retry with new randomized timeouts. | Eventually one candidate starts first and wins. No safety violation during split votes. |
| Log conflict | New leader's AppendEntries fails consistency check. Leader decrements nextIndex and retries until logs match. | Conflicting entries are from uncommitted writes by a deposed leader. Overwriting them is safe. |
| Old leader comes back | Receives AppendEntries or RequestVote with higher term. Steps down to follower. Its uncommitted entries are overwritten. | Only committed entries are guaranteed. Uncommitted entries from old leaders are expendable. |
One of the trickiest aspects of running Raft in production: what happens when you need to add or remove a node? You cannot just change the configuration on all nodes simultaneously — there will be a moment where some nodes think the cluster has 5 members and others think it has 6. During that overlap, you could get two different majorities, leading to split-brain.
Raft solves this with joint consensus: a two-phase approach where the cluster temporarily operates under both the old and new configurations simultaneously.
There is a subtle problem with reads in Raft. A client reads from the leader, but how does the leader know it is still the leader? It might have been deposed by a network partition and not know it yet. Two solutions:
| Approach | How It Works | Cost |
|---|---|---|
| Read Index | Leader notes the current commit index. Sends a heartbeat to confirm it's still leader (majority ack). Then serves the read against the committed state at that index. | One heartbeat round trip per read batch |
| Lease-based reads | Leader holds a "lease" — a time window during which followers promise not to start elections. During the lease, leader serves reads without extra heartbeats. | Depends on clock accuracy. Dangerous if clocks drift. |
etcd uses the Read Index approach by default. CockroachDB uses lease-based reads with careful clock synchronization. The choice depends on whether you trust clocks (most engineers do not, and they are right).
Before Raft, there was Paxos. Leslie Lamport published it in 1989 (in a paper so whimsical that reviewers rejected it; it was finally published in 1998). Paxos solves the same problem as Raft — getting a group of nodes to agree on a value — but it takes a fundamentally different approach. Where Raft is structured around a strong leader, Paxos is more symmetric: any node can be a proposer.
Paxos has three roles:
| Role | What it does | Analogy |
|---|---|---|
| Proposer | Proposes a value for the group to agree on | A senator proposing a bill |
| Acceptor | Votes on proposals (a majority of acceptors must agree) | Members of the senate voting |
| Learner | Learns the decided value after acceptors agree | Citizens learning the new law |
In practice, the same physical node often plays all three roles. But separating them conceptually makes the algorithm easier to reason about.
The simplest form of Paxos agrees on a single value. It runs in two phases:
Paxos has a subtle liveness issue. Imagine two proposers, P1 and P2, competing:
This is called livelock: no value is ever decided because proposers keep interrupting each other. The solution is Multi-Paxos: elect a stable leader (a distinguished proposer) that skips Phase 1 for subsequent proposals. As long as the leader is stable, Paxos is fast. If the leader fails, a new one is elected — which temporarily risks livelock, but the randomized backoff makes it resolve quickly.
Lamport's original Paxos paper is famous for being impenetrable. Even the "simplified" version confuses most readers. The difficulty comes from several sources:
| Source of Confusion | Why | How Raft Fixes It |
|---|---|---|
| No clear leader | Any node can propose at any time. Reasoning about concurrent proposals is mind-bending. | Single leader handles all proposals. Concurrent proposals are impossible during normal operation. |
| Single-decree only | Basic Paxos decides ONE value. For a log of values, you need Multi-Paxos, which is underspecified. | Log replication is a first-class concept. The log is central to the algorithm. |
| Proposal numbers ≠ terms | Proposal numbers are per-proposal, not per-epoch. Multiple proposals can exist within a single "leadership period." | Term number cleanly separates epochs. One term = at most one leader. |
| Safety vs liveness intertwined | Proving safety requires reasoning about all possible interleavings of messages across multiple rounds. | Safety proofs decompose into independent properties (Log Matching, Leader Completeness). |
3 proposers and 5 acceptors. Watch Prepare/Accept messages flow between them. Click "Propose" to start a round. Click "Dual Propose" to see two proposers compete (potential livelock).
| Paxos | Raft | |
|---|---|---|
| Published | 1989/1998 (Lamport) | 2014 (Ongaro & Ousterhout) |
| Approach | Any node can propose | Strong leader handles all requests |
| Leader election | Implicit (via proposal numbers) | Explicit (RequestVote RPC) |
| Log replication | Separate concern (Multi-Paxos) | Integrated from the start |
| Livelock risk | Yes (competing proposers) | No (only leader proposes) |
| Understandability | Notoriously hard | Designed for clarity |
| Correctness proofs | Decades of study | TLA+ verified |
| Real-world use | Google Chubby, Spanner | etcd, CockroachDB, TiKV |
Raft and Paxos are the two most well-known consensus algorithms, but they are not the only ones in production. Let us survey the landscape.
ZAB (ZooKeeper Atomic Broadcast) is the consensus protocol used by Apache ZooKeeper. It was designed specifically for total order broadcast: delivering messages to all nodes in the same order. ZAB is similar to Raft but was developed independently (published 2008, predating Raft by 6 years).
Key differences from Raft:
| Aspect | Raft | ZAB |
|---|---|---|
| Primary goal | Replicated state machine | Atomic broadcast (total order delivery) |
| Phases | Leader election + normal operation | Discovery + synchronization + broadcast |
| Recovery | New leader's log is authoritative | New leader merges proposals from all followers |
| Transaction IDs | Index + term | Epoch + counter (zxid) |
| Client reads | Leader reads or read index | Any node can serve reads (with sync for freshness) |
ZAB has three phases that each new leader goes through:
The zxid (ZooKeeper transaction ID) is a 64-bit number: the high 32 bits are the epoch (like Raft's term), and the low 32 bits are a counter within the epoch. This makes it trivial to compare zxids and determine which transactions are from which leader's reign.
Egalitarian Paxos (EPaxos), published by Moraru, Andersen, and Kaminsky in 2013, takes a radical approach: no designated leader at all. Any node can process any command. Commands that don't conflict (they touch different keys) can be committed in one round trip with no coordination. Only conflicting commands require an extra round.
The appeal: in a geo-distributed system, a client in Tokyo can send a command to the Tokyo replica and get a response in one round trip to a majority, without routing through a leader in Virginia. For workloads with few conflicts (which describes most real workloads), this dramatically reduces latency.
The catch: EPaxos is significantly more complex than Raft. The dependency tracking, conflict detection, and execution ordering are all harder to implement correctly. As of 2024, no major production system uses EPaxos. But the ideas are influential and show up in research on geo-distributed databases.
Viewstamped Replication (VR), published by Oki and Liskov in 1988, is contemporaneous with Paxos and shares many ideas. Each "view" has a designated primary (leader). When the primary fails, a view change protocol selects a new one. The key insight: during a view change, the new primary collects logs from a majority and picks the one with the most entries. This is strikingly similar to Raft's election restriction.
In 1985, Fischer, Lynch, and Paterson proved the FLP impossibility result: in a purely asynchronous system (no timeouts, no clocks), where even a single process can crash, there is NO algorithm that guarantees consensus. This result shocked the distributed systems community.
| Algorithm | Year | Key Innovation | Used By |
|---|---|---|---|
| Paxos | 1989 | Quorum-based agreement | Google Chubby, Spanner |
| Viewstamped Replication | 1988 | View change protocol | Academic reference |
| ZAB | 2008 | Atomic broadcast focus | Apache ZooKeeper |
| Raft | 2014 | Understandability, strong leader | etcd, CockroachDB, TiKV, Consul |
| EPaxos | 2013 | Leaderless, lower latency for non-conflicting commands | Research, some production |
| Metric | Paxos | Raft | ZAB | EPaxos |
|---|---|---|---|---|
| Latency (no conflict) | 2 RTTs | 1 RTT (leader to majority) | 1 RTT | 1 RTT (fast path) |
| Latency (with conflict) | 2+ RTTs (livelock risk) | 1 RTT (only leader proposes) | 1 RTT | 2 RTTs (slow path) |
| Leader bottleneck | Multi-Paxos: yes | Yes | Yes | No (any node) |
| Messages per decision | 2N (prepare + accept) | N (AppendEntries + ack) | N | N (fast), 2N (slow) |
| Disk writes per decision | 2 (proposer + acceptor) | 2 (leader + follower) | 2 | 1 (fast path) |
In practice, Raft and ZAB dominate because the leader bottleneck is rarely the limiting factor for coordination workloads. The simplicity advantage of Raft and the battle-tested nature of ZAB far outweigh EPaxos's theoretical latency advantage for most teams.
You may wonder: how does blockchain consensus relate to Raft and Paxos? The key distinction is the trust model:
| Raft/Paxos (CFT) | PBFT/Blockchain (BFT) | |
|---|---|---|
| Trust model | All nodes are honest but may crash | Up to f nodes can be malicious (send wrong data) |
| Nodes needed | 2f+1 (f crashes) | 3f+1 (f Byzantine faults) |
| Throughput | 10,000+ TPS | 100-1,000 TPS (PBFT), 7-50 TPS (PoW Bitcoin) |
| Latency | Milliseconds | Seconds (PBFT) to minutes (PoW) |
| Use case | Data centers, internal infrastructure | Untrusted participants (public blockchains, multi-org consortia) |
For infrastructure engineering (which is what DDIA and this lesson focus on), you almost always use crash-fault-tolerant (CFT) consensus. You trust your own data center's servers not to lie — they just might crash. BFT consensus is for environments where participants are adversarial, which is a fundamentally different (and much harder) problem.
All of these algorithms solve the same abstract problem. In practice, consensus enables:
| Capability | How It Uses Consensus | Example |
|---|---|---|
| Leader election | Agree on who the leader is | Kafka controller election via ZooKeeper |
| Atomic broadcast | Agree on message ordering | ZooKeeper transaction log |
| Distributed locks | Agree on who holds the lock | etcd distributed mutex |
| Membership changes | Agree on who is in the cluster | Raft joint consensus for config changes |
| Linearizable storage | Agree on the value of each key | etcd key-value store |
Visual comparison of message patterns. Toggle between Raft (leader-based) and Paxos (proposer-based) to see how they differ in practice.
You will rarely implement Raft or Paxos yourself. Instead, you will use coordination services — systems like ZooKeeper, etcd, and Consul that implement consensus internally so your application does not have to. They expose simple APIs (get, set, watch, lock) while hiding the complexity of leader election, log replication, and failure recovery.
| ZooKeeper | etcd | Consul | |
|---|---|---|---|
| Consensus | ZAB | Raft | Raft |
| Data model | Hierarchical (znodes, like a filesystem) | Flat key-value | Key-value + service catalog |
| Watch mechanism | One-time watches per znode | Watch streams (long-lived) | Blocking queries |
| Typical use | Kafka, HBase, Hadoop | Kubernetes, CoreDNS | Service mesh, HashiCorp stack |
| Language | Java | Go | Go |
| Linearizable reads | Via sync() call | Default for leader reads | Via consistent mode |
You are building a system where thousands of tasks arrive every second, and a pool of 50 worker nodes must claim and execute them. Each task must be executed exactly once. How do you ensure no two workers claim the same task?
You are on-call. At 3 AM, you get paged: "Kafka brokers are failing to produce/consume." You check and discover that 2 out of 3 ZooKeeper nodes are down. What happens?
Shows how applications use a coordination service (etcd/ZK). Toggle between healthy and degraded states to see what happens when quorum is lost.
Even with a distributed lock service, there is a danger. Process A acquires a lock, then pauses (GC pause, swap, etc.). The lock expires (TTL runs out). Process B acquires the same lock. Now A resumes and thinks it still holds the lock. Both processes believe they have exclusive access. Data corruption ensues.
The solution is fencing tokens: every time a lock is acquired, the coordination service issues a monotonically increasing token. The resource (database, file, etc.) checks the token on every write and rejects writes with tokens older than the most recent one it has seen.
A common use case for coordination services: service discovery. In a microservices architecture, services need to find each other. Which instances of the "payment service" are currently healthy? What are their IP addresses?
Consul adds a built-in health checking layer on top of this: it not only registers services but actively pings them (HTTP checks, TCP checks, gRPC checks) and removes unhealthy instances from the service catalog. This is why Consul is often preferred for service mesh architectures.
A common mistake in distributed system design: using etcd or ZooKeeper for too much. Here are the warning signs:
| Sign | Problem | Better Alternative |
|---|---|---|
| Storing MBs of data per key | Consensus stores are optimized for small keys/values (etcd: 1.5MB max per value) | Store a pointer/URL in etcd, put the large data in S3 or a database |
| >1000 writes/sec to consensus store | Approaching throughput ceiling | Batch writes, use local caches with watch-based invalidation |
| Using ZK watches as a pub/sub system | ZK watches are one-time triggers, not streams. Re-registering creates thundering herd. | Use Kafka or NATS for high-throughput messaging |
| Storing session data in etcd | Session data is per-request, too frequent for consensus | Use Redis or server-local memory with sticky sessions |
| Storing per-user data in etcd | Millions of keys = memory pressure on all consensus nodes | Use a proper database (Postgres, DynamoDB) |
| Model | Guarantee | Cost | Example System |
|---|---|---|---|
| Strict Serializability | Transactions + real-time order | Very high (global consensus) | Spanner, CockroachDB |
| Linearizability | Single-object recency | High (consensus per key) | etcd, ZooKeeper |
| Sequential Consistency | All nodes see same order (but may not match real-time) | Medium | ZooKeeper reads |
| Causal Consistency | Causally related ops ordered; concurrent ops can diverge | Low (vector clocks) | MongoDB (with causal sessions) |
| Eventual Consistency | All replicas converge eventually (no recency guarantee) | Minimal | DynamoDB, Cassandra |
When the interviewer asks "what consistency do you need for this?", use this framework:
"Design a distributed lock service."
"Design a configuration management system."
"Why did Kafka use ZooKeeper, and why is it removing it?"
python # Drill 1: Implement a vector clock (extends Lamport timestamps) class VectorClock: def __init__(self, node_id, num_nodes): self.id = node_id self.clock = [0] * num_nodes # one counter per node def local_event(self): self.clock[self.id] += 1 def send(self): self.clock[self.id] += 1 return list(self.clock) # copy def receive(self, other_clock): for i in range(len(self.clock)): self.clock[i] = max(self.clock[i], other_clock[i]) self.clock[self.id] += 1 def happens_before(self, other_clock): """Does self happen-before other? (All entries ≤, at least one <)""" return (all(self.clock[i] <= other_clock[i] for i in range(len(self.clock))) and any(self.clock[i] < other_clock[i] for i in range(len(self.clock)))) def is_concurrent(self, other_clock): """Neither happens-before the other.""" return (not self.happens_before(other_clock) and not VectorClock._static_hb(other_clock, self.clock)) # Usage: a = VectorClock(0, 3) # Node 0 in a 3-node system b = VectorClock(1, 3) a.local_event() # a.clock = [1, 0, 0] msg = a.send() # a.clock = [2, 0, 0], msg = [2, 0, 0] b.receive(msg) # b.clock = [2, 1, 0] (merged + incremented)
python # Drill 2: Raft leader election state machine import random, time class RaftElection: def __init__(self, node_id, cluster_size): self.id = node_id self.cluster_size = cluster_size self.state = "follower" self.term = 0 self.voted_for = None self.votes_received = set() self.last_heartbeat = time.time() self.timeout = random.uniform(0.15, 0.3) self.log = [] # [(term, cmd), ...] def tick(self): """Called periodically. Triggers election if timeout expired.""" if self.state == "leader": return None if time.time() - self.last_heartbeat > self.timeout: return self.start_election() def start_election(self): self.state = "candidate" self.term += 1 self.voted_for = self.id self.votes_received = {self.id} self.timeout = random.uniform(0.15, 0.3) self.last_heartbeat = time.time() last_idx = len(self.log) - 1 last_term = self.log[last_idx][0] if self.log else 0 return ("RequestVote", self.term, last_idx, last_term) def handle_vote_request(self, candidate_term, cand_last_idx, cand_last_term): if candidate_term < self.term: return False if candidate_term > self.term: self.term = candidate_term self.state = "follower" self.voted_for = None # Check log freshness my_last_term = self.log[-1][0] if self.log else 0 my_last_idx = len(self.log) - 1 log_ok = (cand_last_term > my_last_term or (cand_last_term == my_last_term and cand_last_idx >= my_last_idx)) if self.voted_for in (None, candidate_term) and log_ok: self.voted_for = candidate_term return True return False def receive_vote(self, voter_id, granted): if granted: self.votes_received.add(voter_id) if len(self.votes_received) > self.cluster_size // 2: self.state = "leader" return True # became leader! return False
python # Drill 3: Distributed lock with fencing token import time class DistributedLock: """Simplified lock with fencing token support.""" def __init__(self): self.holder = None self.token_counter = 0 self.expiry = 0 def acquire(self, client_id, ttl_seconds=10): """Try to acquire lock. Returns fencing token or None.""" now = time.time() # If lock is held and not expired, reject if self.holder and now < self.expiry: return None # Grant lock with monotonic fencing token self.holder = client_id self.token_counter += 1 self.expiry = now + ttl_seconds return self.token_counter def release(self, client_id, token): """Release lock. Only succeeds if token matches.""" if self.holder == client_id: self.holder = None return True return False class FencedResource: """A resource that rejects stale fencing tokens.""" def __init__(self): self.data = None self.last_token = 0 def write(self, value, fencing_token): """Write only if token is fresh (≥ last seen).""" if fencing_token < self.last_token: raise PermissionError( f"Stale token {fencing_token} < {self.last_token}") self.last_token = fencing_token self.data = value return True # Demo: fencing prevents stale writes lock = DistributedLock() db = FencedResource() token_a = lock.acquire("process_A", ttl_seconds=5) # token=1 # Process A pauses (GC)... TTL expires... token_b = lock.acquire("process_B", ttl_seconds=5) # token=2 db.write("B's data", token_b) # succeeds, last_token=2 # Process A resumes, tries to write with stale token: try: db.write("A's data", token_a) # REJECTED: 1 < 2 except PermissionError as e: print(e) # "Stale token 1 < 2"
python # Drill 4: Total order broadcast (simplified) from collections import deque from threading import Lock class TotalOrderBroadcast: """Simplified TOB using a single sequencer (not fault-tolerant). In production, the sequencer would be a Raft/Paxos group.""" def __init__(self, num_nodes): self.sequence_num = 0 self.lock = Lock() self.queues = [deque() for _ in range(num_nodes)] def broadcast(self, message): """Assign a sequence number and deliver to all nodes.""" with self.lock: self.sequence_num += 1 ordered_msg = (self.sequence_num, message) for q in self.queues: q.append(ordered_msg) return self.sequence_num def deliver(self, node_id): """Node consumes next message in total order.""" if self.queues[node_id]: return self.queues[node_id].popleft() return None # All nodes see messages in the same order tob = TotalOrderBroadcast(3) tob.broadcast("set x=1") # seq 1 tob.broadcast("set y=2") # seq 2 tob.broadcast("set x=3") # seq 3 # Every node delivers in order: (1,"set x=1"), (2,"set y=2"), (3,"set x=3") # This is the foundation of replicated state machines.
| Scenario | Root Cause | Fix |
|---|---|---|
| etcd cluster takes 30 seconds to elect a new leader during deploys | Election timeout too high, or disk latency causing fsync delays (Raft requires durable writes before responding) | Tune election timeout (10x heartbeat). Use SSDs. Separate etcd from noisy workloads. |
| ZooKeeper session timeout cascades across 200 Kafka brokers | ZK server GC pause exceeds session timeout. All brokers think ZK is dead simultaneously. | Increase session timeout. Tune ZK JVM GC. Use more ZK nodes. |
| Distributed lock held by a dead process; other processes blocked | Lock holder crashed without releasing. No lease/TTL on the lock. | Always use leased locks (auto-expire). etcd leases, ZK ephemeral nodes. |
| Raft leader elected but can't commit entries | Leader has only minority of cluster alive. Can't get majority acks. | Restore failed nodes. Or if permanent: reconfigure cluster to smaller size (but carefully — needs a majority to approve config change). |
| Cross-region Raft cluster has 500ms commit latency | Consensus requires majority ack. If nodes are in US-East, US-West, EU-West, two round trips to the farthest region. | Place 3 of 5 nodes in the same region (fast commits), 2 in a remote region (fault tolerance). Or use Raft learners for remote replicas (they don't vote). |
| etcd "mvcc: database space exceeded" | etcd has a configurable space quota (default 2GB). Accumulated revisions exceeded it. | Run etcd compaction to purge old revisions: etcdctl compact $REV. Then defrag: etcdctl defrag. Set auto-compaction on. |
Interviewers love these subtle distinctions. Get them wrong and you look like you memorized definitions without understanding.
| Trap | Wrong Answer | Right Answer |
|---|---|---|
| "Is Raft Byzantine fault tolerant?" | "Yes, it handles node failures" | No. Raft (and Paxos) assume crash-fault only — a node either works correctly or stops. A malicious node that sends incorrect messages can break Raft. Byzantine fault tolerance (BFT) requires algorithms like PBFT, which need 3f+1 nodes to tolerate f failures (vs Raft's 2f+1). BFT is used in blockchain, not in typical infrastructure. |
| "Can you have consensus with 2 nodes?" | "Yes, they just need to agree" | No meaningful fault tolerance. A 2-node cluster has a majority of 2, so both nodes must be alive to make progress. If either crashes, the other cannot form a majority. This is WORSE than a single node (single node is at least available while it's up). The minimum useful cluster is 3 nodes. |
| "What consistency does a single-node database provide?" | "It depends on the isolation level" | A single-node database is trivially linearizable for single-object operations — there is only one copy, so reads always see the latest write. The question of consistency models only becomes interesting with replication. Isolation levels (serializable, read committed, etc.) are about transactions, which is a separate concern. |
| "When should you use 5 vs 3 Raft nodes?" | "Always use 5 for better availability" | 3 nodes tolerate 1 failure. 5 nodes tolerate 2 failures. The cost: 5 nodes means more consensus RPCs, more disk writes, more machines. Use 3 for non-critical services (e.g., dev/staging) and 5 for production systems where you need to survive failures during rolling deployments (1 node down for upgrade + 1 unexpected failure = 2 failures, need 5 nodes). |
In a system design interview, these are the "consensus patterns" that recur:
If you had to put the entire chapter on one slide for a senior engineer audience, it would be this:
Consistency and consensus sit at the top of the distributed systems pyramid. Everything we have studied builds toward this chapter.
| Topic | Relationship | Link |
|---|---|---|
| Replication (DDIA Ch6) | Replication creates the need for consistency. Leader-follower, multi-leader, and leaderless each offer different consistency guarantees. | Replication lesson |
| Transactions (DDIA Ch8) | Transactions provide isolation guarantees. Serializability is about transaction ordering; linearizability is about single-object recency. Strict serializability combines both. | Transactions lesson |
| Storage (DDIA Ch4) | Consensus algorithms need durable storage (WAL) for the commit log. The performance of fsync directly affects consensus latency. | Storage lesson |
Raft was published in 2014 and is now the dominant consensus algorithm in industry. But research continues:
| Development | What It Does | Status |
|---|---|---|
| Raft Learners | Non-voting replicas that receive log entries but don't participate in elections or commits. Used for cross-region read replicas. | Production (etcd, TiKV) |
| Multi-Raft | Run many independent Raft groups on the same cluster. Each data range gets its own Raft group. Enables horizontal scaling. | Production (CockroachDB, TiKV) |
| Parallel Raft | Allow out-of-order commit for non-conflicting entries. Improves throughput by removing the serial bottleneck. | Research / early production (PolarDB) |
| Flexible Paxos | Relaxes the quorum requirement: Phase 1 quorum + Phase 2 quorum > N (they don't each need majority). Enables write-optimized configurations. | Research, some adoption |
| CASPaxos | Single-decree Paxos without a leader. Useful for individual distributed registers. | Research |
The trend is clear: Raft as the baseline, with optimizations for specific workloads. Multi-Raft is particularly important — it's how CockroachDB and TiKV scale to millions of ranges across hundreds of nodes, each range with its own independent 3- or 5-node Raft group.
| Paper | Why It Matters |
|---|---|
| Ongaro & Ousterhout, "In Search of an Understandable Consensus Algorithm" (2014) | The Raft paper. Read the extended version — it's genuinely well-written and includes all proofs. |
| Lamport, "Paxos Made Simple" (2001) | Lamport's own simplified explanation of Paxos. 14 pages. Still hard, but the canonical reference. |
| Fischer, Lynch & Paterson, "Impossibility of Distributed Consensus with One Faulty Process" (1985) | The FLP result. Short and devastating. Read it to understand the theoretical limits of what we're doing. |
| Lamport, "Time, Clocks, and the Ordering of Events in a Distributed System" (1978) | The paper that introduced Lamport timestamps and the happens-before relation. One of the most cited CS papers of all time. |
| Kleppmann, "How to do Distributed Locking" (2016, blog) | Martin Kleppmann's analysis of Redlock, showing why Redis-based distributed locks are unsafe. Essential reading for anyone building lock services. |
| Jepsen.io by Kyle Kingsbury | The definitive testing framework for distributed systems. Every Jepsen report is a masterclass in understanding consistency failures. |
When designing a distributed system, work through this decision tree for each piece of state:
"We can solve any problem by introducing an extra level of indirection... except for the problem of too many levels of indirection." — David Wheeler. In distributed systems: we can solve any consistency problem by adding more consensus... except for the problem of consensus being too slow. The engineer's job is finding the minimum coordination that achieves the required guarantees.