Leader election, Raft, state machine replication, consistency models, and chain replication — how distributed nodes agree on the truth.
You are running a web application. All of your data lives on a single server. Users are happy. Latency is low. Life is good. Then, at 2:47 AM on a Saturday, the server's hard drive dies. Every user gets an error page. Every write from the last backup is gone. Your phone is buzzing with alerts.
The obvious fix: make copies. Run three servers instead of one. If one dies, the other two keep serving. This is replication — maintaining copies of the same data on multiple machines. It sounds simple. It is not.
The moment you have two copies, you have a new problem: the copies must agree. If a user writes "balance = $100" to server A, then reads from server B (which still says "balance = $200"), you have given the user wrong information. Replication without coordination is worse than useless — it gives the illusion of safety while introducing inconsistency.
And coordination itself is hard. How do the servers decide which copy is "right"? If server A goes down, which of the remaining servers takes over? What if they disagree about whether A is actually down? What if the network partitions — A can talk to B but not C, and C can talk to B but not A?
The question this entire lesson answers: how do you get multiple machines to agree on the same sequence of events, even when some machines crash and the network is unreliable?
The answer is a family of algorithms called consensus protocols. The most important one today is called Raft. By the end of this lesson, you will understand Raft well enough to implement it — and you will understand why it works, not just how.
The simplest replication strategy: pick one server as the leader. All writes go to the leader. The leader sends copies to the other servers (called followers). Reads can go to any server. This is called single-leader replication, and it works well — until the leader dies.
When the leader dies, the followers need to elect a new leader. But here is the crisis: how do they know the leader is dead? Maybe the leader is just slow. Maybe the network between them is down, but the leader is still alive and accepting writes from clients on its side of the partition. If the followers elect a new leader while the old one is still alive, you have split-brain: two leaders accepting conflicting writes.
A 3-node cluster with one leader. Click "Kill Leader" to simulate a crash. Watch the followers try to elect a new leader. Then click "Partition" to separate the cluster and see split-brain emerge.
At first glance, getting computers to agree seems trivial. Just vote, right? But consider what can go wrong in a distributed system:
| Failure Mode | What Happens | Why Naive Voting Breaks |
|---|---|---|
| Message delay | A vote arrives after the election ends | Cannot distinguish "slow" from "lost." How long do you wait? |
| Message loss | A vote is dropped by the network | Cannot distinguish "lost" from "very delayed." Retries create duplicates. |
| Node crash | A node crashes after voting but before learning the outcome | When it recovers, its state is inconsistent with the cluster. |
| Network partition | Two groups of nodes can talk within groups but not across | Each group might independently elect a leader. Split-brain. |
In 1985, Fischer, Lynch, and Paterson proved a devastating result: it is impossible to guarantee consensus in a purely asynchronous system where even one process can crash. This is the FLP impossibility result. Every real consensus algorithm circumvents it by assuming partial synchrony: the network is usually well-behaved, and you can use timeouts to detect failures. Raft's randomized election timeouts are precisely this practical compromise.
The key idea behind every practical consensus protocol is the quorum: a majority of nodes must agree before any decision is final. In a 3-node cluster, you need 2 nodes to agree. In a 5-node cluster, you need 3. Why a majority? Because any two majorities must overlap — they share at least one node. This shared node prevents two groups from independently making conflicting decisions.
| Cluster Size | Majority Needed | Failures Tolerated | When to Use |
|---|---|---|---|
| 1 | 1 | 0 | Development only. No fault tolerance. |
| 3 | 2 | 1 | Minimum viable production cluster. |
| 5 | 3 | 2 | Standard production. Survives 1 planned + 1 unexpected failure. |
| 7 | 4 | 3 | Critical infrastructure. Cross-region deployments. |
Before we dive into the mechanics of consensus, we need to understand the fundamental idea that makes consensus useful. The idea is old — Leslie Lamport described it in 1978 — and it is beautifully simple: if you have a deterministic state machine (a program that always produces the same output for the same input), and you feed it the same sequence of inputs, it will always end up in the same state.
This is State Machine Replication (SMR): run the same deterministic program on multiple servers, feed them the exact same commands in the exact same order, and they will all have the exact same data. The problem reduces to one thing: agreeing on the order of commands.
A deterministic state machine is one where the next state depends only on the current state and the input. No randomness, no reading the system clock, no consulting external services. Given state S and command C, the next state S' is always the same.
Examples of deterministic state machines:
| State Machine | State | Commands | Why Deterministic |
|---|---|---|---|
| Key-value store | Map of keys to values | GET(k), SET(k,v), DELETE(k) | SET always overwrites, GET always reads current map |
| Bank ledger | Map of account IDs to balances | DEPOSIT(id,amt), WITHDRAW(id,amt) | Arithmetic is deterministic. Same ops, same balances. |
| Lock service | Set of held locks | ACQUIRE(lock,owner), RELEASE(lock,owner) | Lock state depends only on acquire/release history. |
| Counter | A single integer | INCREMENT, DECREMENT, READ | Addition and subtraction are deterministic. |
The critical non-example: a state machine that reads System.currentTimeMillis(). Two replicas processing the same command at slightly different real-time moments will get different timestamps, and their states will diverge. If you need timestamps, the leader assigns them before replicating the command.
Real applications contain many sources of non-determinism. The SMR approach must eliminate all of them. Here is how production systems handle each one:
| Source of Non-Determinism | Problem | Solution |
|---|---|---|
| Current time | Replicas read different clock values | Leader embeds timestamp in the log entry. Replicas use the embedded timestamp, not their own clock. |
| Random numbers | Replicas generate different random values | Leader generates the random number and includes it in the log entry. Or use a deterministic PRNG seeded from the log index. |
| UUID generation | UUID v4 is random | Use UUID v1 (time-based) with leader-assigned timestamps, or include the UUID in the log entry. |
| External I/O | HTTP calls to external services return different results at different times | Perform the external call on the leader BEFORE replicating. Include the result in the log entry. |
| Floating point | Different CPU architectures may compute different results | Use IEEE 754 strict mode, or avoid floating point in state machines (use fixed-point arithmetic). |
| Hash map iteration order | Hash map ordering is non-deterministic in many languages | Use sorted maps, or never depend on iteration order in the state machine. |
Every replicated system built on SMR has two cleanly separated layers:
The beauty of this separation: the consensus layer knows nothing about keys or values — it just orders opaque byte strings. The state machine knows nothing about nodes or elections — it just processes commands in order. You can swap out either layer independently.
Three replicas processing the same commands in the same order. Click "Send Command" to add a new command to the log. Watch all three replicas process it and converge to the same state.
Consider two commands arriving nearly simultaneously: "SET x = 10" and "SET x = 20". If replica A processes them in order (10, then 20), it ends with x = 20. If replica B processes them in the opposite order (20, then 10), it ends with x = 10. The replicas have diverged — and they will never converge unless you fix the ordering.
Total order broadcast (also called atomic broadcast) is the guarantee that all nodes deliver the same messages in the same order. It is equivalent to consensus — if you can solve one, you can solve the other. This equivalence was proven by Chandra and Toueg in 1996.
python class KVStateMachine: """A deterministic key-value store state machine.""" def __init__(self): self.store = {} # The actual state: key -> value self.last_applied = 0 # Index of last applied log entry def apply(self, entry): """Apply a single log entry. MUST be deterministic.""" cmd = entry["command"] if cmd == "SET": self.store[entry["key"]] = entry["value"] return "OK" elif cmd == "GET": return self.store.get(entry["key"], None) elif cmd == "DELETE": self.store.pop(entry["key"], None) return "OK" def apply_log(self, log, commit_index): """Apply all committed but unapplied entries.""" while self.last_applied < commit_index: self.last_applied += 1 result = self.apply(log[self.last_applied]) return result # Return result of last applied entry # Three replicas, same log, same state: log = [ None, # Index 0 is unused (Raft logs are 1-indexed) {"command": "SET", "key": "x", "value": 10}, {"command": "SET", "key": "y", "value": 20}, {"command": "SET", "key": "x", "value": 30}, ] r1, r2, r3 = KVStateMachine(), KVStateMachine(), KVStateMachine() for r in [r1, r2, r3]: r.apply_log(log, commit_index=3) assert r1.store == r2.store == r3.store # Always true! # All replicas: {"x": 30, "y": 20}
This is the entire idea. The hard part is not the state machine — it is getting all replicas to agree on the same log. That is what Raft does, and it is the subject of the next two chapters.
Raft is remarkably compact. The entire protocol consists of just three remote procedure calls (RPCs):
| RPC | Purpose | Sender | When |
|---|---|---|---|
| RequestVote | Ask other nodes for their vote in a leader election. | Candidate | When a follower's election timeout fires and it becomes a candidate. |
| AppendEntries | Replicate log entries to followers. Also serves as heartbeat (empty entries). | Leader | Continuously: every heartbeat interval (100ms) and whenever new entries arrive. |
| InstallSnapshot | Send a state machine snapshot to a lagging follower whose needed entries have been compacted. | Leader | When a follower is so far behind that the leader has already discarded the entries it needs. |
That is the entire Raft protocol. Three messages. Everything else — leader election, log replication, commit tracking, log repair, snapshot transfer — is built from combinations of these three RPCs. This simplicity is Raft's greatest engineering achievement.
To implement SMR, you need to understand exactly what data moves where. Here is the concrete data flow for a write operation in a Raft-based key-value store:
| Step | From | To | Data | Size (typical) |
|---|---|---|---|---|
| 1. Client request | Client | Leader | {"op":"SET","key":"user:42","value":"Alice"} | ~50-500 bytes |
| 2. AppendEntries | Leader | Each follower | prevIdx=146, prevTerm=3, entries=[{idx:147,term:3,data:...}], leaderCommit=145 | ~100-600 bytes per follower |
| 3. AppendEntries response | Each follower | Leader | {term:3, success:true, matchIndex:147} | ~20 bytes |
| 4. Client response | Leader | Client | {status:"OK"} | ~10 bytes |
Total network traffic for one write in a 5-node cluster: ~50 bytes (client request) + 4 x ~300 bytes (AppendEntries to 4 followers) + 4 x ~20 bytes (responses) + ~10 bytes (client response) = approximately 1.3 KB. For 10,000 writes/sec, that is ~13 MB/s of consensus traffic — well within the capacity of a 1 Gbps network link. Consensus is not network-limited; it is latency-limited (waiting for fsync and round trips).
We established that replicated state machines need a total-ordered log. In Raft, one node is responsible for ordering the log: the leader. Every other node is a follower. The leader receives all client requests, appends them to its log, and replicates them to followers. But what happens when the leader crashes?
The followers must detect the failure and elect a new leader. This is leader election — the first of two core mechanisms in Raft (the second is log replication, covered next chapter). The election protocol must guarantee one critical property: at most one leader per term.
Raft divides time into terms, numbered consecutively starting from 1. Each term begins with an election. If the election succeeds, the winner serves as leader for the rest of that term. If the election fails (no candidate gets a majority — a split vote), the term ends with no leader, and a new term begins immediately.
Terms serve as a logical clock. Every message in Raft carries the sender's current term. If a node receives a message with a higher term than its own, it immediately updates its term and becomes a follower. If a node receives a message with an older term, it rejects it. This mechanism ensures that stale leaders discover they have been replaced.
Every Raft node is always in exactly one of three states:
| State | Behavior | Transitions |
|---|---|---|
| Follower | Passive: responds to leader's heartbeats and AppendEntries RPCs. Does not initiate anything. | → Candidate (if election timeout fires without hearing from a leader) |
| Candidate | Active: requests votes from all other nodes. Trying to become leader. | → Leader (if gets majority votes) → Follower (if discovers higher term or existing leader) → Candidate (if election timeout fires again — split vote, retry) |
| Leader | Sends heartbeats and replicates log entries. The only node that proposes new entries. | → Follower (if discovers a higher term in any message) |
Here is exactly what happens when a follower decides to start an election:
When a node receives a RequestVote, it applies two rules:
Rule 1: One vote per term. Each node can vote for at most one candidate in any given term. Votes are persistent (written to stable storage before responding). This ensures that at most one candidate can receive a majority in any term.
Rule 2: Log up-to-date check. A node refuses to vote for a candidate whose log is less "up-to-date" than its own. Raft defines "up-to-date" as: higher last-log term wins; if terms are equal, longer log wins. This critical check ensures that the leader always has all committed entries. We will see why in the next chapter.
Three pieces of state in Raft must be written to stable storage (disk) before responding to any RPC. If a node crashes and restarts, it recovers these from disk:
| State | Why Persistent |
|---|---|
| currentTerm | Without it, a recovering node could vote twice in the same term (it forgot it already voted). This could allow two leaders in the same term. |
| votedFor | Same reason. Must remember who it voted for to enforce one-vote-per-term. |
| log[] | Committed entries must not be lost. If a recovering node's log has a gap, the leader's repair mechanism fills it — but the committed entries that were already on this node must survive. |
Everything else — commit index, last applied index, next/match index arrays — can be reconstructed from the persistent state and communication with the cluster. This is why Raft implementations use a Write-Ahead Log (WAL): before accepting any RPC, the node fsync's the relevant state changes to its WAL. Only after the fsync completes does it respond.
What if two followers time out simultaneously and both start elections? They each vote for themselves and send RequestVote to the other three nodes. If the votes split evenly (each candidate gets 2 votes including their own, but neither reaches the majority of 3), neither wins. The term ends with no leader.
Raft handles this with randomized election timeouts. Each node picks a random timeout between 150ms and 300ms. Because the timeouts differ, it is unlikely that two nodes will start elections at the same instant. In practice, split votes are rare — they occur in about 5-10% of elections in a healthy cluster, and almost never recur twice in a row.
The timeout range matters. The Raft paper recommends that the election timeout be at least 10x the network round-trip time. If your network RTT is 1ms (same data center), an election timeout of 150-300ms is appropriate. If your RTT is 50ms (cross-region), you need 500ms-1000ms election timeouts. The heartbeat interval should be much smaller than the election timeout — typically 100ms for same-DC deployments.
Raft has an optional extension called Pre-Vote that solves a subtle problem. Imagine a node that gets partitioned from the cluster. Its election timeout fires repeatedly, and it increments its term each time — from 5 to 6, 7, 8, 50, 100... When the partition heals, it reconnects with a very high term. Every other node sees this high term and immediately steps down to follower, disrupting the cluster even though the rejoining node has a stale log and cannot win the election.
Pre-Vote adds a preliminary round: before incrementing its term, a candidate sends a "pre-vote" request. Other nodes check if they would vote for this candidate, but without actually recording a vote or changing their term. Only if the candidate gets a pre-vote majority does it proceed to a real election. A partitioned node will fail the pre-vote (its log is stale), so it never disrupts the cluster.
A 5-node Raft cluster. Watch election timeouts, vote requests (orange arrows), and vote grants (teal arrows). Click "Trigger Election" to force a timeout on a random follower. Click "Kill Leader" to crash the current leader and watch a new election happen.
The proof is elegant. Suppose two candidates, A and B, both claim to win election in term T. Each must have received votes from a majority of the N nodes. But any two majorities overlap (they share at least one node). That shared node voted for both A and B in the same term T. But Rule 1 says a node can vote for at most one candidate per term. Contradiction. Therefore, at most one candidate can win per term.
The leader has been elected. Now it must do its job: accept commands from clients, append them to its log, replicate the log to followers, and tell followers when entries are safe to apply. This is log replication — the second core mechanism in Raft, and the one that actually moves data.
Each entry in the Raft log has three fields:
| Field | Type | Purpose |
|---|---|---|
| Index | Integer (1, 2, 3, ...) | The position in the log. Monotonically increasing. |
| Term | Integer | The term of the leader that created this entry. Used to detect inconsistencies. |
| Command | Bytes | The actual operation (e.g., "SET x=42"). Opaque to the consensus layer. |
The combination of (index, term) uniquely identifies a log entry across the entire cluster. Two entries with the same index and term are guaranteed to contain the same command. This is the Log Matching Property.
The leader sends AppendEntries RPCs to each follower. This is the workhorse of Raft — it handles both replication and heartbeats (an AppendEntries with no new entries is a heartbeat). Here is what the message contains:
| Field | Purpose |
|---|---|
| term | Leader's current term. Followers reject messages from stale leaders. |
| leaderId | So followers know who the leader is (for redirecting clients). |
| prevLogIndex | Index of the entry immediately before the new ones. The consistency check anchor. |
| prevLogTerm | Term of the entry at prevLogIndex. The consistency check value. |
| entries[] | New entries to append (empty for heartbeats). |
| leaderCommit | Leader's commit index. Tells followers how far they can safely apply. |
When a follower receives AppendEntries, it checks: "Do I have an entry at prevLogIndex with term prevLogTerm?" If yes, the follower's log is consistent with the leader's up to that point, and it can safely append the new entries. If no, the follower rejects the request.
When the leader gets a rejection, it decrements prevLogIndex and retries. It keeps backing up until it finds a point where the logs agree, then it overwrites the follower's log from that point forward. This is how Raft repairs inconsistent logs after crashes and leader changes.
An entry is committed once the leader has replicated it to a majority of nodes. The leader tracks a commit index — the highest log index known to be committed. It advances the commit index each time a new majority is reached, and piggybacks the commit index on every AppendEntries message. Followers apply committed entries to their state machines.
There is a subtle safety rule: a leader can only commit entries from its own term. It cannot commit entries from previous terms by counting replicas alone. Why? Because an entry from a previous term might have been replicated to a majority, but that majority might not overlap with the majority that elected the current leader. The current leader commits previous-term entries indirectly, by committing a new entry from its current term (which then implicitly commits all preceding entries).
A leader and 2 followers. Click "Send Command" to add a new entry. Watch the AppendEntries flow: leader proposes, followers check consistency, leader commits when majority agrees. Click "Crash Follower" to introduce a lagging node, then watch the leader repair it.
python def handle_append_entries(self, leader_term, prev_idx, prev_term, entries, leader_commit): # Rule: reject if leader's term is stale if leader_term < self.current_term: return False, self.current_term # Become follower if leader's term is newer if leader_term > self.current_term: self.current_term = leader_term self.state = "follower" self.voted_for = None # Reset election timer (leader is alive) self.reset_election_timer() # Consistency check: do I have the expected entry? if prev_idx > 0: if prev_idx >= len(self.log): return False, self.current_term # My log is too short if self.log[prev_idx].term != prev_term: return False, self.current_term # Term mismatch # Append new entries (overwrite conflicts) idx = prev_idx + 1 for entry in entries: if idx < len(self.log): if self.log[idx].term != entry.term: self.log = self.log[:idx] # Truncate conflicting tail self.log.append(entry) # else: already have this entry, skip else: self.log.append(entry) idx += 1 # Update commit index if leader_commit > self.commit_index: self.commit_index = min(leader_commit, len(self.log) - 1) return True, self.current_term
Let us trace through a concrete scenario. We have a 3-node cluster (Leader, F1, F2). The leader has committed entries 1-3. Then the leader crashes, F1 becomes the new leader, and adds entry 4. Meanwhile, F2 was down and missed entry 3. Here is the state:
| Node | Log | Status |
|---|---|---|
| Old Leader | [1, 2, 3] | Dead |
| F1 (new leader, term 2) | [1, 2, 3, 4] | Leader |
| F2 | [1, 2] | Follower, missed entries 3-4 |
F1 sends AppendEntries to F2 with prevLogIndex=3, prevLogTerm=1, entries=[entry 4]. F2 checks: "Do I have an entry at index 3?" No — F2's log only has entries 1-2. F2 rejects.
F1 backs up: sends AppendEntries with prevLogIndex=2, prevLogTerm=1, entries=[entry 3, entry 4]. F2 checks: "Do I have entry at index 2 with term 1?" Yes! F2 accepts, appends entries 3 and 4. Now F2's log matches F1's log. The repair is complete.
nextIndex[i] is the index of the next entry to send to follower i (initially = leader's last log index + 1). matchIndex[i] is the highest index known to be replicated on follower i (initially = 0). When a follower rejects, the leader decrements nextIndex[i] and retries. When a follower accepts, the leader updates matchIndex[i]. The commit index advances when a majority of matchIndex values reach a given index.The log grows forever — every command ever executed is stored. Eventually it will consume all available disk space. The solution is snapshotting: periodically, a node serializes its current state machine state to disk, then discards all log entries up to the snapshot point. If a lagging follower needs entries that have been discarded, the leader sends the snapshot instead of individual entries via an InstallSnapshot RPC.
python class RaftNodeWithSnapshots: def __init__(self): self.log = [None] # 1-indexed self.snapshot = None # Serialized state machine state self.snapshot_index = 0 # Last index included in snapshot self.snapshot_term = 0 # Term of last snapshot entry self.state_machine = KVStateMachine() def take_snapshot(self): """Compact the log by snapshotting current state.""" self.snapshot = self.state_machine.serialize() self.snapshot_index = self.state_machine.last_applied self.snapshot_term = self.log[self.snapshot_index].term # Discard log entries up to snapshot self.log = [None] + self.log[self.snapshot_index + 1:] # Log now starts at snapshot_index + 1 def handle_install_snapshot(self, leader_term, snapshot_data, last_included_index, last_included_term): """Follower receives a snapshot from the leader.""" if leader_term < self.current_term: return False # Replace state machine with snapshot self.state_machine = KVStateMachine.deserialize(snapshot_data) self.snapshot = snapshot_data self.snapshot_index = last_included_index self.snapshot_term = last_included_term # Discard entire log (snapshot is more recent) self.log = [None] self.state_machine.last_applied = last_included_index return True
We have been talking about "replicas agreeing" — but what exactly does "agree" mean from a client's perspective? If I write a value and then immediately read it, will I see my own write? If someone else reads it a millisecond later, will they see it too? The answers depend on the consistency model the system provides.
A consistency model is a contract between the distributed system and its clients. It defines which read results are "legal" given the writes that have occurred. Stronger models are easier for programmers to reason about but more expensive to implement. Weaker models allow more flexibility (and higher performance) but force programmers to handle strange behaviors.
There are four main consistency models you need to know, ordered from strongest to weakest:
| Model | Promise | Cost | Example Systems |
|---|---|---|---|
| Linearizability | Every operation appears to take effect at a single instant between its invocation and response. Real-time ordering is preserved. | Highest. Requires consensus on every operation. | etcd, ZooKeeper, Spanner |
| Sequential Consistency | All nodes see the same order of operations, and each client's operations appear in the order it submitted them. But this order need not match real time. | High. Total order, but no real-time constraint. | Zookeeper reads (within a session) |
| Causal Consistency | If operation A causally precedes B (A happened-before B), then everyone sees A before B. Concurrent operations can be seen in any order. | Medium. Must track causal dependencies. | MongoDB (with read concern "majority"), COPS |
| Eventual Consistency | If no new writes occur, all replicas will eventually converge. No ordering guarantee. | Lowest. No coordination needed for reads. | DynamoDB, Cassandra, DNS |
Linearizability means the system behaves as if there is a single copy of the data, and every operation is atomic. Formally: you can assign a "linearization point" to each operation (a single instant between invocation and response) such that the results are consistent with a sequential execution in that order.
The practical consequence: once a write completes and the client gets an acknowledgment, ALL subsequent reads (by ANY client, on ANY node) must see that write or a later one. You can never go back to seeing a stale value.
Sequential consistency is weaker than linearizability in one key way: it does not require the global order to match real time. If client A writes x=1, and later (in wall-clock time) client B writes x=2, a sequentially consistent system could order B's write before A's — as long as all nodes see the same order, and each individual client's operations appear in the order submitted.
Causal consistency only orders operations that are causally related. If I post a message and you reply to it, everyone must see my message before your reply (causal relationship). But if two users independently post messages with no causal link, different nodes can show them in different orders.
Causal consistency is attractive because it is the strongest consistency model that does not require consensus for every operation. You can implement it with vector clocks and dependency tracking, without a single leader. This makes it much faster than linearizability in geo-distributed systems.
Eventual consistency says: if you stop writing, all replicas will eventually converge to the same value. That is the entire guarantee. While writes are ongoing, different clients can see different values. Reads can go backward. Two clients reading at the same time can get different results. It is the wild west — but it is fast and available.
Two clients, three replicas. Click "Write" to issue a write from Client 1, then "Read" to read from Client 2. Toggle the consistency model to see which read results are legal under each model.
Let us trace through a concrete scenario. Client A writes x=1 at time T=0, gets ACK at T=5. Client B reads x at T=6. What are the legal read results under each model?
| Model | Legal Results for B's Read | Why |
|---|---|---|
| Linearizable | x=1 (or later write) | A's write completed at T=5. B's read started at T=6. Real-time ordering requires B to see A's write. |
| Sequential | x=0 OR x=1 | Sequential consistency does not require real-time ordering. The system can order B's read before A's write, as long as all observers see the same order. |
| Causal | x=0 OR x=1 | A's write and B's read are not causally related (B did not know about A's write). So they are concurrent and can be ordered either way. |
| Eventual | x=0 OR x=1 OR any old value | No ordering guarantee at all. B might read from a stale replica that has not received A's write yet. Eventually it will converge. |
Now change the scenario: Client A writes x=1, gets ACK, then tells Client B "I just wrote x=1, please read it." This creates a causal dependency. Under causal consistency, B must now see x=1 (because B's read causally depends on A's write via the out-of-band communication). Under eventual consistency, B can still see x=0.
| Use Case | Required Model | Why |
|---|---|---|
| Leader election / distributed locks | Linearizability | If two nodes both think they hold the lock, catastrophic conflicts ensue. |
| Unique constraint enforcement | Linearizability | "Only one user can claim username 'alice'" requires real-time ordering. |
| Social media feed | Causal | Replies must appear after their parent posts, but unrelated posts can be reordered. |
| Shopping cart | Eventual | Temporary inconsistency is tolerable. Merge conflicts are resolvable. |
| DNS records | Eventual | Propagation delay of minutes is acceptable. No need for consensus. |
| Bank account balance | Linearizability | Double-spending must be impossible. Must see latest balance before allowing withdrawal. |
python from dataclasses import dataclass from itertools import permutations @dataclass class Op: client: str op_type: str # "write" or "read" value: int # value written or read start: float # invocation time end: float # response time def is_linearizable(ops): """Brute-force linearizability check (NP-hard in general).""" # Try every possible ordering of operations for perm in permutations(ops): if _valid_sequential(perm) and _respects_realtime(perm): return True return False def _valid_sequential(perm): """Check if this ordering produces valid read results.""" register = None for op in perm: if op.op_type == "write": register = op.value elif op.op_type == "read": if op.value != register: return False # Read got wrong value return True def _respects_realtime(perm): """Check that ordering preserves real-time precedence.""" for i in range(len(perm)): for j in range(i + 1, len(perm)): # If perm[j] ended before perm[i] started in real time, # then perm[j] must come before perm[i] in the ordering. if perm[j].end < perm[i].start: return False return True # Example: is this linearizable? history = [ Op("A", "write", 1, 0.0, 0.5), Op("B", "read", 1, 0.3, 0.8), Op("A", "write", 2, 0.6, 1.0), Op("B", "read", 2, 0.9, 1.3), ] print(is_linearizable(history)) # True: order W(1), R(1), W(2), R(2) works
Raft is not the only way to build a replicated system with strong consistency. There is an alternative that is conceptually simpler, has better throughput under certain workloads, and is used in production at Microsoft (Azure Storage) and Amazon (EBS, S3 internal components). It is called chain replication.
The core idea: instead of a leader broadcasting to all followers in parallel (Raft's approach), arrange the replicas in a chain. Writes enter at the head and flow through each node in sequence until they reach the tail. Reads are served exclusively from the tail. When the tail has a write, every node before it also has that write — so the tail always has the most committed state.
| Property | Raft | Chain Replication |
|---|---|---|
| Write path | Leader → all followers in parallel | Head → Node 2 → ... → Tail (sequential) |
| Write latency | 1 round trip (leader waits for majority) | N-1 sequential hops (higher latency for N > 2) |
| Write throughput | Leader bottlenecks on outbound bandwidth | Each node only sends to one successor (better bandwidth distribution) |
| Read path | Leader only (for linearizability) or any node (for weaker models) | Tail only. Tail does no write forwarding — dedicated read server. |
| Read throughput | Leader handles both reads and writes | Tail handles only reads — better read throughput |
| Failure handling | Built-in leader election via Raft protocol | Requires an external configuration manager (e.g., ZooKeeper, etcd) |
| Consistency | Linearizable (with read-from-leader) | Linearizable by construction (reads from tail see all committed writes) |
When a node in the chain fails, the chain must be repaired:
| Failed Node | Repair Action | Complexity |
|---|---|---|
| Head | Successor becomes new head. In-flight writes that the head received but did not forward are lost (client retries). | Simple |
| Middle node | Predecessor links directly to successor. In-flight writes at the failed node must be tracked and re-sent. | Moderate |
| Tail | Predecessor becomes new tail. Uncommitted writes that were in transit to the tail are now committed when the predecessor becomes tail. | Simple |
The critical point: chain replication does not handle reconfiguration itself. It relies on an external configuration manager — typically a Raft or Paxos-based system like ZooKeeper — to detect failures, decide the new chain membership, and notify all nodes. This is why chain replication and Raft are complementary, not competing: you use Raft for the small, critical metadata service, and chain replication for the large, high-throughput data path.
CRAQ (2009, Jeff Terrace and Michael Freedman at Princeton) extends chain replication to allow reads from any node, not just the tail. Each node stores two versions of each object: "clean" (committed) and "dirty" (received but not yet confirmed by the tail). When a non-tail node receives a read and has only a clean version, it serves it directly. If it has a dirty version, it asks the tail for the latest committed version number and serves accordingly.
CRAQ dramatically improves read throughput because reads are distributed across all N nodes instead of concentrated at the tail. The consistency guarantee remains linearizable.
Consider a cluster of 5 nodes, each with 1 Gbps network bandwidth. A client writes 1 MB objects.
| Metric | Raft (Leader-Based) | Chain Replication |
|---|---|---|
| Leader/Head outbound | 4 Gbps (sends 1 MB to each of 4 followers) | 1 Gbps (sends 1 MB to one successor) |
| Max write throughput | ~250 writes/sec (leader bandwidth limited: 1 Gbps / 4 = 250 MB/s) | ~1000 writes/sec (each node forwards to one successor, pipeline fills) |
| Write latency | ~1 RTT (parallel replication, wait for majority) | ~4 RTTs (sequential: head → 2 → 3 → 4 → tail) |
| Read throughput | Limited by leader (same node handles reads + writes) | Tail is dedicated to reads (no write forwarding overhead) |
The key insight: Raft's leader is a bandwidth bottleneck because it sends every entry to every follower. The leader's outbound bandwidth scales as O(N) where N is the cluster size. Chain replication's head sends each entry to only one successor — O(1) outbound per node. For large objects (like the 256 KB extent blocks in Azure Storage), this difference is enormous.
Microsoft Azure Storage uses a variant of chain replication called extent-based replication for its blob, table, and queue services. Each storage extent (a contiguous chunk of data, typically 1-3 GB) is replicated across 3 nodes in a chain. The configuration manager (a Paxos-based service called the "Stream Manager") handles chain membership and failure detection.
Azure Storage processes hundreds of billions of transactions per day across millions of chains. Each chain independently replicates its data without coordinating with other chains. This design gives them both strong consistency (per-chain) and massive scalability (millions of independent chains).
A 4-node chain. Writes enter at the head (left) and flow to the tail (right). Reads are served from the tail. Click "Write" to send a write, "Read" to read from the tail. Click "Kill Node" to crash a random middle node and watch the chain repair.
python class ChainNode: def __init__(self, node_id, successor=None, is_tail=False): self.id = node_id self.store = {} # key -> value self.pending = {} # key -> value (dirty, not yet confirmed) self.successor = successor self.is_tail = is_tail def handle_write(self, key, value, client_callback=None): """Process a write. Forward to successor or commit if tail.""" if self.is_tail: # Tail: commit and ACK client self.store[key] = value if client_callback: client_callback("OK") else: # Non-tail: store as pending, forward to successor self.pending[key] = value self.store[key] = value # optimistic local update self.successor.handle_write(key, value, client_callback) def handle_read(self, key): """Only the tail serves reads in basic chain replication.""" assert self.is_tail, "Reads must go to the tail!" return self.store.get(key, None) # Build a 4-node chain tail = ChainNode("tail", is_tail=True) n3 = ChainNode("n3", successor=tail) n2 = ChainNode("n2", successor=n3) head = ChainNode("head", successor=n2) # Write flows: head -> n2 -> n3 -> tail head.handle_write("x", 42, lambda r: print(f"Client got: {r}")) # Read from tail print(tail.handle_read("x")) # 42
Time to put everything together. Below is a full 5-node Raft cluster simulation. You can send client requests, kill nodes, create network partitions, and watch the system respond in real time. This is Raft in action: leader election, log replication, commit advancement, and failure recovery.
A live 5-node Raft cluster. Nodes show their state (L=Leader, F=Follower, C=Candidate, X=Dead). The log entries are shown as colored blocks below each node. Committed entries are solid; uncommitted entries are striped.
Compare: a 4-node chain. Writes flow left-to-right through the chain. Reads come from the tail (rightmost). Notice the sequential flow vs Raft's parallel broadcast.
| Experiment | What to Watch | Key Insight |
|---|---|---|
| Send 5 requests | Log entries appear on leader first, then replicate to followers | Leader is the source of truth for ordering |
| Kill 1 follower, send requests | Requests still commit (4 alive, majority = 3) | Cluster tolerates minority failures |
| Kill leader | Election timeout fires on a follower, new leader elected | Automatic failover without human intervention |
| Kill 3 nodes | Remaining 2 cannot form majority — cluster stalls | Safety over liveness: better to stop than to split-brain |
| Partition: {0,1} vs {2,3,4} | Majority side (3 nodes) elects leader and continues. Minority side stalls. | Quorum prevents split-brain during partitions |
| Heal partition | Minority nodes catch up from new leader | Raft's log repair mechanism fixes divergent logs |
After experimenting with both simulations above, here is the decision framework:
| Choose Raft When | Choose Chain Replication When |
|---|---|
| You need the coordination service itself (metadata, locks, config) | You need a high-throughput data store with strong consistency |
| Write latency matters more than write throughput | Write throughput matters more than write latency |
| The dataset is small (megabytes) | The dataset is large (gigabytes to terabytes) |
| You want a self-contained system (no external dependencies) | You already have a Raft/Paxos service for configuration |
| You need leader-based reads (strong consistency from any node) | You want to separate read and write workloads |
| Examples: etcd, ZooKeeper, Consul | Examples: Azure Storage, Amazon EBS, HDFS (with modifications) |
To ground your understanding, here are real-world performance numbers from production consensus systems:
| System | Write Latency (p50) | Write Latency (p99) | Throughput | Notes |
|---|---|---|---|---|
| etcd 3.5 | 2-5 ms | 10-25 ms | 15-25K writes/sec | 3-node cluster, SSD, same-region |
| ZooKeeper 3.8 | 2-4 ms | 8-20 ms | 10-20K writes/sec | 3-node cluster, SSD |
| CockroachDB | 3-8 ms | 20-50 ms | 30-50K writes/sec (per range) | Multi-Raft, per-range latency |
| TiKV | 1-3 ms | 5-15 ms | 100K+ writes/sec (aggregate) | Multi-Raft, hundreds of ranges |
| Spanner | 5-10 ms (same region) | 50-200 ms (cross-region) | Varies | TrueTime + Paxos. Cross-region commits are slow. |
Notice the pattern: all consensus systems have write latencies in the low-millisecond range within a single region (dominated by fsync + network RTT) and tens-to-hundreds of milliseconds cross-region (dominated by speed-of-light latency). This is the fundamental performance ceiling of consensus — you cannot beat the speed of light and the cost of durable writes.
Now you understand the theory: state machine replication, leader election, log replication, consistency models, chain replication. But where does consensus actually show up in production systems? And equally important: when should you NOT use consensus?
| System | Protocol | Language | Used By | Key Features |
|---|---|---|---|---|
| etcd | Raft | Go | Kubernetes (control plane), CoreDNS | Simple key-value API, watch streams, leases for TTL, linearizable reads |
| ZooKeeper | ZAB (Zookeeper Atomic Broadcast) | Java | Kafka (metadata), Hadoop, HBase, Solr | Hierarchical namespace (like a filesystem), ephemeral nodes, watches, sequential znodes |
| Consul | Raft | Go | HashiCorp ecosystem, service mesh | Service discovery, health checking, KV store, prepared queries |
Consensus is expensive. Every write must be replicated to a majority before it is committed. Latency is at least one network round trip (more for cross-region). Throughput is limited by the leader's capacity. You should use consensus for small, critical data — not for bulk storage.
| Good Uses (Small, Critical) | Bad Uses (Large, High-Throughput) |
|---|---|
| Cluster membership (which nodes are alive) | User-generated content |
| Configuration (feature flags, routing rules) | Logs and metrics |
| Leader election (who is the primary for shard X) | Session data |
| Distributed locks (only one worker processes job Y) | Shopping carts |
| Schemas and metadata (what tables exist) | Time-series data |
| Epoch numbers (fencing tokens for preventing split-brain) | Blob/object storage |
Every Kubernetes cluster runs etcd as its single source of truth. When you run kubectl apply -f deployment.yaml, the API server writes the deployment spec to etcd. The scheduler watches etcd for unscheduled pods. The kubelet watches for pods assigned to its node. All state transitions go through etcd.
etcd stores the entire Kubernetes cluster state: pods, services, config maps, secrets, RBAC rules, custom resources. A typical production cluster's etcd holds 2-8 GB of data. This small size is by design — consensus does not scale to terabytes.
python # Using etcd via Python client (etcd3 library) import etcd3 client = etcd3.client(host="localhost", port=2379) # Write (goes through Raft consensus) client.put("/services/web/leader", "node-42") # Linearizable read (goes through leader) value, metadata = client.get("/services/web/leader") print(value) # b'node-42' # Watch for changes (streaming) events_iterator, cancel = client.watch("/services/web/", prefix=True) for event in events_iterator: print(f"Key: {event.key}, Value: {event.value}") # Distributed lock using lease (TTL-based) lease = client.lease(ttl=10) # 10-second lease # If this process crashes, the lock auto-releases in 10s client.put("/locks/job-processor", "me", lease=lease)
Understanding etcd's performance envelope is critical for production deployments. Here are the numbers you need to know:
| Metric | Typical Value | Notes |
|---|---|---|
| Write latency (same region) | 2-10 ms | Consensus + fsync. Dominated by disk latency. |
| Write latency (cross-region) | 50-200 ms | Network RTT between data centers dominates. |
| Read latency (linearizable) | 1-5 ms | Must go through leader. ReadIndex optimization avoids full log append. |
| Read latency (serializable) | < 1 ms | Served from any node's local state. May be stale. |
| Write throughput | 10,000-30,000 ops/sec | Depends on value size and hardware. SSD required. |
| Max recommended DB size | 8 GB | Larger sizes cause slow snapshots and long recovery. |
| Max value size | 1.5 MB | Hard limit. etcd is for small metadata, not blobs. |
Raft guarantees linearizable writes by default (they go through the leader and are committed to a majority). But reads are trickier. Here are three approaches, from safest to fastest:
| Approach | How It Works | Latency | Correctness |
|---|---|---|---|
| Read through log | Treat reads as log entries. Replicate to majority before returning. | Same as write (~5ms) | Fully linearizable. Overkill for reads. |
| ReadIndex | Leader records current commit index, confirms it is still leader (heartbeat round), then serves read from that index. | ~1 heartbeat RTT | Linearizable. etcd's default. |
| Lease-based | Leader holds a time-based lease. Serves reads locally without confirmation as long as lease is valid. | Local read (~0.1ms) | Linearizable IF clocks are well-synchronized. Breaks under clock skew. |
etcd uses ReadIndex by default. When a client issues a linearizable read, the leader (1) records the current commit index, (2) sends a heartbeat to confirm it is still the leader, and (3) waits for its state machine to reach the recorded commit index before serving the read. This avoids the cost of replicating reads through the log while still guaranteeing linearizability.
ZooKeeper predates etcd and Consul. It uses a hierarchical namespace (like a filesystem) where each node is called a znode. Two special znode types power most use cases:
| Znode Type | Behavior | Use Case |
|---|---|---|
| Ephemeral | Auto-deleted when the session that created it ends (client disconnects or crashes). | Service discovery: each server creates an ephemeral znode. When it dies, the znode disappears. Watchers are notified. |
| Sequential | ZooKeeper appends a monotonically increasing counter to the name. E.g., "/locks/lock-0000000042". | Distributed locks: create a sequential ephemeral znode under /locks/. The client with the lowest sequence number holds the lock. |
The ZooKeeper distributed lock recipe uses sequential ephemeral znodes. Here is how it works:
python from kazoo.client import KazooClient zk = KazooClient(hosts="localhost:2181") zk.start() # Acquire a distributed lock lock = zk.Lock("/locks/my-resource", "worker-42") with lock: # This block runs with exclusive access. # If this process crashes, the ephemeral znode # auto-deletes, and the next waiter gets the lock. print("I hold the lock!") do_critical_work() # Lock is automatically released when exiting the 'with' block. # Under the hood: ZK deletes the ephemeral sequential znode.
Consul by HashiCorp combines a Raft-based KV store with built-in service discovery and health checking. Unlike etcd (which is a pure KV store) or ZooKeeper (which is a hierarchical namespace), Consul is purpose-built for microservice coordination.
| Feature | etcd | ZooKeeper | Consul |
|---|---|---|---|
| Data model | Flat key-value | Hierarchical (tree) | Key-value + service catalog |
| Service discovery | Manual (store endpoints as values) | Ephemeral znodes | Built-in with DNS and HTTP API |
| Health checking | None (use external tool) | Session heartbeats | Built-in (HTTP, TCP, script, gRPC) |
| Multi-datacenter | Single cluster | Single cluster | WAN federation between clusters |
| Language | Go | Java | Go |
| Protocol | Raft | ZAB | Raft (KV) + Serf/gossip (membership) |
Consensus is the wrong tool when:
| Situation | Why Consensus is Wrong | Better Alternative |
|---|---|---|
| High-throughput data path | Consensus adds latency (cross-region: 100ms+). Every write waits for majority. | Sharded database with per-shard consensus (CockroachDB, TiKV) |
| Temporary or lossy data | Consensus is durable by design. No point making session data durable. | Redis, Memcached (in-memory, no consensus) |
| Cross-region with low latency | Consensus across regions means 100-300ms write latency. | Eventual consistency (DynamoDB global tables) or causal consistency (MongoDB) |
| More than ~50 nodes | Consensus performance degrades with cluster size (more messages per commit). | Gossip protocols (Serf, Memberlist) for membership, consensus for metadata only |
This chapter is your cheat sheet for system design interviews. It covers the comparison table every interviewer expects, the coding drills, and the design questions that come up repeatedly.
| Raft | Paxos | ZAB | Chain Replication | |
|---|---|---|---|---|
| Year | 2014 | 1989 (published 1998) | 2011 | 2004 |
| Leader | Strong leader. All writes through leader. | Proposer (not a fixed leader). Any node can propose. | Strong leader (primary). Similar to Raft. | Head (writes). Tail (reads). Not a "leader" in the Raft sense. |
| Election | Randomized timeout + majority vote | Prepare/Promise phase (can compete) | Prospective leader sends CEPOCH, gets ACK-E from majority | External config manager (ZooKeeper/etcd) |
| Log | Append-only log. Leader's log is authoritative. | Per-slot consensus. Holes possible. | Transaction log. Similar to Raft. | Implicit in forward chain. No explicit log. |
| Commit rule | Majority of nodes have the entry (current term only) | Majority accepted the proposal for that slot | Majority ACK. Leader broadcasts COMMIT. | Tail has the entry (all preceding nodes also have it) |
| Complexity | Simple. 3 RPCs total. | Notoriously complex. Multi-Paxos papers vary. | Moderate. ZooKeeper-specific. | Simple replication. Complex failure recovery. |
| Used in | etcd, Consul, TiKV, CockroachDB | Google Chubby, Spanner (modified) | Apache ZooKeeper only | Azure Storage, Amazon EBS (chain variant) |
| Term | One-Sentence Definition |
|---|---|
| Consensus | Multiple nodes agreeing on a single value despite failures. |
| SMR | Replicate a deterministic state machine by replicating its input log. |
| Quorum | A majority of nodes (N/2 + 1) needed for any decision. |
| Term/Epoch | A logical time period with at most one leader. |
| Split-brain | Two leaders in the same cluster accepting conflicting writes. |
| Linearizability | System behaves as if there is one copy; operations are atomic and real-time ordered. |
| Eventual consistency | All replicas converge eventually, but no ordering guarantee during writes. |
| Causal consistency | Causally related operations are ordered; concurrent operations are not. |
| FLP impossibility | Consensus is impossible in a purely asynchronous system with even one crash. |
| CAP theorem | During a partition, choose Consistency or Availability (not both). |
| Fencing token | Monotonically increasing number attached to a lock to prevent stale holders from doing damage. |
Q: How does Raft guarantee that a new leader has all committed entries?
A: The "log up-to-date" voting rule. A follower refuses to vote for a candidate whose log is less up-to-date (lower last term, or same last term but shorter log). Since committed entries are on a majority, and the new leader must get votes from a majority, the intersection contains at least one node with every committed entry. The candidate must be at least as up-to-date as that node.
Q: What happens if the Raft leader commits an entry and then crashes before broadcasting the commit?
A: The entry is on a majority of nodes. The new leader will have it (by the voting rule above). The new leader will re-replicate it to any followers that are missing it, and then commit it as part of its own term. No committed entry is ever lost.
Q: Design a distributed lock service.
A: Use a consensus-based system (etcd, ZooKeeper). Store locks as keys. Attach a lease/TTL so locks auto-release if the holder crashes. Include a fencing token (monotonically increasing number) with each lock grant. The token is checked by the resource being protected — if the resource sees a stale token, it rejects the operation. This prevents the "process pauses during GC, lock expires, another process acquires, first process resumes and does damage" problem.
An animated timeline showing Raft terms, elections, and leader tenures. Click "Next Event" to step through a sequence of elections, crashes, and recoveries.
python class RaftNode: def __init__(self, node_id, total_nodes): self.id = node_id self.current_term = 0 self.voted_for = None # Persisted: who did I vote for this term? self.log = [None] # 1-indexed, log[0] is a sentinel self.state = "follower" self.total_nodes = total_nodes def last_log_term(self): return self.log[-1].term if len(self.log) > 1 else 0 def last_log_index(self): return len(self.log) - 1 def handle_request_vote(self, candidate_term, candidate_id, candidate_last_log_idx, candidate_last_log_term): """Handle a RequestVote RPC. Return (term, vote_granted).""" # Rule 0: If candidate's term < mine, reject if candidate_term < self.current_term: return self.current_term, False # If candidate's term > mine, update my term, reset vote if candidate_term > self.current_term: self.current_term = candidate_term self.voted_for = None self.state = "follower" # Rule 1: One vote per term if self.voted_for is not None and self.voted_for != candidate_id: return self.current_term, False # Rule 2: Log up-to-date check my_last_term = self.last_log_term() my_last_idx = self.last_log_index() log_ok = (candidate_last_log_term > my_last_term or (candidate_last_log_term == my_last_term and candidate_last_log_idx >= my_last_idx)) if not log_ok: return self.current_term, False # Grant vote self.voted_for = candidate_id return self.current_term, True
python import threading import time class RaftLeader: def __init__(self, peers, heartbeat_interval=0.1): self.peers = peers # List of follower addresses self.interval = heartbeat_interval self.running = False self.next_index = {} # peer -> next log index to send self.match_index = {} # peer -> highest replicated index self.log = [None] self.commit_index = 0 for p in peers: self.next_index[p] = len(self.log) self.match_index[p] = 0 def start_heartbeats(self): self.running = True self._thread = threading.Thread(target=self._heartbeat_loop) self._thread.daemon = True self._thread.start() def _heartbeat_loop(self): while self.running: for peer in self.peers: self._send_append_entries(peer) time.sleep(self.interval) def _send_append_entries(self, peer): ni = self.next_index[peer] prev_idx = ni - 1 prev_term = self.log[prev_idx].term if prev_idx > 0 else 0 entries = self.log[ni:] # Entries to send (empty = heartbeat) # Send RPC (simplified — real impl uses async I/O) success = self._rpc(peer, prev_idx, prev_term, entries) if success: self.next_index[peer] = len(self.log) self.match_index[peer] = len(self.log) - 1 self._try_advance_commit() else: # Back up and retry next heartbeat self.next_index[peer] = max(1, self.next_index[peer] - 1) def _try_advance_commit(self): """Advance commit index if a majority has replicated.""" for n in range(len(self.log) - 1, self.commit_index, -1): # Count nodes that have entry n (including self) count = 1 # self for p in self.peers: if self.match_index[p] >= n: count += 1 if count > (len(self.peers) + 1) / 2: self.commit_index = n break
Interviewers love to ask "prove that Raft is safe" or "why can't committed entries be lost?" Here is the argument in three steps:
Step 1: Leader Completeness. If an entry is committed in term T, then every leader in terms T+1, T+2, ... has that entry in its log. Why? Because the entry is committed = replicated to a majority. The new leader got votes from a majority. These two majorities overlap (share at least one node). The shared node has the committed entry. The voting rule ensures the new leader is at least as up-to-date as this node. Therefore the new leader has the entry.
Step 2: Log Matching. If two logs have an entry with the same index and term, they are identical up to that point. This is maintained inductively by the AppendEntries consistency check.
Step 3: State Machine Safety. If a server applies entry at index I to its state machine, no other server will ever apply a different entry at index I. This follows from Leader Completeness + Log Matching: the entry at index I was committed, so every future leader has it, so it can never be overwritten.
How do you add or remove nodes from a running Raft cluster without downtime? This is one of the hardest problems in consensus. The original Raft paper proposed joint consensus: a transitional configuration where both the old and new membership must agree. But this is complex to implement.
In practice, most Raft implementations (including etcd) use a simpler approach: single-server changes. You add or remove one server at a time. The key insight: when you change the cluster by exactly one node, the old majority and the new majority always overlap. This means there is no point in time where two independent majorities can exist.
| Change | Old Config | New Config | Old Majority | New Majority | Overlap? |
|---|---|---|---|---|---|
| Add 1 node (3→4) | {A,B,C} | {A,B,C,D} | 2 | 3 | Yes (always) |
| Remove 1 node (5→4) | {A,B,C,D,E} | {A,B,C,D} | 3 | 3 | Yes (always) |
| Add 2 at once (3→5) | {A,B,C} | {A,B,C,D,E} | 2 | 3 | Not guaranteed! |
Interviewers sometimes ask "how would you make Raft faster?" Here are the standard optimizations:
| Optimization | What It Does | Speedup |
|---|---|---|
| Batching | Accumulate multiple client requests and replicate them as a single AppendEntries batch. | 10-100x throughput |
| Pipelining | Send the next AppendEntries before the previous one is ACKed. Use the nextIndex to track what each follower has. | 2-5x throughput |
| ReadIndex | Serve reads after one heartbeat round instead of replicating through the log. | 2-3x read throughput |
| Lease-based reads | Leader serves reads locally during its lease period (no heartbeat round). | 10x read latency (but clock-dependent) |
| Parallel apply | Apply committed entries to the state machine in parallel with replicating the next batch. | ~2x throughput |
| Witness replicas | Some replicas participate in voting but do not store the full log — just enough to vote correctly. | Reduced storage cost |
| Question | Key Points |
|---|---|
| Design a distributed configuration service | Raft-based, small dataset, watch API for push-based updates, leases for TTL, consistent reads from leader |
| Design a distributed lock service | Consensus for lock state, leases for automatic release, fencing tokens for safety, leader reads for linearizability |
| Design the Kubernetes control plane | etcd for state, API server as the only etcd client, controllers watch for changes, optimistic concurrency with resource versions |
| When would you use chain replication? | Read-heavy workloads, need strong consistency, acceptable write latency. Azure Storage uses it. Need external config manager (Raft/Paxos) for reconfiguration. |
| How does CockroachDB use Raft? | One Raft group per range (64MB chunk). Each range independently elects leaders. Multi-range transactions use 2PC across ranges, Raft within each range. |
| Statement | Answer |
|---|---|
| A 4-node Raft cluster tolerates 2 failures. | False. 4 nodes need 3 for majority. Can only tolerate 1 failure (same as 3 nodes). |
| Paxos requires a fixed leader. | False. Any node can propose. Multi-Paxos optimizes by having a stable leader, but it is not required. |
| Chain replication provides linearizability without consensus. | True for the data path. False for reconfiguration (needs external consensus for chain membership). |
| Eventual consistency means data will be lost. | False. "Eventual" means replicas will converge. Data is durable — just temporarily inconsistent across replicas. |
| Raft can handle Byzantine faults. | False. Raft assumes crash-fault only. Byzantine faults require BFT algorithms (PBFT, Tendermint). |
| ZooKeeper's reads are linearizable by default. | False. ZooKeeper reads can be served by any node (sequentially consistent). For linearizable reads, use the sync command first. |
You now understand the core of distributed consensus: why replication needs coordination, how Raft structures that coordination with leader election and log replication, what consistency models define the client-visible guarantees, and how chain replication offers an alternative topology. Let us place this knowledge in context.
| Covered | Skipped (Future Lessons) |
|---|---|
| State machine replication | Paxos (the original, harder algorithm) |
| Raft leader election | Byzantine fault tolerance (BFT, used in blockchain) |
| Raft log replication | Raft membership changes (joint consensus) |
| Consistency models (4 levels) | Log compaction and snapshots |
| Chain replication | Multi-Raft (CockroachDB, TiKV architecture) |
| Practical coordination services | Conflict-free replicated data types (CRDTs) |
Understanding the historical progression helps you see why Raft exists and what problem each algorithm was solving:
| Year | Algorithm / Result | Contribution |
|---|---|---|
| 1978 | Lamport's State Machine Replication | The foundational idea: replicate a deterministic state machine by replicating its input log. |
| 1985 | FLP Impossibility | Proved consensus is impossible in purely asynchronous systems with even one crash. Every practical algorithm must assume partial synchrony. |
| 1989 | Paxos (Lamport) | First correct consensus algorithm. Notoriously difficult to understand and implement. The "parliament" metaphor confused generations of engineers. |
| 1998 | Paxos published (Lamport) | After being rejected by reviewers for years ("too whimsical"), the Paxos paper was finally published. Google later built Chubby on it. |
| 2004 | Chain Replication (van Renesse & Schneider) | An alternative to leader-based replication. Simpler, better throughput for large objects, but needs external reconfiguration. |
| 2007 | Paxos Made Live (Google) | Google's experience building Chubby on Paxos. Revealed the enormous gap between the algorithm and a production implementation. |
| 2011 | ZAB (Yahoo) | ZooKeeper's protocol. Designed to be easier than Paxos while providing the same guarantees. Adopted by the entire Hadoop ecosystem. |
| 2014 | Raft (Ongaro & Ousterhout) | Designed from scratch for understandability. The paper was accompanied by a user study showing students learned Raft faster than Paxos. Now the dominant consensus algorithm in industry. |
| 2016+ | Multi-Raft (CockroachDB, TiKV) | Run thousands of independent Raft groups, one per data shard. Scales consensus to large datasets by partitioning. |
Everything in this lesson assumes crash-fault tolerance: nodes either work correctly or stop. In the real world, there are situations where this is not enough:
| Domain | Fault Model | Algorithm Family |
|---|---|---|
| Cloud infrastructure (etcd, CockroachDB) | Crash-fault | Raft, Paxos, ZAB |
| Blockchain (Bitcoin, Ethereum) | Byzantine (any behavior, including malicious) | Nakamoto consensus, PBFT, Tendermint |
| Safety-critical (aviation, medical) | Byzantine (hardware faults, cosmic rays) | PBFT variants, agreement protocols |
| CDNs, DNS, caching | Crash-fault (but eventual consistency suffices) | Gossip, CRDTs, vector clocks |
Byzantine fault tolerance (BFT) requires 3f+1 nodes to tolerate f Byzantine faults, compared to 2f+1 for crash faults. The message complexity is O(n²) instead of O(n). This makes BFT 3-10x more expensive than crash-fault consensus, which is why cloud systems do not use it — they trust their own hardware.
| Lesson | Connection |
|---|---|
| Consistency & Consensus (DDIA Ch10) | Covers the same material from the DDIA perspective, including Paxos, 2PC, and the full linearizability proof. More depth on ordering guarantees and Lamport timestamps. |
| Transactions (DDIA Ch8) | How serializability relates to linearizability. Two-phase locking, optimistic concurrency, and SSI. |
| Sharding (DDIA Ch7) | How to split data across nodes. Each partition can run its own Raft group (the CockroachDB pattern). |
| Replication (DDIA Ch6) | Single-leader, multi-leader, and leaderless replication. The foundation for understanding why consensus is needed. |
How the pieces fit together. Raft provides total order broadcast, which enables state machine replication, which enables linearizable reads and writes. Chain replication is an alternative data path that relies on an external Raft/Paxos service for reconfiguration.
| Paper | Year | Why Read It |
|---|---|---|
| "In Search of an Understandable Consensus Algorithm" (Ongaro & Ousterhout) | 2014 | THE Raft paper. Clearly written, includes a formal specification, and accompanied by a user study. Read this first. |
| "Chain Replication for Supporting High Throughput and Availability" (van Renesse & Schneider) | 2004 | The original chain replication paper. Only 12 pages. Clean and practical. |
| "Paxos Made Simple" (Lamport) | 2001 | Lamport's attempt to simplify Paxos. At 14 pages, it is the most accessible Paxos description. |
| "Paxos Made Live" (Chandra, Griesemer, Redstone — Google) | 2007 | The reality of implementing Paxos in production. Eye-opening on the gap between theory and practice. |
| "Impossibility of Distributed Consensus with One Faulty Process" (Fischer, Lynch, Paterson) | 1985 | The FLP impossibility result. Foundational. Only 7 pages. |
| "Object Storage on CRAQ" (Terrace & Freedman) | 2009 | Chain Replication with Apportioned Queries. Reads from any node with linearizability. |
After this lesson, you should have a clear mental picture of how consensus fits into distributed systems:
This separation — control plane vs data plane, consensus vs replication, metadata vs data — is the organizing principle of every well-designed distributed system. Master it, and every distributed systems paper, blog post, and interview question will make sense.
After completing this lesson, test yourself. You should be able to:
| # | Skill | Test Yourself |
|---|---|---|
| 1 | Explain why replication needs consensus | Describe the split-brain problem in 2 sentences. |
| 2 | Explain State Machine Replication | Draw the two-layer architecture (consensus + state machine) on a whiteboard. |
| 3 | Walk through a Raft election | Trace the flow: timeout fires, increment term, send RequestVote, receive votes, become leader. |
| 4 | Walk through log replication | Trace AppendEntries: prevLogIndex check, append, commit when majority ACKs. |
| 5 | Rank consistency models | Given a scenario (bank, social media, DNS), choose the appropriate model and justify. |
| 6 | Compare Raft and chain replication | State when to use each (latency vs throughput, self-contained vs external config). |
| 7 | Design with consensus | Sketch a system using etcd/ZK for metadata + data stores for throughput. Identify what goes in consensus and what does not. |
| 8 | Prove Raft safety informally | The 5-step argument: committed on majority, new leader from majority, overlap, voting rule, append-only log. |
"A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable."
— Leslie Lamport