Leader-follower, multi-leader, leaderless — keeping copies consistent across machines.
Your startup's database runs on a single server in Virginia. A PostgreSQL instance on a beefy 64-core machine with 256 GB of RAM and NVMe storage. It handles 100,000 reads per second at peak and serves users across three continents. Everything is fine. Until it isn't.
At 2:47 AM on a Tuesday, the server's power supply fails. The database goes offline. For 34 minutes, every user sees "Service Unavailable." Your SLA promised 99.99% uptime — that's 52 minutes of allowed downtime per year. You just burned 65% of your annual budget in one incident. The business impact: $50,000 in lost revenue, three enterprise customers threatening to leave, and your VP of Engineering scheduling a post-mortem that will consume your entire week.
Even when the server is up, there's a subtler problem. A user in Tokyo sends a request. The packet travels 11,000 kilometers across the Pacific, through undersea fiber, through multiple routing hops. Round-trip latency: 180-220 milliseconds. Your competitor, who runs servers in Tokyo, responds in 8 milliseconds. Your user notices.
The solution seems obvious: make copies. Run the same database on multiple machines in multiple locations. A copy in Virginia, a copy in Frankfurt, a copy in Tokyo. Reads can be served from the nearest copy. If one machine dies, the others keep serving. Problem solved.
Except now you have a much harder problem. When a user in Virginia writes a new record, how do the copies in Frankfurt and Tokyo learn about it? What happens if two users update the same record from different locations at the same time? What does a reader in Tokyo see during the milliseconds (or seconds, or minutes) before the write from Virginia arrives?
This is replication: keeping copies of the same data on multiple machines connected via a network. And the tension between availability (more copies = more resilience) and consistency (more copies = harder to keep them all in sync) is the core drama of distributed systems.
Let's be precise about the challenge. The difficulty of replication lies almost entirely in handling changes to data. If the data never changed after it was written, replication would be trivial — copy the files once, done. But real data changes constantly: users update profiles, place orders, delete messages, transfer money. Every change must propagate to every copy, in the right order, despite network delays, node failures, and concurrent modifications. The harder you try to keep copies perfectly in sync, the more you sacrifice speed and availability. The more you relax consistency, the more anomalies your users experience.
Every replication strategy must answer three questions:
| Question | Easy answer | Why it's hard |
|---|---|---|
| Where do writes go? | Pick one node (the "leader") | That node becomes a bottleneck and single point of failure |
| How fast do copies update? | Immediately (synchronous) | A single slow replica blocks all writes |
| What if copies disagree? | Keep the latest write | "Latest" requires perfect clocks — which don't exist in distributed systems |
The rest of this lesson is about the different ways real systems navigate these three questions. There is no perfect answer — only trade-offs. Every database you will ever use has made a specific choice in this trade-off space, and understanding that choice is what separates a developer who uses databases from an engineer who can design systems.
Every motivation for replication falls into one of three categories:
| Reason | What it gives you | Example |
|---|---|---|
| High availability | System keeps working when one node fails | PostgreSQL primary fails, standby takes over in 30 seconds |
| Reduced latency | Data is geographically close to users | Replicas in Virginia, Frankfurt, and Tokyo serve local reads |
| Read scalability | More nodes handle more read queries | 1 leader + 10 read replicas handle 10x the read throughput |
Notice that NONE of these reasons is "increase write throughput." Replication does not help with writes — every write must still be processed by the leader (or by the quorum, in leaderless systems). Writes are always the bottleneck. If you need more write throughput, you need partitioning (sharding), which is a different technique covered in DDIA Chapter 6.
A common misconception: "We have nightly backups, so we don't need replication." Backups and replication solve different problems:
| Property | Backup | Replication |
|---|---|---|
| Purpose | Disaster recovery (restore from snapshot) | High availability (automatic failover) |
| Data loss (RPO) | Hours (since last backup) | Seconds (sync) to minutes (async lag) |
| Recovery time (RTO) | Minutes to hours (restore snapshot) | Seconds (automated failover) |
| Protects against | Accidental deletion, corruption, ransomware | Hardware failure, network issues |
| Does NOT protect against | Hardware failure (unless offsite) | Accidental deletion (replicated instantly to all!) |
DELETE FROM users WHERE id > 0 on the leader, that deletion is faithfully replicated to all followers within milliseconds. You now have N copies of an empty table. Backups are your last line of defense against human error. Always have both replication AND backups.Watch a single-node system handle requests, then fail. Click "Add Replicas" to see the replication trade-off in action.
The simplest and most common replication strategy: designate one replica as the leader (also called primary or master). All writes go through the leader. The other replicas are followers (also called read replicas, secondaries, or slaves). They receive a stream of changes from the leader and apply them in the same order.
This is not a theoretical construct. It's how PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, and most relational databases work by default. It's also how some message brokers (Kafka, RabbitMQ) and some non-relational databases (RethinkDB, Espresso) work.
Think of it like a newsroom. The editor-in-chief (leader) decides what goes into the newspaper. Correspondents (followers) receive the final copy and distribute it in their regions. Nobody else writes the headlines — that power is centralized. This makes coordination simple but creates a bottleneck at the top.
When a client sends a write to the leader, the leader has a choice: does it wait for followers to confirm they received the change before telling the client "write succeeded"?
Synchronous replication: The leader waits until at least one follower confirms it has written the data to disk. Only then does the leader report success to the client. Advantage: the follower is guaranteed to have an up-to-date copy. If the leader dies, you know the follower has everything. Disadvantage: if the follower is slow or down, the write is blocked. One laggy follower can stall your entire write path.
Asynchronous replication: The leader writes locally and immediately tells the client "success." The change is sent to followers in the background. Advantage: writes are fast — the leader never blocks. Disadvantage: if the leader dies before the change reaches any follower, that write is permanently lost. The client got a "success" acknowledgment for data that no longer exists.
You can't just copy the leader's data files while it's running — the data is constantly in flux, and you'd get a corrupted half-written snapshot. Here's the correct procedure:
pg_basebackup, MySQL mysqldump --single-transaction). Record the snapshot's position in the replication log.Let's trace a single write through a leader-follower cluster with 1 leader and 2 followers, semi-synchronous replication.
How much data could we lose in a leader failure? With asynchronous replication, the answer depends on replication lag — the delay between a write hitting the leader and that write being applied on the follower.
Here's what the actual configuration looks like. These are the key parameters in postgresql.conf on the leader and recovery configuration on the follower:
bash # === On the LEADER (primary) === # postgresql.conf wal_level = replica # Enable replication-grade WAL max_wal_senders = 5 # Max concurrent replication connections synchronous_standby_names = 'first 1 (follower1, follower2)' # Semi-sync: wait for 1 of these # pg_hba.conf (authentication for replication) host replication repl_user 10.0.0.0/8 md5 # === On the FOLLOWER === # Create base backup from leader: pg_basebackup -h leader-host -U repl_user -D /var/lib/pgsql/data -P # standby.signal (tells PG this is a follower) # Just create an empty file: touch /var/lib/pgsql/data/standby.signal # postgresql.conf on follower primary_conninfo = 'host=leader-host user=repl_user password=secret' hot_standby = on # Allow read queries on follower
sql -- Monitor replication lag (run on leader): SELECT client_addr, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, (sent_lsn - replay_lsn) AS replication_lag_bytes FROM pg_stat_replication; -- Example output: -- client_addr | state | sent_lsn | replay_lsn | lag_bytes -- 10.0.1.2 | streaming | 0/16B3A40 | 0/16B3A40 | 0 (caught up) -- 10.0.1.3 | streaming | 0/16B3A40 | 0/16B2F20 | 2848 (2.8 KB behind)
Watch writes flow from client to leader to followers. Toggle sync/async to see the difference in latency and safety.
Adding read replicas has diminishing returns. Here's why:
Nodes go down. Hardware fails, kernels panic, power grids hiccup, network cables get unplugged by overzealous cleaning crews. The entire point of replication is to keep the system running despite individual node failures. But the details of how you handle failure determine whether you get a smooth failover or a data-loss catastrophe.
A follower crashes and restarts. This is the easy case. The follower knows the last transaction it processed (recorded in its local log). It connects to the leader and requests all changes since that position. It applies them sequentially until it catches up. Then it resumes processing the live stream. No data loss. No downtime for the rest of the cluster. This "just works" in every major database.
The leader crashes. This is the hard case. The system needs to:
Failover is one of the most dangerous operations in a database cluster. Here's a catalog of disasters:
Lost writes. If the old leader had accepted writes that weren't yet replicated to any follower (because replication was asynchronous), those writes are permanently lost when the new leader takes over. When the old leader comes back, it has writes that conflict with the new leader's history. Common resolution: discard the old leader's unreplicated writes. This violates the client's durability expectation — they got "OK" for writes that vanished.
Split brain. Both the old leader and the new leader think they're the leader. Both accept writes. Now you have two diverging histories of the same data. Merging them is often impossible without data loss. Prevention: use a fencing mechanism — when a new leader is elected, the old leader is forcibly shut down (via STONITH: "Shoot The Other Node In The Head") before the new leader starts accepting writes.
Autoincrement conflicts. The old leader assigned autoincrement IDs 1001-1005 to writes that weren't replicated. The new leader starts assigning IDs from 1001. Now you have two different rows with ID 1001. If a downstream system (like a Redis cache or a search index) processes both, it will overwrite one with the other.
Stale reads during failover. During the detection window (while the system is deciding whether the leader is actually dead), clients may be reading from followers that are serving increasingly stale data. If the detection timeout is 30 seconds, reads could be 30+ seconds stale before the failover even begins.
Cascading failures. The leader goes down because of high load (not hardware failure). The remaining followers suddenly get more read traffic (clients that were reading from the leader are redistributed). The increased load on followers causes them to slow down their WAL replay, increasing replication lag. If you then promote a lagging follower, you lose even more data. Meanwhile, the new leader is also under heavy load and might itself become unresponsive.
Here's what a production-grade failover system needs:
python # Failover decision pseudocode (based on Patroni's approach) def should_failover(leader_last_seen, followers): # 1. Don't failover too quickly (avoid false positives) if time.time() - leader_last_seen < DETECTION_TIMEOUT: return False, None # 2. Don't failover if we can't reach the DCS (split brain risk) if not can_reach_distributed_consensus_store(): return False, None # 3. Find eligible candidates candidates = [] for f in followers: lag = f.replication_lag_bytes() if lag <= MAX_LAG_FOR_FAILOVER: candidates.append((f, lag)) if not candidates: alert("CRITICAL: No follower eligible for promotion!") return False, None # 4. Pick the most up-to-date candidate best = min(candidates, key=lambda x: x[1]) return True, best[0] def execute_failover(old_leader, new_leader, all_followers): # 1. FENCE the old leader (STONITH) # This is CRITICAL — must happen BEFORE promotion fence_node(old_leader) # Power off, revoke network, etc. # 2. Wait for new leader to finish replaying its pending WAL new_leader.wait_for_replay_complete() # 3. Promote new_leader.promote() # 4. Reconfigure remaining followers for f in all_followers: if f != new_leader: f.follow(new_leader) # 5. Update DNS / service discovery / proxy config update_write_endpoint(new_leader.address) # 6. Verify assert new_leader.is_accepting_writes() log("Failover complete")
Let's trace a real-world failover step by step. The cluster has Leader L, Follower F1 (1 second behind), and Follower F2 (3 seconds behind). Leader L crashes at t=0.
Those 10 lost writes are the price of asynchronous replication. If you need zero data loss, you must use synchronous replication — but then your write latency includes the round-trip to at least one follower, and a follower failure can block writes entirely.
| System | Detection time | Election time | Total failover | Data loss risk |
|---|---|---|---|---|
| PostgreSQL + Patroni | 10-30s (configurable TTL) | 1-5s (DCS-based) | 15-35s | Async: up to lag duration. Sync: 0. |
| MySQL + Orchestrator | 10-30s | 1-3s | 15-35s | Semi-sync reduces to ~1 transaction |
| MongoDB Replica Set | 10s (electionTimeoutMillis) | 2-12s | 12-22s | w:majority: 0. w:1: up to oplog lag. |
| CockroachDB (per range) | 9s (default liveness) | Sub-second (Raft) | ~10s per range | 0 (Raft guarantees no data loss) |
| Amazon Aurora | Instant (shared storage) | ~30s (DNS propagation) | ~30s | 0 (shared storage, write acknowledged = durable) |
A 3-node cluster processes writes. Kill the leader and watch the failover sequence: detection, election, reconfiguration.
The leader needs to tell followers what changed. But how does it describe the change? This seemingly simple question has four distinct answers, each with different trade-offs. The choice of replication log format determines what you can and can't do with your replicas.
The leader logs the actual SQL statement and sends it to followers. The follower re-executes the same statement.
Simple and intuitive. But broken in subtle ways:
| Problem | Example | Why it breaks |
|---|---|---|
| Non-deterministic functions | NOW(), RAND(), UUID() | Returns different values on leader vs follower |
| Auto-incrementing columns | INSERT ... (auto_id) | Must execute in exactly the same order on all replicas |
| Side effects | Triggers, stored procedures, UDFs | May depend on local state that differs between nodes |
| Execution order | Concurrent UPDATEs | Different execution order → different final state |
MySQL used statement-based replication before version 5.1. It worked by replacing NOW() with a fixed timestamp value in the replicated statement. But the edge cases were endless, and MySQL eventually switched to row-based replication as the default.
Every write to a database first goes to an append-only log on disk before it modifies the actual data structures (B-trees, pages). This is the Write-Ahead Log — it ensures durability even if the process crashes mid-write. WAL shipping sends this raw log to followers.
The follower replays the same low-level disk operations. This is exactly what PostgreSQL streaming replication does.
Advantage: Byte-for-byte identical replicas. No ambiguity.
Disadvantage: The WAL is tightly coupled to the storage engine's internal format. If the leader runs PostgreSQL 15 and uses a specific B-tree page layout, the follower must run the exact same version. You cannot do a rolling upgrade (run the new version on followers first, then failover) because the new version's WAL format might be incompatible. This makes zero-downtime upgrades painful.
Instead of the raw bytes, send a logical description of what changed at the row level:
This is what MySQL's binlog uses in row-based mode. It's also what PostgreSQL's logical replication (pgoutput) uses.
Advantage: Decoupled from storage engine internals. The leader and follower can run different versions, different storage engines, or even different database systems entirely. This is how you build change data capture (CDC) — streaming database changes to a data warehouse, search index, or cache.
Disadvantage: Slightly more data to transmit (row values vs. raw page diffs). But network bandwidth is rarely the bottleneck.
Database triggers fire custom application code when data changes. You can use triggers to write change records to a separate table, which an external process reads and applies to other databases.
Tools like Oracle GoldenGate and Bucardo use this approach. It's the most flexible — you can filter, transform, or route changes with arbitrary logic. But it's also the slowest (trigger overhead on every write) and most error-prone (your replication logic is application code that can have bugs).
Consider this simple operation on a table with schema users(id INT PK, name TEXT, email TEXT, updated_at TIMESTAMP):
The logical (row-based) log has become the industry standard for modern data architectures because it enables Change Data Capture (CDC) — streaming every database change as an event to downstream systems.
python # Reading PostgreSQL logical replication stream with psycopg2 # This is what tools like Debezium do internally import psycopg2 from psycopg2.extras import LogicalReplicationConnection conn = psycopg2.connect( "host=leader dbname=mydb", connection_factory=LogicalReplicationConnection ) cur = conn.cursor() # Create a replication slot (persistent position tracker) cur.create_replication_slot('my_cdc_slot', output_plugin='pgoutput') # Start consuming changes cur.start_replication( slot_name='my_cdc_slot', options={'publication_names': 'my_publication'} ) def consume(msg): # msg.payload contains the logical change record # Parse it and send to Elasticsearch, Kafka, data warehouse, etc. print(f"Change at LSN {msg.data_start}: {msg.payload}") msg.cursor.send_feedback(flush_lsn=msg.data_start) cur.consume_stream(consume)
The same UPDATE flowing through all four replication methods. See how each represents the change differently.
Leader-follower replication works great for read-heavy workloads: send reads to followers, writes to the leader, and scale out your read capacity by adding followers. But followers are always at least slightly behind the leader. This delay is replication lag.
If you stop writing, the followers eventually catch up. All replicas converge to the same state. This guarantee is called eventual consistency — a deliberately vague promise. "Eventually" might mean 1 millisecond or 10 seconds, depending on network conditions, follower load, and the volume of writes.
The word "eventually" hides three specific anomalies that will bite you in production. Each one has caused real outages at real companies.
A user submits a comment on a blog post. Your application writes it to the leader. The user's browser immediately refreshes to show the page. This read request is load-balanced to a follower that hasn't yet received the new comment. The user sees the page without their comment. They click "submit" again, creating a duplicate. Or worse, they think the system is broken and leave.
Solutions:
A user refreshes a page and sees 10 comments. They refresh again and see only 9 — the 10th comment disappeared. They refresh a third time and it's back. What happened? Their first read hit Follower-A (caught up), their second read hit Follower-B (lagging), and their third read hit Follower-A again.
Seeing data go backward in time is disorienting and feels like a bug. The guarantee we want is monotonic reads: once you've seen data at a certain point in time, you should never subsequently see older data.
Solution: Sticky sessions. Hash the user ID to a consistent follower. The same user always reads from the same follower. Their view of the data only moves forward. If that follower dies, they're reassigned to another, but since they start fresh, it feels like a reload rather than data disappearing.
Two users have a conversation:
A third user observes this conversation. But Partition B replicates faster than Partition A. The observer sees "It's $42" arrive before "How much is the item?" — the answer appears before the question. Causal ordering is violated.
This is the consistent prefix reads problem. It only occurs when data is spread across multiple partitions that replicate independently. If everything is on one partition, the replication log preserves order.
Solution: Ensure causally related writes go to the same partition (e.g., all messages in a conversation go to the same partition). Or use a global ordering mechanism like a logical timestamp.
You can't fix what you can't measure. Here's how to monitor lag in the major databases:
python # Monitoring replication lag programmatically import psycopg2 import time def check_pg_replication_lag(leader_conn, follower_conn): """Check lag between PostgreSQL leader and follower.""" # Method 1: Query pg_stat_replication on the leader with leader_conn.cursor() as cur: cur.execute(""" SELECT client_addr, pg_wal_lsn_diff(sent_lsn, replay_lsn) AS lag_bytes, EXTRACT(EPOCH FROM replay_lag) AS lag_seconds FROM pg_stat_replication """) for row in cur.fetchall(): addr, lag_bytes, lag_sec = row if lag_sec > 5.0: alert(f"Follower {addr}: {lag_sec:.1f}s behind!") # Method 2: Write a canary value and measure time-to-appear canary = str(time.time()) with leader_conn.cursor() as cur: cur.execute("UPDATE canary SET val = %s", (canary,)) leader_conn.commit() t0 = time.time() # Poll follower until canary appears while True: with follower_conn.cursor() as cur: cur.execute("SELECT val FROM canary") if cur.fetchone()[0] == canary: lag = time.time() - t0 print(f"Measured lag: {lag*1000:.0f}ms") break time.sleep(0.01)
| Lag Duration | Typical Cause | Impact | Action |
|---|---|---|---|
| < 100ms | Normal async operation | Most apps won't notice | Monitor, no action needed |
| 100ms - 1s | Follower under load, minor network issue | Read-your-own-writes failures for fast UIs | Implement sticky sessions |
| 1s - 10s | Large transaction, follower disk I/O saturated | Visible stale data, customer complaints | Route time-sensitive reads to leader |
| 10s - 60s | Schema migration replaying, vacuum, batch import on leader | Significant data staleness, monotonic reads violations widespread | Consider pausing non-critical reads from followers |
| > 60s | Follower network partition, disk failure, bug | Follower is essentially a different database | Remove from rotation, investigate immediately |
Three anomalies caused by replication lag. Select one and watch it happen.
Leader-follower replication has a fundamental limitation: every write must go through the single leader. If the leader is in Virginia and your user is in Tokyo, every write incurs a trans-Pacific round trip. If the leader goes down, writes stop until failover completes.
Multi-leader replication (also called master-master or active-active) allows more than one node to accept writes. Each leader processes writes locally and replicates changes to the other leaders asynchronously.
Multi-datacenter operation. One leader per datacenter. Users write to their local datacenter's leader. Replication between datacenters happens in the background. Each datacenter can operate independently if the network between datacenters goes down.
Offline-capable clients. Your calendar app, notes app, or document editor needs to work without internet. Each device is effectively a "leader" — it accepts writes locally and syncs when connectivity is restored. CouchDB was designed for exactly this model.
Collaborative editing. Google Docs lets multiple users edit the same document simultaneously. Each user's keystrokes are local writes that must be merged. This is multi-leader replication at the keystroke level.
Two users simultaneously edit the same row from different datacenters:
In single-leader replication, this can't happen — all writes go through one node, which serializes them. In multi-leader replication, conflicts are inevitable.
| Strategy | How it works | Trade-off |
|---|---|---|
| Last-Write-Wins (LWW) | Attach a timestamp to each write. When two writes conflict, keep the one with the higher timestamp. Discard the other. | Simple but lossy. One write silently disappears. If clocks are skewed (and they always are), "last" is arbitrary. Cassandra uses this by default. |
| Merge values | Concatenate or union the conflicting values. E.g., if one leader sets title="A" and another sets title="B", store "A/B" and let the application resolve later. | No data loss, but the merged value may be nonsensical. Works for sets (add items to a shopping cart) but not scalars. |
| On-write handler | Call a custom conflict resolution function when the conflict is detected during replication. | Flexible, but runs in the database layer, which limits what logic you can use. Bucardo uses this. |
| On-read resolution | Store all conflicting values. When a client reads, present all versions and let the application (or user) pick. | CouchDB does this. The application must handle conflicts, which is often forgotten. |
| CRDTs | Conflict-free Replicated Data Types. Mathematical data structures that automatically merge without conflicts. E.g., a G-Counter (grow-only counter) can be safely merged by taking the max of each node's count. | Only works for specific data types. Can't express arbitrary business logic. Riak uses these. |
To understand why collaborative editing needs special treatment, consider this scenario. Two users are editing the string "HELLO" simultaneously:
How do the leaders exchange changes? Three options:
Circular topology: Each leader sends its changes to the next in a ring. If one node fails, the ring breaks. MySQL supports this.
Star topology: One designated root leader forwards changes to all others. Simpler routing, but the root is a single point of failure.
All-to-all topology: Every leader sends to every other leader. Most fault-tolerant but requires more network connections. PostgreSQL BDR uses this.
You run an e-commerce site with leaders in Virginia (VA) and Frankfurt (EU). A customer has a shipping address on record.
For specific data types, Conflict-free Replicated Data Types (CRDTs) can automatically merge concurrent updates without any data loss. The most common CRDT is the G-Counter (grow-only counter):
python class GCounter: """Grow-only counter CRDT. Each replica has its own counter. Global count = sum of all replicas' counters. Merge = component-wise maximum.""" def __init__(self, replica_id, n_replicas): self.replica_id = replica_id self.counts = [0] * n_replicas def increment(self): # Only increment OUR counter self.counts[self.replica_id] += 1 def value(self): return sum(self.counts) def merge(self, other): # Component-wise max — NEVER loses an increment for i in range(len(self.counts)): self.counts[i] = max(self.counts[i], other.counts[i]) # Demo: two replicas count page views independently va = GCounter(replica_id=0, n_replicas=2) eu = GCounter(replica_id=1, n_replicas=2) # VA gets 5 page views, EU gets 3 for _ in range(5): va.increment() for _ in range(3): eu.increment() print(va.counts) # [5, 0] print(eu.counts) # [0, 3] # Merge: no conflicts, no data loss va.merge(eu) eu.merge(va) print(va.value()) # 8 — correct! print(eu.value()) # 8 — both replicas agree
Two leaders accept conflicting writes simultaneously. Watch the conflict detection and see different resolution strategies.
What if we got rid of the leader entirely? No single node is special. Any replica can accept reads AND writes. The client sends each write to multiple replicas in parallel. This is leaderless replication, pioneered by Amazon's Dynamo paper (2007) and used by Cassandra, Riak, and Voldemort.
With N replicas, the client sends each write to W replicas and each read to R replicas. The key insight:
At least one of the R replicas you read from must have the latest write. This is a quorum.
You sacrifice the quorum guarantee. Reads might miss the latest write entirely. But you gain availability: you can tolerate more node failures. Some applications choose W=1, R=1 (fastest possible, no consistency guarantee) for use cases where stale data is acceptable (e.g., analytics counters).
| N | W | R | Quorum? | Tolerate failures | Trade-off |
|---|---|---|---|---|---|
| 3 | 2 | 2 | Yes (4>3) | 1 write failure, 1 read failure | Balanced |
| 3 | 3 | 1 | Yes (4>3) | 0 write failures, 2 read failures | Fast reads, slow writes |
| 3 | 1 | 3 | Yes (4>3) | 2 write failures, 0 read failures | Fast writes, slow reads |
| 5 | 3 | 3 | Yes (6>5) | 2 write failures, 2 read failures | High availability |
| 3 | 1 | 1 | No (2≤3) | 2 of each, but may read stale | Maximum speed, no consistency |
Even with quorums, replicas can diverge. Two mechanisms bring them back in sync:
Read repair: When a client reads from multiple replicas and discovers that one has stale data, it writes the fresh value back to the stale replica. Repairs happen on every read, but only for data that's actually being read. Cold data (rarely read) can remain stale forever.
Anti-entropy process: A background daemon constantly compares replicas and copies missing data. Unlike read repair, this catches all stale data eventually, not just data being read. But it adds background I/O.
What happens when more than N-W nodes are down? Strict quorum writes fail. But if your priority is availability over consistency, you can use a sloppy quorum: accept writes on nodes that aren't the "home" replicas for this key. When the home nodes come back, the data is handed off to them. This is hinted handoff.
Sloppy quorums increase availability but weaken consistency — a read quorum might now miss a write that was stored on a "wrong" node.
Let's trace a complete quorum write and read with N=3, W=2, R=2, showing exactly how read repair works.
Even with W + R > N, quorums do NOT guarantee linearizability (the strongest consistency model). Here's why:
| Scenario | Why quorum fails |
|---|---|
| Sloppy quorum | Writes go to "wrong" nodes during network partition. The W nodes that ACK'd are not in the R set for the next read. |
| Concurrent writes | Two writes arrive at the same time. Different replicas might process them in different orders. No single "latest" value. |
| Write + read concurrency | A write is in progress. Some replicas have v2, others still have v1. R replicas might all return v1 if the write hasn't propagated to R nodes yet. |
| Failed write rollback | Write reaches 1 of W=2 needed replicas, then coordinator crashes. One replica has v2, two have v1. A read with R=2 might miss v2. |
Set N, W, and R. Watch which replica combinations guarantee a consistent read and which might miss the latest write.
Here's what leaderless replication looks like in practice with Apache Cassandra:
sql -- Create a keyspace with replication factor 3 CREATE KEYSPACE my_app WITH replication = { 'class': 'NetworkTopologyStrategy', 'us-east': 3, -- 3 replicas in US-East 'eu-west': 3 -- 3 replicas in EU-West }; -- Write with LOCAL_QUORUM (majority in the local datacenter) -- This means W=2 out of 3 local replicas, plus async to remote DC INSERT INTO users (id, name, email) VALUES (uuid(), 'Alice', 'alice@example.com') USING CONSISTENCY LOCAL_QUORUM; -- Read with LOCAL_QUORUM -- Reads from 2 out of 3 local replicas -- Guarantees seeing local writes, but NOT writes from other DC SELECT * FROM users WHERE id = ? USING CONSISTENCY LOCAL_QUORUM;
QUORUM requires a majority across ALL replicas globally (4 out of 6 in the example above). This means cross-datacenter round trips for every read and write. LOCAL_QUORUM only requires a majority in the local datacenter (2 out of 3). Much faster, but a write in US-East might not yet be visible to a read in EU-West. Choose LOCAL_QUORUM for performance, QUORUM for global consistency.Leaderless replication shines for append-heavy workloads where writes are far more frequent than reads, and eventual consistency is acceptable. Consider an IoT sensor network:
This is why Cassandra dominates in IoT, time-series, and event logging. The data model is append-only (sensor readings are never updated), conflicts don't apply (each timestamp is unique), and eventual consistency is fine (dashboards can be a few seconds behind).
In a leaderless system, how does the coordinator know WHICH N replicas should store a given key? The answer is consistent hashing. Each replica is assigned a position on a hash ring. Each key is hashed to a position on the same ring. The N replicas responsible for a key are the first N nodes clockwise from the key's position.
This is where replication gets truly interesting. Two clients write to the same key at roughly the same time. Neither knows about the other's write. In distributed systems terminology, these writes are concurrent — there is no causal relationship between them (neither "happened before" the other).
How do you even detect that two writes are concurrent, as opposed to one happening after the other?
Two events A and B have one of three relationships:
| Relationship | Meaning | How to detect |
|---|---|---|
| A → B | A happened before B. B knew about A. | B's version vector includes A's version or higher. |
| B → A | B happened before A. | A's version vector includes B's version or higher. |
| A || B | Concurrent. Neither knew about the other. | Neither version vector dominates the other. |
You cannot use wall-clock timestamps to determine this ordering. Clocks are unreliable in distributed systems (NTP drift, leap seconds, VM clock pauses). Instead, we use version vectors.
Each replica maintains a version number for each key. When a client writes, it includes the version numbers it last read. The server can then tell whether the new write supersedes, is superseded by, or is concurrent with existing data.
Amazon's Dynamo paper used a shopping cart as the motivating example. When two concurrent writes add different items to the same cart, the system stores both versions (siblings). When the client next reads, it receives all siblings and must merge them. For a shopping cart, merging means taking the union of all items.
But what about removing items? If one sibling has {milk, eggs, butter} and another has {milk, eggs} (butter was removed), taking the union puts butter back. The removed item resurrects. This is the "resurrection bug."
The fix: don't remove items, mark them as deleted with a tombstone. The merge function is: union all items, then remove items that have a tombstone in any sibling.
python class VersionVector: def __init__(self): self.clock = {} # {replica_id: version_number} def increment(self, replica_id): self.clock[replica_id] = self.clock.get(replica_id, 0) + 1 def dominates(self, other): """Does self happen-after other?""" # Every entry in other must be <= corresponding entry in self # AND at least one must be strictly less all_keys = set(self.clock) | set(other.clock) at_least_one_greater = False for k in all_keys: s = self.clock.get(k, 0) o = other.clock.get(k, 0) if s < o: return False # other has a higher version somewhere if s > o: at_least_one_greater = True return at_least_one_greater def concurrent_with(self, other): """Neither dominates the other.""" return not self.dominates(other) and not other.dominates(self) def merge(self, other): """Take component-wise max.""" merged = VersionVector() all_keys = set(self.clock) | set(other.clock) for k in all_keys: merged.clock[k] = max(self.clock.get(k, 0), other.clock.get(k, 0)) return merged def __repr__(self): return str(self.clock) # Demo: detecting concurrent writes vv_a = VersionVector() vv_a.increment("A") # {A:1} vv_b = VersionVector() vv_b.increment("B") # {B:1} print(vv_a.concurrent_with(vv_b)) # True — neither dominates merged = vv_a.merge(vv_b) # {A:1, B:1} merged.increment("A") # {A:2, B:1} print(merged.dominates(vv_a)) # True — merged happened after vv_a print(merged.dominates(vv_b)) # True — merged happened after vv_b
Here's a complete implementation that ties version vectors to actual data storage — the same pattern used by Riak internally:
python class DynamoStyleStore: """A single replica in a Dynamo-style leaderless system.""" def __init__(self, replica_id): self.id = replica_id # key -> list of (value, VersionVector) pairs # Multiple entries = siblings (concurrent writes) self.data = {} def put(self, key, value, context=None): """Write a value. context = version vector from prior read.""" if context is None: context = VersionVector() # Increment this replica's version new_vv = VersionVector() new_vv.clock = dict(context.clock) new_vv.increment(self.id) if key not in self.data: self.data[key] = [(value, new_vv)] return new_vv # Remove entries that the new write supersedes surviving = [] for (v, vv) in self.data[key]: if not new_vv.dominates(vv): surviving.append((v, vv)) # keep as sibling surviving.append((value, new_vv)) self.data[key] = surviving return new_vv def get(self, key): """Read all values (may return siblings).""" if key not in self.data: return [], VersionVector() entries = self.data[key] merged_vv = VersionVector() for (v, vv) in entries: merged_vv = merged_vv.merge(vv) values = [v for (v, vv) in entries] return values, merged_vv # Demo: the shopping cart scenario store_A = DynamoStyleStore("A") store_B = DynamoStyleStore("B") # Client X writes "milk" to store A ctx1 = store_A.put("cart", ["milk"]) # store_A: cart = [(["milk"], {A:1})] # Client Y writes "eggs" to store B (concurrent!) ctx2 = store_B.put("cart", ["eggs"]) # store_B: cart = [(["eggs"], {B:1})] # Anti-entropy: sync A's data to B. # B sees {A:1} vs {B:1} — neither dominates → sibling! # store_B: cart = [(["milk"],{A:1}), (["eggs"],{B:1})] # Client reads from B, merges siblings vals, ctx = store_B.get("cart") # vals = [["milk"], ["eggs"]], ctx = {A:1, B:1} merged_cart = list(set(item for v in vals for item in v)) # merged_cart = ["milk", "eggs"] # Client writes merged value back with the merged context store_B.put("cart", merged_cart, context=ctx) # store_B: cart = [(["milk","eggs"], {A:1, B:2})] # Sibling resolved! {A:1,B:2} dominates both {A:1} and {B:1}.
Practice determining the relationship between version vectors. For each pair, decide: does A dominate B? Does B dominate A? Or are they concurrent?
Siblings (concurrent values stored together) must eventually be merged. For a shopping cart:
python def merge_shopping_carts(siblings): """Merge concurrent shopping cart siblings using tombstones. Each sibling is a dict: {item: True/False} where False = tombstone.""" all_items = set() tombstoned = set() for sibling in siblings: for item, alive in sibling.items(): if alive: all_items.add(item) else: tombstoned.add(item) # Result: all items that appear alive in ANY sibling, # minus items tombstoned in ANY sibling. return all_items - tombstoned # Example: sibling_1 = {"milk": True, "eggs": True, "butter": True} sibling_2 = {"milk": True, "eggs": True, "butter": False} # tombstone result = merge_shopping_carts([sibling_1, sibling_2]) print(result) # {'milk', 'eggs'} — butter correctly removed
Two clients send concurrent writes to a leaderless cluster. Watch the version vectors evolve, detect concurrent writes, and see sibling resolution.
LWW is the simplest conflict resolution strategy: attach a timestamp to each write, keep the highest. Many engineers default to it because it's easy to implement. But it has a fundamental problem: it silently discards writes. Here's a complete analysis:
| Scenario | LWW behavior | Is this acceptable? |
|---|---|---|
| Cache warming | Two servers populate the same cache key. Latest wins. | Yes — both values are equivalent, losing one is fine. |
| Sensor readings | Two sensors report the same metric. Latest reading wins. | Yes — we want the freshest reading. |
| User profile updates | User updates name from two devices. One edit is silently lost. | No — user explicitly made both changes. |
| Shopping cart additions | User adds item on phone and laptop. One item silently disappears. | No — both additions were intentional. |
| Bank balance transfers | Two transfers happen concurrently. One is silently lost. | Absolutely not — money disappears. |
Happens-before is a partial order, not a total order. In a total order, every pair of events can be compared (like numbers on a number line). In a partial order, some pairs are incomparable — that's what "concurrent" means. Think of it like a family tree: your parents are your ancestors (they "happened before" you), but two cousins are neither ancestor nor descendant of each other — they're concurrent.
Theory is clean. Production is messy. Let's look at how real databases implement replication and what trade-offs they've made.
PostgreSQL uses physical streaming replication by default. The leader streams its WAL (Write-Ahead Log) to followers, which replay the exact same disk operations.
| Property | PostgreSQL |
|---|---|
| Topology | Leader-follower (1 primary, N standbys) |
| Log format | Physical WAL (tied to storage engine). Also supports logical replication (pgoutput) since v10. |
| Sync mode | Configurable: async (default), sync, or remote_apply (sync + applied to standby memory) |
| Failover | Manual (pg_ctl promote) or automated via Patroni/pg_auto_failover |
| Read from follower? | Yes (hot standby). But physical replication lag can cause canceled queries on standbys. |
| Gotcha | Physical WAL is version-locked. Rolling upgrades require logical replication or dump/restore. |
| Property | MySQL (InnoDB) |
|---|---|
| Topology | Leader-follower. Also supports Group Replication (multi-leader with Paxos). |
| Log format | Binlog: statement-based (legacy), row-based (default since 5.7), or mixed. |
| Sync mode | Semi-synchronous plugin available (wait for at least 1 replica ACK). |
| Failover | MySQL Orchestrator, MHA, or built-in Group Replication with automatic failover. |
| Read from follower? | Yes. ProxySQL or MySQL Router handle routing. |
| Gotcha | GTID (Global Transaction ID) is critical for reliable failover. Without it, finding the correct binlog position after failover is a nightmare. |
| Property | MongoDB |
|---|---|
| Topology | Replica sets (1 primary, N-1 secondaries, optional arbiters). |
| Log format | Oplog (operations log): logical format (similar to row-based binlog). |
| Sync mode | Configurable write concern: w=1 (leader only), w=majority, w=N. |
| Failover | Automatic. Built-in Raft-inspired election protocol. Election takes 2-12 seconds. |
| Read from follower? | Yes, with read preference: primary, primaryPreferred, secondary, nearest. |
| Gotcha | Election during network partitions can cause rollback of unreplicated writes on the old primary. |
| Property | Cassandra |
|---|---|
| Topology | Leaderless (Dynamo-style). Any node can coordinate reads/writes. |
| Log format | Mutations sent to replicas, each with a timestamp. Merged by LWW per column. |
| Sync mode | Tunable consistency: ONE, QUORUM, LOCAL_QUORUM, ALL. |
| Failover | No leader to fail over. Nodes join/leave the ring via gossip protocol. |
| Read from follower? | No leaders/followers distinction. All replicas serve reads and writes. |
| Gotcha | LWW conflict resolution means concurrent writes silently lose data. Deletes can resurrect (tombstone expiry). Counter columns need special handling. |
This is one of the most common system design decisions. Here's the structured way to think about it:
In practice, many systems use BOTH: PostgreSQL for transactional data (user accounts, orders, payments) and Cassandra for high-volume event data (clickstreams, sensor readings, logs). CDC pipes changes from PostgreSQL to Cassandra for full-text search or analytics.
A less-discussed but dangerous failure mode in Cassandra: tombstone accumulation. When you delete a row in Cassandra, it doesn't physically remove the data. Instead, it writes a tombstone — a marker that says "this row was deleted." Tombstones are necessary because the delete must propagate to all replicas, and without a tombstone, a replica that missed the delete would "resurrect" the row on the next read repair.
The problem: tombstones accumulate until Cassandra runs a compaction (merging SSTables). If your workload involves heavy deletes (e.g., a TTL-based cache, or a queue that dequeues by deleting), you can accumulate millions of tombstones. When a read query scans a range, it must read through all tombstones to find live rows. A query that should take 5ms now takes 30 seconds because it's scanning 2 million tombstones.
| Property | CockroachDB |
|---|---|
| Topology | Ranges (key ranges) each replicated via Raft consensus. Leader per range (not per cluster). |
| Log format | Raft log entries (logical operations). |
| Sync mode | Synchronous by design (Raft requires majority ACK for commit). |
| Failover | Automatic Raft leader election per range. Sub-second for each range. |
| Read from follower? | Follower reads available (with timestamp check for staleness bound). |
| Gotcha | Cross-region writes incur multi-RTT latency (Raft requires majority across regions). Design tables with locality in mind. |
Twitter (now X) is a fascinating case study in replication strategy. The core challenge: when a user with 50 million followers tweets, that tweet must appear in 50 million timelines. How?
This is replication at the application level. The same concepts apply: fan-out-on-write is like synchronous replication (write is not "done" until all copies exist), fan-out-on-read is like lazy evaluation (compute the view on demand). The hybrid approach mirrors semi-synchronous replication: eagerly replicate to most targets, lazily handle the expensive ones.
"Replication lag is growing steadily." Check: (1) Is the follower's disk I/O saturated? (long-running queries on the follower, vacuum, or compaction), (2) Is the network between leader and follower degraded? (packet loss, bandwidth saturation), (3) Is the write workload spiking? (large batch imports), (4) Is the follower CPU-bound on replay? (too many indexes to update per write).
"Reads return stale data intermittently." Check: (1) Are reads going to followers? (verify load balancer routing), (2) What's the current replication lag? (pg_stat_replication, SHOW SLAVE STATUS), (3) Is it always the same follower? (that follower might be unhealthy), (4) Is the application using connection pooling that routes some reads to followers? (PgBouncer config).
You can't trust your replication strategy until you've broken it deliberately. Here's how production teams validate their replication setup:
python # Chaos engineering tests for replication # Run these in staging, NOT production (unless you're Netflix) class ReplicationChaosTests: def test_leader_failure(self): """Kill the leader and verify failover.""" # 1. Write a canary value to the leader canary = f"chaos-test-{uuid.uuid4()}" leader.write("canary", canary) # 2. Wait for replication time.sleep(2) # 3. Kill the leader leader.kill() # 4. Wait for failover time.sleep(60) # max expected failover time # 5. Verify: can we still read the canary? result = get_new_leader().read("canary") assert result == canary, "DATA LOSS during failover!" # 6. Verify: can we write? get_new_leader().write("post-failover", "alive") print("✓ Leader failover: PASS") def test_follower_failure(self): """Kill a follower and verify reads continue.""" follower_1.kill() time.sleep(5) # Reads should still work (from remaining followers) result = read_from_cluster("any-key") assert result is not None print("✓ Follower failure: reads continue") def test_network_partition(self): """Simulate network partition between leader and followers.""" # Use iptables or tc to block traffic block_traffic(leader, followers) # Leader should still accept writes leader.write("during-partition", "test") # Followers should serve (stale) reads # But NOT serve writes (no split brain) # Heal partition restore_traffic(leader, followers) time.sleep(10) # Followers should catch up result = follower_1.read("during-partition") assert result == "test" print("✓ Network partition recovery: PASS") def test_replication_lag_under_load(self): """Write heavily and measure lag increase.""" initial_lag = measure_lag() # Write 100K rows rapidly for i in range(100000): leader.write(f"load-test-{i}", "x" * 1000) post_load_lag = measure_lag() print(f"Lag increase under load: {initial_lag}ms → {post_load_lag}ms") assert post_load_lag < 30000, "Lag exceeded 30s threshold!"
An interviewer says: "You're building an e-commerce platform with users in North America, Europe, and Asia. Design the database replication strategy."
This answer demonstrates staff-level thinking: different strategies for different data, clear trade-off reasoning, concrete numbers, and operational awareness.
Common mistakes that cause production incidents:
| Anti-pattern | Why it seems reasonable | Why it fails | What to do instead |
|---|---|---|---|
| Running heavy analytics queries on followers | "Followers have the same data, might as well use them" | Long-running queries hold row locks, slowing WAL replay. Replication lag spikes. Hot standby conflict cancels the query. | Use a dedicated analytics replica with max_standby_streaming_delay = infinity, or use a separate OLAP system (ClickHouse, BigQuery) |
| Too many followers on one leader | "More replicas = more read capacity" | Each follower needs a replication connection and CPU on the leader to read and send WAL. Past 10-15 followers, the leader spends more time replicating than serving writes. | Use cascading replication: leader → 2-3 followers → each feeds 2-3 more. Or use a CDC pipeline to fan out. |
| Synchronous replication across regions | "We need zero data loss globally" | Cross-region RTT is 50-200ms. Every write now takes 50-200ms minimum. Write throughput collapses. | Synchronous within the same region (same AZ or nearby AZ). Asynchronous across regions. Accept region-level RPO. |
| No monitoring of replication lag | "Replication just works, right?" | Lag drifts unnoticed. Then a failover promotes a follower that's 30 minutes behind. 30 minutes of data lost. | Alert if lag exceeds 10 seconds. Page if lag exceeds 60 seconds. Dashboard with per-follower lag. |
Compare the five databases across key replication dimensions.
Everything you've learned, condensed into the frameworks and talking points that win system design interviews.
| Strategy | Writes go to | Conflict? | Consistency | Failover | Best for |
|---|---|---|---|---|---|
| Leader-Follower | 1 leader | No (serialized) | Eventual (async) or strong (sync) | Manual or automated election | Most OLTP workloads |
| Multi-Leader | N leaders | Yes — must resolve | Eventual | Each DC independent | Multi-DC, offline clients |
| Leaderless | W of N replicas | Yes — siblings/LWW | Tunable (quorum) | No leader to fail | High availability, immutable data |
| Consensus (Raft) | Elected leader per shard | No (serialized by Raft) | Strong (linearizable) | Automatic, sub-second | Strong consistency requirements |
"Design a multi-region database for a social network."
"Design a collaborative document editor."
python import random from concurrent.futures import ThreadPoolExecutor, as_completed class Replica: def __init__(self, id): self.id = id self.store = {} # key -> (value, version) def write(self, key, value, version): self.store[key] = (value, version) return True def read(self, key): return self.store.get(key, (None, 0)) class QuorumClient: def __init__(self, replicas, w, r): self.replicas = replicas # list of Replica self.N = len(replicas) self.W = w self.R = r self.version = 0 def quorum_write(self, key, value): self.version += 1 # Send to all, wait for W ACKs acks = 0 for replica in self.replicas: if replica.write(key, value, self.version): acks += 1 if acks >= self.W: return True # quorum met return False # not enough ACKs def quorum_read(self, key): # Read from R replicas, return highest version results = [] chosen = random.sample(self.replicas, self.R) for replica in chosen: val, ver = replica.read(key) results.append((val, ver, replica)) # Pick the value with highest version best = max(results, key=lambda x: x[1]) # Read repair: update stale replicas for val, ver, replica in results: if ver < best[1]: replica.write(key, best[0], best[1]) return best[0] # Demo replicas = [Replica(i) for i in range(3)] client = QuorumClient(replicas, w=2, r=2) client.quorum_write("user:42", "Alice") print(client.quorum_read("user:42")) # "Alice"
In every system design interview, when you reach the database/storage section, use this framework:
| Mistake | Why it's wrong | What to say instead |
|---|---|---|
| "We'll use master-master replication for high availability" | Multi-leader creates write conflicts that most interviewers expect you to address | "We'll use leader-follower with automated failover. Multi-leader only if we need multi-region writes, and then we'll discuss conflict resolution strategy." |
| "We'll replicate everything synchronously for safety" | Sync replication to 3+ followers blocks on the slowest one | "Semi-synchronous: 1 sync follower for durability, rest async for read scalability." |
| "We'll use Cassandra because it's distributed" | Cassandra uses LWW by default, which silently loses data on conflicts | "Cassandra is a good fit for append-only data like event logs. For mutable data needing strong consistency, we'd use PostgreSQL or CockroachDB." |
| Not mentioning replication lag | The interviewer knows this is the core challenge | "With async replication, we'll have 1-5 seconds of lag. For user-facing reads, we'll use sticky sessions and read-your-own-writes routing." |
| Symptom | Likely cause | Investigation |
|---|---|---|
| Replication lag growing linearly | Write volume exceeds follower replay speed | Check follower CPU, disk IOPS. Consider parallel apply (MySQL 5.7+, PG 15+). |
| Intermittent stale reads | Read-your-own-writes violation | Verify read routing. Add sticky sessions or read-from-leader after writes. |
| Data appears then disappears | Monotonic reads violation (different followers) | Add session affinity. Check load balancer health checks. |
| Duplicate key errors after failover | Autoincrement ID collision | Fence old leader. Advance sequence. Switch to UUIDs. |
| Split-brain: both nodes accepting writes | Fencing failure during failover | Immediate: shut down one node. Long-term: STONITH + Raft. |
python import time from typing import Optional class ReplicationAwareRouter: """Routes reads to leader or follower based on read-your-own-writes consistency requirement.""" def __init__(self, leader, followers): self.leader = leader self.followers = followers # Track last write LSN per user self.user_write_lsn = {} # user_id -> (lsn, timestamp) self.consistency_window = 5.0 # seconds def record_write(self, user_id, lsn): """Called after every write. Records the LSN for routing.""" self.user_write_lsn[user_id] = (lsn, time.time()) def route_read(self, user_id, key) -> "Connection": """Route a read to the best replica for this user.""" entry = self.user_write_lsn.get(user_id) if entry: required_lsn, write_time = entry # If write was recent, ensure we read fresh data if time.time() - write_time < self.consistency_window: # Try to find a follower that's caught up for f in self.followers: if f.current_lsn() >= required_lsn: return f # This follower has the user's write # No follower is caught up — fall back to leader return self.leader # No recent write — any follower is fine return random.choice(self.followers)
python import threading class FailoverMonitor: """Monitors leader health and triggers failover.""" def __init__(self, leader, followers, heartbeat_interval=1.0, timeout=10.0, max_lag_for_promotion=1000): self.leader = leader self.followers = followers self.heartbeat_interval = heartbeat_interval self.timeout = timeout self.max_lag = max_lag_for_promotion # max bytes behind self.last_heartbeat = time.time() self.is_leader_alive = True def check_heartbeat(self): try: self.leader.ping() self.last_heartbeat = time.time() self.is_leader_alive = True except ConnectionError: elapsed = time.time() - self.last_heartbeat if elapsed > self.timeout: self.trigger_failover() def trigger_failover(self): # 1. Fence old leader (STONITH) self.leader.fence() # Prevent split-brain # 2. Find best candidate (most up-to-date, within lag limit) candidates = [ (f, f.replication_lag_bytes()) for f in self.followers if f.replication_lag_bytes() <= self.max_lag ] if not candidates: raise RuntimeError("No eligible follower for promotion!") # Pick the one with least lag new_leader, lag = min(candidates, key=lambda x: x[1]) # 3. Promote new_leader.promote_to_leader() # 4. Reconfigure remaining followers for f in self.followers: if f != new_leader: f.set_replication_source(new_leader) print(f"Failover complete. New leader: {new_leader.id}, lag was {lag} bytes")
| Paper/Resource | Key contribution |
|---|---|
| Dynamo: Amazon's Key-Value Store (2007) | Leaderless replication, quorums, vector clocks, consistent hashing |
| Raft: In Search of an Understandable Consensus Algorithm (2014) | Leader election + log replication made understandable |
| CAP Theorem (Brewer, 2000) | You can have at most 2 of: Consistency, Availability, Partition tolerance |
| PACELC (Abadi, 2012) | Extension of CAP: even without partitions, you trade latency vs consistency |
| CRDTs: Comprehensive Study (Shapiro et al., 2011) | Data structures that converge without coordination |
python import time, threading, queue class ReplicationLog: """Write-ahead log used for replication.""" def __init__(self): self.entries = [] # [(lsn, operation)] self.lsn_counter = 0 self.lock = threading.Lock() def append(self, operation): with self.lock: self.lsn_counter += 1 self.entries.append((self.lsn_counter, operation)) return self.lsn_counter def get_entries_since(self, from_lsn): """Return all entries with LSN > from_lsn.""" return [(lsn, op) for (lsn, op) in self.entries if lsn > from_lsn] class Leader: """Leader node that accepts writes and streams to followers.""" def __init__(self): self.data = {} self.wal = ReplicationLog() self.followers = [] def write(self, key, value): # 1. Write to WAL lsn = self.wal.append({"op": "SET", "key": key, "value": value}) # 2. Apply to local state self.data[key] = value # 3. Stream to followers (async) for f in self.followers: f.receive(lsn, {"op": "SET", "key": key, "value": value}) return lsn def read(self, key): return self.data.get(key) class Follower: """Follower node that replays changes from the leader.""" def __init__(self, follower_id): self.id = follower_id self.data = {} self.applied_lsn = 0 self.pending = queue.Queue() def receive(self, lsn, operation): """Receive a change from the leader (async).""" self.pending.put((lsn, operation)) def apply_pending(self): """Apply pending changes (called by replay loop).""" while not self.pending.empty(): lsn, op = self.pending.get() if lsn <= self.applied_lsn: continue # Already applied (idempotent) if op["op"] == "SET": self.data[op["key"]] = op["value"] self.applied_lsn = lsn def read(self, key): self.apply_pending() # Catch up before reading return self.data.get(key) def lag_behind(self, leader): """How many LSNs behind the leader?""" return leader.wal.lsn_counter - self.applied_lsn # Demo: trace writes through the replication stream leader = Leader() f1 = Follower("follower-1") f2 = Follower("follower-2") leader.followers = [f1, f2] leader.write("user:1", "Alice") # LSN=1 leader.write("user:2", "Bob") # LSN=2 # Before followers apply: print(f1.lag_behind(leader)) # 2 (two writes pending) print(f1.read("user:1")) # "Alice" (apply_pending runs first) print(f1.lag_behind(leader)) # 0 (caught up after read)
python import hashlib class ConsistentHashRing: """Assigns keys to N replicas using consistent hashing. Used by Cassandra, DynamoDB, and Riak to determine which nodes store a given key.""" def __init__(self, nodes, virtual_nodes=150, replication_factor=3): self.ring = {} # hash_position -> node_id self.sorted_keys = [] # sorted hash positions self.rf = replication_factor self.nodes = set(nodes) # Each physical node gets multiple positions on the ring # (virtual nodes) for better distribution for node in nodes: for i in range(virtual_nodes): key = f"{node}:{i}" h = self._hash(key) self.ring[h] = node self.sorted_keys.append(h) self.sorted_keys.sort() def _hash(self, key): return int(hashlib.md5(key.encode()).hexdigest(), 16) def get_replicas(self, key): """Return the N replica nodes responsible for this key.""" if not self.sorted_keys: return [] h = self._hash(key) replicas = [] seen_nodes = set() # Walk clockwise from the key's hash position idx = self._find_start(h) while len(replicas) < self.rf and len(seen_nodes) < len(self.nodes): pos = self.sorted_keys[idx % len(self.sorted_keys)] node = self.ring[pos] if node not in seen_nodes: replicas.append(node) seen_nodes.add(node) idx += 1 return replicas def _find_start(self, h): """Binary search for first position >= h.""" lo, hi = 0, len(self.sorted_keys) - 1 while lo < hi: mid = (lo + hi) // 2 if self.sorted_keys[mid] < h: lo = mid + 1 else: hi = mid return lo def add_node(self, node): """Add a node — only keys assigned to this node need to move.""" self.nodes.add(node) for i in range(150): key = f"{node}:{i}" h = self._hash(key) self.ring[h] = node self.sorted_keys.append(h) self.sorted_keys.sort() # Demo ring = ConsistentHashRing(["nodeA", "nodeB", "nodeC", "nodeD", "nodeE"]) print(ring.get_replicas("user:42")) # e.g., ['nodeC', 'nodeA', 'nodeE'] print(ring.get_replicas("user:99")) # e.g., ['nodeB', 'nodeD', 'nodeA'] # Add a new node — minimal key redistribution ring.add_node("nodeF") print(ring.get_replicas("user:42")) # May or may not change
| # | Question | One-line answer |
|---|---|---|
| 1 | Sync vs async replication? | Sync: no data loss, slower writes. Async: fast writes, possible data loss on leader failure. |
| 2 | What is replication lag? | The delay between a write on the leader and that write being applied on a follower. |
| 3 | What is split brain? | Two nodes both believe they are the leader. Both accept writes. Data diverges. |
| 4 | What is a quorum? | W + R > N — the overlap between write set and read set guarantees reading fresh data. |
| 5 | What is read repair? | When a quorum read finds a stale replica, it writes the fresh value back to fix it. |
| 6 | LWW stands for? | Last-Write-Wins — resolve conflicts by keeping the write with the highest timestamp. Simple but lossy. |
| 7 | What is a version vector? | A per-replica counter that tracks causality — detects whether two writes are concurrent or ordered. |
| 8 | What is STONITH? | Shoot The Other Node In The Head — forcibly shut down the old leader to prevent split brain. |
| 9 | What is CDC? | Change Data Capture — streaming the replication log as events to downstream systems (search, cache, analytics). |
| 10 | CAP theorem? | During a network partition, choose Consistency (reject requests) or Availability (serve possibly stale data). |
Every interview question about replication tests one of these dimensions. For each, here's a question you should be able to answer without hesitation:
| Dimension | Question | Key points in your answer |
|---|---|---|
| CONCEPT | "Explain the trade-off between sync and async replication." | Sync: no data loss but blocks on slowest replica. Async: fast writes but data loss on leader failure. Semi-sync: best of both — one guaranteed replica, rest async. |
| DESIGN | "Design a multi-region user profile service." | Leader-follower per region. Writes to home region. Cross-region reads from local follower. Read-your-own-writes via sticky sessions or LSN tracking. |
| CODE | "Implement quorum read/write with read repair." | Write to W replicas. Read from R replicas. Compare versions. Write fresh value back to stale replicas. |
| DEBUG | "Users see stale data after updating their profile." | Read-your-own-writes violation. Check: which replica served the read? What's the replication lag? Fix: route post-write reads to leader for N seconds. |
| FRONTIER | "What's the state of the art in conflict resolution?" | CRDTs (Automerge, Yjs for text). OT still used by Google Docs. Raft consensus for strong consistency (CockroachDB, TiDB). Deterministic simulation testing (FoundationDB). |
Replication is one of three pillars of distributed data systems. Here's how it connects to the other DDIA chapters and the broader landscape.
| Topic | Relationship to Replication | DDIA Chapter |
|---|---|---|
| Partitioning (Sharding) | Replication copies data across nodes. Partitioning splits data across nodes. Most systems use BOTH: each partition is replicated to N nodes. | Chapter 6 (Partitioning) |
| Transactions | Replication gives you copies. Transactions give you ACID guarantees within those copies. Distributed transactions (2PC) coordinate writes across replicas. | Chapter 7 (Transactions) |
| Consistency & Consensus | Replication creates the problem (multiple copies can diverge). Consensus algorithms (Raft, Paxos) are one solution: they ensure all replicas agree on the order of operations. | Chapter 9 (Consistency and Consensus) |
| Batch & Stream Processing | Change data capture (CDC) uses the replication log as a stream of events. Kafka consumers read the binlog/WAL to build derived datasets. | Chapters 10-11 (Batch & Stream) |
Replication sits at the heart of the CAP/PACELC trade-off space:
| Topic | Why it matters | Where to learn it |
|---|---|---|
| Consensus algorithms (Raft, Paxos) | How to make replicas agree on a single value. The foundation of strong consistency. | DDIA Ch. 9, the Raft paper |
| Distributed transactions (2PC, Saga) | How to coordinate writes across replicas atomically. Much harder than single-node transactions. | DDIA Ch. 7, Ch. 9 |
| CRDTs in depth | Mathematical structures for conflict-free merging. G-Counter, PN-Counter, LWW-Register, OR-Set. | Shapiro et al. 2011 survey paper |
| Change Data Capture | Using the replication log as an event stream for building derived systems. | DDIA Ch. 11 |
If you're using a cloud provider, you don't configure replication from scratch — but you still need to understand the trade-offs behind the options:
| Service | Replication Model | What you configure | Hidden trade-off |
|---|---|---|---|
| AWS RDS (PostgreSQL/MySQL) | Leader-follower. Multi-AZ = sync standby. Read replicas = async followers. | Number of read replicas (up to 15). Multi-AZ on/off. | Multi-AZ failover takes 60-120 seconds and can cause brief connection drops even with DNS updates. |
| AWS Aurora | Shared storage. 6-way replication across 3 AZs at the storage layer. Up to 15 read replicas. | Number of readers. Reader endpoint for load balancing. | Reader instances share storage but have independent buffer caches. After failover, the new writer starts with a cold cache. |
| Google Cloud Spanner | Paxos consensus per split. Synchronous across zones/regions. | Instance size. Regional vs multi-regional. | Multi-regional Spanner has 300-400ms write latency (cross-continent Paxos). Regional is ~5ms. Huge cost difference. |
| Azure Cosmos DB | Configurable: strong, bounded staleness, session, consistent prefix, eventual. | Consistency level. Regions. Conflict resolution policy. | Strong consistency requires single-region writes. Multi-region writes only with weaker consistency levels. |
| MongoDB Atlas | Replica sets. Configurable read/write concern. | Cluster tier. Read preference. Write concern. | w:majority adds ~2-5ms per write. Many developers use w:1 for speed and are surprised by data loss on failover. |
Replication is not always the answer. Consider these alternatives:
| If your problem is... | Replication helps? | Better solution |
|---|---|---|
| Write throughput exceeds single node | No — all writes still go to the leader | Partitioning (sharding) |
| Data doesn't fit on one node (disk space) | No — every replica stores a full copy | Partitioning (sharding) |
| Protecting against accidental deletion | No — deletes replicate too | Point-in-time backups, soft deletes |
| Complex cross-table queries are slow | No — followers have the same slow queries | Read-optimized derived views (materialized views, CQRS) |
| Need to serve data close to users globally | Yes — this is exactly what replication is for | Multi-region read replicas |
| Need to survive hardware failure | Yes — this is exactly what replication is for | Multi-AZ or multi-region replication |
The CAP theorem is often misunderstood. Here's the nuanced version:
| Scenario | What you choose | What you sacrifice | Real system |
|---|---|---|---|
| No partition (normal operation) | You can have BOTH consistency and availability | Latency (PACELC trade-off) | All systems in normal mode |
| Partition + choose C | Refuse requests to partitioned nodes | Availability (some users get errors) | CockroachDB, Spanner, etcd |
| Partition + choose A | Continue serving from all nodes | Consistency (stale reads, conflicts) | Cassandra, DynamoDB, CouchDB |
In practice, network partitions are rare (minutes per year in a well-run datacenter). So the PACELC extension is more useful day-to-day: even without partitions, you're constantly trading Latency vs. Consistency. Every time you choose async replication for speed, you're making a PACELC trade-off.
There is no "best" replication strategy. There are only trade-offs. Every system you build or evaluate sits somewhere in this space:
The engineer who understands WHERE a system sits in this space, and WHY it was designed that way, is the one who gets the staff-level offer.
A fascinating frontier in replication: what if there is no server at all? Local-first software runs entirely on the user's device, stores data locally, and uses CRDTs to sync between devices peer-to-peer.
| Property | Cloud-first (traditional) | Local-first |
|---|---|---|
| Primary copy | Server (cloud database) | User's device |
| Offline support | Limited or none | Full — local copy is authoritative |
| Sync mechanism | Server mediates all writes | CRDT merge between devices |
| Conflict resolution | Server decides (or rejects) | Automatic CRDT merge — no conflicts by construction |
| Latency | Network RTT per operation | Zero (local read/write), eventual sync |
| Privacy | Data stored on third-party servers | Data stays on user devices |
| Examples | Google Docs, Notion, Figma | Automerge, Yjs, Ink & Switch research |
In this model, every user's device is effectively a "leader" in a multi-leader replication system. CRDTs ensure that all devices converge to the same state regardless of the order in which changes are applied. The challenge: CRDTs work well for text, counters, and sets, but poorly for structured data like relational tables or complex business objects.
| Era | System | Innovation |
|---|---|---|
| 1970s | IBM System R | First replicated databases. Manual failover. Statement-based replication. |
| 1990s | Oracle RAC, MySQL Replication | Leader-follower becomes standard. Async replication over TCP. |
| 2007 | Amazon Dynamo | Leaderless replication, consistent hashing, vector clocks, quorums. Spawned Cassandra, Riak, Voldemort. |
| 2012 | Google Spanner | Globally consistent replication using TrueTime (atomic clocks + GPS). First system to offer linearizability across continents. |
| 2014 | Raft consensus (etcd, CockroachDB) | Made Paxos understandable. Leader election + log replication in one algorithm. CockroachDB proves Raft can power SQL databases. |
| 2018+ | Automerge, Yjs (CRDTs) | Conflict-free replication for collaborative apps. Local-first software movement. No server needed for consistency. |
| 2020+ | FoundationDB, TiDB, Neon | Deterministic simulation testing. Disaggregated storage (compute replicas share storage). Serverless replication. |
If this lesson gave you the conceptual foundation, here's the path to deeper mastery:
| Next step | Resource | What you'll learn |
|---|---|---|
| Implement Raft from scratch | MIT 6.824 Lab 2 (free, online) | Leader election, log replication, and safety proofs — by writing the code yourself. |
| Read the Dynamo paper | DeCandia et al., SOSP 2007 | The original leaderless design. Every section is quotable in interviews. |
| Study FoundationDB | "Testing Distributed Systems" (Will Wilson, Strange Loop talk) | How deterministic simulation testing catches bugs that real-world testing misses. |
| Build a CDC pipeline | Debezium tutorial + Kafka Connect | Stream PostgreSQL changes to Elasticsearch. See logical replication in action. |
| Try CRDTs | Automerge library (JavaScript/Rust) | Build a collaborative text editor with conflict-free merge. Local-first architecture. |
"Design the replication strategy for a global payments system."
This is arguably the hardest replication design challenge because financial data has the strictest consistency requirements:
This answer demonstrates the key insight: even in a "zero data loss" system, you sometimes accept async replication for cross-region operations. The trick is designing the business logic to handle the async window gracefully (saga pattern, pending states, idempotent operations).