Designing Data-Intensive Applications — Chapter 10

Consistency & Consensus

Linearizability, Raft, Paxos — how distributed nodes agree on the truth.

Prerequisites: Replication basics + Network intuition. That's it.
11
Chapters
9+
Simulations
5
Interview Dimensions

Chapter 0: The Problem

You are running a distributed database with three nodes. Node A is the leader, handling all writes. Nodes B and C are followers, replicating data from A. Life is good. Then Node A's network interface goes down. It is still running, still accepting writes from local clients, but it cannot talk to B or C. And B and C cannot talk to A.

Here is the crisis. Nodes B and C need a leader to accept writes. They cannot reach A, so they assume A is dead. They elect Node B as the new leader. Now you have two nodes that both think they are the leader. A is still accepting writes. B is also accepting writes. Different clients are sending different writes to different leaders. The data on A and the data on B are diverging with every passing second.

This is split-brain: a situation where two parts of a distributed system operate independently, each believing it is the sole authority. When the network heals and A and B try to reconcile, you have conflicting data and no way to know which version is "correct." Money has been double-spent. Inventory has been double-sold. The damage is already done.

Split-brain is not hypothetical. GitHub experienced a 24-hour outage in 2018 caused by a split-brain in their MySQL replication topology. A network partition caused the orchestrator to promote a replica to primary, but the original primary was still receiving writes. When they reconnected, the conflicting writes required manual reconciliation. The cost: millions of dollars and permanent data loss for some users.

The fundamental question of this chapter: in a network where messages can be delayed, dropped, or reordered, how do you get a group of nodes to agree on something? "Something" could be: who is the leader, what value a register holds, whether a transaction should commit, or what order events occurred in. All of these reduce to the same problem — consensus.

Consensus sounds simple. Just vote! If a majority agrees, the decision is final. But what happens when votes are delayed? What if a node votes, then crashes before learning the outcome? What if two candidates start elections simultaneously and split the vote? What if the network partitions right in the middle of voting?

For decades, researchers struggled to even prove that consensus was possible in an asynchronous system. In 1985, Fischer, Lynch, and Paterson proved a devastating result: it is impossible to guarantee consensus in an asynchronous system where even one process can crash. This is the FLP impossibility result. Every real consensus algorithm circumvents it by assuming partial synchrony — the network is usually well-behaved, and you can use timeouts to detect failures. It is a pragmatic compromise between theoretical impossibility and practical necessity.

Why Is Consensus So Hard?

At first glance, consensus seems trivial. Just pick a leader, right? But consider the failure modes:

FailureWhat HappensWhy It Breaks Naive Solutions
Message delayA vote arrives after the election endsCannot distinguish "slow" from "lost." How long do you wait?
Message lossA vote is dropped by the networkCannot distinguish "lost" from "very delayed." Retries create duplicates.
Message duplicationA vote is received twiceCould double-count votes, elect a leader with fewer real supporters than thought.
Node crashA node crashes after voting but before learning the outcomeWhen it recovers, its state is inconsistent with the cluster.
Network partitionTwo groups of nodes can talk within groups but not acrossEach group might independently elect a leader → split-brain.
Byzantine failureA node sends intentionally wrong messagesStandard consensus (Raft, Paxos) doesn't handle this. Need BFT algorithms.

Every real consensus algorithm must handle the first five failures correctly. The sixth (Byzantine) is out of scope for this chapter — it's the domain of blockchain and military systems. The algorithms we will study (Raft, Paxos, ZAB) assume crash-fault only: nodes either work correctly or stop completely.

The Replicated State Machine Approach

The most common use of consensus is the replicated state machine (RSM). The idea: if every node starts in the same state and applies the same sequence of commands in the same order, they will all end up in the same state. Consensus ensures the "same sequence" part.

1. Client sends command
e.g., "SET x = 42" or "leader_id = node3"
2. Consensus orders it
All nodes agree: this is command #47 in the global sequence.
3. Every node applies
Every node applies command #47 to its local state machine. All get the same result.
4. Response to client
Once a majority have applied, the leader returns the result.

This is how etcd, ZooKeeper, and CockroachDB work internally. The consensus algorithm (Raft or ZAB or Paxos) is the mechanism for step 2. The state machine in step 3 is the actual key-value store, lock service, or database engine. The separation of concerns is clean: the consensus layer knows nothing about keys or values — it just orders bytes. The state machine knows nothing about nodes or elections — it just applies commands.

What We Will Cover

ChapterTopicThe Core Question
1LinearizabilityWhat does "consistent" even mean, formally?
2Ordering GuaranteesHow do you track which event happened before which?
3Two-Phase CommitCan we just coordinate with a single node? (No.)
4-5Raft ConsensusThe full algorithm: leader election + log replication
6PaxosThe original (and harder) consensus algorithm
7ZAB & OthersWhat do ZooKeeper, etcd, and CockroachDB actually use?
8In PracticeCoordination services and real system design
9Interview ArsenalThe cheat sheet for system design interviews
Split-Brain Simulation

A 3-node cluster. Click "Kill Leader" to take down the leader. Watch the remaining nodes try to elect a new leader — then click "Partition" to separate them. Split-brain emerges.

All nodes healthy. Node A is the leader.

The Numbers: How Many Nodes Do You Need?

Cluster SizeMajorityTolerated FailuresWhen to Use
110Development. No fault tolerance at all.
220Never. Worse than 1 node (both must be up).
321Small production systems. Minimum for fault tolerance.
431Rarely used. Same fault tolerance as 3, but more overhead.
532Standard production. Survives 1 planned + 1 unplanned failure.
642Rarely used. Same fault tolerance as 5.
743Critical infrastructure. Cross-region with 3 data centers.
The "rolling upgrade" argument for 5 nodes. With 3 nodes, you can tolerate 1 failure. But during a rolling upgrade, 1 node is down for maintenance. If an unexpected failure occurs during the upgrade, you've lost your quorum. With 5 nodes: 1 down for upgrade + 1 unexpected failure = 2 failures tolerated. You still have 3 nodes alive (a majority). This is why production Kubernetes clusters run 5 etcd nodes, not 3.
Concept check: A distributed system has 5 nodes. The network partitions into two groups: {A, B} and {C, D, E}. Both groups try to elect a leader. If the system requires a strict majority (>50%) to elect a leader, what happens?

Chapter 1: Linearizability

Before we can talk about how nodes agree, we need to define what "agreement" looks like from the outside. When a client reads a value from a distributed system, what should they see? The answer depends on the consistency model the system provides — a contract between the system and its clients about what behaviors are allowed.

The strongest consistency model is linearizability (also called atomic consistency or strong consistency). Here is the promise: the system behaves as if there is exactly one copy of the data, and every operation takes effect atomically at some single point in time between when the client sent the request and when the client received the response.

Let us make that precise. Every read or write operation has two timestamps: the invocation (when the client sends the request) and the response (when the client receives the reply). In a real system, there is some nonzero duration between these — the operation takes time. Linearizability says that you must be able to assign a single point in time within that duration where the operation "takes effect," and if you line up all operations by their effect points, the result must look like a sequential execution.

The "recency guarantee." Linearizability means: once a write completes and a client gets the acknowledgment, ALL subsequent reads (by any client, on any node) must see that write or a later one. You can never go back to seeing an old value. This is the property that makes linearizability so intuitive — and so expensive to implement.

Is This History Linearizable?

The classic technique for checking linearizability: draw operations as horizontal bars on a timeline (one row per client), then try to place a "linearization point" (a dot) within each bar such that the dots form a valid sequential order.

Linearizability Checker

Each horizontal bar is an operation. Green = write, blue = read. The value of register X is shown on each operation. Click "Generate" to see a new history, then determine: is it linearizable?

Click "Generate" to create a history.

Linearizability vs Serializability

These two terms are constantly confused — even in published papers. They are entirely different guarantees.

LinearizabilitySerializability
ScopeSingle object (a register, a key)Multiple objects (a transaction touching many rows)
GuaranteeReal-time ordering: if op A finishes before op B starts, A's effect is visible to BEquivalent to some serial order — but that order need not match real-time
AboutRecency — you never see stale dataIsolation — transactions don't interfere with each other
Used byConsensus protocols, linearizable registersDatabase transaction isolation levels

The combination of both — strict serializability — gives you transactions that are both serializable AND linearizable. This is what Spanner and CockroachDB provide, and it is the gold standard. But it comes at a cost: you need globally synchronized clocks (Spanner uses GPS and atomic clocks) or consensus on every transaction.

Implementing Linearizability

ApproachLinearizable?Why
Single-leader + sync replicationPotentiallyOnly if reads go through the leader or use sync followers. Async replicas can return stale data.
Consensus algorithms (Raft, Paxos)YesThe whole point of consensus is to agree on a total order of operations.
Multi-leader replicationNoConcurrent writes to different leaders produce conflicts that violate real-time order.
Leaderless (Dynamo-style)Usually noSloppy quorums, read-repair races, and clock skew break linearizability in practice.

The Cost: CAP Theorem

The CAP theorem (Brewer, 2000; proved by Gilbert & Lynch, 2002) states: during a network partition, a distributed system must choose between Consistency (linearizability) and Availability (every non-crashed node responds). You cannot have both.

More precisely: if two nodes cannot communicate (partition), and a client writes to one node, the other node has two options: (1) refuse to answer reads until the partition heals (consistent but unavailable), or (2) answer reads with potentially stale data (available but inconsistent). There is no third option.

CAP is about partitions, not everyday operation. When there is no partition, you can have both consistency and availability. CAP only forces a choice during a partition. In practice, network partitions do happen (every large-scale study shows they occur regularly in data centers), so the choice is real. But "AP vs CP" is a spectrum, not a binary. Most real systems are somewhere in between.

Worked Example: Is This Linearizable?

// Client A writes x = 1 (starts at t=0, acknowledged at t=2)
A: write(x, 1)   [t=0 ——————— t=2]

// Client B reads x, gets 1 (starts at t=1, returns at t=3)
B: read(x) → 1   [t=1 ——————— t=3]

// Client C reads x, gets 0 (starts at t=2.5, returns at t=3.5)
C: read(x) → 0   [t=2.5 ———— t=3.5]

// Is this linearizable?
// B's read overlaps with A's write, so getting 1 is fine (linearization point
// of write before linearization point of B's read).
// But C's read starts AFTER A's write is acknowledged (2.5 > 2).
// C should see x=1, but sees 0.
// VERDICT: NOT linearizable. The recency guarantee is violated.

The CAP Spectrum in Real Systems

CAP is often presented as a binary choice, but real systems exist on a spectrum. Here is where common databases land:

SystemCAP ChoiceDuring PartitionNormal Operation
SpannerCPMinority partition becomes unavailable (no writes, no strong reads)Linearizable reads, serializable transactions. Uses TrueTime (GPS + atomic clocks) for tight clock sync.
CockroachDBCPSame as Spanner: minority stallsSerializable transactions via Raft consensus per range. No special hardware (uses hybrid logical clocks).
DynamoDBAP (default)All nodes remain available. May return stale data.Eventually consistent by default. Can opt into strongly-consistent reads (reads from leader).
CassandraAP (tunable)Available as long as you can reach enough replicas for your chosen consistency level.Tunable consistency: ONE (fast, stale) to ALL (slow, consistent). QUORUM is the common middle ground.
MongoDBCP (primary reads)Primary election stalls writes. Secondary reads may be stale.Strong reads from primary. Causal consistency with causal sessions.
Redis (single)CASingle-node: no partition to worry about (but no fault tolerance either).Linearizable (single node = trivially consistent). Redis Cluster: AP (async replication).
The real engineering question is not "CP or AP?" It is: "Which operations need linearizability, and which can tolerate eventual consistency?" A well-designed system uses strong consistency where it matters (account balances, leader election, distributed locks) and relaxes to eventual consistency for everything else (user timelines, analytics, caches). This is why many databases offer tunable consistency levels per query.

When Do You Need Linearizability?

Not everything needs the strongest guarantee. Here is a decision framework:

Use CaseNeed Linearizability?Why
Leader electionYesTwo leaders = split-brain = data loss. Must agree on exactly one.
Distributed lockYesTwo processes holding same lock = mutual exclusion violated.
Unique constraint (username)YesTwo users claiming same username = conflict.
Bank account balanceYesOverdraft from stale read = financial loss.
Social media timelineNoSeeing a post 2 seconds late is fine.
Analytics dashboardNoApproximate counts are acceptable.
CDN cacheNoStale content for seconds/minutes is the whole point of caching.
Shopping cartUsually noEventually consistent with conflict resolution (last-write-wins or merge).
CAP Tradeoff Space

Where do real systems sit in the CAP space? The triangle shows the tradeoff. Systems cluster near CP (strong consistency, unavailable during partitions) or AP (always available, eventually consistent).

Interview question: Your team runs a service with 3 replicas. A writes x=5 to the leader. The leader acknowledges. B immediately reads x from a follower and gets x=3 (old value). Is this system linearizable, and why?

Chapter 2: Ordering Guarantees

Linearizability is powerful but expensive. Can we get useful guarantees with something weaker? Yes — by focusing on causality. If event A caused event B, then A must be ordered before B. But if A and B are independent (neither caused the other), we don't care which comes first. This is causal consistency, and it is the strongest consistency model that does not sacrifice availability during partitions.

But how do you track "what caused what" in a distributed system? You cannot use wall clocks — we learned in the previous chapters that clocks drift, skew, and cannot be trusted. You need a logical clock.

Lamport Timestamps

Leslie Lamport (1978) invented a beautifully simple mechanism. Each node maintains a counter. The rules:

Rule 1: Internal Event
Before processing any event, increment your counter by 1.
Rule 2: Send Message
Increment your counter. Attach the counter value to the message.
Rule 3: Receive Message
Set your counter to max(local_counter, received_counter) + 1.

The result: if event A causally precedes event B, then A's timestamp is strictly less than B's. The contrapositive: if timestamp(A) ≥ timestamp(B), then A did NOT causally precede B.

The critical limitation. Lamport timestamps give you a total order: every event gets a unique number (break ties with node ID). But they cannot tell you which events are concurrent. If timestamp(A) < timestamp(B), it might be because A caused B, or it might be a coincidence — A and B happened independently on different nodes. This matters because concurrent events might represent conflicting writes that need resolution. Vector clocks solve this by maintaining a vector of counters (one per node), but at the cost of O(N) space per message.
Lamport Timestamps

Three nodes exchange messages. Watch how Lamport timestamps update with each send/receive. Click "Send Message" to trigger a random message between nodes, or "Internal Event" to advance one node's clock.

Vector Clocks: Detecting Concurrency

Lamport timestamps are insufficient for detecting concurrent events. Vector clocks extend the idea: instead of a single counter, each node maintains a vector of counters — one for every node in the system.

// Vector clock for node A in a 3-node system {A, B, C}:
VCA = [A_count, B_count, C_count]

// Rules (same spirit as Lamport, but richer):
// Internal event: increment own position
VCA[A] += 1

// Send: increment own position, attach full vector
VCA[A] += 1; send(message, VCA)

// Receive: merge (element-wise max), then increment own
VCA[i] = max(VCA[i], received[i]) for all i
VCA[A] += 1

// Causality detection:
// VC1 happens-before VC2 if: every element of VC1 ≤ corresponding in VC2,
// AND at least one is strictly less.
// If neither VC1 ≤ VC2 nor VC2 ≤ VC1, the events are CONCURRENT.

// Example:
VCA = [2, 1, 0]   ← A did 2 events, saw 1 from B, none from C
VCB = [1, 3, 0]   ← B saw 1 from A, did 3 events, none from C
// A[0]=2 > B[0]=1, but A[1]=1 < B[1]=3 → CONCURRENT!
// Neither caused the other. These represent a conflict that needs resolution.

Vector clocks are used in systems like Riak and Amazon's Dynamo (internally) to detect write conflicts. When two writes are concurrent, the system can either: (a) keep both versions (siblings) and let the application resolve, or (b) use a deterministic rule (last-write-wins by wall clock — but this loses data).

The scaling problem with vector clocks. A vector clock has one entry per node. With 1000 nodes, each timestamp is a 1000-element vector. This doesn't scale. Practical systems use dotted version vectors or interval tree clocks to reduce space, or simply limit the use of vector clocks to small coordination groups rather than the entire cluster.

Total Order Broadcast

Total order broadcast (also called atomic broadcast) is a protocol that delivers messages to all nodes such that: (1) every non-crashed node delivers the same set of messages, and (2) every node delivers them in the same order. This is strictly stronger than Lamport timestamps because it gives you a total order that all nodes agree on, not just one that's consistent per node.

Here is the remarkable equivalence: total order broadcast is equivalent to consensus. If you can solve total order broadcast, you can solve consensus (decide on the first message delivered). If you can solve consensus, you can solve total order broadcast (use consensus to decide each next message). This equivalence is why we care so deeply about consensus algorithms — they are the universal primitive.

Formally, total order broadcast has two properties:

PropertyDefinitionWhy It Matters
Reliable deliveryIf a non-crashed node delivers a message, all non-crashed nodes eventually deliver it.No node can miss a message that others see. Prevents state divergence.
Total orderAll nodes deliver messages in the exact same order.If nodes apply messages as state transitions, they all end up in the same state. This is the replicated state machine approach.
python
class LamportClock:
    def __init__(self, node_id):
        self.time = 0
        self.node_id = node_id

    def local_event(self):
        self.time += 1
        return self.time

    def send(self):
        self.time += 1
        return (self.time, self.node_id)  # attach to message

    def receive(self, msg_time):
        self.time = max(self.time, msg_time) + 1
        return self.time

# Usage
a = LamportClock("A")
b = LamportClock("B")

a.local_event()          # a.time = 1
ts = a.send()            # a.time = 2, msg carries (2, "A")
b.receive(ts[0])        # b.time = max(0, 2) + 1 = 3
b.local_event()          # b.time = 4

Worked Example: Tracing Lamport Timestamps

// Three nodes: A, B, C. All start at counter = 0.

// Step 1: A does internal event
A: counter = 0 + 1 = 1

// Step 2: A sends message to B (attaches timestamp 2)
A: counter = 1 + 1 = 2   → sends msg(2) to B

// Step 3: C does internal event (independently)
C: counter = 0 + 1 = 1

// Step 4: B receives msg(2) from A
B: counter = max(0, 2) + 1 = 3

// Step 5: B sends message to C (attaches timestamp 4)
B: counter = 3 + 1 = 4   → sends msg(4) to C

// Step 6: C receives msg(4) from B
C: counter = max(1, 4) + 1 = 5

// Final counters: A=2, B=4, C=5
// Causal chain: A(1) → A(2) → B(3) → B(4) → C(5)
// C's internal event at time 1 is concurrent with A's events.

Worked Example: Vector Clocks Detect Conflict

// Three nodes: A, B, C. All vector clocks start at [0, 0, 0].

// Step 1: A writes x=1 (internal event)
A.vc = [1, 0, 0]

// Step 2: A sends write to B
A.vc = [2, 0, 0]   (send: increment A's position)
message carries vc = [2, 0, 0]

// Step 3: B receives from A
B.vc = [max(0,2), max(0,0)+1, max(0,0)] = [2, 1, 0]

// Step 4: CONCURRENTLY, C writes x=2 (C hasn't heard from anyone)
C.vc = [0, 0, 1]

// Now compare B's write and C's write:
B.vc = [2, 1, 0]
C.vc = [0, 0, 1]

// B[0]=2 > C[0]=0, but B[2]=0 < C[2]=1
// Neither ≤ the other → CONCURRENT!
// This is a REAL CONFLICT that needs resolution.
// Lamport timestamps would just give B ts=3 and C ts=1,
// making it look like B happened after C — but that's wrong.
// They are genuinely independent, concurrent writes.
Concurrency ≠ simultaneous. "Concurrent" in distributed systems does NOT mean "at the same wall-clock time." It means "neither causally depends on the other." Two events can happen hours apart and still be concurrent — if neither node learned about the other's event before performing its own. Concurrency is about information flow, not time.
Interview question: Node X has Lamport timestamp 15. Node Y has Lamport timestamp 10. Can we conclude that X's event happened after Y's event?

Chapter 3: Two-Phase Commit

Before diving into real consensus algorithms, let's examine a simpler approach that almost works: two-phase commit (2PC). It guarantees atomicity across distributed nodes — all nodes commit or all nodes abort — but it is NOT consensus. Understanding why reveals exactly what consensus needs that 2PC lacks.

The Protocol

2PC uses a designated coordinator node that orchestrates a two-phase dance with the participants (the nodes that hold the data):

Phase 1: Prepare
Coordinator sends "prepare" to all participants. Each participant checks if it CAN commit (locks resources, writes data to durable log). If yes, responds "vote-yes." If no, responds "vote-no."
Decision Point
Coordinator collects all votes. If ALL participants voted yes, decision = commit. If ANY voted no, decision = abort.
Phase 2: Commit/Abort
Coordinator sends the decision to all participants. Each participant commits or rolls back accordingly and acknowledges.

Notice the critical rule: once a participant votes "yes," it has made an irrevocable promise. It has written all data to its log and it CANNOT unilaterally abort — it must wait for the coordinator's decision, no matter how long that takes.

The Fatal Flaw

What happens if the coordinator crashes after sending "prepare" but before sending the decision? Every participant that voted "yes" is stuck. They have locks held on their data. They cannot commit (they don't know the decision). They cannot abort (they promised they could commit, and maybe the coordinator will come back and tell them to commit). They are blocked — holding locks, unable to proceed, potentially forever if the coordinator never recovers.

2PC is a blocking protocol. A coordinator failure can leave participants holding locks indefinitely. This is not just a theoretical concern — in production, 2PC coordinator failures cause cascading timeouts across services. Worse, other transactions that need those locked rows are also blocked. One crashed coordinator can freeze an entire system. This is why 2PC is NOT consensus: consensus requires progress even when nodes fail, as long as a majority is alive.
Two-Phase Commit Simulation

A coordinator and 3 participants. Click "Start 2PC" to run the protocol. Then try "Kill Coordinator" during Phase 1 or Phase 2 and watch the participants get stuck.

Ready. Click "Start 2PC" to begin.

Worked Example: 2PC Timeline

// Distributed transaction: transfer $100 from Account A (Node 1) to Account B (Node 2)

// HAPPY PATH:
t=0: Client → Coordinator: "Transfer $100 from A to B"
t=1: Coordinator → Node1: "Prepare: debit A by $100"
t=1: Coordinator → Node2: "Prepare: credit B by $100"
t=2: Node1 checks A has sufficient balance. Writes debit to WAL. Locks row A.
t=2: Node1 → Coordinator: "Vote YES"
t=3: Node2 writes credit to WAL. Locks row B.
t=3: Node2 → Coordinator: "Vote YES"
t=4: Coordinator logs decision "COMMIT" to its own WAL.
t=4: Coordinator → Node1: "COMMIT"
t=4: Coordinator → Node2: "COMMIT"
t=5: Node1 applies debit, releases lock on A.
t=5: Node2 applies credit, releases lock on B.
t=6: Both ack. Transaction complete. Total time: ~6 round trips.

// FAILURE SCENARIO:
t=0-3: Same as above. Both nodes voted YES.
t=4: Coordinator CRASHES before writing decision or sending commit.
t=5: Node1 is stuck. It voted YES (promised to commit). It's holding a lock on A.
t=5: Node2 is stuck. Same situation. Lock on B held.
t=?: Neither node can unilaterally commit (what if coordinator decides abort?)
t=?: Neither node can unilaterally abort (what if coordinator decides commit?)
t=?: Other transactions wanting A or B are BLOCKED by the held locks.
// This is the "in-doubt" state. It persists until coordinator recovers.
The "coordinator log write" is the point of no return. Once the coordinator writes "COMMIT" to its WAL, the transaction WILL commit — even if the coordinator crashes before sending the decision. Upon recovery, it reads the log and resends the decision. If the coordinator crashes BEFORE writing the decision, the transaction aborts (no decision = abort is the safe default). The critical window is between receiving all YES votes and writing the decision — this is where 2PC is vulnerable.

2PC in the Real World

2PC is used in practice despite its limitations, because the alternatives are worse for many use cases:

SystemHow It Uses 2PCHow It Mitigates the Blocking Problem
MySQL (XA Transactions)Cross-shard transactions in Vitess, MySQL ClusterTimeout-based abort (risky — can violate atomicity). Prefers single-shard transactions.
PostgreSQLPREPARE TRANSACTION / COMMIT PREPAREDpg_prepared_xacts view for manual cleanup. DBA intervention for orphaned transactions.
Google Spanner2PC for cross-shard writes, BUT coordinator is a Paxos groupThe coordinator itself is replicated via Paxos, so coordinator crash ≠ lost decision. This is the gold standard.
CockroachDBParallel commits (optimized 2PC variant)Transaction record is a Raft-replicated key. Amortizes consensus cost.

From 2PC to Consensus

The fix for 2PC's blocking problem is to replicate the coordinator using a consensus algorithm. Instead of one coordinator, you have a group of nodes that agree on the decision using Raft or Paxos. If one node crashes, the others still know the decision. This is called three-phase commit (3PC) in theory, but in practice everyone just uses consensus directly.

The insight: 2PC's problem is that the coordinator is a single point of failure with irrecoverable state (the commit decision). Consensus eliminates single points of failure by replicating the decision across a majority of nodes. This is the bridge from "simple coordination" to "fault-tolerant agreement."

2PCConsensus (Raft/Paxos)
CoordinatorOne fixed nodeElected leader, can change
Tolerates coordinator crash?No — blocksYes — new leader elected
GuaranteesAtomicityAtomicity + liveness (with partial synchrony)
Message rounds22+ (but pipelining helps)
Used forDistributed transactionsLeader election, replicated state machines
Debug scenario: Your microservice architecture uses 2PC for cross-service transactions. During a deploy, the coordinator service is restarted. Three participant services are stuck holding locks. What should you do, and how would you prevent this in the future?

Chapter 4: Raft Consensus

In 2014, Diego Ongaro and John Ousterhout published "In Search of an Understandable Consensus Algorithm." They were frustrated that Paxos — the dominant consensus algorithm — was so notoriously difficult to understand that most implementations got it wrong. Their goal was to create an algorithm that was equivalent in correctness and performance but dramatically easier to understand. The result was Raft.

Raft decomposes consensus into three independent sub-problems:

Sub-problemWhat it solvesMechanism
Leader electionWho makes decisions?Random timeouts + majority vote
Log replicationHow do decisions propagate?Leader appends entries, followers replicate
SafetyHow do we prevent inconsistency?Election restriction + commitment rules

Three Roles

Every Raft node is in exactly one of three states at any time:

Follower: passive. Responds to requests from leaders and candidates. If it hears nothing from a leader for a random timeout period (the election timeout, typically 150-300ms), it becomes a candidate.

Candidate: actively seeking votes. Increments its term number (a logical clock for the election cycle), votes for itself, and sends RequestVote RPCs to all other nodes. If it gets votes from a majority, it becomes the leader. If it hears from a new leader, it steps down to follower. If the election times out with no winner, it starts a new election with a higher term.

Leader: handles all client requests. Appends entries to its log, sends AppendEntries RPCs (also used as heartbeats) to all followers, and tracks which entries have been committed. There is at most one leader per term.

The term is everything. Raft's correctness depends on the term number: a monotonically increasing integer that serves as a logical clock. Every RPC includes the sender's term. If a node receives a message with a higher term, it immediately updates its own term and steps down to follower. If a candidate or leader discovers that its term is stale, it stops what it's doing. This ensures that outdated leaders cannot cause damage — they are automatically dethroned by higher terms.

Leader Election in Detail

1. Timeout Triggers
A follower hasn't heard from the leader within its election timeout (randomized, e.g., 250ms). It becomes a candidate.
2. Start Election
Candidate increments its term to T+1. Votes for itself. Sends RequestVote(term=T+1, lastLogIndex, lastLogTerm) to all other nodes.
3. Nodes Vote
A node grants its vote if: (a) it hasn't already voted in term T+1, AND (b) the candidate's log is at least as up-to-date as the voter's log. "Up-to-date" means: higher last log term, or same last log term but longer log.
4. Majority Wins
If the candidate gets votes from a majority (including itself), it becomes leader. It immediately sends heartbeats (empty AppendEntries) to prevent new elections.

The randomized timeout is key to preventing split votes. If two candidates start elections simultaneously, they might split the vote and neither gets a majority. In that case, both time out and start new elections — but with freshly randomized timeouts, so one will almost certainly start before the other and win. In practice, split votes are rare (Ongaro reports them happening less than once per thousand elections in testing).

State Transition Diagram

Every Raft node moves between exactly three states. The transitions are triggered by specific events:

Raft State Machine

The three states of a Raft node and the events that trigger transitions. Click each state to highlight its outgoing transitions.

// State transitions:

Follower → Candidate: election timeout expires (no heartbeat from leader)
Candidate → Candidate: election timeout expires (split vote, retry)
Candidate → Leader: received votes from a majority
Candidate → Follower: received AppendEntries from a valid leader, or higher term
Leader → Follower: discovered a higher term (another leader was elected)

// NOTE: There is NO transition from Follower directly to Leader.
// You MUST go through the Candidate state first.
// This prevents a node from unilaterally claiming leadership.

The Heartbeat Mechanism

A leader sends heartbeats (empty AppendEntries RPCs) to all followers at a regular interval — typically every 50-100ms. This serves two purposes: (1) it prevents followers from timing out and starting elections, and (2) it carries the leader's commitIndex, telling followers which entries are committed.

The election timeout must be significantly longer than the heartbeat interval. The Raft paper recommends:

broadcastTime << electionTimeout << MTBF

Where broadcastTime is the average time for an RPC round trip (~1-20ms), electionTimeout is the random timeout (150-300ms), and MTBF is the mean time between failures for a single node (months to years). This ensures elections are rare, fast, and don't overlap with heartbeats.

Log Replication in Detail

Once a leader is elected, it handles all client requests. Each request becomes an entry in the leader's log: an ordered sequence of (term, command) pairs. The leader then replicates each entry to followers:

1. Client Request
Client sends command to leader. Leader appends entry to its own log at the next index.
2. AppendEntries RPC
Leader sends AppendEntries(entries, prevLogIndex, prevLogTerm) to each follower. The prevLogIndex and prevLogTerm are consistency checks — the follower must have the previous entry before accepting the new one.
3. Followers Append
If the follower's log matches at prevLogIndex/prevLogTerm, it appends the new entries and responds success. If not, it responds failure — the leader will retry with earlier entries.
4. Commit
Once a majority of nodes have the entry, the leader marks it as committed. Committed entries are safe — they will never be lost (barring total cluster failure). The leader notifies followers of the commit index in subsequent AppendEntries.
Majority = fault tolerance. In a 5-node cluster, an entry is committed once 3 nodes have it. This means you can tolerate 2 node failures and the data is still safe. In general, a cluster of 2f+1 nodes tolerates f failures. So: 3 nodes tolerate 1 failure, 5 tolerate 2, 7 tolerate 3. Odd numbers are preferred because even numbers waste a node (4 nodes still only tolerate 1 failure, same as 3).

The Performance of Raft in Practice

How fast is Raft? The critical metric is commit latency: the time from when a client sends a request to when it receives a committed response. This depends on:

// Commit latency breakdown:
commit_latency =
   network_rtt_to_leader // client → leader
+ fsync_latency_leader // leader writes entry to disk (~0.1-1ms SSD)
+ network_rtt_to_majority // leader → followers (parallel)
+ fsync_latency_followers // followers write to disk (parallel)
+ network_rtt_response // leader → client

// In a same-datacenter cluster:
// ~0.5ms (network) + ~0.2ms (fsync) + ~0.5ms (network) + ~0.2ms (fsync) + ~0.5ms
// ≈ 2ms total. etcd reports p99 of ~3-5ms in practice.

// In a cross-region cluster (US-East to US-West = ~60ms RTT):
// ~1ms + ~0.2ms + ~60ms + ~0.2ms + ~1ms
// ≈ 62ms total. This is why cross-region consensus is expensive.

// Throughput: etcd handles ~10,000-50,000 writes/sec.
// Raft can pipeline: leader sends new entries without waiting for
// previous ones to commit. Batching multiple client requests into
// a single AppendEntries RPC dramatically improves throughput.

The key insight for system design: consensus is fast enough for metadata (who is the leader? what's the partition map?) but too slow for data-plane operations (every user write, every analytics event). This is why the pattern is always: consensus for coordination, something faster for data.

Raft Cluster Simulation

A 5-node Raft cluster. Send client requests, kill nodes, create partitions. Watch leader election and log replication happen in real time. Each node shows its state (F=Follower, C=Candidate, L=Leader) and its log entries.

Cluster initialized. Node 1 is the leader (term 1).
python
from enum import Enum
import random

class State(Enum):
    FOLLOWER = "follower"
    CANDIDATE = "candidate"
    LEADER = "leader"

class RaftNode:
    def __init__(self, node_id, peers):
        self.id = node_id
        self.peers = peers
        self.state = State.FOLLOWER
        self.current_term = 0
        self.voted_for = None
        self.log = []  # list of (term, command)
        self.commit_index = -1
        self.election_timeout = random.uniform(0.15, 0.3)

    def start_election(self):
        self.state = State.CANDIDATE
        self.current_term += 1
        self.voted_for = self.id
        votes = 1  # vote for self
        for peer in self.peers:
            if self.request_vote(peer):
                votes += 1
        if votes > len(self.peers + [self]) // 2:
            self.state = State.LEADER
            self.send_heartbeats()

    def append_entry(self, command):
        """Leader receives client request."""
        assert self.state == State.LEADER
        self.log.append((self.current_term, command))
        acks = 1  # leader has it
        for peer in self.peers:
            if self.replicate_to(peer):
                acks += 1
        if acks > len(self.peers + [self]) // 2:
            self.commit_index = len(self.log) - 1
            return True  # committed
        return False

Raft RPCs in Detail

Raft uses only two RPCs for the core protocol. This simplicity is intentional — fewer message types means fewer edge cases to reason about.

// RPC 1: RequestVote
// Sent by: Candidate → All other nodes
// Arguments:
  term: candidate's current term
  candidateId: who is asking for votes
  lastLogIndex: index of candidate's last log entry
  lastLogTerm: term of candidate's last log entry
// Response:
  term: receiver's current term (for candidate to update itself)
  voteGranted: true if voter gave its vote

// RPC 2: AppendEntries (also serves as heartbeat when entries is empty)
// Sent by: Leader → All followers
// Arguments:
  term: leader's current term
  leaderId: who is the leader (so followers can redirect clients)
  prevLogIndex: index of log entry immediately preceding new ones
  prevLogTerm: term of prevLogIndex entry
  entries[]: log entries to replicate (empty for heartbeat)
  leaderCommit: leader's commitIndex
// Response:
  term: receiver's current term
  success: true if follower matched prevLogIndex/prevLogTerm

// The beauty: AppendEntries handles THREE jobs with one RPC:
// 1. Heartbeat (empty entries)
// 2. Log replication (entries present)
// 3. Commit notification (leaderCommit field)

Per-Follower State Tracking

The leader maintains two indices for each follower:

IndexWhat It TracksHow It Changes
nextIndex[i]The next log entry to send to follower iInitialized to leader's last log index + 1. Decremented on AppendEntries failure (follower's log is behind). Advanced on success.
matchIndex[i]The highest log entry known to be replicated on follower iInitialized to 0. Updated when follower successfully appends entries.

The commit rule: an entry at index N is committed if matchIndex[i] >= N for a majority of nodes, AND the entry at N has the current term. This last condition — the entry must be from the current term — prevents a subtle bug where a leader commits entries from a previous term based on replication count alone. (See the Raft paper Section 5.4.2 for the counterexample that motivates this rule.)

Why the current-term restriction? Consider: a leader in term 2 replicates an entry to a majority, but crashes before committing. A new leader in term 3 may or may not have this entry (depending on who wins the election). If the term-3 leader doesn't have it and a term-4 leader does, the term-4 leader cannot commit the old term-2 entry by counting replicas — it must first commit a NEW entry from its own term, which implicitly commits all prior entries. This prevents a committed entry from being overwritten by a future leader.
Design question: In a 5-node Raft cluster, two nodes fail permanently. Can the cluster still make progress? What if a third node fails?

Chapter 5: Raft Deep Dive

The elegance of Raft lies in two safety properties that guarantee consistency even through crashes and elections. Let us examine each in detail.

Log Matching Property

If two nodes have a log entry with the same index and the same term, then: (1) that entry stores the same command, and (2) ALL preceding entries are identical in both logs. This property is maintained by the consistency check in AppendEntries: the leader sends (prevLogIndex, prevLogTerm) and the follower rejects the append if its log doesn't match at that position. The leader then decrements the index and retries until it finds the point where the logs agree, then replays entries from there.

Think of it like a blockchain. Each entry implicitly "hashes" all previous entries through the (index, term) pair. If two logs agree on entry 47, they must agree on entries 1 through 46. This makes log reconciliation efficient: you only need to find the first point of divergence and replay from there.

Leader Completeness Property

If a log entry is committed in a given term, then that entry will be present in the logs of all leaders in all higher terms. This is enforced by the election restriction: a candidate includes its (lastLogIndex, lastLogTerm) in RequestVote, and a voter rejects candidates whose logs are less up-to-date. Since committed entries are present on a majority, and a candidate needs a majority to win, at least one voter in any winning majority must have the committed entry — and will not vote for a candidate that lacks it.

// Why the election restriction guarantees leader completeness:

// 1. Entry E is committed → E exists on a majority M1
// 2. New leader needs votes from a majority M2
// 3. M1 ∩ M2 ≠ ∅ (two majorities must overlap)
// 4. Some node in M2 has entry E
// 5. That node won't vote for a candidate without E
// 6. Therefore, the new leader must have E ∎

Worked Example: 5-Node Cluster Through Failures

Let us trace a complete scenario. Our cluster has nodes {1, 2, 3, 4, 5}. We will go through leader election, writes, a crash, re-election, and log reconciliation.

// INITIAL STATE: Node 1 is leader, term 1, all logs empty
Node 1 (L, t=1): []
Node 2 (F, t=1): []
Node 3 (F, t=1): []
Node 4 (F, t=1): []
Node 5 (F, t=1): []

// CLIENT WRITE "x=1": Leader appends, replicates to 2,3,4 (majority=3+leader=4)
Node 1 (L, t=1): [(t1, x=1)]   committed ✓
Node 2 (F, t=1): [(t1, x=1)]
Node 3 (F, t=1): [(t1, x=1)]
Node 4 (F, t=1): [(t1, x=1)]
Node 5 (F, t=1): []   ← slow, hasn't received yet

// CLIENT WRITE "y=2": Replicated to 2,3 (3 total including leader)
Node 1 (L, t=1): [(t1, x=1), (t1, y=2)]   committed ✓
Node 2 (F, t=1): [(t1, x=1), (t1, y=2)]
Node 3 (F, t=1): [(t1, x=1), (t1, y=2)]
Node 4 (F, t=1): [(t1, x=1)]   ← hasn't received y=2 yet
Node 5 (F, t=1): [(t1, x=1)]   ← caught up on x=1, not y=2

// NODE 1 CRASHES. Election timeout fires on Node 3.
// Node 3 starts election: term 2, requests votes.
// Node 3's log: [(t1,x=1), (t1,y=2)] — lastLogIndex=1, lastLogTerm=1
// Node 4's log: [(t1,x=1)] — less up-to-date, grants vote
// Node 5's log: [(t1,x=1)] — less up-to-date, grants vote
// Node 2's log: [(t1,x=1),(t1,y=2)] — equally up-to-date, grants vote
// Node 3 wins with 4 votes (including self). Becomes leader of term 2.

Node 1 (DEAD)
Node 3 (L, t=2): [(t1, x=1), (t1, y=2)]
Node 2 (F, t=2): [(t1, x=1), (t1, y=2)]
Node 4 (F, t=2): [(t1, x=1)]   ← will receive y=2 from new leader
Node 5 (F, t=2): [(t1, x=1)]   ← will receive y=2 from new leader

// CLIENT WRITE "z=3": Node 3 appends and replicates
Node 3 (L, t=2): [(t1, x=1), (t1, y=2), (t2, z=3)]   committed ✓
Node 2 (F, t=2): [(t1, x=1), (t1, y=2), (t2, z=3)]
Node 4 (F, t=2): [(t1, x=1), (t1, y=2), (t2, z=3)]   ← caught up!
Node 5 (F, t=2): [(t1, x=1), (t1, y=2)]   ← still catching up

// KEY INSIGHT: y=2 was committed in term 1 and survived the leadership change.
// Leader completeness property in action.
Log Reconciliation Visualizer

Shows a leader reconciling a divergent follower's log. The leader finds the first matching entry and replays from there. Click "Diverge" to create a conflict, then "Reconcile" to watch the fix.

Leader and follower logs are in sync.

How Raft Handles Edge Cases

ScenarioWhat HappensWhy It's Safe
Leader crashesFollowers time out, election begins. Candidate with most up-to-date log wins.Leader completeness: all committed entries are on the new leader.
Network partitionMajority partition can elect leader and proceed. Minority partition stalls.Only one partition has a majority, so at most one leader exists.
Split voteNo candidate gets majority. All candidates time out and retry with new randomized timeouts.Eventually one candidate starts first and wins. No safety violation during split votes.
Log conflictNew leader's AppendEntries fails consistency check. Leader decrements nextIndex and retries until logs match.Conflicting entries are from uncommitted writes by a deposed leader. Overwriting them is safe.
Old leader comes backReceives AppendEntries or RequestVote with higher term. Steps down to follower. Its uncommitted entries are overwritten.Only committed entries are guaranteed. Uncommitted entries from old leaders are expendable.

Raft Cluster Membership Changes

One of the trickiest aspects of running Raft in production: what happens when you need to add or remove a node? You cannot just change the configuration on all nodes simultaneously — there will be a moment where some nodes think the cluster has 5 members and others think it has 6. During that overlap, you could get two different majorities, leading to split-brain.

Raft solves this with joint consensus: a two-phase approach where the cluster temporarily operates under both the old and new configurations simultaneously.

Phase 1: Joint Config
Leader creates a special log entry Cold,new containing both the old and new configurations. A decision requires a majority of BOTH the old cluster AND the new cluster.
Replicate Cold,new
Once committed, the joint config is active. Any node can serve as leader, but must get agreement from both configurations.
Phase 2: New Config
Leader creates entry Cnew with only the new configuration. Once committed, the old configuration is no longer relevant. Nodes not in Cnew shut down.
In practice, most systems use single-server changes. Instead of joint consensus, you add or remove one server at a time. This is simpler because going from any odd number to the next even (or vice versa) never allows two disjoint majorities. etcd and CockroachDB both use this approach. Adding a server: (1) the new node joins as a non-voting member, (2) it catches up on the log, (3) a configuration change entry promotes it to voting member. Each step is itself a Raft consensus operation.

Read Consistency in Raft

There is a subtle problem with reads in Raft. A client reads from the leader, but how does the leader know it is still the leader? It might have been deposed by a network partition and not know it yet. Two solutions:

ApproachHow It WorksCost
Read IndexLeader notes the current commit index. Sends a heartbeat to confirm it's still leader (majority ack). Then serves the read against the committed state at that index.One heartbeat round trip per read batch
Lease-based readsLeader holds a "lease" — a time window during which followers promise not to start elections. During the lease, leader serves reads without extra heartbeats.Depends on clock accuracy. Dangerous if clocks drift.

etcd uses the Read Index approach by default. CockroachDB uses lease-based reads with careful clock synchronization. The choice depends on whether you trust clocks (most engineers do not, and they are right).

Debug scenario: In a 5-node Raft cluster, nodes 1 and 2 are partitioned from nodes 3, 4, and 5. Node 1 was the leader in term 3. Node 1 continues receiving client writes in its partition. Meanwhile, nodes 3-5 elect node 4 as leader of term 4. After 10 minutes, the partition heals. What happens to node 1's writes?

Chapter 6: Paxos

Before Raft, there was Paxos. Leslie Lamport published it in 1989 (in a paper so whimsical that reviewers rejected it; it was finally published in 1998). Paxos solves the same problem as Raft — getting a group of nodes to agree on a value — but it takes a fundamentally different approach. Where Raft is structured around a strong leader, Paxos is more symmetric: any node can be a proposer.

Paxos has three roles:

RoleWhat it doesAnalogy
ProposerProposes a value for the group to agree onA senator proposing a bill
AcceptorVotes on proposals (a majority of acceptors must agree)Members of the senate voting
LearnerLearns the decided value after acceptors agreeCitizens learning the new law

In practice, the same physical node often plays all three roles. But separating them conceptually makes the algorithm easier to reason about.

Single-Decree Paxos

The simplest form of Paxos agrees on a single value. It runs in two phases:

Phase 1a: Prepare
Proposer chooses a unique proposal number n (higher than any it has seen). Sends Prepare(n) to a majority of acceptors.
Phase 1b: Promise
Each acceptor checks: have I promised to ignore proposals numbered less than n? If n is the highest I've seen, I promise to reject anything lower, and I reply with any value I've already accepted (if any).
Phase 2a: Accept
If proposer receives promises from a majority: if any acceptor already accepted a value, use the value with the highest proposal number. Otherwise, use its own proposed value. Sends Accept(n, value) to all acceptors.
Phase 2b: Accepted
Acceptor accepts (n, value) if it hasn't promised a higher number since Phase 1. Sends Accepted(n, value) to learners. When a learner sees a majority of Accepted messages with the same (n, value), the value is chosen.
Why the "already accepted" value rule? This is the heart of Paxos's safety. Imagine acceptor A has already accepted value V in a previous round. A new proposer starts a fresh round with a higher number. In Phase 1b, A tells the proposer "I already accepted V." The proposer MUST use V (not its own value) in Phase 2a. This ensures that once a value is accepted by a majority, all future proposals will converge on the same value. Without this rule, two proposers could get different values accepted, violating consensus.

The Livelock Problem

Paxos has a subtle liveness issue. Imagine two proposers, P1 and P2, competing:

// P1 sends Prepare(1), gets promises from majority
P1: Prepare(1) → majority promises

// P2 sends Prepare(2) — higher number! Majority promises to P2
// This invalidates P1's promises
P2: Prepare(2) → majority promises (now ignoring P1)

// P1 sends Accept(1, v1) — rejected! Acceptors promised a higher number
P1: Accept(1, v1) → REJECTED

// P1 retries with Prepare(3) — now P2's promises are invalidated
P1: Prepare(3) → majority promises (now ignoring P2)

// P2 sends Accept(2, v2) — rejected!
// This can repeat forever: each proposer preempts the other

This is called livelock: no value is ever decided because proposers keep interrupting each other. The solution is Multi-Paxos: elect a stable leader (a distinguished proposer) that skips Phase 1 for subsequent proposals. As long as the leader is stable, Paxos is fast. If the leader fails, a new one is elected — which temporarily risks livelock, but the randomized backoff makes it resolve quickly.

Multi-Paxos ≈ Raft. When you add leader election and log replication to Multi-Paxos, you get something very similar to Raft. The key difference is that Raft was designed with these components integrated from the start, while Multi-Paxos evolved by adding them on top of single-decree Paxos. This is why Raft is easier to implement: the components were designed to work together.

Why Is Paxos Hard?

Lamport's original Paxos paper is famous for being impenetrable. Even the "simplified" version confuses most readers. The difficulty comes from several sources:

Source of ConfusionWhyHow Raft Fixes It
No clear leaderAny node can propose at any time. Reasoning about concurrent proposals is mind-bending.Single leader handles all proposals. Concurrent proposals are impossible during normal operation.
Single-decree onlyBasic Paxos decides ONE value. For a log of values, you need Multi-Paxos, which is underspecified.Log replication is a first-class concept. The log is central to the algorithm.
Proposal numbers ≠ termsProposal numbers are per-proposal, not per-epoch. Multiple proposals can exist within a single "leadership period."Term number cleanly separates epochs. One term = at most one leader.
Safety vs liveness intertwinedProving safety requires reasoning about all possible interleavings of messages across multiple rounds.Safety proofs decompose into independent properties (Log Matching, Leader Completeness).

Worked Example: A Complete Paxos Round

// Setup: 5 acceptors (A1-A5). All start with no promises, no accepted values.
// Proposer P1 wants to propose value "red".

// PHASE 1a: P1 sends Prepare(n=1) to A1, A2, A3 (majority)
P1 → A1: Prepare(1)
P1 → A2: Prepare(1)
P1 → A3: Prepare(1)

// PHASE 1b: Acceptors check. All have promised=0, so 1 > 0. They promise.
A1 → P1: Promise(1, no_accepted_value)
A2 → P1: Promise(1, no_accepted_value)
A3 → P1: Promise(1, no_accepted_value)

// P1 has 3 promises (majority of 5). None had previously accepted a value.
// So P1 can use its own value: "red".

// PHASE 2a: P1 sends Accept(n=1, value="red") to same majority
P1 → A1: Accept(1, "red")
P1 → A2: Accept(1, "red")
P1 → A3: Accept(1, "red")

// PHASE 2b: Acceptors check. All promised ≤ 1, so they accept.
A1: accepted = (1, "red")
A2: accepted = (1, "red")
A3: accepted = (1, "red")

// 3 out of 5 acceptors accepted (1, "red"). VALUE "red" IS CHOSEN. ✓
// From this point, no other value can ever be chosen.
// If P2 starts a new round with Prepare(2), it will learn about "red"
// from the majority and must re-propose "red" (convergence guarantee).
Paxos Protocol Simulation

3 proposers and 5 acceptors. Watch Prepare/Accept messages flow between them. Click "Propose" to start a round. Click "Dual Propose" to see two proposers compete (potential livelock).

Ready. Click "Propose" to start a Paxos round.

Paxos vs Raft: The Key Differences

PaxosRaft
Published1989/1998 (Lamport)2014 (Ongaro & Ousterhout)
ApproachAny node can proposeStrong leader handles all requests
Leader electionImplicit (via proposal numbers)Explicit (RequestVote RPC)
Log replicationSeparate concern (Multi-Paxos)Integrated from the start
Livelock riskYes (competing proposers)No (only leader proposes)
UnderstandabilityNotoriously hardDesigned for clarity
Correctness proofsDecades of studyTLA+ verified
Real-world useGoogle Chubby, Spanneretcd, CockroachDB, TiKV
Concept check: In Paxos, a proposer sends Prepare(5) and gets promises from 3 out of 5 acceptors. Two of those acceptors already accepted value "blue" from a previous proposal. One accepted value "red" from an even earlier proposal. What value must the proposer use in its Accept message?

Chapter 7: ZAB & Other Algorithms

Raft and Paxos are the two most well-known consensus algorithms, but they are not the only ones in production. Let us survey the landscape.

ZAB: ZooKeeper Atomic Broadcast

ZAB (ZooKeeper Atomic Broadcast) is the consensus protocol used by Apache ZooKeeper. It was designed specifically for total order broadcast: delivering messages to all nodes in the same order. ZAB is similar to Raft but was developed independently (published 2008, predating Raft by 6 years).

Key differences from Raft:

AspectRaftZAB
Primary goalReplicated state machineAtomic broadcast (total order delivery)
PhasesLeader election + normal operationDiscovery + synchronization + broadcast
RecoveryNew leader's log is authoritativeNew leader merges proposals from all followers
Transaction IDsIndex + termEpoch + counter (zxid)
Client readsLeader reads or read indexAny node can serve reads (with sync for freshness)

ZAB in Detail

ZAB has three phases that each new leader goes through:

Phase 1: Discovery
Prospective leader contacts followers to learn the highest epoch (similar to Raft term) and the latest transaction any follower has seen. The leader adopts the highest epoch + 1.
Phase 2: Synchronization
New leader sends its full transaction history to followers that are behind. Followers update their state to match the leader. This ensures all followers have the same starting point before broadcast begins.
Phase 3: Broadcast
Normal operation. Leader proposes transactions (with a unique zxid: epoch + counter), followers ack, leader commits once a majority acks. Identical to Raft's log replication.

The zxid (ZooKeeper transaction ID) is a 64-bit number: the high 32 bits are the epoch (like Raft's term), and the low 32 bits are a counter within the epoch. This makes it trivial to compare zxids and determine which transactions are from which leader's reign.

ZAB vs Raft: the philosophical difference. Raft was designed for understandability: clear leader election, explicit log matching, minimal state transitions. ZAB was designed for performance in a specific use case: total order broadcast for ZooKeeper. ZAB's synchronization phase is more aggressive — the new leader ensures ALL followers are caught up before serving any new requests. Raft allows the new leader to start serving immediately and lets followers catch up asynchronously via AppendEntries retries.

EPaxos: Leaderless Consensus

Egalitarian Paxos (EPaxos), published by Moraru, Andersen, and Kaminsky in 2013, takes a radical approach: no designated leader at all. Any node can process any command. Commands that don't conflict (they touch different keys) can be committed in one round trip with no coordination. Only conflicting commands require an extra round.

The appeal: in a geo-distributed system, a client in Tokyo can send a command to the Tokyo replica and get a response in one round trip to a majority, without routing through a leader in Virginia. For workloads with few conflicts (which describes most real workloads), this dramatically reduces latency.

The catch: EPaxos is significantly more complex than Raft. The dependency tracking, conflict detection, and execution ordering are all harder to implement correctly. As of 2024, no major production system uses EPaxos. But the ideas are influential and show up in research on geo-distributed databases.

Viewstamped Replication

Viewstamped Replication (VR), published by Oki and Liskov in 1988, is contemporaneous with Paxos and shares many ideas. Each "view" has a designated primary (leader). When the primary fails, a view change protocol selects a new one. The key insight: during a view change, the new primary collects logs from a majority and picks the one with the most entries. This is strikingly similar to Raft's election restriction.

The FLP Impossibility

In 1985, Fischer, Lynch, and Paterson proved the FLP impossibility result: in a purely asynchronous system (no timeouts, no clocks), where even a single process can crash, there is NO algorithm that guarantees consensus. This result shocked the distributed systems community.

How real algorithms work despite FLP. Every practical consensus algorithm assumes partial synchrony: the network is usually well-behaved, and messages are delivered within some (unknown) time bound. Timeouts are used to detect failures. This means: in periods of asynchrony (extreme network delays), the algorithms may temporarily stall (violating liveness), but they NEVER produce incorrect results (safety is always maintained). You trade guaranteed progress for guaranteed correctness. In practice, networks are synchronous enough that elections complete in milliseconds.

Comparison Table

AlgorithmYearKey InnovationUsed By
Paxos1989Quorum-based agreementGoogle Chubby, Spanner
Viewstamped Replication1988View change protocolAcademic reference
ZAB2008Atomic broadcast focusApache ZooKeeper
Raft2014Understandability, strong leaderetcd, CockroachDB, TiKV, Consul
EPaxos2013Leaderless, lower latency for non-conflicting commandsResearch, some production

Performance Characteristics

MetricPaxosRaftZABEPaxos
Latency (no conflict)2 RTTs1 RTT (leader to majority)1 RTT1 RTT (fast path)
Latency (with conflict)2+ RTTs (livelock risk)1 RTT (only leader proposes)1 RTT2 RTTs (slow path)
Leader bottleneckMulti-Paxos: yesYesYesNo (any node)
Messages per decision2N (prepare + accept)N (AppendEntries + ack)NN (fast), 2N (slow)
Disk writes per decision2 (proposer + acceptor)2 (leader + follower)21 (fast path)

In practice, Raft and ZAB dominate because the leader bottleneck is rarely the limiting factor for coordination workloads. The simplicity advantage of Raft and the battle-tested nature of ZAB far outweigh EPaxos's theoretical latency advantage for most teams.

Consensus and Blockchain

You may wonder: how does blockchain consensus relate to Raft and Paxos? The key distinction is the trust model:

Raft/Paxos (CFT)PBFT/Blockchain (BFT)
Trust modelAll nodes are honest but may crashUp to f nodes can be malicious (send wrong data)
Nodes needed2f+1 (f crashes)3f+1 (f Byzantine faults)
Throughput10,000+ TPS100-1,000 TPS (PBFT), 7-50 TPS (PoW Bitcoin)
LatencyMillisecondsSeconds (PBFT) to minutes (PoW)
Use caseData centers, internal infrastructureUntrusted participants (public blockchains, multi-org consortia)

For infrastructure engineering (which is what DDIA and this lesson focus on), you almost always use crash-fault-tolerant (CFT) consensus. You trust your own data center's servers not to lie — they just might crash. BFT consensus is for environments where participants are adversarial, which is a fundamentally different (and much harder) problem.

What Consensus Gives You

All of these algorithms solve the same abstract problem. In practice, consensus enables:

CapabilityHow It Uses ConsensusExample
Leader electionAgree on who the leader isKafka controller election via ZooKeeper
Atomic broadcastAgree on message orderingZooKeeper transaction log
Distributed locksAgree on who holds the locketcd distributed mutex
Membership changesAgree on who is in the clusterRaft joint consensus for config changes
Linearizable storageAgree on the value of each keyetcd key-value store
Consensus Algorithm Comparison

Visual comparison of message patterns. Toggle between Raft (leader-based) and Paxos (proposer-based) to see how they differ in practice.

Frontier question: The FLP result says consensus is impossible in a fully asynchronous system. Yet etcd, ZooKeeper, and CockroachDB all use consensus and work in production. How is this not a contradiction?

Chapter 8: Consistency in Practice

You will rarely implement Raft or Paxos yourself. Instead, you will use coordination services — systems like ZooKeeper, etcd, and Consul that implement consensus internally so your application does not have to. They expose simple APIs (get, set, watch, lock) while hiding the complexity of leader election, log replication, and failure recovery.

The Big Three Coordination Services

ZooKeeperetcdConsul
ConsensusZABRaftRaft
Data modelHierarchical (znodes, like a filesystem)Flat key-valueKey-value + service catalog
Watch mechanismOne-time watches per znodeWatch streams (long-lived)Blocking queries
Typical useKafka, HBase, HadoopKubernetes, CoreDNSService mesh, HashiCorp stack
LanguageJavaGoGo
Linearizable readsVia sync() callDefault for leader readsVia consistent mode
These are NOT general-purpose databases. Coordination services are designed for small amounts of critical metadata: "who is the leader?", "what's the current cluster configuration?", "which worker holds this lock?" They store megabytes, not gigabytes. Writing to them is slow (consensus round per write). Reading is fast from replicas, but only linearizable from the leader. If you try to use etcd as your primary data store, you will have a bad time.

Design Challenge: Distributed Task Scheduler

You are building a system where thousands of tasks arrive every second, and a pool of 50 worker nodes must claim and execute them. Each task must be executed exactly once. How do you ensure no two workers claim the same task?

// Approach 1: Distributed lock per task (BAD)
// For each task: worker acquires lock in etcd, processes task, releases lock.
// Problem: 1000 tasks/sec × 1 lock/task = 1000 consensus rounds/sec.
// etcd can handle ~10,000 writes/sec max. You'll hit the ceiling fast.
// Also: lock contention is O(workers × tasks).

// Approach 2: Leader-assigned partitions (GOOD)
// 1. Use etcd to elect ONE scheduler leader (1 consensus round).
// 2. Leader assigns task partitions to workers (e.g., worker 1 gets tasks 0-99,
// worker 2 gets 100-199, etc.). Partition map stored in etcd.
// 3. Workers poll their assigned partition from a regular database (no consensus).
// 4. If a worker dies, leader detects via heartbeat timeout and reassigns.
// Consensus is used ONLY for leader election and partition map — O(1) writes,
// not O(tasks).

// Approach 3: Compare-and-swap (BEST for simplicity)
// Each task has a status key in etcd: /tasks/{id}/status = "pending"
// Worker claims a task via atomic CAS: set "pending" → "claimed:{worker_id}"
// Only one CAS succeeds per task (linearizable guarantee).
// Good for moderate throughput. For high throughput, use Approach 2.

Debug Scenario: ZooKeeper Quorum Lost

You are on-call. At 3 AM, you get paged: "Kafka brokers are failing to produce/consume." You check and discover that 2 out of 3 ZooKeeper nodes are down. What happens?

// ZK has 3 nodes. 2 are down. Only 1 remains.
// Majority of 3 = 2. The remaining node cannot form a quorum.

// What breaks:
// 1. ZK becomes read-only (stale reads only, no writes)
// 2. Kafka controller cannot renew its ZK session → controller fails over
// 3. But controller election needs ZK → STUCK. No controller.
// 4. Without controller: no partition leader elections, no ISR updates
// 5. Producers can't write to partitions with dead leaders
// 6. Consumers can't commit offsets (offsets stored in __consumer_offsets,
// which needs functioning partition leaders)

// Immediate fix: bring at least 1 ZK node back online
// Long-term fix: run 5 ZK nodes (tolerates 2 failures instead of 1)
// Better: migrate to Kafka's KRaft mode (removes ZK dependency entirely)
Why Kafka is removing ZooKeeper. Kafka historically depended on ZooKeeper for controller election, topic/partition metadata, and broker membership. This meant operating two distributed systems instead of one — doubling operational complexity. KRaft (Kafka Raft) moves all metadata management into Kafka itself, using a built-in Raft implementation. The controller quorum IS the Kafka cluster. One less system to manage, monitor, and debug at 3 AM.
Coordination Service Architecture

Shows how applications use a coordination service (etcd/ZK). Toggle between healthy and degraded states to see what happens when quorum is lost.

Fencing Tokens: The Lock Safety Net

Even with a distributed lock service, there is a danger. Process A acquires a lock, then pauses (GC pause, swap, etc.). The lock expires (TTL runs out). Process B acquires the same lock. Now A resumes and thinks it still holds the lock. Both processes believe they have exclusive access. Data corruption ensues.

The solution is fencing tokens: every time a lock is acquired, the coordination service issues a monotonically increasing token. The resource (database, file, etc.) checks the token on every write and rejects writes with tokens older than the most recent one it has seen.

// Without fencing (DANGEROUS):
t=0: Process A acquires lock
t=5: Process A pauses (GC)
t=8: Lock expires (TTL=8s)
t=9: Process B acquires lock, starts writing
t=10: Process A resumes, thinks it has lock, ALSO writes
// RESULT: Corrupted data

// With fencing tokens (SAFE):
t=0: Process A acquires lock, gets token #33
t=5: Process A pauses (GC)
t=8: Lock expires (TTL=8s)
t=9: Process B acquires lock, gets token #34
t=9: Process B writes to DB with token #34. DB records: latest_token = 34
t=10: Process A resumes, tries to write with token #33
t=10: DB rejects: 33 < 34 (stale token). Write is blocked.
// RESULT: Data safe. A's stale write is prevented.
This is why Redlock is unsafe. Redis-based distributed locks (Redlock) cannot provide fencing tokens because Redis doesn't have a consensus-backed monotonic counter. Martin Kleppmann's famous analysis shows that without fencing tokens, any lock service is vulnerable to process pauses. If you need safe distributed locks, use etcd or ZooKeeper — they provide both consensus-backed locks and monotonic ordering for fencing.

Service Discovery Pattern

A common use case for coordination services: service discovery. In a microservices architecture, services need to find each other. Which instances of the "payment service" are currently healthy? What are their IP addresses?

1. Register
When a service instance starts, it creates an ephemeral key in etcd: /services/payment/instances/{instance-id} with its address and health status. The key has a lease (TTL).
2. Heartbeat
The instance periodically renews its lease. If it crashes without renewing, the lease expires and the key is automatically deleted.
3. Discover
A client watches /services/payment/instances/ and gets real-time notifications when instances appear or disappear. It load-balances across healthy instances.
4. Deregister
On graceful shutdown, the instance deletes its key. On crash, the lease handles cleanup automatically.

Consul adds a built-in health checking layer on top of this: it not only registers services but actively pings them (HTTP checks, TCP checks, gRPC checks) and removes unhealthy instances from the service catalog. This is why Consul is often preferred for service mesh architectures.

Anti-Pattern: Putting Everything in the Consensus Store

A common mistake in distributed system design: using etcd or ZooKeeper for too much. Here are the warning signs:

SignProblemBetter Alternative
Storing MBs of data per keyConsensus stores are optimized for small keys/values (etcd: 1.5MB max per value)Store a pointer/URL in etcd, put the large data in S3 or a database
>1000 writes/sec to consensus storeApproaching throughput ceilingBatch writes, use local caches with watch-based invalidation
Using ZK watches as a pub/sub systemZK watches are one-time triggers, not streams. Re-registering creates thundering herd.Use Kafka or NATS for high-throughput messaging
Storing session data in etcdSession data is per-request, too frequent for consensusUse Redis or server-local memory with sticky sessions
Storing per-user data in etcdMillions of keys = memory pressure on all consensus nodesUse a proper database (Postgres, DynamoDB)
The golden rule: Put ONLY coordination metadata in the consensus store. Everything else goes in a system designed for higher throughput. If your etcd cluster is handling more than 5,000 writes/sec sustained, you're probably using it wrong.
Design question: You're building a rate limiter for an API that serves 10,000 requests per second across 20 servers. Each user is limited to 100 requests per minute. A colleague suggests using etcd to store and atomically increment the counter for each user. Why is this a bad idea, and what should you use instead?

Chapter 9: Interview Arsenal

Consistency Models: Strongest to Weakest

ModelGuaranteeCostExample System
Strict SerializabilityTransactions + real-time orderVery high (global consensus)Spanner, CockroachDB
LinearizabilitySingle-object recencyHigh (consensus per key)etcd, ZooKeeper
Sequential ConsistencyAll nodes see same order (but may not match real-time)MediumZooKeeper reads
Causal ConsistencyCausally related ops ordered; concurrent ops can divergeLow (vector clocks)MongoDB (with causal sessions)
Eventual ConsistencyAll replicas converge eventually (no recency guarantee)MinimalDynamoDB, Cassandra

Quick Decision Matrix for System Design Interviews

When the interviewer asks "what consistency do you need for this?", use this framework:

// "I'm designing a [system]. What consistency model should I use?"

// Step 1: Identify the operation
// - Is it a COORDINATION operation? (lock, leader election, config)
// → YES: You need linearizability. Use consensus (etcd, ZK).
// - Is it a UNIQUE CONSTRAINT check? (username, idempotency key)
// → YES: You need linearizability for the check. CAS operation.
// - Is it a MONEY operation? (transfer, balance check)
// → YES: You need serializable transactions. (Spanner, CockroachDB)
// - Is it a READ of user-facing data? (timeline, feed, product page)
// → Usually eventual consistency is fine. Causal if you need "read your writes."
// - Is it an ANALYTICS query?
// → Eventual consistency, batch processing, approximate counts. Cheapest.

// Step 2: Articulate the trade-off
// "I'm choosing [model] because [reason]. The cost is [cost].
// If we needed stronger guarantees, we'd pay [price]."
// This shows the interviewer you understand it's a spectrum, not binary.

// Step 3: Name a real system
// "Spanner does this with TrueTime."
// "DynamoDB offers this via consistent reads."
// "Kafka uses ZK for controller election but is moving to KRaft."
// Naming real systems signals you've operated these in production.

Raft Summary Card

// RAFT IN 60 SECONDS
// Cluster: 2f+1 nodes, tolerates f failures
// Roles: Leader (1), Followers (rest), Candidates (during election)
// Term: monotonic counter = logical clock for elections

// ELECTION:
// 1. Follower times out (150-300ms, randomized)
// 2. Becomes candidate, increments term, votes for self
// 3. Sends RequestVote(term, lastLogIndex, lastLogTerm)
// 4. Voter grants vote if: hasn't voted this term AND candidate's log ≥ voter's
// 5. Majority wins → leader. Sends heartbeats immediately.

// LOG REPLICATION:
// 1. Client → leader: command
// 2. Leader appends (term, cmd) to log
// 3. Leader → followers: AppendEntries(entries, prevLogIdx, prevLogTerm)
// 4. Follower checks prevLog match. Accept or reject.
// 5. Majority ack → committed. Leader advances commitIndex.

// SAFETY:
// Log Matching: same (index, term) → same command & all prior entries match
// Leader Completeness: committed entry exists in all future leaders' logs
// Election Restriction: voter rejects candidates with less up-to-date logs

System Design Talking Points

"Design a distributed lock service."

// Core: consensus group (etcd/ZK) stores lock state
// Acquire: CAS on /locks/{resource} from "free" to "{owner, lease_id}"
// Release: CAS from "{owner}" to "free"
// Lease: auto-expire lock if holder crashes (heartbeat-based TTL)
// Fencing: attach monotonic token to each lock grant. Resource checks token
// is not stale before accepting writes (prevents zombie lock holders).
// Scale: shard locks across multiple consensus groups by hash(resource)
// WATCH OUT: "redlock" (Redis-based) is NOT safe — Kleppmann's analysis
// shows it breaks under process pauses and clock skew.

"Design a configuration management system."

// Store config in etcd (linearizable, versioned, watchable)
// Key layout: /config/{service}/{environment}/{key}
// Reads: cached locally, refreshed via etcd watch stream
// Writes: through admin API → etcd CAS (prevents lost updates)
// Rollback: etcd stores revisions — restore to any previous version
// Propagation: watch delivers changes to all subscribers in <100ms
// Scale bottleneck: etcd write throughput (~10K/sec). Fine for config
// (changes are rare). NOT fine for per-request data.

"Why did Kafka use ZooKeeper, and why is it removing it?"

// Kafka needed consensus for: controller election, partition leader election,
// ISR management, topic metadata, ACLs, broker membership.
// ZooKeeper was the only mature consensus service in 2011.
//
// Problems with external dependency:
// 1. Operational burden: deploy, monitor, upgrade two systems
// 2. Scalability ceiling: ZK watches don't scale to millions of partitions
// 3. Controller bottleneck: all metadata funneled through one ZK connection
// 4. Recovery time: controller failover requires full metadata reload from ZK
//
// KRaft solution: built-in Raft consensus for metadata quorum
// - Controller nodes form a Raft group, store metadata in an internal topic
// - No external dependency, single system to operate
// - Scales to millions of partitions (metadata is partitioned too)
// - Faster recovery: new controller reads from local Raft log, not ZK

Coding Drills

python
# Drill 1: Implement a vector clock (extends Lamport timestamps)

class VectorClock:
    def __init__(self, node_id, num_nodes):
        self.id = node_id
        self.clock = [0] * num_nodes  # one counter per node

    def local_event(self):
        self.clock[self.id] += 1

    def send(self):
        self.clock[self.id] += 1
        return list(self.clock)  # copy

    def receive(self, other_clock):
        for i in range(len(self.clock)):
            self.clock[i] = max(self.clock[i], other_clock[i])
        self.clock[self.id] += 1

    def happens_before(self, other_clock):
        """Does self happen-before other? (All entries ≤, at least one <)"""
        return (all(self.clock[i] <= other_clock[i]
                   for i in range(len(self.clock)))
            and any(self.clock[i] < other_clock[i]
                    for i in range(len(self.clock))))

    def is_concurrent(self, other_clock):
        """Neither happens-before the other."""
        return (not self.happens_before(other_clock)
            and not VectorClock._static_hb(other_clock, self.clock))

# Usage:
a = VectorClock(0, 3)  # Node 0 in a 3-node system
b = VectorClock(1, 3)
a.local_event()              # a.clock = [1, 0, 0]
msg = a.send()               # a.clock = [2, 0, 0], msg = [2, 0, 0]
b.receive(msg)               # b.clock = [2, 1, 0] (merged + incremented)
python
# Drill 2: Raft leader election state machine

import random, time

class RaftElection:
    def __init__(self, node_id, cluster_size):
        self.id = node_id
        self.cluster_size = cluster_size
        self.state = "follower"
        self.term = 0
        self.voted_for = None
        self.votes_received = set()
        self.last_heartbeat = time.time()
        self.timeout = random.uniform(0.15, 0.3)
        self.log = []  # [(term, cmd), ...]

    def tick(self):
        """Called periodically. Triggers election if timeout expired."""
        if self.state == "leader":
            return None
        if time.time() - self.last_heartbeat > self.timeout:
            return self.start_election()

    def start_election(self):
        self.state = "candidate"
        self.term += 1
        self.voted_for = self.id
        self.votes_received = {self.id}
        self.timeout = random.uniform(0.15, 0.3)
        self.last_heartbeat = time.time()
        last_idx = len(self.log) - 1
        last_term = self.log[last_idx][0] if self.log else 0
        return ("RequestVote", self.term, last_idx, last_term)

    def handle_vote_request(self, candidate_term, cand_last_idx, cand_last_term):
        if candidate_term < self.term:
            return False
        if candidate_term > self.term:
            self.term = candidate_term
            self.state = "follower"
            self.voted_for = None
        # Check log freshness
        my_last_term = self.log[-1][0] if self.log else 0
        my_last_idx = len(self.log) - 1
        log_ok = (cand_last_term > my_last_term or
                  (cand_last_term == my_last_term and cand_last_idx >= my_last_idx))
        if self.voted_for in (None, candidate_term) and log_ok:
            self.voted_for = candidate_term
            return True
        return False

    def receive_vote(self, voter_id, granted):
        if granted:
            self.votes_received.add(voter_id)
        if len(self.votes_received) > self.cluster_size // 2:
            self.state = "leader"
            return True  # became leader!
        return False
python
# Drill 3: Distributed lock with fencing token

import time

class DistributedLock:
    """Simplified lock with fencing token support."""

    def __init__(self):
        self.holder = None
        self.token_counter = 0
        self.expiry = 0

    def acquire(self, client_id, ttl_seconds=10):
        """Try to acquire lock. Returns fencing token or None."""
        now = time.time()
        # If lock is held and not expired, reject
        if self.holder and now < self.expiry:
            return None
        # Grant lock with monotonic fencing token
        self.holder = client_id
        self.token_counter += 1
        self.expiry = now + ttl_seconds
        return self.token_counter

    def release(self, client_id, token):
        """Release lock. Only succeeds if token matches."""
        if self.holder == client_id:
            self.holder = None
            return True
        return False


class FencedResource:
    """A resource that rejects stale fencing tokens."""

    def __init__(self):
        self.data = None
        self.last_token = 0

    def write(self, value, fencing_token):
        """Write only if token is fresh (≥ last seen)."""
        if fencing_token < self.last_token:
            raise PermissionError(
                f"Stale token {fencing_token} < {self.last_token}")
        self.last_token = fencing_token
        self.data = value
        return True

# Demo: fencing prevents stale writes
lock = DistributedLock()
db = FencedResource()

token_a = lock.acquire("process_A", ttl_seconds=5)  # token=1
# Process A pauses (GC)... TTL expires...
token_b = lock.acquire("process_B", ttl_seconds=5)  # token=2
db.write("B's data", token_b)  # succeeds, last_token=2
# Process A resumes, tries to write with stale token:
try:
    db.write("A's data", token_a)  # REJECTED: 1 < 2
except PermissionError as e:
    print(e)  # "Stale token 1 < 2"
python
# Drill 4: Total order broadcast (simplified)

from collections import deque
from threading import Lock

class TotalOrderBroadcast:
    """Simplified TOB using a single sequencer (not fault-tolerant).
    In production, the sequencer would be a Raft/Paxos group."""

    def __init__(self, num_nodes):
        self.sequence_num = 0
        self.lock = Lock()
        self.queues = [deque() for _ in range(num_nodes)]

    def broadcast(self, message):
        """Assign a sequence number and deliver to all nodes."""
        with self.lock:
            self.sequence_num += 1
            ordered_msg = (self.sequence_num, message)
            for q in self.queues:
                q.append(ordered_msg)
        return self.sequence_num

    def deliver(self, node_id):
        """Node consumes next message in total order."""
        if self.queues[node_id]:
            return self.queues[node_id].popleft()
        return None

# All nodes see messages in the same order
tob = TotalOrderBroadcast(3)
tob.broadcast("set x=1")   # seq 1
tob.broadcast("set y=2")   # seq 2
tob.broadcast("set x=3")   # seq 3

# Every node delivers in order: (1,"set x=1"), (2,"set y=2"), (3,"set x=3")
# This is the foundation of replicated state machines.

Debug Scenarios

ScenarioRoot CauseFix
etcd cluster takes 30 seconds to elect a new leader during deploysElection timeout too high, or disk latency causing fsync delays (Raft requires durable writes before responding)Tune election timeout (10x heartbeat). Use SSDs. Separate etcd from noisy workloads.
ZooKeeper session timeout cascades across 200 Kafka brokersZK server GC pause exceeds session timeout. All brokers think ZK is dead simultaneously.Increase session timeout. Tune ZK JVM GC. Use more ZK nodes.
Distributed lock held by a dead process; other processes blockedLock holder crashed without releasing. No lease/TTL on the lock.Always use leased locks (auto-expire). etcd leases, ZK ephemeral nodes.
Raft leader elected but can't commit entriesLeader has only minority of cluster alive. Can't get majority acks.Restore failed nodes. Or if permanent: reconfigure cluster to smaller size (but carefully — needs a majority to approve config change).
Cross-region Raft cluster has 500ms commit latencyConsensus requires majority ack. If nodes are in US-East, US-West, EU-West, two round trips to the farthest region.Place 3 of 5 nodes in the same region (fast commits), 2 in a remote region (fault tolerance). Or use Raft learners for remote replicas (they don't vote).
etcd "mvcc: database space exceeded"etcd has a configurable space quota (default 2GB). Accumulated revisions exceeded it.Run etcd compaction to purge old revisions: etcdctl compact $REV. Then defrag: etcdctl defrag. Set auto-compaction on.

Common Interview Traps

Interviewers love these subtle distinctions. Get them wrong and you look like you memorized definitions without understanding.

TrapWrong AnswerRight Answer
"Is Raft Byzantine fault tolerant?""Yes, it handles node failures"No. Raft (and Paxos) assume crash-fault only — a node either works correctly or stops. A malicious node that sends incorrect messages can break Raft. Byzantine fault tolerance (BFT) requires algorithms like PBFT, which need 3f+1 nodes to tolerate f failures (vs Raft's 2f+1). BFT is used in blockchain, not in typical infrastructure.
"Can you have consensus with 2 nodes?""Yes, they just need to agree"No meaningful fault tolerance. A 2-node cluster has a majority of 2, so both nodes must be alive to make progress. If either crashes, the other cannot form a majority. This is WORSE than a single node (single node is at least available while it's up). The minimum useful cluster is 3 nodes.
"What consistency does a single-node database provide?""It depends on the isolation level"A single-node database is trivially linearizable for single-object operations — there is only one copy, so reads always see the latest write. The question of consistency models only becomes interesting with replication. Isolation levels (serializable, read committed, etc.) are about transactions, which is a separate concern.
"When should you use 5 vs 3 Raft nodes?""Always use 5 for better availability"3 nodes tolerate 1 failure. 5 nodes tolerate 2 failures. The cost: 5 nodes means more consensus RPCs, more disk writes, more machines. Use 3 for non-critical services (e.g., dev/staging) and 5 for production systems where you need to survive failures during rolling deployments (1 node down for upgrade + 1 unexpected failure = 2 failures, need 5 nodes).

Whiteboard Patterns

In a system design interview, these are the "consensus patterns" that recur:

// Pattern 1: CONSENSUS FOR METADATA, NOT DATA
// Store metadata (partition map, leader assignments, config) in etcd/ZK.
// Store actual data in a fast, partitioned data store (Kafka, MySQL, S3).
// Consensus throughput: ~10K writes/sec. Data throughput: ~100K+ writes/sec.
// Never put hot-path data in the consensus store.

// Pattern 2: LEADER ELECTION AS A SERVICE
// Multiple instances of a service. Only one should be active (e.g., scheduler).
// Use etcd election API: all instances try to become leader on the same key.
// Exactly one wins. Others block on watch until leader's lease expires.
// When active leader crashes, lease expires, next-in-line takes over.

// Pattern 3: CONSENSUS-FREE WHERE POSSIBLE
// CRDTs (Conflict-free Replicated Data Types) for counters, sets, maps.
// No coordination needed — all replicas can accept writes independently.
// Example: Redis CRDT (redis-enterprise), Riak.
// Trade-off: limited data structures, not general-purpose.

// Pattern 4: PARTITION MAP STORED IN CONSENSUS
// A database is sharded across 100 nodes. The mapping of "which shard is
// on which node" must be consistent — otherwise clients route to wrong nodes.
// Store the partition map in etcd/ZK. Workers watch for changes.
// When a node fails, the scheduler updates the map, all workers re-route.

The One-Slide Summary

If you had to put the entire chapter on one slide for a senior engineer audience, it would be this:

// CONSISTENCY & CONSENSUS — THE ESSENTIALS

// 1. PROBLEM: Distributed nodes must agree (leader, value, order).
// Without agreement: split-brain, data loss, double-spending.

// 2. LINEARIZABILITY: "One copy, atomic operations."
// Once a write is acked, all reads see it. Strongest guarantee.
// Cost: consensus per operation. CAP: sacrifices availability.

// 3. ORDERING: Lamport timestamps (total order, can't detect concurrency).
// Vector clocks (detect concurrency, O(N) space).
// Total order broadcast ≡ consensus.

// 4. 2PC: Atomicity across nodes, but BLOCKS on coordinator crash.
// Fix: replicate the coordinator with consensus.

// 5. RAFT: Leader election (random timeouts) + log replication
// (majority ack = committed) + safety (election restriction).
// 2f+1 nodes tolerate f failures. Used by etcd, CockroachDB, TiKV.

// 6. PAXOS: Same guarantees, harder to understand. Livelock risk.
// Multi-Paxos ≈ Raft. Used by Google internally.

// 7. PRACTICE: Use coordination services (etcd, ZK, Consul).
// Consensus for metadata only. Fast stores for data.
// Fencing tokens for lock safety. Leases for auto-cleanup.

// 8. FLP: Consensus impossible in async systems. Real systems use
// partial synchrony (timeouts). Safety always. Liveness usually.
System design: You're designing a distributed lock service that must survive data center failures. Your lock holders are in US-East and US-West. Where do you place your consensus nodes?

Chapter 10: Connections

Consistency and consensus sit at the top of the distributed systems pyramid. Everything we have studied builds toward this chapter.

How This Connects

TopicRelationshipLink
Replication (DDIA Ch6)Replication creates the need for consistency. Leader-follower, multi-leader, and leaderless each offer different consistency guarantees.Replication lesson
Transactions (DDIA Ch8)Transactions provide isolation guarantees. Serializability is about transaction ordering; linearizability is about single-object recency. Strict serializability combines both.Transactions lesson
Storage (DDIA Ch4)Consensus algorithms need durable storage (WAL) for the commit log. The performance of fsync directly affects consensus latency.Storage lesson

The Hierarchy of Agreement

Eventual Consistency
Weakest. Replicas converge "eventually." No ordering guarantees. Cheapest to implement (async replication). Example: DynamoDB, Cassandra.
Causal Consistency
Preserves cause-and-effect ordering. Concurrent events can diverge. Tracked via vector clocks. No global coordination needed. Example: MongoDB causal sessions.
Sequential Consistency
All nodes see the same order, but that order may not match real-time. Cheaper than linearizability. Example: ZooKeeper reads.
Linearizability
Strongest single-object guarantee. Real-time ordering: once a write is acknowledged, all reads see it. Requires consensus. Example: etcd.
Strict Serializability
Linearizability + serializability. Transactions are both isolated and real-time ordered. The gold standard. Example: Spanner, CockroachDB.

The Evolution: What's Next After Raft?

Raft was published in 2014 and is now the dominant consensus algorithm in industry. But research continues:

DevelopmentWhat It DoesStatus
Raft LearnersNon-voting replicas that receive log entries but don't participate in elections or commits. Used for cross-region read replicas.Production (etcd, TiKV)
Multi-RaftRun many independent Raft groups on the same cluster. Each data range gets its own Raft group. Enables horizontal scaling.Production (CockroachDB, TiKV)
Parallel RaftAllow out-of-order commit for non-conflicting entries. Improves throughput by removing the serial bottleneck.Research / early production (PolarDB)
Flexible PaxosRelaxes the quorum requirement: Phase 1 quorum + Phase 2 quorum > N (they don't each need majority). Enables write-optimized configurations.Research, some adoption
CASPaxosSingle-decree Paxos without a leader. Useful for individual distributed registers.Research

The trend is clear: Raft as the baseline, with optimizations for specific workloads. Multi-Raft is particularly important — it's how CockroachDB and TiKV scale to millions of ranges across hundreds of nodes, each range with its own independent 3- or 5-node Raft group.

Recommended Reading

PaperWhy It Matters
Ongaro & Ousterhout, "In Search of an Understandable Consensus Algorithm" (2014)The Raft paper. Read the extended version — it's genuinely well-written and includes all proofs.
Lamport, "Paxos Made Simple" (2001)Lamport's own simplified explanation of Paxos. 14 pages. Still hard, but the canonical reference.
Fischer, Lynch & Paterson, "Impossibility of Distributed Consensus with One Faulty Process" (1985)The FLP result. Short and devastating. Read it to understand the theoretical limits of what we're doing.
Lamport, "Time, Clocks, and the Ordering of Events in a Distributed System" (1978)The paper that introduced Lamport timestamps and the happens-before relation. One of the most cited CS papers of all time.
Kleppmann, "How to do Distributed Locking" (2016, blog)Martin Kleppmann's analysis of Redlock, showing why Redis-based distributed locks are unsafe. Essential reading for anyone building lock services.
Jepsen.io by Kyle KingsburyThe definitive testing framework for distributed systems. Every Jepsen report is a masterclass in understanding consistency failures.

The Big Picture: A Decision Tree

When designing a distributed system, work through this decision tree for each piece of state:

Q1: Does this data need to survive node failures?
No → in-memory cache, no replication needed. Yes → continue.
Q2: Can you tolerate stale reads (seconds to minutes old)?
Yes → eventual consistency (async replication, DynamoDB, Cassandra with ONE). Cheapest. No → continue.
Q3: Do you need real-time ordering (linearizability)?
No, just causal ordering → causal consistency (vector clocks, MongoDB causal sessions). Yes → continue.
Q4: Single object or multi-object transactions?
Single object → linearizable register (etcd, consensus). Multi-object → strict serializability (Spanner, CockroachDB). Most expensive.
The fundamental lesson. Consensus is the most expensive coordination primitive in distributed systems. Every consensus round costs network round trips, disk fsyncs, and availability risk. The art of distributed system design is minimizing how often you need consensus. Use it for the critical stuff — leader election, config changes, distributed locks — and use weaker consistency models for everything else. As Pat Helland says: "Consensus is the bane of availability."

"We can solve any problem by introducing an extra level of indirection... except for the problem of too many levels of indirection." — David Wheeler. In distributed systems: we can solve any consistency problem by adding more consensus... except for the problem of consensus being too slow. The engineer's job is finding the minimum coordination that achieves the required guarantees.