Distributed Systems

Consensus & Replication

Leader election, Raft, state machine replication, consistency models, and chain replication — how distributed nodes agree on the truth.

Prerequisites: Basic networking intuition + What a server is. That's it.
10
Chapters
11
Simulations
0
Assumed Knowledge

Chapter 0: The Problem

You are running a web application. All of your data lives on a single server. Users are happy. Latency is low. Life is good. Then, at 2:47 AM on a Saturday, the server's hard drive dies. Every user gets an error page. Every write from the last backup is gone. Your phone is buzzing with alerts.

The obvious fix: make copies. Run three servers instead of one. If one dies, the other two keep serving. This is replication — maintaining copies of the same data on multiple machines. It sounds simple. It is not.

The moment you have two copies, you have a new problem: the copies must agree. If a user writes "balance = $100" to server A, then reads from server B (which still says "balance = $200"), you have given the user wrong information. Replication without coordination is worse than useless — it gives the illusion of safety while introducing inconsistency.

And coordination itself is hard. How do the servers decide which copy is "right"? If server A goes down, which of the remaining servers takes over? What if they disagree about whether A is actually down? What if the network partitions — A can talk to B but not C, and C can talk to B but not A?

This is not hypothetical. In 2012, a network partition at Amazon caused two groups of DynamoDB nodes to each believe they were the sole authority. Both accepted writes. When the partition healed, conflicting writes had to be reconciled — some data was permanently lost. In 2018, GitHub's MySQL cluster experienced split-brain during a network failure, causing a 24-hour outage. The fundamental problem: without consensus, replication is a source of bugs, not safety.

The question this entire lesson answers: how do you get multiple machines to agree on the same sequence of events, even when some machines crash and the network is unreliable?

The answer is a family of algorithms called consensus protocols. The most important one today is called Raft. By the end of this lesson, you will understand Raft well enough to implement it — and you will understand why it works, not just how.

Why Not Just Pick a Leader?

The simplest replication strategy: pick one server as the leader. All writes go to the leader. The leader sends copies to the other servers (called followers). Reads can go to any server. This is called single-leader replication, and it works well — until the leader dies.

When the leader dies, the followers need to elect a new leader. But here is the crisis: how do they know the leader is dead? Maybe the leader is just slow. Maybe the network between them is down, but the leader is still alive and accepting writes from clients on its side of the partition. If the followers elect a new leader while the old one is still alive, you have split-brain: two leaders accepting conflicting writes.

Split-Brain: When Replicas Disagree

A 3-node cluster with one leader. Click "Kill Leader" to simulate a crash. Watch the followers try to elect a new leader. Then click "Partition" to separate the cluster and see split-brain emerge.

All nodes healthy. Node A is leader.

Why Is Consensus So Hard?

At first glance, getting computers to agree seems trivial. Just vote, right? But consider what can go wrong in a distributed system:

Failure ModeWhat HappensWhy Naive Voting Breaks
Message delayA vote arrives after the election endsCannot distinguish "slow" from "lost." How long do you wait?
Message lossA vote is dropped by the networkCannot distinguish "lost" from "very delayed." Retries create duplicates.
Node crashA node crashes after voting but before learning the outcomeWhen it recovers, its state is inconsistent with the cluster.
Network partitionTwo groups of nodes can talk within groups but not acrossEach group might independently elect a leader. Split-brain.

In 1985, Fischer, Lynch, and Paterson proved a devastating result: it is impossible to guarantee consensus in a purely asynchronous system where even one process can crash. This is the FLP impossibility result. Every real consensus algorithm circumvents it by assuming partial synchrony: the network is usually well-behaved, and you can use timeouts to detect failures. Raft's randomized election timeouts are precisely this practical compromise.

Crash-fault vs Byzantine. The algorithms we study (Raft, Paxos, ZAB) assume crash-fault tolerance: nodes either work correctly or stop completely. They do NOT handle Byzantine faults, where a node sends intentionally wrong messages. Byzantine tolerance requires algorithms like PBFT and is the domain of blockchain — it is much more expensive (O(n²) messages vs O(n)) and outside our scope.

The Core Insight: Majority Rules

The key idea behind every practical consensus protocol is the quorum: a majority of nodes must agree before any decision is final. In a 3-node cluster, you need 2 nodes to agree. In a 5-node cluster, you need 3. Why a majority? Because any two majorities must overlap — they share at least one node. This shared node prevents two groups from independently making conflicting decisions.

Cluster SizeMajority NeededFailures ToleratedWhen to Use
110Development only. No fault tolerance.
321Minimum viable production cluster.
532Standard production. Survives 1 planned + 1 unexpected failure.
743Critical infrastructure. Cross-region deployments.
Why always odd numbers? Even numbers waste a node. A 4-node cluster tolerates 1 failure (needs 3 for a majority) — exactly the same as a 3-node cluster. The extra node costs money and network bandwidth but buys zero additional safety. Always use 3, 5, or 7.
Concept check: A 5-node cluster partitions into two groups: {A, B} and {C, D, E}. Both groups try to elect a leader. With a strict majority requirement, what happens?

Chapter 1: State Machine Replication

Before we dive into the mechanics of consensus, we need to understand the fundamental idea that makes consensus useful. The idea is old — Leslie Lamport described it in 1978 — and it is beautifully simple: if you have a deterministic state machine (a program that always produces the same output for the same input), and you feed it the same sequence of inputs, it will always end up in the same state.

This is State Machine Replication (SMR): run the same deterministic program on multiple servers, feed them the exact same commands in the exact same order, and they will all have the exact same data. The problem reduces to one thing: agreeing on the order of commands.

The SMR insight. You do not need to replicate data. You replicate the log of commands. If every replica processes the same log in the same order, they will all converge to the same state. This is how etcd, ZooKeeper, CockroachDB, and TiKV all work internally. The consensus algorithm orders the log. The state machine interprets it.

What Makes a State Machine Deterministic?

A deterministic state machine is one where the next state depends only on the current state and the input. No randomness, no reading the system clock, no consulting external services. Given state S and command C, the next state S' is always the same.

Examples of deterministic state machines:

State MachineStateCommandsWhy Deterministic
Key-value storeMap of keys to valuesGET(k), SET(k,v), DELETE(k)SET always overwrites, GET always reads current map
Bank ledgerMap of account IDs to balancesDEPOSIT(id,amt), WITHDRAW(id,amt)Arithmetic is deterministic. Same ops, same balances.
Lock serviceSet of held locksACQUIRE(lock,owner), RELEASE(lock,owner)Lock state depends only on acquire/release history.
CounterA single integerINCREMENT, DECREMENT, READAddition and subtraction are deterministic.

The critical non-example: a state machine that reads System.currentTimeMillis(). Two replicas processing the same command at slightly different real-time moments will get different timestamps, and their states will diverge. If you need timestamps, the leader assigns them before replicating the command.

Taming Non-Determinism

Real applications contain many sources of non-determinism. The SMR approach must eliminate all of them. Here is how production systems handle each one:

Source of Non-DeterminismProblemSolution
Current timeReplicas read different clock valuesLeader embeds timestamp in the log entry. Replicas use the embedded timestamp, not their own clock.
Random numbersReplicas generate different random valuesLeader generates the random number and includes it in the log entry. Or use a deterministic PRNG seeded from the log index.
UUID generationUUID v4 is randomUse UUID v1 (time-based) with leader-assigned timestamps, or include the UUID in the log entry.
External I/OHTTP calls to external services return different results at different timesPerform the external call on the leader BEFORE replicating. Include the result in the log entry.
Floating pointDifferent CPU architectures may compute different resultsUse IEEE 754 strict mode, or avoid floating point in state machines (use fixed-point arithmetic).
Hash map iteration orderHash map ordering is non-deterministic in many languagesUse sorted maps, or never depend on iteration order in the state machine.
The golden rule: all non-determinism is resolved by the leader, before replication. If a command requires a timestamp, the leader generates it. If it requires randomness, the leader generates it. If it requires an external API call, the leader makes it. The replicated log entry contains the fully-determined command — replicas never need to make any non-deterministic choices. This is why the leader is sometimes called the "oracle" of the system: it resolves all uncertainty before broadcasting decisions.

The Two-Part Architecture

Every replicated system built on SMR has two cleanly separated layers:

1. Client sends command
"SET user:42 name='Alice'" arrives at the leader node.
2. Consensus layer orders it
The leader proposes this as entry #147 in the log. A majority of nodes agree.
3. Log entry is committed
Entry #147 is now durable. Every node will eventually have it.
4. State machine applies it
Each node's state machine processes entry #147: update key "user:42".
5. Response to client
Leader returns "OK" to the client after the majority has committed.

The beauty of this separation: the consensus layer knows nothing about keys or values — it just orders opaque byte strings. The state machine knows nothing about nodes or elections — it just processes commands in order. You can swap out either layer independently.

State Machine Replication

Three replicas processing the same commands in the same order. Click "Send Command" to add a new command to the log. Watch all three replicas process it and converge to the same state.

All replicas idle. Send a command to begin.

Why Total Order Matters

Consider two commands arriving nearly simultaneously: "SET x = 10" and "SET x = 20". If replica A processes them in order (10, then 20), it ends with x = 20. If replica B processes them in the opposite order (20, then 10), it ends with x = 10. The replicas have diverged — and they will never converge unless you fix the ordering.

Total order broadcast (also called atomic broadcast) is the guarantee that all nodes deliver the same messages in the same order. It is equivalent to consensus — if you can solve one, you can solve the other. This equivalence was proven by Chandra and Toueg in 1996.

Total order broadcast = consensus. They are mathematically equivalent. A total order broadcast delivers messages in the same order to all nodes. A consensus algorithm picks a single value that all nodes agree on. You can implement total order broadcast by running consensus on each message in sequence (this is what Raft does). And you can implement consensus by broadcasting a proposal and accepting the first one delivered.

Code: A Minimal Replicated State Machine

python
class KVStateMachine:
    """A deterministic key-value store state machine."""
    def __init__(self):
        self.store = {}        # The actual state: key -> value
        self.last_applied = 0  # Index of last applied log entry

    def apply(self, entry):
        """Apply a single log entry. MUST be deterministic."""
        cmd = entry["command"]
        if cmd == "SET":
            self.store[entry["key"]] = entry["value"]
            return "OK"
        elif cmd == "GET":
            return self.store.get(entry["key"], None)
        elif cmd == "DELETE":
            self.store.pop(entry["key"], None)
            return "OK"

    def apply_log(self, log, commit_index):
        """Apply all committed but unapplied entries."""
        while self.last_applied < commit_index:
            self.last_applied += 1
            result = self.apply(log[self.last_applied])
        return result  # Return result of last applied entry

# Three replicas, same log, same state:
log = [
    None,  # Index 0 is unused (Raft logs are 1-indexed)
    {"command": "SET", "key": "x", "value": 10},
    {"command": "SET", "key": "y", "value": 20},
    {"command": "SET", "key": "x", "value": 30},
]

r1, r2, r3 = KVStateMachine(), KVStateMachine(), KVStateMachine()
for r in [r1, r2, r3]:
    r.apply_log(log, commit_index=3)

assert r1.store == r2.store == r3.store  # Always true!
# All replicas: {"x": 30, "y": 20}

This is the entire idea. The hard part is not the state machine — it is getting all replicas to agree on the same log. That is what Raft does, and it is the subject of the next two chapters.

Raft in Three RPCs

Raft is remarkably compact. The entire protocol consists of just three remote procedure calls (RPCs):

RPCPurposeSenderWhen
RequestVoteAsk other nodes for their vote in a leader election.CandidateWhen a follower's election timeout fires and it becomes a candidate.
AppendEntriesReplicate log entries to followers. Also serves as heartbeat (empty entries).LeaderContinuously: every heartbeat interval (100ms) and whenever new entries arrive.
InstallSnapshotSend a state machine snapshot to a lagging follower whose needed entries have been compacted.LeaderWhen a follower is so far behind that the leader has already discarded the entries it needs.

That is the entire Raft protocol. Three messages. Everything else — leader election, log replication, commit tracking, log repair, snapshot transfer — is built from combinations of these three RPCs. This simplicity is Raft's greatest engineering achievement.

Compare with Paxos. Paxos has at minimum two phases per consensus decision: Prepare/Promise and Accept/Accepted. Multi-Paxos adds a leader election phase, a log compaction mechanism, and membership changes — each described differently in different papers. There is no canonical "the Paxos protocol" — there are dozens of variants, each with its own paper. Raft gives you one paper, one protocol, three RPCs, and a reference implementation. This is why industry adopted Raft.

Data Flow: What Moves Over the Wire

To implement SMR, you need to understand exactly what data moves where. Here is the concrete data flow for a write operation in a Raft-based key-value store:

StepFromToDataSize (typical)
1. Client requestClientLeader{"op":"SET","key":"user:42","value":"Alice"}~50-500 bytes
2. AppendEntriesLeaderEach followerprevIdx=146, prevTerm=3, entries=[{idx:147,term:3,data:...}], leaderCommit=145~100-600 bytes per follower
3. AppendEntries responseEach followerLeader{term:3, success:true, matchIndex:147}~20 bytes
4. Client responseLeaderClient{status:"OK"}~10 bytes

Total network traffic for one write in a 5-node cluster: ~50 bytes (client request) + 4 x ~300 bytes (AppendEntries to 4 followers) + 4 x ~20 bytes (responses) + ~10 bytes (client response) = approximately 1.3 KB. For 10,000 writes/sec, that is ~13 MB/s of consensus traffic — well within the capacity of a 1 Gbps network link. Consensus is not network-limited; it is latency-limited (waiting for fsync and round trips).

Bandwidth vs Latency. Consensus throughput is limited by latency (round trips + fsync), not bandwidth. A single Raft leader can handle ~30,000 small writes/sec because each write takes ~2-5ms and the leader pipelines requests. But each individual write must wait for a majority of followers to fsync and respond — that 2-5ms is the minimum per-write latency. This is why you cannot use consensus for sub-millisecond operations, regardless of network bandwidth.
Concept check: Two replicas receive commands "SET x=5" and "SET x=8". Replica A processes them in order [5, 8]. Replica B processes them in order [8, 5]. What is the result?

Chapter 2: Leader Election

We established that replicated state machines need a total-ordered log. In Raft, one node is responsible for ordering the log: the leader. Every other node is a follower. The leader receives all client requests, appends them to its log, and replicates them to followers. But what happens when the leader crashes?

The followers must detect the failure and elect a new leader. This is leader election — the first of two core mechanisms in Raft (the second is log replication, covered next chapter). The election protocol must guarantee one critical property: at most one leader per term.

Terms: Raft's Logical Clock

Raft divides time into terms, numbered consecutively starting from 1. Each term begins with an election. If the election succeeds, the winner serves as leader for the rest of that term. If the election fails (no candidate gets a majority — a split vote), the term ends with no leader, and a new term begins immediately.

Terms serve as a logical clock. Every message in Raft carries the sender's current term. If a node receives a message with a higher term than its own, it immediately updates its term and becomes a follower. If a node receives a message with an older term, it rejects it. This mechanism ensures that stale leaders discover they have been replaced.

Terms are the immune system of Raft. They prevent stale leaders from doing damage. Imagine a leader that gets partitioned, misses two elections, and then reconnects. Its term is 3, but the cluster is now on term 5. Every message it sends is rejected because its term is outdated. It learns the new term from the rejection, steps down to follower, and catches up. No split-brain.

The Three States

Every Raft node is always in exactly one of three states:

StateBehaviorTransitions
FollowerPassive: responds to leader's heartbeats and AppendEntries RPCs. Does not initiate anything.→ Candidate (if election timeout fires without hearing from a leader)
CandidateActive: requests votes from all other nodes. Trying to become leader.→ Leader (if gets majority votes)
→ Follower (if discovers higher term or existing leader)
→ Candidate (if election timeout fires again — split vote, retry)
LeaderSends heartbeats and replicates log entries. The only node that proposes new entries.→ Follower (if discovers a higher term in any message)

The Election Protocol, Step by Step

Here is exactly what happens when a follower decides to start an election:

1. Follower's election timeout fires
Each follower has a randomized timeout (150-300ms). If no heartbeat arrives before it fires, the follower suspects the leader is dead.
2. Increment term, become Candidate
The follower increments its current term from T to T+1, transitions to Candidate state, and votes for itself.
3. Send RequestVote to all other nodes
The message contains: candidate's term, candidate's ID, index and term of the candidate's last log entry.
4. Wait for responses
Three possible outcomes: win (majority votes), lose (discover higher term or existing leader), or timeout (split vote — restart election with T+2).

Voting Rules

When a node receives a RequestVote, it applies two rules:

Rule 1: One vote per term. Each node can vote for at most one candidate in any given term. Votes are persistent (written to stable storage before responding). This ensures that at most one candidate can receive a majority in any term.

Rule 2: Log up-to-date check. A node refuses to vote for a candidate whose log is less "up-to-date" than its own. Raft defines "up-to-date" as: higher last-log term wins; if terms are equal, longer log wins. This critical check ensures that the leader always has all committed entries. We will see why in the next chapter.

Why the log check matters. Without it, a node with an outdated log could win the election and become leader. It would then try to replicate its stale log to followers, overwriting committed entries. The log up-to-date check guarantees the election winner has every committed entry — it may have some uncommitted entries too, but it never misses committed ones.

Persistent State: What Survives a Crash

Three pieces of state in Raft must be written to stable storage (disk) before responding to any RPC. If a node crashes and restarts, it recovers these from disk:

StateWhy Persistent
currentTermWithout it, a recovering node could vote twice in the same term (it forgot it already voted). This could allow two leaders in the same term.
votedForSame reason. Must remember who it voted for to enforce one-vote-per-term.
log[]Committed entries must not be lost. If a recovering node's log has a gap, the leader's repair mechanism fills it — but the committed entries that were already on this node must survive.

Everything else — commit index, last applied index, next/match index arrays — can be reconstructed from the persistent state and communication with the cluster. This is why Raft implementations use a Write-Ahead Log (WAL): before accepting any RPC, the node fsync's the relevant state changes to its WAL. Only after the fsync completes does it respond.

Split Votes and Randomized Timeouts

What if two followers time out simultaneously and both start elections? They each vote for themselves and send RequestVote to the other three nodes. If the votes split evenly (each candidate gets 2 votes including their own, but neither reaches the majority of 3), neither wins. The term ends with no leader.

Raft handles this with randomized election timeouts. Each node picks a random timeout between 150ms and 300ms. Because the timeouts differ, it is unlikely that two nodes will start elections at the same instant. In practice, split votes are rare — they occur in about 5-10% of elections in a healthy cluster, and almost never recur twice in a row.

The timeout range matters. The Raft paper recommends that the election timeout be at least 10x the network round-trip time. If your network RTT is 1ms (same data center), an election timeout of 150-300ms is appropriate. If your RTT is 50ms (cross-region), you need 500ms-1000ms election timeouts. The heartbeat interval should be much smaller than the election timeout — typically 100ms for same-DC deployments.

heartbeat_interval << election_timeout << MTBF
MTBF = Mean Time Between Failures. If election timeout ≈ MTBF, the system spends too much time electing.

Pre-Vote: Preventing Disruptive Rejoiners

Raft has an optional extension called Pre-Vote that solves a subtle problem. Imagine a node that gets partitioned from the cluster. Its election timeout fires repeatedly, and it increments its term each time — from 5 to 6, 7, 8, 50, 100... When the partition heals, it reconnects with a very high term. Every other node sees this high term and immediately steps down to follower, disrupting the cluster even though the rejoining node has a stale log and cannot win the election.

Pre-Vote adds a preliminary round: before incrementing its term, a candidate sends a "pre-vote" request. Other nodes check if they would vote for this candidate, but without actually recording a vote or changing their term. Only if the candidate gets a pre-vote majority does it proceed to a real election. A partitioned node will fail the pre-vote (its log is stale), so it never disrupts the cluster.

Raft Leader Election

A 5-node Raft cluster. Watch election timeouts, vote requests (orange arrows), and vote grants (teal arrows). Click "Trigger Election" to force a timeout on a random follower. Click "Kill Leader" to crash the current leader and watch a new election happen.

Term 1. Node 0 is leader. All followers healthy.

Election Correctness: Why At Most One Leader

The proof is elegant. Suppose two candidates, A and B, both claim to win election in term T. Each must have received votes from a majority of the N nodes. But any two majorities overlap (they share at least one node). That shared node voted for both A and B in the same term T. But Rule 1 says a node can vote for at most one candidate per term. Contradiction. Therefore, at most one candidate can win per term.

|majority(A)| + |majority(B)| > N ⇒ majority(A) ∩ majority(B) ≠ ∅
But one vote per term ⇒ the shared node cannot vote for both ⇒ contradiction.
Concept check: In a 5-node Raft cluster, the leader (term 3) gets partitioned. It keeps incrementing its term during the partition: 4, 5, 6, 7... When it reconnects, what happens WITHOUT Pre-Vote?

Chapter 3: Log Replication

The leader has been elected. Now it must do its job: accept commands from clients, append them to its log, replicate the log to followers, and tell followers when entries are safe to apply. This is log replication — the second core mechanism in Raft, and the one that actually moves data.

Anatomy of a Log Entry

Each entry in the Raft log has three fields:

FieldTypePurpose
IndexInteger (1, 2, 3, ...)The position in the log. Monotonically increasing.
TermIntegerThe term of the leader that created this entry. Used to detect inconsistencies.
CommandBytesThe actual operation (e.g., "SET x=42"). Opaque to the consensus layer.

The combination of (index, term) uniquely identifies a log entry across the entire cluster. Two entries with the same index and term are guaranteed to contain the same command. This is the Log Matching Property.

AppendEntries: The Replication RPC

The leader sends AppendEntries RPCs to each follower. This is the workhorse of Raft — it handles both replication and heartbeats (an AppendEntries with no new entries is a heartbeat). Here is what the message contains:

FieldPurpose
termLeader's current term. Followers reject messages from stale leaders.
leaderIdSo followers know who the leader is (for redirecting clients).
prevLogIndexIndex of the entry immediately before the new ones. The consistency check anchor.
prevLogTermTerm of the entry at prevLogIndex. The consistency check value.
entries[]New entries to append (empty for heartbeats).
leaderCommitLeader's commit index. Tells followers how far they can safely apply.

The Consistency Check

When a follower receives AppendEntries, it checks: "Do I have an entry at prevLogIndex with term prevLogTerm?" If yes, the follower's log is consistent with the leader's up to that point, and it can safely append the new entries. If no, the follower rejects the request.

When the leader gets a rejection, it decrements prevLogIndex and retries. It keeps backing up until it finds a point where the logs agree, then it overwrites the follower's log from that point forward. This is how Raft repairs inconsistent logs after crashes and leader changes.

The Log Matching Property — an inductive argument. Initially, all logs are empty (base case). If entry at index I matches (same term), then all entries before I also match (inductive step, enforced by the consistency check). Therefore, if the consistency check passes at prevLogIndex, ALL entries from 1 to prevLogIndex are identical on the leader and follower. This means the leader never needs to check the entire log — just one entry at the boundary.

Commit Index: When Is It Safe to Apply?

An entry is committed once the leader has replicated it to a majority of nodes. The leader tracks a commit index — the highest log index known to be committed. It advances the commit index each time a new majority is reached, and piggybacks the commit index on every AppendEntries message. Followers apply committed entries to their state machines.

There is a subtle safety rule: a leader can only commit entries from its own term. It cannot commit entries from previous terms by counting replicas alone. Why? Because an entry from a previous term might have been replicated to a majority, but that majority might not overlap with the majority that elected the current leader. The current leader commits previous-term entries indirectly, by committing a new entry from its current term (which then implicitly commits all preceding entries).

The "commit from current term only" rule. This is the most subtle safety property in Raft. Without it, here is a scenario that breaks safety: Leader of term 2 creates entry at index 3 and replicates it to 2 of 5 nodes. Leader crashes. Leader of term 3 is elected (has not seen entry at index 3). Leader of term 3 creates its own entry at index 3 with a different command. Leader of term 3 commits it. Now the two nodes that had the term-2 entry at index 3 have conflicting committed entries. Disaster.

Step-by-Step: A Replication Round

1. Client sends "SET x=42" to leader
Leader appends entry {index:4, term:2, cmd:"SET x=42"} to its log.
2. Leader sends AppendEntries to all followers
prevLogIndex=3, prevLogTerm=2, entries=[{index:4, term:2, cmd:"SET x=42"}]
3. Followers check consistency
"Do I have entry at index 3 with term 2?" If yes, append. If no, reject.
4. Followers reply success/failure
Leader counts successes. Needs majority (including itself).
5. Leader advances commit index
If 3+ nodes (majority of 5) have entry 4, commitIndex advances to 4.
6. Leader applies entry and responds to client
State machine processes "SET x=42". Client gets "OK".
Raft Log Replication

A leader and 2 followers. Click "Send Command" to add a new entry. Watch the AppendEntries flow: leader proposes, followers check consistency, leader commits when majority agrees. Click "Crash Follower" to introduce a lagging node, then watch the leader repair it.

Leader ready. Commit index: 0.

Code: AppendEntries Handler

python
def handle_append_entries(self, leader_term, prev_idx, prev_term,
                           entries, leader_commit):
    # Rule: reject if leader's term is stale
    if leader_term < self.current_term:
        return False, self.current_term

    # Become follower if leader's term is newer
    if leader_term > self.current_term:
        self.current_term = leader_term
        self.state = "follower"
        self.voted_for = None

    # Reset election timer (leader is alive)
    self.reset_election_timer()

    # Consistency check: do I have the expected entry?
    if prev_idx > 0:
        if prev_idx >= len(self.log):
            return False, self.current_term  # My log is too short
        if self.log[prev_idx].term != prev_term:
            return False, self.current_term  # Term mismatch

    # Append new entries (overwrite conflicts)
    idx = prev_idx + 1
    for entry in entries:
        if idx < len(self.log):
            if self.log[idx].term != entry.term:
                self.log = self.log[:idx]  # Truncate conflicting tail
                self.log.append(entry)
            # else: already have this entry, skip
        else:
            self.log.append(entry)
        idx += 1

    # Update commit index
    if leader_commit > self.commit_index:
        self.commit_index = min(leader_commit, len(self.log) - 1)

    return True, self.current_term

Worked Example: Log Repair After a Crash

Let us trace through a concrete scenario. We have a 3-node cluster (Leader, F1, F2). The leader has committed entries 1-3. Then the leader crashes, F1 becomes the new leader, and adds entry 4. Meanwhile, F2 was down and missed entry 3. Here is the state:

NodeLogStatus
Old Leader[1, 2, 3]Dead
F1 (new leader, term 2)[1, 2, 3, 4]Leader
F2[1, 2]Follower, missed entries 3-4

F1 sends AppendEntries to F2 with prevLogIndex=3, prevLogTerm=1, entries=[entry 4]. F2 checks: "Do I have an entry at index 3?" No — F2's log only has entries 1-2. F2 rejects.

F1 backs up: sends AppendEntries with prevLogIndex=2, prevLogTerm=1, entries=[entry 3, entry 4]. F2 checks: "Do I have entry at index 2 with term 1?" Yes! F2 accepts, appends entries 3 and 4. Now F2's log matches F1's log. The repair is complete.

Optimization: nextIndex and matchIndex. The leader maintains two arrays. nextIndex[i] is the index of the next entry to send to follower i (initially = leader's last log index + 1). matchIndex[i] is the highest index known to be replicated on follower i (initially = 0). When a follower rejects, the leader decrements nextIndex[i] and retries. When a follower accepts, the leader updates matchIndex[i]. The commit index advances when a majority of matchIndex values reach a given index.

Snapshots: When the Log Gets Too Long

The log grows forever — every command ever executed is stored. Eventually it will consume all available disk space. The solution is snapshotting: periodically, a node serializes its current state machine state to disk, then discards all log entries up to the snapshot point. If a lagging follower needs entries that have been discarded, the leader sends the snapshot instead of individual entries via an InstallSnapshot RPC.

python
class RaftNodeWithSnapshots:
    def __init__(self):
        self.log = [None]    # 1-indexed
        self.snapshot = None  # Serialized state machine state
        self.snapshot_index = 0  # Last index included in snapshot
        self.snapshot_term = 0   # Term of last snapshot entry
        self.state_machine = KVStateMachine()

    def take_snapshot(self):
        """Compact the log by snapshotting current state."""
        self.snapshot = self.state_machine.serialize()
        self.snapshot_index = self.state_machine.last_applied
        self.snapshot_term = self.log[self.snapshot_index].term
        # Discard log entries up to snapshot
        self.log = [None] + self.log[self.snapshot_index + 1:]
        # Log now starts at snapshot_index + 1

    def handle_install_snapshot(self, leader_term, snapshot_data,
                                 last_included_index, last_included_term):
        """Follower receives a snapshot from the leader."""
        if leader_term < self.current_term:
            return False
        # Replace state machine with snapshot
        self.state_machine = KVStateMachine.deserialize(snapshot_data)
        self.snapshot = snapshot_data
        self.snapshot_index = last_included_index
        self.snapshot_term = last_included_term
        # Discard entire log (snapshot is more recent)
        self.log = [None]
        self.state_machine.last_applied = last_included_index
        return True
When to snapshot. A common heuristic: snapshot when the log exceeds a configurable size (e.g., 10,000 entries or 64 MB). etcd snapshots every 10,000 entries by default. The snapshot is taken asynchronously on a background thread to avoid blocking the main consensus loop. The key trade-off: frequent snapshots keep the log small but cost CPU/IO; infrequent snapshots keep a long log but make recovery of lagging followers faster (no need to send a snapshot if the log still has their missing entries).
Concept check: A leader has committed entries 1-5. A follower crashed at entry 3 and just came back online. The leader sends AppendEntries with prevLogIndex=5. What happens?

Chapter 4: Consistency Models

We have been talking about "replicas agreeing" — but what exactly does "agree" mean from a client's perspective? If I write a value and then immediately read it, will I see my own write? If someone else reads it a millisecond later, will they see it too? The answers depend on the consistency model the system provides.

A consistency model is a contract between the distributed system and its clients. It defines which read results are "legal" given the writes that have occurred. Stronger models are easier for programmers to reason about but more expensive to implement. Weaker models allow more flexibility (and higher performance) but force programmers to handle strange behaviors.

The Hierarchy

There are four main consistency models you need to know, ordered from strongest to weakest:

ModelPromiseCostExample Systems
LinearizabilityEvery operation appears to take effect at a single instant between its invocation and response. Real-time ordering is preserved.Highest. Requires consensus on every operation.etcd, ZooKeeper, Spanner
Sequential ConsistencyAll nodes see the same order of operations, and each client's operations appear in the order it submitted them. But this order need not match real time.High. Total order, but no real-time constraint.Zookeeper reads (within a session)
Causal ConsistencyIf operation A causally precedes B (A happened-before B), then everyone sees A before B. Concurrent operations can be seen in any order.Medium. Must track causal dependencies.MongoDB (with read concern "majority"), COPS
Eventual ConsistencyIf no new writes occur, all replicas will eventually converge. No ordering guarantee.Lowest. No coordination needed for reads.DynamoDB, Cassandra, DNS

Linearizability: The Gold Standard

Linearizability means the system behaves as if there is a single copy of the data, and every operation is atomic. Formally: you can assign a "linearization point" to each operation (a single instant between invocation and response) such that the results are consistent with a sequential execution in that order.

The practical consequence: once a write completes and the client gets an acknowledgment, ALL subsequent reads (by ANY client, on ANY node) must see that write or a later one. You can never go back to seeing a stale value.

Sequential Consistency: Same Order, But Maybe Not Real Time

Sequential consistency is weaker than linearizability in one key way: it does not require the global order to match real time. If client A writes x=1, and later (in wall-clock time) client B writes x=2, a sequentially consistent system could order B's write before A's — as long as all nodes see the same order, and each individual client's operations appear in the order submitted.

The practical difference. Linearizability: "I wrote x=1 and got ACK. You read x right after. You MUST see 1 (or later)." Sequential consistency: "I wrote x=1 and got ACK. You read x right after. You might see an older value, as long as you eventually see 1 and never go backward." The "right after" real-time constraint is what separates them.

Causal Consistency: Respecting "Happened Before"

Causal consistency only orders operations that are causally related. If I post a message and you reply to it, everyone must see my message before your reply (causal relationship). But if two users independently post messages with no causal link, different nodes can show them in different orders.

Causal consistency is attractive because it is the strongest consistency model that does not require consensus for every operation. You can implement it with vector clocks and dependency tracking, without a single leader. This makes it much faster than linearizability in geo-distributed systems.

Eventual Consistency: The Bare Minimum

Eventual consistency says: if you stop writing, all replicas will eventually converge to the same value. That is the entire guarantee. While writes are ongoing, different clients can see different values. Reads can go backward. Two clients reading at the same time can get different results. It is the wild west — but it is fast and available.

Consistency Models Visualizer

Two clients, three replicas. Click "Write" to issue a write from Client 1, then "Read" to read from Client 2. Toggle the consistency model to see which read results are legal under each model.

No operations yet. Write a value to start.

Worked Example: Same Operations, Four Models

Let us trace through a concrete scenario. Client A writes x=1 at time T=0, gets ACK at T=5. Client B reads x at T=6. What are the legal read results under each model?

ModelLegal Results for B's ReadWhy
Linearizablex=1 (or later write)A's write completed at T=5. B's read started at T=6. Real-time ordering requires B to see A's write.
Sequentialx=0 OR x=1Sequential consistency does not require real-time ordering. The system can order B's read before A's write, as long as all observers see the same order.
Causalx=0 OR x=1A's write and B's read are not causally related (B did not know about A's write). So they are concurrent and can be ordered either way.
Eventualx=0 OR x=1 OR any old valueNo ordering guarantee at all. B might read from a stale replica that has not received A's write yet. Eventually it will converge.

Now change the scenario: Client A writes x=1, gets ACK, then tells Client B "I just wrote x=1, please read it." This creates a causal dependency. Under causal consistency, B must now see x=1 (because B's read causally depends on A's write via the out-of-band communication). Under eventual consistency, B can still see x=0.

The real-time gap is everything. The only difference between linearizability and sequential consistency is the real-time constraint. If operation A completes before operation B starts (in wall-clock time), linearizability guarantees A's effect is visible to B. Sequential consistency makes no such guarantee. This single constraint is what makes linearizability expensive — the system must actually know which operations completed before which, which requires either a centralized authority (leader) or synchronized clocks (Spanner's TrueTime).

When to Use What

Use CaseRequired ModelWhy
Leader election / distributed locksLinearizabilityIf two nodes both think they hold the lock, catastrophic conflicts ensue.
Unique constraint enforcementLinearizability"Only one user can claim username 'alice'" requires real-time ordering.
Social media feedCausalReplies must appear after their parent posts, but unrelated posts can be reordered.
Shopping cartEventualTemporary inconsistency is tolerable. Merge conflicts are resolvable.
DNS recordsEventualPropagation delay of minutes is acceptable. No need for consensus.
Bank account balanceLinearizabilityDouble-spending must be impossible. Must see latest balance before allowing withdrawal.

Code: Testing Linearizability

python
from dataclasses import dataclass
from itertools import permutations

@dataclass
class Op:
    client: str
    op_type: str     # "write" or "read"
    value: int       # value written or read
    start: float     # invocation time
    end: float       # response time

def is_linearizable(ops):
    """Brute-force linearizability check (NP-hard in general)."""
    # Try every possible ordering of operations
    for perm in permutations(ops):
        if _valid_sequential(perm) and _respects_realtime(perm):
            return True
    return False

def _valid_sequential(perm):
    """Check if this ordering produces valid read results."""
    register = None
    for op in perm:
        if op.op_type == "write":
            register = op.value
        elif op.op_type == "read":
            if op.value != register:
                return False  # Read got wrong value
    return True

def _respects_realtime(perm):
    """Check that ordering preserves real-time precedence."""
    for i in range(len(perm)):
        for j in range(i + 1, len(perm)):
            # If perm[j] ended before perm[i] started in real time,
            # then perm[j] must come before perm[i] in the ordering.
            if perm[j].end < perm[i].start:
                return False
    return True

# Example: is this linearizable?
history = [
    Op("A", "write", 1, 0.0, 0.5),
    Op("B", "read",  1, 0.3, 0.8),
    Op("A", "write", 2, 0.6, 1.0),
    Op("B", "read",  2, 0.9, 1.3),
]
print(is_linearizable(history))  # True: order W(1), R(1), W(2), R(2) works
Concept check: Client A writes x=1 and gets an ACK at time T=5. Client B starts a read of x at time T=6 and gets back x=0 (the old value). Under which consistency model(s) is this legal?

Chapter 5: Chain Replication

Raft is not the only way to build a replicated system with strong consistency. There is an alternative that is conceptually simpler, has better throughput under certain workloads, and is used in production at Microsoft (Azure Storage) and Amazon (EBS, S3 internal components). It is called chain replication.

The core idea: instead of a leader broadcasting to all followers in parallel (Raft's approach), arrange the replicas in a chain. Writes enter at the head and flow through each node in sequence until they reach the tail. Reads are served exclusively from the tail. When the tail has a write, every node before it also has that write — so the tail always has the most committed state.

How It Works

1. Client sends write to Head
WRITE(x=42) arrives at the first node in the chain.
2. Head appends and forwards to next
Head stores x=42 locally, then sends it to Node 2.
3. Each node forwards to its successor
Node 2 stores and forwards to Node 3, etc.
4. Tail stores and ACKs the client
The tail is the last node. When it has the write, all prior nodes also have it. Tail sends ACK back to the client.
5. Reads served from tail
All reads go to the tail. Since the tail only ACKs writes it has stored, every read at the tail sees all committed writes. Strong consistency for free.

Chain Replication vs Raft

PropertyRaftChain Replication
Write pathLeader → all followers in parallelHead → Node 2 → ... → Tail (sequential)
Write latency1 round trip (leader waits for majority)N-1 sequential hops (higher latency for N > 2)
Write throughputLeader bottlenecks on outbound bandwidthEach node only sends to one successor (better bandwidth distribution)
Read pathLeader only (for linearizability) or any node (for weaker models)Tail only. Tail does no write forwarding — dedicated read server.
Read throughputLeader handles both reads and writesTail handles only reads — better read throughput
Failure handlingBuilt-in leader election via Raft protocolRequires an external configuration manager (e.g., ZooKeeper, etcd)
ConsistencyLinearizable (with read-from-leader)Linearizable by construction (reads from tail see all committed writes)
Chain replication's secret weapon: separating reads and writes. In Raft, the leader handles both reads and writes, making it the bottleneck. In chain replication, the head handles writes and the tail handles reads. This separation means you can scale reads and writes independently. For read-heavy workloads (which most workloads are), the tail can serve reads at full network bandwidth while the head processes writes.

Failure Handling

When a node in the chain fails, the chain must be repaired:

Failed NodeRepair ActionComplexity
HeadSuccessor becomes new head. In-flight writes that the head received but did not forward are lost (client retries).Simple
Middle nodePredecessor links directly to successor. In-flight writes at the failed node must be tracked and re-sent.Moderate
TailPredecessor becomes new tail. Uncommitted writes that were in transit to the tail are now committed when the predecessor becomes tail.Simple

The critical point: chain replication does not handle reconfiguration itself. It relies on an external configuration manager — typically a Raft or Paxos-based system like ZooKeeper — to detect failures, decide the new chain membership, and notify all nodes. This is why chain replication and Raft are complementary, not competing: you use Raft for the small, critical metadata service, and chain replication for the large, high-throughput data path.

CRAQ: Chain Replication with Apportioned Queries

CRAQ (2009, Jeff Terrace and Michael Freedman at Princeton) extends chain replication to allow reads from any node, not just the tail. Each node stores two versions of each object: "clean" (committed) and "dirty" (received but not yet confirmed by the tail). When a non-tail node receives a read and has only a clean version, it serves it directly. If it has a dirty version, it asks the tail for the latest committed version number and serves accordingly.

CRAQ dramatically improves read throughput because reads are distributed across all N nodes instead of concentrated at the tail. The consistency guarantee remains linearizable.

Throughput Analysis: Why Chain Replication Wins for Large Objects

Consider a cluster of 5 nodes, each with 1 Gbps network bandwidth. A client writes 1 MB objects.

MetricRaft (Leader-Based)Chain Replication
Leader/Head outbound4 Gbps (sends 1 MB to each of 4 followers)1 Gbps (sends 1 MB to one successor)
Max write throughput~250 writes/sec (leader bandwidth limited: 1 Gbps / 4 = 250 MB/s)~1000 writes/sec (each node forwards to one successor, pipeline fills)
Write latency~1 RTT (parallel replication, wait for majority)~4 RTTs (sequential: head → 2 → 3 → 4 → tail)
Read throughputLimited by leader (same node handles reads + writes)Tail is dedicated to reads (no write forwarding overhead)

The key insight: Raft's leader is a bandwidth bottleneck because it sends every entry to every follower. The leader's outbound bandwidth scales as O(N) where N is the cluster size. Chain replication's head sends each entry to only one successor — O(1) outbound per node. For large objects (like the 256 KB extent blocks in Azure Storage), this difference is enormous.

The trade-off is latency vs throughput. Chain replication has higher write latency (N-1 sequential hops vs 1 parallel broadcast) but higher write throughput (no single-node bandwidth bottleneck). For workloads where throughput matters more than latency — blob storage, log-structured systems, append-heavy workloads — chain replication wins. For workloads where latency matters — metadata, locks, leader election — Raft wins.

Real-World Chain Replication: Azure Storage

Microsoft Azure Storage uses a variant of chain replication called extent-based replication for its blob, table, and queue services. Each storage extent (a contiguous chunk of data, typically 1-3 GB) is replicated across 3 nodes in a chain. The configuration manager (a Paxos-based service called the "Stream Manager") handles chain membership and failure detection.

Azure Storage processes hundreds of billions of transactions per day across millions of chains. Each chain independently replicates its data without coordinating with other chains. This design gives them both strong consistency (per-chain) and massive scalability (millions of independent chains).

Chain Replication

A 4-node chain. Writes enter at the head (left) and flow to the tail (right). Reads are served from the tail. Click "Write" to send a write, "Read" to read from the tail. Click "Kill Node" to crash a random middle node and watch the chain repair.

Chain: Head → N2 → N3 → Tail. Ready.

Code: Chain Replication Node

python
class ChainNode:
    def __init__(self, node_id, successor=None, is_tail=False):
        self.id = node_id
        self.store = {}          # key -> value
        self.pending = {}        # key -> value (dirty, not yet confirmed)
        self.successor = successor
        self.is_tail = is_tail

    def handle_write(self, key, value, client_callback=None):
        """Process a write. Forward to successor or commit if tail."""
        if self.is_tail:
            # Tail: commit and ACK client
            self.store[key] = value
            if client_callback:
                client_callback("OK")
        else:
            # Non-tail: store as pending, forward to successor
            self.pending[key] = value
            self.store[key] = value  # optimistic local update
            self.successor.handle_write(key, value, client_callback)

    def handle_read(self, key):
        """Only the tail serves reads in basic chain replication."""
        assert self.is_tail, "Reads must go to the tail!"
        return self.store.get(key, None)

# Build a 4-node chain
tail = ChainNode("tail", is_tail=True)
n3   = ChainNode("n3", successor=tail)
n2   = ChainNode("n2", successor=n3)
head = ChainNode("head", successor=n2)

# Write flows: head -> n2 -> n3 -> tail
head.handle_write("x", 42, lambda r: print(f"Client got: {r}"))
# Read from tail
print(tail.handle_read("x"))  # 42
Concept check: In a 5-node chain, the middle node (node 3) crashes. A write was in transit — the head, node 2, and node 3 all have it, but node 4 and the tail do not. Is this write committed?

Chapter 6: Consensus Showcase

Time to put everything together. Below is a full 5-node Raft cluster simulation. You can send client requests, kill nodes, create network partitions, and watch the system respond in real time. This is Raft in action: leader election, log replication, commit advancement, and failure recovery.

How to use this simulation. Start by clicking "Send Request" a few times to see normal operation. Then kill a follower — notice nothing changes (the cluster tolerates 2 failures). Kill another follower — still fine. Now kill the leader — watch a new election happen automatically. Try creating a partition to split the cluster and observe which side can still make progress. Reset and try different scenarios.
5-Node Raft Cluster

A live 5-node Raft cluster. Nodes show their state (L=Leader, F=Follower, C=Candidate, X=Dead). The log entries are shown as colored blocks below each node. Committed entries are solid; uncommitted entries are striped.

Term 1. Node 0 is leader. Cluster healthy. Send a request to begin.
4-Node Chain Replication

Compare: a 4-node chain. Writes flow left-to-right through the chain. Reads come from the tail (rightmost). Notice the sequential flow vs Raft's parallel broadcast.

Chain ready. Head → N2 → N3 → Tail.

Observations to Make

ExperimentWhat to WatchKey Insight
Send 5 requestsLog entries appear on leader first, then replicate to followersLeader is the source of truth for ordering
Kill 1 follower, send requestsRequests still commit (4 alive, majority = 3)Cluster tolerates minority failures
Kill leaderElection timeout fires on a follower, new leader electedAutomatic failover without human intervention
Kill 3 nodesRemaining 2 cannot form majority — cluster stallsSafety over liveness: better to stop than to split-brain
Partition: {0,1} vs {2,3,4}Majority side (3 nodes) elects leader and continues. Minority side stalls.Quorum prevents split-brain during partitions
Heal partitionMinority nodes catch up from new leaderRaft's log repair mechanism fixes divergent logs

Raft vs Chain: When to Use Which

After experimenting with both simulations above, here is the decision framework:

Choose Raft WhenChoose Chain Replication When
You need the coordination service itself (metadata, locks, config)You need a high-throughput data store with strong consistency
Write latency matters more than write throughputWrite throughput matters more than write latency
The dataset is small (megabytes)The dataset is large (gigabytes to terabytes)
You want a self-contained system (no external dependencies)You already have a Raft/Paxos service for configuration
You need leader-based reads (strong consistency from any node)You want to separate read and write workloads
Examples: etcd, ZooKeeper, ConsulExamples: Azure Storage, Amazon EBS, HDFS (with modifications)
The hybrid architecture. Most real systems use both patterns. CockroachDB uses Raft for consensus within each data range and an architecture similar to chain replication for cross-range data flow. Azure Storage uses Paxos for its Stream Manager (metadata) and chain replication for the actual data path. Kafka used to use ZooKeeper (ZAB consensus) for metadata and its own replication protocol (ISR-based, similar to chain) for messages — now it uses KRaft (Raft) for metadata internally.

Production Performance Numbers

To ground your understanding, here are real-world performance numbers from production consensus systems:

SystemWrite Latency (p50)Write Latency (p99)ThroughputNotes
etcd 3.52-5 ms10-25 ms15-25K writes/sec3-node cluster, SSD, same-region
ZooKeeper 3.82-4 ms8-20 ms10-20K writes/sec3-node cluster, SSD
CockroachDB3-8 ms20-50 ms30-50K writes/sec (per range)Multi-Raft, per-range latency
TiKV1-3 ms5-15 ms100K+ writes/sec (aggregate)Multi-Raft, hundreds of ranges
Spanner5-10 ms (same region)50-200 ms (cross-region)VariesTrueTime + Paxos. Cross-region commits are slow.

Notice the pattern: all consensus systems have write latencies in the low-millisecond range within a single region (dominated by fsync + network RTT) and tens-to-hundreds of milliseconds cross-region (dominated by speed-of-light latency). This is the fundamental performance ceiling of consensus — you cannot beat the speed of light and the cost of durable writes.

Chapter 7: Consensus in Practice

Now you understand the theory: state machine replication, leader election, log replication, consistency models, chain replication. But where does consensus actually show up in production systems? And equally important: when should you NOT use consensus?

The Big Three Coordination Services

SystemProtocolLanguageUsed ByKey Features
etcdRaftGoKubernetes (control plane), CoreDNSSimple key-value API, watch streams, leases for TTL, linearizable reads
ZooKeeperZAB (Zookeeper Atomic Broadcast)JavaKafka (metadata), Hadoop, HBase, SolrHierarchical namespace (like a filesystem), ephemeral nodes, watches, sequential znodes
ConsulRaftGoHashiCorp ecosystem, service meshService discovery, health checking, KV store, prepared queries

What to Store in Consensus

Consensus is expensive. Every write must be replicated to a majority before it is committed. Latency is at least one network round trip (more for cross-region). Throughput is limited by the leader's capacity. You should use consensus for small, critical data — not for bulk storage.

Good Uses (Small, Critical)Bad Uses (Large, High-Throughput)
Cluster membership (which nodes are alive)User-generated content
Configuration (feature flags, routing rules)Logs and metrics
Leader election (who is the primary for shard X)Session data
Distributed locks (only one worker processes job Y)Shopping carts
Schemas and metadata (what tables exist)Time-series data
Epoch numbers (fencing tokens for preventing split-brain)Blob/object storage
The "metadata vs data" rule. Consensus is for metadata — the small amount of information that tells you WHERE the data is and WHO is in charge. The actual data goes in a system optimized for throughput: a distributed file system, an object store, a sharded database. Kubernetes uses etcd for cluster state (a few MB) but stores container images in a registry and logs in Elasticsearch. Kafka uses ZooKeeper for broker membership and topic metadata (KB) but stores actual messages on broker disks (TB).

etcd: The Kubernetes Brain

Every Kubernetes cluster runs etcd as its single source of truth. When you run kubectl apply -f deployment.yaml, the API server writes the deployment spec to etcd. The scheduler watches etcd for unscheduled pods. The kubelet watches for pods assigned to its node. All state transitions go through etcd.

etcd stores the entire Kubernetes cluster state: pods, services, config maps, secrets, RBAC rules, custom resources. A typical production cluster's etcd holds 2-8 GB of data. This small size is by design — consensus does not scale to terabytes.

python
# Using etcd via Python client (etcd3 library)
import etcd3

client = etcd3.client(host="localhost", port=2379)

# Write (goes through Raft consensus)
client.put("/services/web/leader", "node-42")

# Linearizable read (goes through leader)
value, metadata = client.get("/services/web/leader")
print(value)  # b'node-42'

# Watch for changes (streaming)
events_iterator, cancel = client.watch("/services/web/", prefix=True)
for event in events_iterator:
    print(f"Key: {event.key}, Value: {event.value}")

# Distributed lock using lease (TTL-based)
lease = client.lease(ttl=10)  # 10-second lease
# If this process crashes, the lock auto-releases in 10s
client.put("/locks/job-processor", "me", lease=lease)

etcd Performance Characteristics

Understanding etcd's performance envelope is critical for production deployments. Here are the numbers you need to know:

MetricTypical ValueNotes
Write latency (same region)2-10 msConsensus + fsync. Dominated by disk latency.
Write latency (cross-region)50-200 msNetwork RTT between data centers dominates.
Read latency (linearizable)1-5 msMust go through leader. ReadIndex optimization avoids full log append.
Read latency (serializable)< 1 msServed from any node's local state. May be stale.
Write throughput10,000-30,000 ops/secDepends on value size and hardware. SSD required.
Max recommended DB size8 GBLarger sizes cause slow snapshots and long recovery.
Max value size1.5 MBHard limit. etcd is for small metadata, not blobs.
The SSD requirement. etcd (and Raft in general) writes to a Write-Ahead Log (WAL) on every commit. The WAL must be fsynced to guarantee durability. On spinning disks, fsync takes 5-15ms. On SSDs, it takes 0.1-0.5ms. This 10-50x difference directly impacts commit latency. Every production Raft deployment uses SSDs. If your etcd is slow, check if it is running on a spinning disk — this is the number one performance issue in the field.

Linearizable Reads: Three Approaches

Raft guarantees linearizable writes by default (they go through the leader and are committed to a majority). But reads are trickier. Here are three approaches, from safest to fastest:

ApproachHow It WorksLatencyCorrectness
Read through logTreat reads as log entries. Replicate to majority before returning.Same as write (~5ms)Fully linearizable. Overkill for reads.
ReadIndexLeader records current commit index, confirms it is still leader (heartbeat round), then serves read from that index.~1 heartbeat RTTLinearizable. etcd's default.
Lease-basedLeader holds a time-based lease. Serves reads locally without confirmation as long as lease is valid.Local read (~0.1ms)Linearizable IF clocks are well-synchronized. Breaks under clock skew.

etcd uses ReadIndex by default. When a client issues a linearizable read, the leader (1) records the current commit index, (2) sends a heartbeat to confirm it is still the leader, and (3) waits for its state machine to reach the recorded commit index before serving the read. This avoids the cost of replicating reads through the log while still guaranteeing linearizability.

ZooKeeper: The Original Coordination Service

ZooKeeper predates etcd and Consul. It uses a hierarchical namespace (like a filesystem) where each node is called a znode. Two special znode types power most use cases:

Znode TypeBehaviorUse Case
EphemeralAuto-deleted when the session that created it ends (client disconnects or crashes).Service discovery: each server creates an ephemeral znode. When it dies, the znode disappears. Watchers are notified.
SequentialZooKeeper appends a monotonically increasing counter to the name. E.g., "/locks/lock-0000000042".Distributed locks: create a sequential ephemeral znode under /locks/. The client with the lowest sequence number holds the lock.

ZooKeeper Distributed Lock: Step by Step

The ZooKeeper distributed lock recipe uses sequential ephemeral znodes. Here is how it works:

1. Create sequential ephemeral znode
Client creates /locks/mylock/lock-000000042 (ZooKeeper assigns the sequence number).
2. List children of /locks/mylock/
Get all znodes: lock-000000040, lock-000000041, lock-000000042.
3. Check if I am the smallest
If my znode (042) has the lowest sequence number, I hold the lock. If not, watch the znode just before mine (041) and wait.
4. On notification, re-check
When the watched znode is deleted (previous holder released or crashed), re-list children and check again. If I am now the smallest, I hold the lock.
python
from kazoo.client import KazooClient

zk = KazooClient(hosts="localhost:2181")
zk.start()

# Acquire a distributed lock
lock = zk.Lock("/locks/my-resource", "worker-42")
with lock:
    # This block runs with exclusive access.
    # If this process crashes, the ephemeral znode
    # auto-deletes, and the next waiter gets the lock.
    print("I hold the lock!")
    do_critical_work()

# Lock is automatically released when exiting the 'with' block.
# Under the hood: ZK deletes the ephemeral sequential znode.
The fencing token problem. A distributed lock is necessary but not sufficient for safety. Here is why: Process A acquires the lock, then pauses for a long GC (garbage collection). The lock's TTL expires. Process B acquires the lock and starts writing. Process A resumes, unaware its lock expired, and also writes. Both processes believe they hold the lock — mutual exclusion is violated. The solution: attach a fencing token (a monotonically increasing number) to every lock grant. The resource being protected checks the token — if it receives a request with a lower token than the last one it saw, it rejects it. ZooKeeper's sequential znode numbers serve as natural fencing tokens.

Consul: Service Discovery + Consensus

Consul by HashiCorp combines a Raft-based KV store with built-in service discovery and health checking. Unlike etcd (which is a pure KV store) or ZooKeeper (which is a hierarchical namespace), Consul is purpose-built for microservice coordination.

FeatureetcdZooKeeperConsul
Data modelFlat key-valueHierarchical (tree)Key-value + service catalog
Service discoveryManual (store endpoints as values)Ephemeral znodesBuilt-in with DNS and HTTP API
Health checkingNone (use external tool)Session heartbeatsBuilt-in (HTTP, TCP, script, gRPC)
Multi-datacenterSingle clusterSingle clusterWAN federation between clusters
LanguageGoJavaGo
ProtocolRaftZABRaft (KV) + Serf/gossip (membership)

When NOT to Use Consensus

Consensus is the wrong tool when:

SituationWhy Consensus is WrongBetter Alternative
High-throughput data pathConsensus adds latency (cross-region: 100ms+). Every write waits for majority.Sharded database with per-shard consensus (CockroachDB, TiKV)
Temporary or lossy dataConsensus is durable by design. No point making session data durable.Redis, Memcached (in-memory, no consensus)
Cross-region with low latencyConsensus across regions means 100-300ms write latency.Eventual consistency (DynamoDB global tables) or causal consistency (MongoDB)
More than ~50 nodesConsensus performance degrades with cluster size (more messages per commit).Gossip protocols (Serf, Memberlist) for membership, consensus for metadata only
The architecture pattern. Use a small consensus cluster (3-7 nodes) for metadata and coordination. Use a separate, purpose-built system for the data path. This is how every major system works: Kafka uses ZooKeeper for metadata + broker disks for messages. Kubernetes uses etcd for state + container registries for images. CockroachDB uses Raft per range (small groups) but not globally.
Concept check: A team is building a real-time multiplayer game. They consider using a 5-node etcd cluster to store player positions (updated 60 times per second per player). Why is this a terrible idea?

Chapter 8: Interview Arsenal

This chapter is your cheat sheet for system design interviews. It covers the comparison table every interviewer expects, the coding drills, and the design questions that come up repeatedly.

The Consensus Protocol Comparison

RaftPaxosZABChain Replication
Year20141989 (published 1998)20112004
LeaderStrong leader. All writes through leader.Proposer (not a fixed leader). Any node can propose.Strong leader (primary). Similar to Raft.Head (writes). Tail (reads). Not a "leader" in the Raft sense.
ElectionRandomized timeout + majority votePrepare/Promise phase (can compete)Prospective leader sends CEPOCH, gets ACK-E from majorityExternal config manager (ZooKeeper/etcd)
LogAppend-only log. Leader's log is authoritative.Per-slot consensus. Holes possible.Transaction log. Similar to Raft.Implicit in forward chain. No explicit log.
Commit ruleMajority of nodes have the entry (current term only)Majority accepted the proposal for that slotMajority ACK. Leader broadcasts COMMIT.Tail has the entry (all preceding nodes also have it)
ComplexitySimple. 3 RPCs total.Notoriously complex. Multi-Paxos papers vary.Moderate. ZooKeeper-specific.Simple replication. Complex failure recovery.
Used inetcd, Consul, TiKV, CockroachDBGoogle Chubby, Spanner (modified)Apache ZooKeeper onlyAzure Storage, Amazon EBS (chain variant)

Key Concepts Quick Reference

TermOne-Sentence Definition
ConsensusMultiple nodes agreeing on a single value despite failures.
SMRReplicate a deterministic state machine by replicating its input log.
QuorumA majority of nodes (N/2 + 1) needed for any decision.
Term/EpochA logical time period with at most one leader.
Split-brainTwo leaders in the same cluster accepting conflicting writes.
LinearizabilitySystem behaves as if there is one copy; operations are atomic and real-time ordered.
Eventual consistencyAll replicas converge eventually, but no ordering guarantee during writes.
Causal consistencyCausally related operations are ordered; concurrent operations are not.
FLP impossibilityConsensus is impossible in a purely asynchronous system with even one crash.
CAP theoremDuring a partition, choose Consistency or Availability (not both).
Fencing tokenMonotonically increasing number attached to a lock to prevent stale holders from doing damage.

Common Interview Questions

Q: How does Raft guarantee that a new leader has all committed entries?

A: The "log up-to-date" voting rule. A follower refuses to vote for a candidate whose log is less up-to-date (lower last term, or same last term but shorter log). Since committed entries are on a majority, and the new leader must get votes from a majority, the intersection contains at least one node with every committed entry. The candidate must be at least as up-to-date as that node.

Q: What happens if the Raft leader commits an entry and then crashes before broadcasting the commit?

A: The entry is on a majority of nodes. The new leader will have it (by the voting rule above). The new leader will re-replicate it to any followers that are missing it, and then commit it as part of its own term. No committed entry is ever lost.

Q: Design a distributed lock service.

A: Use a consensus-based system (etcd, ZooKeeper). Store locks as keys. Attach a lease/TTL so locks auto-release if the holder crashes. Include a fencing token (monotonically increasing number) with each lock grant. The token is checked by the resource being protected — if the resource sees a stale token, it rejects the operation. This prevents the "process pauses during GC, lock expires, another process acquires, first process resumes and does damage" problem.

Raft Terms & Elections Timeline

An animated timeline showing Raft terms, elections, and leader tenures. Click "Next Event" to step through a sequence of elections, crashes, and recoveries.

Timeline: Term 1. Node A elected leader.

Coding Drill: Implement RequestVote

python
class RaftNode:
    def __init__(self, node_id, total_nodes):
        self.id = node_id
        self.current_term = 0
        self.voted_for = None   # Persisted: who did I vote for this term?
        self.log = [None]       # 1-indexed, log[0] is a sentinel
        self.state = "follower"
        self.total_nodes = total_nodes

    def last_log_term(self):
        return self.log[-1].term if len(self.log) > 1 else 0

    def last_log_index(self):
        return len(self.log) - 1

    def handle_request_vote(self, candidate_term, candidate_id,
                             candidate_last_log_idx, candidate_last_log_term):
        """Handle a RequestVote RPC. Return (term, vote_granted)."""

        # Rule 0: If candidate's term < mine, reject
        if candidate_term < self.current_term:
            return self.current_term, False

        # If candidate's term > mine, update my term, reset vote
        if candidate_term > self.current_term:
            self.current_term = candidate_term
            self.voted_for = None
            self.state = "follower"

        # Rule 1: One vote per term
        if self.voted_for is not None and self.voted_for != candidate_id:
            return self.current_term, False

        # Rule 2: Log up-to-date check
        my_last_term = self.last_log_term()
        my_last_idx = self.last_log_index()
        log_ok = (candidate_last_log_term > my_last_term or
                  (candidate_last_log_term == my_last_term and
                   candidate_last_log_idx >= my_last_idx))

        if not log_ok:
            return self.current_term, False

        # Grant vote
        self.voted_for = candidate_id
        return self.current_term, True

Coding Drill: Leader Heartbeat Loop

python
import threading
import time

class RaftLeader:
    def __init__(self, peers, heartbeat_interval=0.1):
        self.peers = peers            # List of follower addresses
        self.interval = heartbeat_interval
        self.running = False
        self.next_index = {}          # peer -> next log index to send
        self.match_index = {}         # peer -> highest replicated index
        self.log = [None]
        self.commit_index = 0
        for p in peers:
            self.next_index[p] = len(self.log)
            self.match_index[p] = 0

    def start_heartbeats(self):
        self.running = True
        self._thread = threading.Thread(target=self._heartbeat_loop)
        self._thread.daemon = True
        self._thread.start()

    def _heartbeat_loop(self):
        while self.running:
            for peer in self.peers:
                self._send_append_entries(peer)
            time.sleep(self.interval)

    def _send_append_entries(self, peer):
        ni = self.next_index[peer]
        prev_idx = ni - 1
        prev_term = self.log[prev_idx].term if prev_idx > 0 else 0
        entries = self.log[ni:]  # Entries to send (empty = heartbeat)

        # Send RPC (simplified — real impl uses async I/O)
        success = self._rpc(peer, prev_idx, prev_term, entries)

        if success:
            self.next_index[peer] = len(self.log)
            self.match_index[peer] = len(self.log) - 1
            self._try_advance_commit()
        else:
            # Back up and retry next heartbeat
            self.next_index[peer] = max(1, self.next_index[peer] - 1)

    def _try_advance_commit(self):
        """Advance commit index if a majority has replicated."""
        for n in range(len(self.log) - 1, self.commit_index, -1):
            # Count nodes that have entry n (including self)
            count = 1  # self
            for p in self.peers:
                if self.match_index[p] >= n:
                    count += 1
            if count > (len(self.peers) + 1) / 2:
                self.commit_index = n
                break

The Raft Safety Argument — How to Explain It in an Interview

Interviewers love to ask "prove that Raft is safe" or "why can't committed entries be lost?" Here is the argument in three steps:

Step 1: Leader Completeness. If an entry is committed in term T, then every leader in terms T+1, T+2, ... has that entry in its log. Why? Because the entry is committed = replicated to a majority. The new leader got votes from a majority. These two majorities overlap (share at least one node). The shared node has the committed entry. The voting rule ensures the new leader is at least as up-to-date as this node. Therefore the new leader has the entry.

Step 2: Log Matching. If two logs have an entry with the same index and term, they are identical up to that point. This is maintained inductively by the AppendEntries consistency check.

Step 3: State Machine Safety. If a server applies entry at index I to its state machine, no other server will ever apply a different entry at index I. This follows from Leader Completeness + Log Matching: the entry at index I was committed, so every future leader has it, so it can never be overwritten.

The short version for interviews. "Committed entries are safe because (1) they are on a majority, (2) any new leader got votes from a majority, (3) those two majorities overlap, (4) the voting rule ensures the new leader has all committed entries, and (5) the leader never overwrites its own log — it only appends. So committed entries survive all leader changes."

Advanced Topic: Membership Changes

How do you add or remove nodes from a running Raft cluster without downtime? This is one of the hardest problems in consensus. The original Raft paper proposed joint consensus: a transitional configuration where both the old and new membership must agree. But this is complex to implement.

In practice, most Raft implementations (including etcd) use a simpler approach: single-server changes. You add or remove one server at a time. The key insight: when you change the cluster by exactly one node, the old majority and the new majority always overlap. This means there is no point in time where two independent majorities can exist.

ChangeOld ConfigNew ConfigOld MajorityNew MajorityOverlap?
Add 1 node (3→4){A,B,C}{A,B,C,D}23Yes (always)
Remove 1 node (5→4){A,B,C,D,E}{A,B,C,D}33Yes (always)
Add 2 at once (3→5){A,B,C}{A,B,C,D,E}23Not guaranteed!
Never change more than one node at a time. If you need to go from 3 nodes to 5 nodes, do it in two steps: 3→4, wait for the new node to catch up, then 4→5. If you change two at once, the old majority (2 of 3) and the new majority (3 of 5) may not overlap, allowing two leaders to exist simultaneously. etcd enforces single-node changes — it rejects configuration changes that would add or remove more than one node.

Raft Optimizations for Interviews

Interviewers sometimes ask "how would you make Raft faster?" Here are the standard optimizations:

OptimizationWhat It DoesSpeedup
BatchingAccumulate multiple client requests and replicate them as a single AppendEntries batch.10-100x throughput
PipeliningSend the next AppendEntries before the previous one is ACKed. Use the nextIndex to track what each follower has.2-5x throughput
ReadIndexServe reads after one heartbeat round instead of replicating through the log.2-3x read throughput
Lease-based readsLeader serves reads locally during its lease period (no heartbeat round).10x read latency (but clock-dependent)
Parallel applyApply committed entries to the state machine in parallel with replicating the next batch.~2x throughput
Witness replicasSome replicas participate in voting but do not store the full log — just enough to vote correctly.Reduced storage cost

Design Questions to Practice

QuestionKey Points
Design a distributed configuration serviceRaft-based, small dataset, watch API for push-based updates, leases for TTL, consistent reads from leader
Design a distributed lock serviceConsensus for lock state, leases for automatic release, fencing tokens for safety, leader reads for linearizability
Design the Kubernetes control planeetcd for state, API server as the only etcd client, controllers watch for changes, optimistic concurrency with resource versions
When would you use chain replication?Read-heavy workloads, need strong consistency, acceptable write latency. Azure Storage uses it. Need external config manager (Raft/Paxos) for reconfiguration.
How does CockroachDB use Raft?One Raft group per range (64MB chunk). Each range independently elects leaders. Multi-range transactions use 2PC across ranges, Raft within each range.

Quick-Fire: True or False

StatementAnswer
A 4-node Raft cluster tolerates 2 failures.False. 4 nodes need 3 for majority. Can only tolerate 1 failure (same as 3 nodes).
Paxos requires a fixed leader.False. Any node can propose. Multi-Paxos optimizes by having a stable leader, but it is not required.
Chain replication provides linearizability without consensus.True for the data path. False for reconfiguration (needs external consensus for chain membership).
Eventual consistency means data will be lost.False. "Eventual" means replicas will converge. Data is durable — just temporarily inconsistent across replicas.
Raft can handle Byzantine faults.False. Raft assumes crash-fault only. Byzantine faults require BFT algorithms (PBFT, Tendermint).
ZooKeeper's reads are linearizable by default.False. ZooKeeper reads can be served by any node (sequentially consistent). For linearizable reads, use the sync command first.
Concept check: You are designing a system where 1000 microservices need to discover each other. Should you use a 5-node Raft cluster for service registration, or a gossip protocol like Serf?

Chapter 9: Connections

You now understand the core of distributed consensus: why replication needs coordination, how Raft structures that coordination with leader election and log replication, what consistency models define the client-visible guarantees, and how chain replication offers an alternative topology. Let us place this knowledge in context.

What We Covered vs What We Skipped

CoveredSkipped (Future Lessons)
State machine replicationPaxos (the original, harder algorithm)
Raft leader electionByzantine fault tolerance (BFT, used in blockchain)
Raft log replicationRaft membership changes (joint consensus)
Consistency models (4 levels)Log compaction and snapshots
Chain replicationMulti-Raft (CockroachDB, TiKV architecture)
Practical coordination servicesConflict-free replicated data types (CRDTs)

The Evolution of Consensus

Understanding the historical progression helps you see why Raft exists and what problem each algorithm was solving:

YearAlgorithm / ResultContribution
1978Lamport's State Machine ReplicationThe foundational idea: replicate a deterministic state machine by replicating its input log.
1985FLP ImpossibilityProved consensus is impossible in purely asynchronous systems with even one crash. Every practical algorithm must assume partial synchrony.
1989Paxos (Lamport)First correct consensus algorithm. Notoriously difficult to understand and implement. The "parliament" metaphor confused generations of engineers.
1998Paxos published (Lamport)After being rejected by reviewers for years ("too whimsical"), the Paxos paper was finally published. Google later built Chubby on it.
2004Chain Replication (van Renesse & Schneider)An alternative to leader-based replication. Simpler, better throughput for large objects, but needs external reconfiguration.
2007Paxos Made Live (Google)Google's experience building Chubby on Paxos. Revealed the enormous gap between the algorithm and a production implementation.
2011ZAB (Yahoo)ZooKeeper's protocol. Designed to be easier than Paxos while providing the same guarantees. Adopted by the entire Hadoop ecosystem.
2014Raft (Ongaro & Ousterhout)Designed from scratch for understandability. The paper was accompanied by a user study showing students learned Raft faster than Paxos. Now the dominant consensus algorithm in industry.
2016+Multi-Raft (CockroachDB, TiKV)Run thousands of independent Raft groups, one per data shard. Scales consensus to large datasets by partitioning.
Why Raft won. Paxos and Raft are equivalent in terms of safety and performance. The difference is understandability. In the original Raft paper's user study, students scored 4.9/5 on Raft questions vs 2.7/5 on Paxos. When engineers can understand the algorithm, they can implement it correctly, debug it effectively, and extend it confidently. Raft's dominance is a triumph of clarity over cleverness.

The Landscape Beyond Crash Faults

Everything in this lesson assumes crash-fault tolerance: nodes either work correctly or stop. In the real world, there are situations where this is not enough:

DomainFault ModelAlgorithm Family
Cloud infrastructure (etcd, CockroachDB)Crash-faultRaft, Paxos, ZAB
Blockchain (Bitcoin, Ethereum)Byzantine (any behavior, including malicious)Nakamoto consensus, PBFT, Tendermint
Safety-critical (aviation, medical)Byzantine (hardware faults, cosmic rays)PBFT variants, agreement protocols
CDNs, DNS, cachingCrash-fault (but eventual consistency suffices)Gossip, CRDTs, vector clocks

Byzantine fault tolerance (BFT) requires 3f+1 nodes to tolerate f Byzantine faults, compared to 2f+1 for crash faults. The message complexity is O(n²) instead of O(n). This makes BFT 3-10x more expensive than crash-fault consensus, which is why cloud systems do not use it — they trust their own hardware.

Related Lessons

LessonConnection
Consistency & Consensus (DDIA Ch10)Covers the same material from the DDIA perspective, including Paxos, 2PC, and the full linearizability proof. More depth on ordering guarantees and Lamport timestamps.
Transactions (DDIA Ch8)How serializability relates to linearizability. Two-phase locking, optimistic concurrency, and SSI.
Sharding (DDIA Ch7)How to split data across nodes. Each partition can run its own Raft group (the CockroachDB pattern).
Replication (DDIA Ch6)Single-leader, multi-leader, and leaderless replication. The foundation for understanding why consensus is needed.

The One Diagram to Remember

Consensus Landscape

How the pieces fit together. Raft provides total order broadcast, which enables state machine replication, which enables linearizable reads and writes. Chain replication is an alternative data path that relies on an external Raft/Paxos service for reconfiguration.

The key insight to carry forward. Consensus is expensive and should be used sparingly — for metadata, coordination, and leader election. The actual data path should use cheaper mechanisms (sharding, eventual consistency, CRDTs) with consensus only at the boundaries. Every well-architected distributed system separates the "control plane" (consensus) from the "data plane" (throughput-optimized). Kubernetes, Kafka, Cassandra, CockroachDB — they all follow this pattern.

Seminal Papers Worth Reading

PaperYearWhy Read It
"In Search of an Understandable Consensus Algorithm" (Ongaro & Ousterhout)2014THE Raft paper. Clearly written, includes a formal specification, and accompanied by a user study. Read this first.
"Chain Replication for Supporting High Throughput and Availability" (van Renesse & Schneider)2004The original chain replication paper. Only 12 pages. Clean and practical.
"Paxos Made Simple" (Lamport)2001Lamport's attempt to simplify Paxos. At 14 pages, it is the most accessible Paxos description.
"Paxos Made Live" (Chandra, Griesemer, Redstone — Google)2007The reality of implementing Paxos in production. Eye-opening on the gap between theory and practice.
"Impossibility of Distributed Consensus with One Faulty Process" (Fischer, Lynch, Paterson)1985The FLP impossibility result. Foundational. Only 7 pages.
"Object Storage on CRAQ" (Terrace & Freedman)2009Chain Replication with Apportioned Queries. Reads from any node with linearizability.

The Mental Model

After this lesson, you should have a clear mental picture of how consensus fits into distributed systems:

Clients send requests
Writes and reads arrive at the distributed system.
Control plane orders them
A small Raft/Paxos cluster ensures total ordering of commands. 3-7 nodes, SSD-backed, low-latency.
Data plane applies them
State machines or data stores apply the ordered commands. Can be sharded across thousands of nodes.
Consistency model determines reads
Linearizable reads go through the leader. Eventual reads go to any replica. Causal reads track dependencies.

This separation — control plane vs data plane, consensus vs replication, metadata vs data — is the organizing principle of every well-designed distributed system. Master it, and every distributed systems paper, blog post, and interview question will make sense.

Checklist: What You Should Be Able to Do

After completing this lesson, test yourself. You should be able to:

#SkillTest Yourself
1Explain why replication needs consensusDescribe the split-brain problem in 2 sentences.
2Explain State Machine ReplicationDraw the two-layer architecture (consensus + state machine) on a whiteboard.
3Walk through a Raft electionTrace the flow: timeout fires, increment term, send RequestVote, receive votes, become leader.
4Walk through log replicationTrace AppendEntries: prevLogIndex check, append, commit when majority ACKs.
5Rank consistency modelsGiven a scenario (bank, social media, DNS), choose the appropriate model and justify.
6Compare Raft and chain replicationState when to use each (latency vs throughput, self-contained vs external config).
7Design with consensusSketch a system using etcd/ZK for metadata + data stores for throughput. Identify what goes in consensus and what does not.
8Prove Raft safety informallyThe 5-step argument: committed on majority, new leader from majority, overlap, voting rule, append-only log.
Final concept check: You are building a new database that needs to replicate data across 3 data centers (US-East, US-West, EU). Each write must be acknowledged as committed before returning to the client. What is the minimum write latency your system can achieve?

"A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable."
— Leslie Lamport