Designing Data-Intensive Applications — Chapter 6

Database Replication

Leader-follower, multi-leader, leaderless — keeping copies consistent across machines.

Prerequisites: Basic networking + Key-value stores. That's it.
11
Chapters
9+
Simulations
5
Interview Dimensions

Chapter 0: The Problem

Your startup's database runs on a single server in Virginia. A PostgreSQL instance on a beefy 64-core machine with 256 GB of RAM and NVMe storage. It handles 100,000 reads per second at peak and serves users across three continents. Everything is fine. Until it isn't.

At 2:47 AM on a Tuesday, the server's power supply fails. The database goes offline. For 34 minutes, every user sees "Service Unavailable." Your SLA promised 99.99% uptime — that's 52 minutes of allowed downtime per year. You just burned 65% of your annual budget in one incident. The business impact: $50,000 in lost revenue, three enterprise customers threatening to leave, and your VP of Engineering scheduling a post-mortem that will consume your entire week.

Even when the server is up, there's a subtler problem. A user in Tokyo sends a request. The packet travels 11,000 kilometers across the Pacific, through undersea fiber, through multiple routing hops. Round-trip latency: 180-220 milliseconds. Your competitor, who runs servers in Tokyo, responds in 8 milliseconds. Your user notices.

The solution seems obvious: make copies. Run the same database on multiple machines in multiple locations. A copy in Virginia, a copy in Frankfurt, a copy in Tokyo. Reads can be served from the nearest copy. If one machine dies, the others keep serving. Problem solved.

Except now you have a much harder problem. When a user in Virginia writes a new record, how do the copies in Frankfurt and Tokyo learn about it? What happens if two users update the same record from different locations at the same time? What does a reader in Tokyo see during the milliseconds (or seconds, or minutes) before the write from Virginia arrives?

This is replication: keeping copies of the same data on multiple machines connected via a network. And the tension between availability (more copies = more resilience) and consistency (more copies = harder to keep them all in sync) is the core drama of distributed systems.

Let's be precise about the challenge. The difficulty of replication lies almost entirely in handling changes to data. If the data never changed after it was written, replication would be trivial — copy the files once, done. But real data changes constantly: users update profiles, place orders, delete messages, transfer money. Every change must propagate to every copy, in the right order, despite network delays, node failures, and concurrent modifications. The harder you try to keep copies perfectly in sync, the more you sacrifice speed and availability. The more you relax consistency, the more anomalies your users experience.

Why not just use a bigger server? Vertical scaling (bigger machine) has a hard ceiling. The largest single machine you can rent from a cloud provider has maybe 24 TB of RAM and 448 vCPUs. That sounds like a lot, but it's still a single point of failure. And it can't solve the speed-of-light problem: physics says a packet from Virginia to Tokyo takes at least 37 ms one way, and real networks add 3-5x on top of that. Replication is the only way to serve data close to users AND survive hardware failures.

The Fundamental Trade-off

Every replication strategy must answer three questions:

QuestionEasy answerWhy it's hard
Where do writes go?Pick one node (the "leader")That node becomes a bottleneck and single point of failure
How fast do copies update?Immediately (synchronous)A single slow replica blocks all writes
What if copies disagree?Keep the latest write"Latest" requires perfect clocks — which don't exist in distributed systems

The rest of this lesson is about the different ways real systems navigate these three questions. There is no perfect answer — only trade-offs. Every database you will ever use has made a specific choice in this trade-off space, and understanding that choice is what separates a developer who uses databases from an engineer who can design systems.

The Three Reasons to Replicate

Every motivation for replication falls into one of three categories:

ReasonWhat it gives youExample
High availabilitySystem keeps working when one node failsPostgreSQL primary fails, standby takes over in 30 seconds
Reduced latencyData is geographically close to usersReplicas in Virginia, Frankfurt, and Tokyo serve local reads
Read scalabilityMore nodes handle more read queries1 leader + 10 read replicas handle 10x the read throughput

Notice that NONE of these reasons is "increase write throughput." Replication does not help with writes — every write must still be processed by the leader (or by the quorum, in leaderless systems). Writes are always the bottleneck. If you need more write throughput, you need partitioning (sharding), which is a different technique covered in DDIA Chapter 6.

The Numbers: Availability Math

// Single node with 99.9% uptime (8.76 hours of downtime/year):
P(available) = 0.999

// Two replicas, each 99.9% uptime, independent failures:
P(both down) = 0.001 × 0.001 = 0.000001
P(at least one up) = 1 - 0.000001 = 0.999999 = 99.9999%
Downtime: 31.5 seconds/year

// Three replicas:
P(all three down) = 0.0013 = 10-9
P(at least one up) = 0.999999999 = nine 9's
Downtime: 0.03 seconds/year (theoretical — correlated failures break this)

// CAVEAT: This assumes INDEPENDENT failures.
// In reality, failures correlate: power outage hits all nodes in a rack,
// software bug crashes all replicas running the same code,
// network partition isolates an entire datacenter.
// This is why you replicate ACROSS failure domains (racks, AZs, regions).

Replication vs. Backup: They're Not the Same

A common misconception: "We have nightly backups, so we don't need replication." Backups and replication solve different problems:

PropertyBackupReplication
PurposeDisaster recovery (restore from snapshot)High availability (automatic failover)
Data loss (RPO)Hours (since last backup)Seconds (sync) to minutes (async lag)
Recovery time (RTO)Minutes to hours (restore snapshot)Seconds (automated failover)
Protects againstAccidental deletion, corruption, ransomwareHardware failure, network issues
Does NOT protect againstHardware failure (unless offsite)Accidental deletion (replicated instantly to all!)
Replication replicates mistakes too. If a developer accidentally runs DELETE FROM users WHERE id > 0 on the leader, that deletion is faithfully replicated to all followers within milliseconds. You now have N copies of an empty table. Backups are your last line of defense against human error. Always have both replication AND backups.
Single Node vs. Replicated System

Watch a single-node system handle requests, then fail. Click "Add Replicas" to see the replication trade-off in action.

Single node serving all requests. Click "Kill Primary" to see what happens.
Interview warm-up: A single database server handles 50K reads/sec with 5ms p99 latency. You add two read replicas. Under ideal conditions (no replication lag, perfect load balancing), what is the theoretical maximum read throughput of the system?

Chapter 1: Leaders and Followers

The simplest and most common replication strategy: designate one replica as the leader (also called primary or master). All writes go through the leader. The other replicas are followers (also called read replicas, secondaries, or slaves). They receive a stream of changes from the leader and apply them in the same order.

This is not a theoretical construct. It's how PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, and most relational databases work by default. It's also how some message brokers (Kafka, RabbitMQ) and some non-relational databases (RethinkDB, Espresso) work.

Think of it like a newsroom. The editor-in-chief (leader) decides what goes into the newspaper. Correspondents (followers) receive the final copy and distribute it in their regions. Nobody else writes the headlines — that power is centralized. This makes coordination simple but creates a bottleneck at the top.

Terminology note. The industry is moving away from "master/slave" terminology. This lesson uses leader/follower throughout. You'll see "primary/replica" in PostgreSQL docs, "primary/secondary" in MongoDB, and "source/replica" in MySQL 8.0+. The concept is identical regardless of naming.

Synchronous vs. Asynchronous

When a client sends a write to the leader, the leader has a choice: does it wait for followers to confirm they received the change before telling the client "write succeeded"?

Synchronous replication: The leader waits until at least one follower confirms it has written the data to disk. Only then does the leader report success to the client. Advantage: the follower is guaranteed to have an up-to-date copy. If the leader dies, you know the follower has everything. Disadvantage: if the follower is slow or down, the write is blocked. One laggy follower can stall your entire write path.

Asynchronous replication: The leader writes locally and immediately tells the client "success." The change is sent to followers in the background. Advantage: writes are fast — the leader never blocks. Disadvantage: if the leader dies before the change reaches any follower, that write is permanently lost. The client got a "success" acknowledgment for data that no longer exists.

Semi-synchronous: the practical compromise. In practice, making ALL followers synchronous is impractical — any single follower failure blocks all writes. Most production deployments use semi-synchronous replication: one follower is synchronous (guaranteed up-to-date), and the rest are asynchronous. If the synchronous follower becomes unavailable, one of the async followers is promoted to synchronous. This guarantees that at least two nodes (leader + one follower) always have the latest data. PostgreSQL calls this "synchronous standby."

Setting Up a New Follower

You can't just copy the leader's data files while it's running — the data is constantly in flux, and you'd get a corrupted half-written snapshot. Here's the correct procedure:

1. Consistent Snapshot
Take a point-in-time snapshot of the leader's data without locking the entire database. Most databases can do this (PostgreSQL pg_basebackup, MySQL mysqldump --single-transaction). Record the snapshot's position in the replication log.
2. Copy to Follower
Transfer the snapshot to the new follower node and restore it. The follower now has the leader's data as of the snapshot timestamp.
3. Catch Up
The follower connects to the leader and requests all changes that occurred since the snapshot's replication log position. It applies these changes in order until it is caught up.
4. Streaming
The follower now processes new changes as they arrive from the leader in real-time. It has joined the replication cluster.

Worked Example: Tracing a Write

Let's trace a single write through a leader-follower cluster with 1 leader and 2 followers, semi-synchronous replication.

// t=0ms: Client sends INSERT INTO users (name) VALUES ('Alice')
Client → Leader

// t=1ms: Leader writes to its WAL (Write-Ahead Log)
Leader WAL: LSN=1042, INSERT users ('Alice')

// t=2ms: Leader sends change to Follower-1 (synchronous) and Follower-2 (async)
Leader → Follower-1: LSN=1042 data
Leader → Follower-2: LSN=1042 data

// t=5ms: Follower-1 applies change, sends ACK back to leader
Follower-1 → Leader: ACK LSN=1042

// t=6ms: Leader has 1 sync ACK — responds to client
Leader → Client: OK (write committed)

// t=12ms: Follower-2 applies change (no one was waiting for this)
Follower-2: applied LSN=1042

// Result: Data on Leader + Follower-1 immediately. Follower-2 lags by ~6ms.

The Replication Lag Equation

How much data could we lose in a leader failure? With asynchronous replication, the answer depends on replication lag — the delay between a write hitting the leader and that write being applied on the follower.

// Data at risk = write throughput × replication lag

Write throughput: 1,000 writes/sec × 500 bytes/write = 500 KB/sec
Replication lag: 2 seconds (normal operation)

Data at risk = 500 KB/sec × 2 sec = 1 MB of data

// In a failure scenario with 10-second lag spike:
Data at risk = 500 KB/sec × 10 sec = 5 MB = ~10,000 writes lost

// This is your RPO (Recovery Point Objective):
// The maximum amount of data loss you can tolerate.
// Sync replication: RPO = 0 (no data loss, but higher latency).
// Async replication: RPO = replication lag.

Hands-on: Setting Up Replication in PostgreSQL

Here's what the actual configuration looks like. These are the key parameters in postgresql.conf on the leader and recovery configuration on the follower:

bash
# === On the LEADER (primary) ===
# postgresql.conf
wal_level = replica              # Enable replication-grade WAL
max_wal_senders = 5             # Max concurrent replication connections
synchronous_standby_names = 'first 1 (follower1, follower2)'
                                 # Semi-sync: wait for 1 of these

# pg_hba.conf (authentication for replication)
host replication repl_user 10.0.0.0/8 md5

# === On the FOLLOWER ===
# Create base backup from leader:
pg_basebackup -h leader-host -U repl_user -D /var/lib/pgsql/data -P

# standby.signal (tells PG this is a follower)
# Just create an empty file:
touch /var/lib/pgsql/data/standby.signal

# postgresql.conf on follower
primary_conninfo = 'host=leader-host user=repl_user password=secret'
hot_standby = on                 # Allow read queries on follower
sql
-- Monitor replication lag (run on leader):
SELECT client_addr,
       state,
       sent_lsn,
       write_lsn,
       flush_lsn,
       replay_lsn,
       (sent_lsn - replay_lsn) AS replication_lag_bytes
FROM pg_stat_replication;

-- Example output:
--  client_addr  | state     | sent_lsn    | replay_lsn  | lag_bytes
--  10.0.1.2     | streaming | 0/16B3A40   | 0/16B3A40   | 0        (caught up)
--  10.0.1.3     | streaming | 0/16B3A40   | 0/16B2F20   | 2848     (2.8 KB behind)
Leader-Follower Replication

Watch writes flow from client to leader to followers. Toggle sync/async to see the difference in latency and safety.

Mode
Semi-synchronous mode. Click "Send Write" to trace a write through the cluster.

Read Scaling: The Math Behind Adding Followers

Adding read replicas has diminishing returns. Here's why:

// With 1 leader and F followers:
// Each follower replays ALL writes from the leader.
// So each follower spends some CPU on write replay.

Total read capacity = (Leader read capacity after writes)
+ F × (Follower read capacity after replay)

// If leader uses 50% CPU for writes:
// Leader reads: 50% capacity → 25K reads/sec
// If each follower uses 30% CPU for write replay:
// Follower reads: 70% capacity → 35K reads/sec

F=0: Total = 25K reads/sec
F=1: Total = 25K + 35K = 60K (2.4x improvement)
F=3: Total = 25K + 105K = 130K (5.2x)
F=10: Total = 25K + 350K = 375K (15x)

// But: each follower adds load on the leader (replication connections).
// Past ~15-20 followers, leader CPU for replication streams
// starts eating into write and read capacity.
// Use cascading replication beyond this point.
System design question: You run a leader-follower PostgreSQL cluster with fully synchronous replication to 3 followers. One follower's network card fails. What happens to your write throughput?

Chapter 2: Handling Node Outages

Nodes go down. Hardware fails, kernels panic, power grids hiccup, network cables get unplugged by overzealous cleaning crews. The entire point of replication is to keep the system running despite individual node failures. But the details of how you handle failure determine whether you get a smooth failover or a data-loss catastrophe.

Follower Failure: Catch-Up Recovery

A follower crashes and restarts. This is the easy case. The follower knows the last transaction it processed (recorded in its local log). It connects to the leader and requests all changes since that position. It applies them sequentially until it catches up. Then it resumes processing the live stream. No data loss. No downtime for the rest of the cluster. This "just works" in every major database.

Leader Failure: Failover

The leader crashes. This is the hard case. The system needs to:

1. Detect Failure
Other nodes notice the leader isn't responding. Usually done with heartbeat timeouts: if the leader hasn't sent a heartbeat in 30 seconds, it's presumed dead. Too short = false alarms (leader was just busy under load). Too long = unnecessary downtime.
2. Choose New Leader
Among the remaining followers, pick the one with the most up-to-date data (highest replication log position). This can be done by consensus (election), or by a controller node deciding. The new leader must be accepted by all remaining nodes.
3. Reconfigure Clients
All clients must now send writes to the new leader. All remaining followers must start replicating from the new leader. The old leader, if it comes back, must become a follower (or be fenced off).

What Can Go Wrong During Failover

Failover is one of the most dangerous operations in a database cluster. Here's a catalog of disasters:

Lost writes. If the old leader had accepted writes that weren't yet replicated to any follower (because replication was asynchronous), those writes are permanently lost when the new leader takes over. When the old leader comes back, it has writes that conflict with the new leader's history. Common resolution: discard the old leader's unreplicated writes. This violates the client's durability expectation — they got "OK" for writes that vanished.

Split brain. Both the old leader and the new leader think they're the leader. Both accept writes. Now you have two diverging histories of the same data. Merging them is often impossible without data loss. Prevention: use a fencing mechanism — when a new leader is elected, the old leader is forcibly shut down (via STONITH: "Shoot The Other Node In The Head") before the new leader starts accepting writes.

Autoincrement conflicts. The old leader assigned autoincrement IDs 1001-1005 to writes that weren't replicated. The new leader starts assigning IDs from 1001. Now you have two different rows with ID 1001. If a downstream system (like a Redis cache or a search index) processes both, it will overwrite one with the other.

Stale reads during failover. During the detection window (while the system is deciding whether the leader is actually dead), clients may be reading from followers that are serving increasingly stale data. If the detection timeout is 30 seconds, reads could be 30+ seconds stale before the failover even begins.

Cascading failures. The leader goes down because of high load (not hardware failure). The remaining followers suddenly get more read traffic (clients that were reading from the leader are redistributed). The increased load on followers causes them to slow down their WAL replay, increasing replication lag. If you then promote a lagging follower, you lose even more data. Meanwhile, the new leader is also under heavy load and might itself become unresponsive.

Designing a Safe Failover System

Here's what a production-grade failover system needs:

python
# Failover decision pseudocode (based on Patroni's approach)

def should_failover(leader_last_seen, followers):
    # 1. Don't failover too quickly (avoid false positives)
    if time.time() - leader_last_seen < DETECTION_TIMEOUT:
        return False, None

    # 2. Don't failover if we can't reach the DCS (split brain risk)
    if not can_reach_distributed_consensus_store():
        return False, None

    # 3. Find eligible candidates
    candidates = []
    for f in followers:
        lag = f.replication_lag_bytes()
        if lag <= MAX_LAG_FOR_FAILOVER:
            candidates.append((f, lag))

    if not candidates:
        alert("CRITICAL: No follower eligible for promotion!")
        return False, None

    # 4. Pick the most up-to-date candidate
    best = min(candidates, key=lambda x: x[1])
    return True, best[0]

def execute_failover(old_leader, new_leader, all_followers):
    # 1. FENCE the old leader (STONITH)
    # This is CRITICAL — must happen BEFORE promotion
    fence_node(old_leader)  # Power off, revoke network, etc.

    # 2. Wait for new leader to finish replaying its pending WAL
    new_leader.wait_for_replay_complete()

    # 3. Promote
    new_leader.promote()

    # 4. Reconfigure remaining followers
    for f in all_followers:
        if f != new_leader:
            f.follow(new_leader)

    # 5. Update DNS / service discovery / proxy config
    update_write_endpoint(new_leader.address)

    # 6. Verify
    assert new_leader.is_accepting_writes()
    log("Failover complete")

Worked Example: The Failover Timeline

Let's trace a real-world failover step by step. The cluster has Leader L, Follower F1 (1 second behind), and Follower F2 (3 seconds behind). Leader L crashes at t=0.

// t=0s: Leader L crashes. L had processed writes up to LSN=5000.
// F1 has replicated up to LSN=4990 (10 writes behind).
// F2 has replicated up to LSN=4970 (30 writes behind).

// t=0 to t=30s: Detection phase.
// F1 and F2 notice heartbeats from L have stopped.
// They wait for the configured timeout (30 seconds).
// During this time: ALL WRITES ARE REJECTED. Reads still work from F1/F2.

// t=30s: Timeout expires. F1 and F2 agree L is dead.
// Election: F1 has LSN=4990, F2 has LSN=4970.
// F1 wins (most up-to-date). F1 becomes new leader.

// t=31s: Reconfiguration.
// F2 starts replicating from F1 instead of L.
// Client connection strings are updated (via DNS, proxy, or service discovery).

// t=32s: F1 starts accepting writes at LSN=4991.
// PROBLEM: LSNs 4991-5000 from old leader L are LOST.
// 10 writes that got "OK" from the client are permanently gone.

// Total downtime for writes: ~32 seconds.
// Total data loss: 10 writes (the async replication gap).

Those 10 lost writes are the price of asynchronous replication. If you need zero data loss, you must use synchronous replication — but then your write latency includes the round-trip to at least one follower, and a follower failure can block writes entirely.

GitHub's 2012 MySQL failover incident. An automated failover promoted a follower that was several seconds behind the leader. The new leader's autoincrement IDs overlapped with the old leader's unreplicated writes. When the old leader's data was eventually recovered and merged, the conflicting IDs caused data corruption. GitHub was partially down for hours and some private repository data was exposed to the wrong users. The root cause: their failover system did not check how far behind the promoted follower was. Lesson: automated failover without a lag threshold is a loaded gun.

Failover in Production: How Long Does It Take?

SystemDetection timeElection timeTotal failoverData loss risk
PostgreSQL + Patroni10-30s (configurable TTL)1-5s (DCS-based)15-35sAsync: up to lag duration. Sync: 0.
MySQL + Orchestrator10-30s1-3s15-35sSemi-sync reduces to ~1 transaction
MongoDB Replica Set10s (electionTimeoutMillis)2-12s12-22sw:majority: 0. w:1: up to oplog lag.
CockroachDB (per range)9s (default liveness)Sub-second (Raft)~10s per range0 (Raft guarantees no data loss)
Amazon AuroraInstant (shared storage)~30s (DNS propagation)~30s0 (shared storage, write acknowledged = durable)
The detection-election trade-off. Short detection timeout (e.g., 5 seconds) catches failures fast but causes false positives — a leader that's just busy under heavy load gets dethroned. Long timeout (e.g., 60 seconds) avoids false positives but means longer downtime during real failures. Most production systems use 10-30 seconds. Some use adaptive timeouts: if the leader's response time is trending upward, tighten the threshold.
Failover Simulation

A 3-node cluster processes writes. Kill the leader and watch the failover sequence: detection, election, reconfiguration.

Click "Start Writes" to begin, then "Kill Leader" to trigger failover.
Debug scenario: After an automated failover, your application starts throwing "duplicate key" errors on INSERT statements. The old leader was down for 90 seconds. What happened, and what's your immediate remediation?

Chapter 3: Replication Logs

The leader needs to tell followers what changed. But how does it describe the change? This seemingly simple question has four distinct answers, each with different trade-offs. The choice of replication log format determines what you can and can't do with your replicas.

Method 1: Statement-Based Replication

The leader logs the actual SQL statement and sends it to followers. The follower re-executes the same statement.

// Leader sends:
INSERT INTO users (name, created_at) VALUES ('Alice', NOW())

// Follower executes the same statement

Simple and intuitive. But broken in subtle ways:

ProblemExampleWhy it breaks
Non-deterministic functionsNOW(), RAND(), UUID()Returns different values on leader vs follower
Auto-incrementing columnsINSERT ... (auto_id)Must execute in exactly the same order on all replicas
Side effectsTriggers, stored procedures, UDFsMay depend on local state that differs between nodes
Execution orderConcurrent UPDATEsDifferent execution order → different final state

MySQL used statement-based replication before version 5.1. It worked by replacing NOW() with a fixed timestamp value in the replicated statement. But the edge cases were endless, and MySQL eventually switched to row-based replication as the default.

Method 2: Write-Ahead Log (WAL) Shipping

Every write to a database first goes to an append-only log on disk before it modifies the actual data structures (B-trees, pages). This is the Write-Ahead Log — it ensures durability even if the process crashes mid-write. WAL shipping sends this raw log to followers.

// PostgreSQL WAL record (simplified):
LSN: 0/16B3A40 | Transaction: 1042 | Table: users
Block: 231 | Offset: 148 | Op: INSERT
Data: [raw binary tuple bytes...]

The follower replays the same low-level disk operations. This is exactly what PostgreSQL streaming replication does.

Advantage: Byte-for-byte identical replicas. No ambiguity.

Disadvantage: The WAL is tightly coupled to the storage engine's internal format. If the leader runs PostgreSQL 15 and uses a specific B-tree page layout, the follower must run the exact same version. You cannot do a rolling upgrade (run the new version on followers first, then failover) because the new version's WAL format might be incompatible. This makes zero-downtime upgrades painful.

Method 3: Logical (Row-Based) Log Replication

Instead of the raw bytes, send a logical description of what changed at the row level:

// For an INSERT:
Table: users | Op: INSERT
new_row: {id: 42, name: 'Alice', created_at: '2024-01-15 09:30:00'}

// For an UPDATE:
Table: users | Op: UPDATE | Key: {id: 42}
changed_columns: {name: 'Alice Smith'}

// For a DELETE:
Table: users | Op: DELETE | Key: {id: 42}

This is what MySQL's binlog uses in row-based mode. It's also what PostgreSQL's logical replication (pgoutput) uses.

Advantage: Decoupled from storage engine internals. The leader and follower can run different versions, different storage engines, or even different database systems entirely. This is how you build change data capture (CDC) — streaming database changes to a data warehouse, search index, or cache.

Disadvantage: Slightly more data to transmit (row values vs. raw page diffs). But network bandwidth is rarely the bottleneck.

Method 4: Trigger-Based Replication

Database triggers fire custom application code when data changes. You can use triggers to write change records to a separate table, which an external process reads and applies to other databases.

Tools like Oracle GoldenGate and Bucardo use this approach. It's the most flexible — you can filter, transform, or route changes with arbitrary logic. But it's also the slowest (trigger overhead on every write) and most error-prone (your replication logic is application code that can have bugs).

When to use trigger-based replication. Almost never, unless you need to replicate a subset of data, replicate between different database systems, or add conflict resolution logic that the database doesn't support natively. For anything else, use the database's built-in replication.

Worked Example: Same Operation, Four Representations

Consider this simple operation on a table with schema users(id INT PK, name TEXT, email TEXT, updated_at TIMESTAMP):

// The SQL statement:
UPDATE users SET name='Alice Smith', updated_at=NOW() WHERE id=42

// 1. STATEMENT-BASED: send the SQL verbatim
→ "UPDATE users SET name='Alice Smith', updated_at=NOW() WHERE id=42"
Problem: NOW() returns 2024-01-15 09:30:01 on leader
but 2024-01-15 09:30:03 on follower (2s later).
Fix: Replace NOW() with literal: updated_at='2024-01-15 09:30:01'

// 2. WAL (PHYSICAL): raw page-level change
→ LSN=0/16B3A40 | Relation: 16384/1663/users | Block: 231
Offset: 148 | Old tuple: [42, 'Alice', 'alice@...', ...]
New tuple: [42, 'Alice Smith', 'alice@...', '2024-01-15 09:30:01']
+ index updates for name column B-tree
Problem: includes internal page numbers, offsets, tuple format.
Follower MUST have identical disk layout.

// 3. LOGICAL (ROW-BASED): structured change description
→ {table: "public.users", op: "UPDATE",
key: {id: 42},
before: {name: "Alice", updated_at: "2024-01-15 08:00:00"},
after: {name: "Alice Smith", updated_at: "2024-01-15 09:30:01"}}
Clean, portable, version-agnostic.

// 4. TRIGGER-BASED: custom audit trail
→ The UPDATE fires a trigger that runs:
INSERT INTO audit_log (table_name, pk, old_data, new_data, ts)
VALUES ('users', 42, '{"name":"Alice"}'::jsonb,
'{"name":"Alice Smith"}'::jsonb, now());
An external process reads audit_log and applies changes to followers.

Change Data Capture: Why Logical Logs Won

The logical (row-based) log has become the industry standard for modern data architectures because it enables Change Data Capture (CDC) — streaming every database change as an event to downstream systems.

python
# Reading PostgreSQL logical replication stream with psycopg2
# This is what tools like Debezium do internally
import psycopg2
from psycopg2.extras import LogicalReplicationConnection

conn = psycopg2.connect(
    "host=leader dbname=mydb",
    connection_factory=LogicalReplicationConnection
)
cur = conn.cursor()

# Create a replication slot (persistent position tracker)
cur.create_replication_slot('my_cdc_slot', output_plugin='pgoutput')

# Start consuming changes
cur.start_replication(
    slot_name='my_cdc_slot',
    options={'publication_names': 'my_publication'}
)

def consume(msg):
    # msg.payload contains the logical change record
    # Parse it and send to Elasticsearch, Kafka, data warehouse, etc.
    print(f"Change at LSN {msg.data_start}: {msg.payload}")
    msg.cursor.send_feedback(flush_lsn=msg.data_start)

cur.consume_stream(consume)
The CDC ecosystem. Debezium (open source, backed by Red Hat) is the most popular CDC tool. It reads the logical replication log from PostgreSQL, MySQL, MongoDB, SQL Server, and others, and publishes changes to Kafka topics. From there, Kafka consumers can: update a search index (Elasticsearch), populate a cache (Redis), sync to a data warehouse (Snowflake), feed a real-time analytics dashboard, or trigger application-level events. The replication log becomes the single source of truth, and all derived systems are eventually consistent views of it.
Replication Log Methods Compared

The same UPDATE flowing through all four replication methods. See how each represents the change differently.

Design decision: You need to replicate data from PostgreSQL to Elasticsearch for full-text search. Which replication log method would you use, and why?

Chapter 4: Replication Lag Problems

Leader-follower replication works great for read-heavy workloads: send reads to followers, writes to the leader, and scale out your read capacity by adding followers. But followers are always at least slightly behind the leader. This delay is replication lag.

If you stop writing, the followers eventually catch up. All replicas converge to the same state. This guarantee is called eventual consistency — a deliberately vague promise. "Eventually" might mean 1 millisecond or 10 seconds, depending on network conditions, follower load, and the volume of writes.

The word "eventually" hides three specific anomalies that will bite you in production. Each one has caused real outages at real companies.

Anomaly 1: Read-Your-Own-Writes

A user submits a comment on a blog post. Your application writes it to the leader. The user's browser immediately refreshes to show the page. This read request is load-balanced to a follower that hasn't yet received the new comment. The user sees the page without their comment. They click "submit" again, creating a duplicate. Or worse, they think the system is broken and leave.

Solutions:

Anomaly 2: Monotonic Reads

A user refreshes a page and sees 10 comments. They refresh again and see only 9 — the 10th comment disappeared. They refresh a third time and it's back. What happened? Their first read hit Follower-A (caught up), their second read hit Follower-B (lagging), and their third read hit Follower-A again.

Seeing data go backward in time is disorienting and feels like a bug. The guarantee we want is monotonic reads: once you've seen data at a certain point in time, you should never subsequently see older data.

Solution: Sticky sessions. Hash the user ID to a consistent follower. The same user always reads from the same follower. Their view of the data only moves forward. If that follower dies, they're reassigned to another, but since they start fresh, it feels like a reload rather than data disappearing.

Anomaly 3: Consistent Prefix Reads

Two users have a conversation:

// Partition A (stores User-1's messages):
User-1: "How much is the item?"

// Partition B (stores User-2's messages):
User-2: "It's $42."

A third user observes this conversation. But Partition B replicates faster than Partition A. The observer sees "It's $42" arrive before "How much is the item?" — the answer appears before the question. Causal ordering is violated.

This is the consistent prefix reads problem. It only occurs when data is spread across multiple partitions that replicate independently. If everything is on one partition, the replication log preserves order.

Solution: Ensure causally related writes go to the same partition (e.g., all messages in a conversation go to the same partition). Or use a global ordering mechanism like a logical timestamp.

Measuring Replication Lag in Production

You can't fix what you can't measure. Here's how to monitor lag in the major databases:

python
# Monitoring replication lag programmatically
import psycopg2
import time

def check_pg_replication_lag(leader_conn, follower_conn):
    """Check lag between PostgreSQL leader and follower."""
    # Method 1: Query pg_stat_replication on the leader
    with leader_conn.cursor() as cur:
        cur.execute("""
            SELECT client_addr,
                   pg_wal_lsn_diff(sent_lsn, replay_lsn) AS lag_bytes,
                   EXTRACT(EPOCH FROM replay_lag) AS lag_seconds
            FROM pg_stat_replication
        """)
        for row in cur.fetchall():
            addr, lag_bytes, lag_sec = row
            if lag_sec > 5.0:
                alert(f"Follower {addr}: {lag_sec:.1f}s behind!")

    # Method 2: Write a canary value and measure time-to-appear
    canary = str(time.time())
    with leader_conn.cursor() as cur:
        cur.execute("UPDATE canary SET val = %s", (canary,))
        leader_conn.commit()
        t0 = time.time()

    # Poll follower until canary appears
    while True:
        with follower_conn.cursor() as cur:
            cur.execute("SELECT val FROM canary")
            if cur.fetchone()[0] == canary:
                lag = time.time() - t0
                print(f"Measured lag: {lag*1000:.0f}ms")
                break
        time.sleep(0.01)

The "Replication Window" — How Long Is Dangerous?

Lag DurationTypical CauseImpactAction
< 100msNormal async operationMost apps won't noticeMonitor, no action needed
100ms - 1sFollower under load, minor network issueRead-your-own-writes failures for fast UIsImplement sticky sessions
1s - 10sLarge transaction, follower disk I/O saturatedVisible stale data, customer complaintsRoute time-sensitive reads to leader
10s - 60sSchema migration replaying, vacuum, batch import on leaderSignificant data staleness, monotonic reads violations widespreadConsider pausing non-critical reads from followers
> 60sFollower network partition, disk failure, bugFollower is essentially a different databaseRemove from rotation, investigate immediately
Replication Lag Anomalies

Three anomalies caused by replication lag. Select one and watch it happen.

Anomaly
Select an anomaly and click "Play" to see it in action.
Debug scenario: Users report that after updating their profile picture, they sometimes see the old picture when they reload the page. The bug is intermittent. Your setup: PostgreSQL leader with 4 read replicas behind a round-robin load balancer. What is the most likely cause and the minimal fix?

Chapter 5: Multi-Leader Replication

Leader-follower replication has a fundamental limitation: every write must go through the single leader. If the leader is in Virginia and your user is in Tokyo, every write incurs a trans-Pacific round trip. If the leader goes down, writes stop until failover completes.

Multi-leader replication (also called master-master or active-active) allows more than one node to accept writes. Each leader processes writes locally and replicates changes to the other leaders asynchronously.

Use Cases

Multi-datacenter operation. One leader per datacenter. Users write to their local datacenter's leader. Replication between datacenters happens in the background. Each datacenter can operate independently if the network between datacenters goes down.

Offline-capable clients. Your calendar app, notes app, or document editor needs to work without internet. Each device is effectively a "leader" — it accepts writes locally and syncs when connectivity is restored. CouchDB was designed for exactly this model.

Collaborative editing. Google Docs lets multiple users edit the same document simultaneously. Each user's keystrokes are local writes that must be merged. This is multi-leader replication at the keystroke level.

The BIG Problem: Write Conflicts

Two users simultaneously edit the same row from different datacenters:

// Leader-Virginia receives at t=0:
UPDATE users SET name = 'Alice Smith' WHERE id = 42

// Leader-Frankfurt receives at t=0:
UPDATE users SET name = 'Alice Jones' WHERE id = 42

// Both succeed locally. When they exchange changes... conflict!
// What should user 42's name be?

In single-leader replication, this can't happen — all writes go through one node, which serializes them. In multi-leader replication, conflicts are inevitable.

Conflict Resolution Strategies

StrategyHow it worksTrade-off
Last-Write-Wins (LWW)Attach a timestamp to each write. When two writes conflict, keep the one with the higher timestamp. Discard the other.Simple but lossy. One write silently disappears. If clocks are skewed (and they always are), "last" is arbitrary. Cassandra uses this by default.
Merge valuesConcatenate or union the conflicting values. E.g., if one leader sets title="A" and another sets title="B", store "A/B" and let the application resolve later.No data loss, but the merged value may be nonsensical. Works for sets (add items to a shopping cart) but not scalars.
On-write handlerCall a custom conflict resolution function when the conflict is detected during replication.Flexible, but runs in the database layer, which limits what logic you can use. Bucardo uses this.
On-read resolutionStore all conflicting values. When a client reads, present all versions and let the application (or user) pick.CouchDB does this. The application must handle conflicts, which is often forgotten.
CRDTsConflict-free Replicated Data Types. Mathematical data structures that automatically merge without conflicts. E.g., a G-Counter (grow-only counter) can be safely merged by taking the max of each node's count.Only works for specific data types. Can't express arbitrary business logic. Riak uses these.
LWW is more dangerous than it looks. Consider: User-A creates a customer record at t=100. User-B (on a different leader) deletes it at t=99 because their clock is 1 second behind. With LWW, the creation "wins" because 100 > 99. The delete is silently dropped. Now the system has a customer record that someone explicitly deleted. If the clocks were perfectly synced, the delete would win. Your data integrity depends on clock accuracy — a terrifying proposition in a distributed system where NTP can drift by hundreds of milliseconds.

Worked Example: Google Docs-Style Conflict Resolution

To understand why collaborative editing needs special treatment, consider this scenario. Two users are editing the string "HELLO" simultaneously:

// Initial state: "HELLO"
// Positions: 01234

// User A: insert "X" at position 1 → "HXELLO"
// User B: delete position 4 ("O") → "HELL"

// If we apply A first, then B:
"HELLO" → insert X@1 → "HXELLO" → delete @4 → "HXELO"
// Position 4 is now "L", not "O". We deleted the wrong character!

// Operational Transformation (OT) fixes this by TRANSFORMING operations:
// When A inserted at position 1, all positions after 1 shift by +1.
// So B's "delete @4" becomes "delete @5" (the original "O" is now at 5).
"HELLO" → insert X@1 → "HXELLO" → delete @5 → "HXELL"
// Correct! Both edits are preserved.

// The transform function: T(op_B, op_A) adjusts B given A happened first.
// If A inserted before B's position: B.position += 1
// If A deleted before B's position: B.position -= 1
// This is the core of OT. It gets much more complex with real text.

Multi-Leader Topology

How do the leaders exchange changes? Three options:

Circular topology: Each leader sends its changes to the next in a ring. If one node fails, the ring breaks. MySQL supports this.

Star topology: One designated root leader forwards changes to all others. Simpler routing, but the root is a single point of failure.

All-to-all topology: Every leader sends to every other leader. Most fault-tolerant but requires more network connections. PostgreSQL BDR uses this.

Worked Example: Multi-Leader Conflict in an E-Commerce System

You run an e-commerce site with leaders in Virginia (VA) and Frankfurt (EU). A customer has a shipping address on record.

// Initial state (both leaders agree):
customer_42.address = "123 Main St, NYC"

// t=0: Customer logs in from NYC, updates address via VA leader:
VA Leader: UPDATE customer SET address="456 Oak Ave, NYC" WHERE id=42
timestamp = 1705312000.001
VA_LSN = 8001

// t=0: Customer support in Berlin updates address via EU leader
// (customer called to correct a typo):
EU Leader: UPDATE customer SET address="123 Main Street, NYC" WHERE id=42
timestamp = 1705312000.003
EU_LSN = 5501

// t=150ms: VA replicates to EU. EU sees conflict:
EU already has: "123 Main Street, NYC" (ts=.003)
VA sends: "456 Oak Ave, NYC" (ts=.001)

// Resolution depends on strategy:
LWW (by timestamp): KEEP "123 Main Street, NYC" (ts=.003 > .001)
Problem: Customer's NEW address "456 Oak Ave" is silently lost!
The customer moved, but the DB kept the old address.

LWW (by replica ID): If VA has higher priority, KEEP "456 Oak Ave"
Problem: The typo fix from support is lost.

On-read resolution: Store BOTH. Next time customer views profile,
show: "We found conflicting addresses. Which is correct?"
This is the safest but requires UI support.

CRDT: The Mathematical Solution

For specific data types, Conflict-free Replicated Data Types (CRDTs) can automatically merge concurrent updates without any data loss. The most common CRDT is the G-Counter (grow-only counter):

python
class GCounter:
    """Grow-only counter CRDT. Each replica has its own counter.
    Global count = sum of all replicas' counters.
    Merge = component-wise maximum."""

    def __init__(self, replica_id, n_replicas):
        self.replica_id = replica_id
        self.counts = [0] * n_replicas

    def increment(self):
        # Only increment OUR counter
        self.counts[self.replica_id] += 1

    def value(self):
        return sum(self.counts)

    def merge(self, other):
        # Component-wise max — NEVER loses an increment
        for i in range(len(self.counts)):
            self.counts[i] = max(self.counts[i], other.counts[i])

# Demo: two replicas count page views independently
va = GCounter(replica_id=0, n_replicas=2)
eu = GCounter(replica_id=1, n_replicas=2)

# VA gets 5 page views, EU gets 3
for _ in range(5): va.increment()
for _ in range(3): eu.increment()

print(va.counts)  # [5, 0]
print(eu.counts)  # [0, 3]

# Merge: no conflicts, no data loss
va.merge(eu)
eu.merge(va)
print(va.value())  # 8 — correct!
print(eu.value())  # 8 — both replicas agree
Why CRDTs can't solve everything. G-Counters work because addition is commutative and associative (order doesn't matter). But "set address to X" is NOT commutative — the order matters. There is no CRDT for arbitrary string overwrites. CRDTs work for: counters, sets (add/remove), flags (enable/disable), registers (with LWW semantics), and certain text editing operations (via sequence CRDTs). For complex business logic like "update shipping address," you need application-level conflict resolution.
Multi-Leader Write Conflict

Two leaders accept conflicting writes simultaneously. Watch the conflict detection and see different resolution strategies.

Resolution
Click "Trigger Conflict" to see two leaders receive conflicting writes.
Design question: You're building a collaborative note-taking app (like Notion). Two users edit the same paragraph simultaneously from different devices. Which conflict resolution strategy would you choose, and why?

Chapter 6: Leaderless Replication

What if we got rid of the leader entirely? No single node is special. Any replica can accept reads AND writes. The client sends each write to multiple replicas in parallel. This is leaderless replication, pioneered by Amazon's Dynamo paper (2007) and used by Cassandra, Riak, and Voldemort.

Quorum Reads and Writes

With N replicas, the client sends each write to W replicas and each read to R replicas. The key insight:

If W + R > N, the read set and write set must overlap.

At least one of the R replicas you read from must have the latest write. This is a quorum.

Worked Example: The Quorum Math

// Setup: N=3 replicas, W=2, R=2
// W + R = 4 > 3 = N ✓ — quorum condition satisfied

// Step 1: Client writes value="Alice" to replicas A, B, C
// Write goes to W=2 replicas (say A and B succeed, C is slow)
Replica A: value = "Alice" (version 1) ✓
Replica B: value = "Alice" (version 1) ✓
Replica C: value = "Bob" (version 0, stale) ✗

// Step 2: Client reads from R=2 replicas (say B and C)
Replica B returns: "Alice" (version 1)
Replica C returns: "Bob" (version 0)

// Step 3: Client sees two different values. It picks the one with
// the highest version number: "Alice" (version 1). Correct!

// Why this works: with W=2 and R=2, out of N=3 replicas,
// at least 1 replica must be in BOTH the write set and read set.
// (Pigeonhole principle: 2+2=4 slots for 3 replicas → at least 1 overlap)

What if W + R ≤ N?

You sacrifice the quorum guarantee. Reads might miss the latest write entirely. But you gain availability: you can tolerate more node failures. Some applications choose W=1, R=1 (fastest possible, no consistency guarantee) for use cases where stale data is acceptable (e.g., analytics counters).

Common Quorum Configurations

NWRQuorum?Tolerate failuresTrade-off
322Yes (4>3)1 write failure, 1 read failureBalanced
331Yes (4>3)0 write failures, 2 read failuresFast reads, slow writes
313Yes (4>3)2 write failures, 0 read failuresFast writes, slow reads
533Yes (6>5)2 write failures, 2 read failuresHigh availability
311No (2≤3)2 of each, but may read staleMaximum speed, no consistency

Repairing Stale Replicas

Even with quorums, replicas can diverge. Two mechanisms bring them back in sync:

Read repair: When a client reads from multiple replicas and discovers that one has stale data, it writes the fresh value back to the stale replica. Repairs happen on every read, but only for data that's actually being read. Cold data (rarely read) can remain stale forever.

Anti-entropy process: A background daemon constantly compares replicas and copies missing data. Unlike read repair, this catches all stale data eventually, not just data being read. But it adds background I/O.

Sloppy Quorums and Hinted Handoff

What happens when more than N-W nodes are down? Strict quorum writes fail. But if your priority is availability over consistency, you can use a sloppy quorum: accept writes on nodes that aren't the "home" replicas for this key. When the home nodes come back, the data is handed off to them. This is hinted handoff.

Sloppy quorums increase availability but weaken consistency — a read quorum might now miss a write that was stored on a "wrong" node.

Worked Example: Quorum Write and Read Repair

Let's trace a complete quorum write and read with N=3, W=2, R=2, showing exactly how read repair works.

// Setup: 3 replicas (A, B, C). Key "user:42" currently has value "Bob" v1 on all.

// Step 1: Client writes "Alice" v2 to W=2 replicas.
// Coordinator sends write to A, B, C in parallel.
// A: receives, writes. ACK. (1 ACK)
// B: receives, writes. ACK. (2 ACKs — W=2 met!)
// C: network delay, write arrives late.

Replica A: user:42 = "Alice" (v2) ✓ written
Replica B: user:42 = "Alice" (v2) ✓ written
Replica C: user:42 = "Bob" (v1) ✗ stale

// Step 2: Coordinator returns success to client (W=2 met).
// Shortly after, C finally receives the write and applies it.
// But what if C had crashed before receiving it?

// Step 3: Another client reads key "user:42" with R=2.
// Coordinator sends read to A and C (randomly chosen).
// A returns: "Alice" (v2)
// C returns: "Bob" (v1) ← stale!

// Step 4: Client picks the highest-version value: "Alice" (v2). Correct!

// Step 5: READ REPAIR — client writes "Alice" (v2) back to C.
Replica C: user:42 = "Alice" (v2) ✓ repaired!

// All replicas now agree. Read repair fixed the inconsistency.

The Limits of Quorums

Even with W + R > N, quorums do NOT guarantee linearizability (the strongest consistency model). Here's why:

ScenarioWhy quorum fails
Sloppy quorumWrites go to "wrong" nodes during network partition. The W nodes that ACK'd are not in the R set for the next read.
Concurrent writesTwo writes arrive at the same time. Different replicas might process them in different orders. No single "latest" value.
Write + read concurrencyA write is in progress. Some replicas have v2, others still have v1. R replicas might all return v1 if the write hasn't propagated to R nodes yet.
Failed write rollbackWrite reaches 1 of W=2 needed replicas, then coordinator crashes. One replica has v2, two have v1. A read with R=2 might miss v2.
Quorums give you probabilistic consistency, not guaranteed consistency. If you need strong consistency (linearizability), you need a consensus algorithm like Raft or Paxos, which adds coordination overhead. Dynamo-style quorums are designed for availability and partition tolerance, accepting weaker consistency.
Interactive Quorum Calculator

Set N, W, and R. Watch which replica combinations guarantee a consistent read and which might miss the latest write.

N (replicas) 3
W (write to) 2
R (read from) 2

Leaderless in the Wild: Cassandra Configuration

Here's what leaderless replication looks like in practice with Apache Cassandra:

sql
-- Create a keyspace with replication factor 3
CREATE KEYSPACE my_app
WITH replication = {
    'class': 'NetworkTopologyStrategy',
    'us-east': 3,     -- 3 replicas in US-East
    'eu-west': 3      -- 3 replicas in EU-West
};

-- Write with LOCAL_QUORUM (majority in the local datacenter)
-- This means W=2 out of 3 local replicas, plus async to remote DC
INSERT INTO users (id, name, email)
VALUES (uuid(), 'Alice', 'alice@example.com')
USING CONSISTENCY LOCAL_QUORUM;

-- Read with LOCAL_QUORUM
-- Reads from 2 out of 3 local replicas
-- Guarantees seeing local writes, but NOT writes from other DC
SELECT * FROM users WHERE id = ?
USING CONSISTENCY LOCAL_QUORUM;
LOCAL_QUORUM vs QUORUM. QUORUM requires a majority across ALL replicas globally (4 out of 6 in the example above). This means cross-datacenter round trips for every read and write. LOCAL_QUORUM only requires a majority in the local datacenter (2 out of 3). Much faster, but a write in US-East might not yet be visible to a read in EU-West. Choose LOCAL_QUORUM for performance, QUORUM for global consistency.

Why Leaderless Databases Are Popular for Time-Series and IoT

Leaderless replication shines for append-heavy workloads where writes are far more frequent than reads, and eventual consistency is acceptable. Consider an IoT sensor network:

// 10,000 sensors, each reporting every second:
Write load: 10,000 writes/sec
Read load: ~100 queries/sec (dashboards, alerts)
Write:Read ratio = 100:1

// With leader-follower (PostgreSQL):
// All 10K writes/sec hit the single leader.
// The leader is the bottleneck.
// Adding followers helps reads, NOT writes.

// With leaderless (Cassandra, N=3, W=2, R=1):
// Writes are distributed across ALL nodes.
// Each node handles ~10K/num_nodes writes/sec as coordinator,
// plus ~10K * (3/num_nodes) writes/sec as a replica.
// Adding nodes scales writes linearly.
// R=1 reads are fast (any one replica responds).
// Stale reads are acceptable — 2-second-old sensor data is fine.

This is why Cassandra dominates in IoT, time-series, and event logging. The data model is append-only (sensor readings are never updated), conflicts don't apply (each timestamp is unique), and eventual consistency is fine (dashboards can be a few seconds behind).

Consistent Hashing: How Leaderless Systems Assign Data to Replicas

In a leaderless system, how does the coordinator know WHICH N replicas should store a given key? The answer is consistent hashing. Each replica is assigned a position on a hash ring. Each key is hashed to a position on the same ring. The N replicas responsible for a key are the first N nodes clockwise from the key's position.

// Hash ring with 5 nodes (A through E), N=3:

A(hash=10) B(hash=30) C(hash=55)
D(hash=75) E(hash=90)

// Key "user:42" hashes to position 40.
// Walk clockwise: first 3 nodes are C(55), D(75), E(90).
// "user:42" is stored on replicas C, D, and E.

// Key "user:99" hashes to position 80.
// Walk clockwise: E(90), A(10), B(30).
// "user:99" is stored on replicas E, A, and B.

// If node C dies, "user:42" is now on D, E, and A (next clockwise).
// Only keys that were on C need to move. Other keys are unaffected.
// This is why it's called "consistent" — minimal redistribution.
Quorum math: You have N=5 replicas. You want writes to be fast (low W) but reads MUST always return the latest value. What is the minimum R you need if W=2?

Chapter 7: Detecting Concurrent Writes

This is where replication gets truly interesting. Two clients write to the same key at roughly the same time. Neither knows about the other's write. In distributed systems terminology, these writes are concurrent — there is no causal relationship between them (neither "happened before" the other).

How do you even detect that two writes are concurrent, as opposed to one happening after the other?

The Happens-Before Relationship

Two events A and B have one of three relationships:

RelationshipMeaningHow to detect
A → BA happened before B. B knew about A.B's version vector includes A's version or higher.
B → AB happened before A.A's version vector includes B's version or higher.
A || BConcurrent. Neither knew about the other.Neither version vector dominates the other.

You cannot use wall-clock timestamps to determine this ordering. Clocks are unreliable in distributed systems (NTP drift, leap seconds, VM clock pauses). Instead, we use version vectors.

Version Vectors: The Algorithm

Each replica maintains a version number for each key. When a client writes, it includes the version numbers it last read. The server can then tell whether the new write supersedes, is superseded by, or is concurrent with existing data.

// System: 2 replicas (A, B), 2 clients (X, Y), key="cart"

// Step 1: Client X writes "milk" to Replica A
Replica A: cart = ["milk"], version = {A:1, B:0}
Client X receives: context = {A:1, B:0}

// Step 2: Client Y writes "eggs" to Replica B (doesn't know about "milk")
Replica B: cart = ["eggs"], version = {A:0, B:1}
Client Y receives: context = {A:0, B:1}

// Step 3: Replica B receives the "milk" write via replication
Compare versions: {A:1, B:0} vs {A:0, B:1}
Neither dominates → CONCURRENT! Store both as siblings:
Replica B: cart = [["milk"], ["eggs"]], version = {A:1, B:1}

// Step 4: Client X reads and sees siblings. It merges: ["milk", "eggs"]
Client X writes merged value with context {A:1, B:1}:
cart = ["milk", "eggs"], version = {A:2, B:1}

// Now both replicas converge to ["milk", "eggs"]. Conflict resolved!

The Shopping Cart Example (from the Dynamo Paper)

Amazon's Dynamo paper used a shopping cart as the motivating example. When two concurrent writes add different items to the same cart, the system stores both versions (siblings). When the client next reads, it receives all siblings and must merge them. For a shopping cart, merging means taking the union of all items.

But what about removing items? If one sibling has {milk, eggs, butter} and another has {milk, eggs} (butter was removed), taking the union puts butter back. The removed item resurrects. This is the "resurrection bug."

The fix: don't remove items, mark them as deleted with a tombstone. The merge function is: union all items, then remove items that have a tombstone in any sibling.

Version Vector Implementation

python
class VersionVector:
    def __init__(self):
        self.clock = {}  # {replica_id: version_number}

    def increment(self, replica_id):
        self.clock[replica_id] = self.clock.get(replica_id, 0) + 1

    def dominates(self, other):
        """Does self happen-after other?"""
        # Every entry in other must be <= corresponding entry in self
        # AND at least one must be strictly less
        all_keys = set(self.clock) | set(other.clock)
        at_least_one_greater = False
        for k in all_keys:
            s = self.clock.get(k, 0)
            o = other.clock.get(k, 0)
            if s < o:
                return False  # other has a higher version somewhere
            if s > o:
                at_least_one_greater = True
        return at_least_one_greater

    def concurrent_with(self, other):
        """Neither dominates the other."""
        return not self.dominates(other) and not other.dominates(self)

    def merge(self, other):
        """Take component-wise max."""
        merged = VersionVector()
        all_keys = set(self.clock) | set(other.clock)
        for k in all_keys:
            merged.clock[k] = max(self.clock.get(k, 0),
                                  other.clock.get(k, 0))
        return merged

    def __repr__(self):
        return str(self.clock)

# Demo: detecting concurrent writes
vv_a = VersionVector()
vv_a.increment("A")         # {A:1}
vv_b = VersionVector()
vv_b.increment("B")         # {B:1}

print(vv_a.concurrent_with(vv_b))  # True — neither dominates

merged = vv_a.merge(vv_b)         # {A:1, B:1}
merged.increment("A")             # {A:2, B:1}
print(merged.dominates(vv_a))     # True — merged happened after vv_a
print(merged.dominates(vv_b))     # True — merged happened after vv_b

Complete Leaderless Key-Value Store with Version Vectors

Here's a complete implementation that ties version vectors to actual data storage — the same pattern used by Riak internally:

python
class DynamoStyleStore:
    """A single replica in a Dynamo-style leaderless system."""

    def __init__(self, replica_id):
        self.id = replica_id
        # key -> list of (value, VersionVector) pairs
        # Multiple entries = siblings (concurrent writes)
        self.data = {}

    def put(self, key, value, context=None):
        """Write a value. context = version vector from prior read."""
        if context is None:
            context = VersionVector()

        # Increment this replica's version
        new_vv = VersionVector()
        new_vv.clock = dict(context.clock)
        new_vv.increment(self.id)

        if key not in self.data:
            self.data[key] = [(value, new_vv)]
            return new_vv

        # Remove entries that the new write supersedes
        surviving = []
        for (v, vv) in self.data[key]:
            if not new_vv.dominates(vv):
                surviving.append((v, vv))  # keep as sibling
        surviving.append((value, new_vv))
        self.data[key] = surviving
        return new_vv

    def get(self, key):
        """Read all values (may return siblings)."""
        if key not in self.data:
            return [], VersionVector()
        entries = self.data[key]
        merged_vv = VersionVector()
        for (v, vv) in entries:
            merged_vv = merged_vv.merge(vv)
        values = [v for (v, vv) in entries]
        return values, merged_vv

# Demo: the shopping cart scenario
store_A = DynamoStyleStore("A")
store_B = DynamoStyleStore("B")

# Client X writes "milk" to store A
ctx1 = store_A.put("cart", ["milk"])
# store_A: cart = [(["milk"], {A:1})]

# Client Y writes "eggs" to store B (concurrent!)
ctx2 = store_B.put("cart", ["eggs"])
# store_B: cart = [(["eggs"], {B:1})]

# Anti-entropy: sync A's data to B.
# B sees {A:1} vs {B:1} — neither dominates → sibling!
# store_B: cart = [(["milk"],{A:1}), (["eggs"],{B:1})]

# Client reads from B, merges siblings
vals, ctx = store_B.get("cart")
# vals = [["milk"], ["eggs"]], ctx = {A:1, B:1}
merged_cart = list(set(item for v in vals for item in v))
# merged_cart = ["milk", "eggs"]

# Client writes merged value back with the merged context
store_B.put("cart", merged_cart, context=ctx)
# store_B: cart = [(["milk","eggs"], {A:1, B:2})]
# Sibling resolved! {A:1,B:2} dominates both {A:1} and {B:1}.

Hand-Worked Exercise: Version Vector Comparison

Practice determining the relationship between version vectors. For each pair, decide: does A dominate B? Does B dominate A? Or are they concurrent?

// Exercise 1:
A = {X:3, Y:2, Z:1}
B = {X:2, Y:2, Z:1}
// A dominates B: A.X > B.X (3>2), A.Y = B.Y (2=2), A.Z = B.Z (1=1)
// A has at least one component strictly greater, and no component less.
// Result: A → B (A happened after B)

// Exercise 2:
A = {X:3, Y:2}
B = {X:2, Y:4}
// A.X > B.X (3>2), but A.Y < B.Y (2<4)
// Neither dominates the other.
// Result: A || B (concurrent — must resolve conflict)

// Exercise 3:
A = {X:1, Y:1}
B = {X:1, Y:1, Z:1}
// A.X = B.X, A.Y = B.Y, but A has no Z (treat as 0), B.Z = 1 > 0
// B dominates A.
// Result: B → A (B happened after A; a new replica Z has joined)

// Exercise 4:
A = {X:5, Y:3, Z:2}
B = {X:5, Y:3, Z:2}
// All components equal. Neither dominates (need at least one strictly greater).
// But they're not concurrent either — they're the SAME version.
// Result: A = B (identical state, no conflict)

The Sibling Merge Problem: Deletes and Tombstones

Siblings (concurrent values stored together) must eventually be merged. For a shopping cart:

// Sibling 1 (from client X): cart = {milk, eggs, butter}
// Sibling 2 (from client Y): cart = {milk, eggs} ← Y removed butter

// Naive union merge: {milk, eggs, butter} ← butter is RESURRECTED!
// Y explicitly removed butter, but the merge brought it back.

// Fix: use tombstones (mark deletions)
// Sibling 2 becomes: cart = {milk, eggs, butter:TOMBSTONE}
// Merge: union items, remove anything with a tombstone in ANY sibling
// Result: {milk, eggs} ← correct!
python
def merge_shopping_carts(siblings):
    """Merge concurrent shopping cart siblings using tombstones.
    Each sibling is a dict: {item: True/False} where False = tombstone."""
    all_items = set()
    tombstoned = set()

    for sibling in siblings:
        for item, alive in sibling.items():
            if alive:
                all_items.add(item)
            else:
                tombstoned.add(item)

    # Result: all items that appear alive in ANY sibling,
    # minus items tombstoned in ANY sibling.
    return all_items - tombstoned

# Example:
sibling_1 = {"milk": True, "eggs": True, "butter": True}
sibling_2 = {"milk": True, "eggs": True, "butter": False}  # tombstone

result = merge_shopping_carts([sibling_1, sibling_2])
print(result)  # {'milk', 'eggs'} — butter correctly removed
Version vectors vs. vector clocks. A vector clock tracks one version number per client. A version vector tracks one version number per replica. In practice, Dynamo-style databases use version vectors (per replica) because the number of replicas is bounded (N is fixed), while the number of clients can be unbounded. The terms are often conflated, but the distinction matters for memory usage: a vector clock that grows with the number of clients can become enormous.
Version Vector Simulation

Two clients send concurrent writes to a leaderless cluster. Watch the version vectors evolve, detect concurrent writes, and see sibling resolution.

Click "Next Step" to walk through a concurrent write scenario with version vectors.

Last-Write-Wins: When It Works and When It Kills

LWW is the simplest conflict resolution strategy: attach a timestamp to each write, keep the highest. Many engineers default to it because it's easy to implement. But it has a fundamental problem: it silently discards writes. Here's a complete analysis:

ScenarioLWW behaviorIs this acceptable?
Cache warmingTwo servers populate the same cache key. Latest wins.Yes — both values are equivalent, losing one is fine.
Sensor readingsTwo sensors report the same metric. Latest reading wins.Yes — we want the freshest reading.
User profile updatesUser updates name from two devices. One edit is silently lost.No — user explicitly made both changes.
Shopping cart additionsUser adds item on phone and laptop. One item silently disappears.No — both additions were intentional.
Bank balance transfersTwo transfers happen concurrently. One is silently lost.Absolutely not — money disappears.
The rule: LWW is safe ONLY when: (1) all concurrent writes to the same key produce equivalent values (idempotent), OR (2) losing a concurrent write has no business impact. For anything involving user-facing mutations, counters, or financial data, LWW is unacceptable. Use version vectors + sibling merging instead.

The Happens-Before Partial Order: Visualizing Causality

Happens-before is a partial order, not a total order. In a total order, every pair of events can be compared (like numbers on a number line). In a partial order, some pairs are incomparable — that's what "concurrent" means. Think of it like a family tree: your parents are your ancestors (they "happened before" you), but two cousins are neither ancestor nor descendant of each other — they're concurrent.

// A partial order of events across 3 nodes:

// Node A: a1 ——→ a2 ——→ a3 ——→ a4
// ↗ ↘
// Node B: b1 ——→ b2 ——→ b3 ——→ b4 ——→ b5
// ↗ ↗
// Node C: c1 ——→ c2 ——————→ c3 ——→ c4

// Arrows represent causality (messages between nodes).
// a1 → a2 → b2 → a3: causal chain, a1 happened before a3.
// b3 and c2: no causal path between them → CONCURRENT.
// This is the partial order that version vectors capture.
Version vector exercise: Replica A has version vector {A:3, B:2}. Replica B has {A:2, B:4}. Are these concurrent, or did one happen before the other?

Chapter 8: Replication in Practice

Theory is clean. Production is messy. Let's look at how real databases implement replication and what trade-offs they've made.

PostgreSQL: Leader-Follower with WAL Shipping

PostgreSQL uses physical streaming replication by default. The leader streams its WAL (Write-Ahead Log) to followers, which replay the exact same disk operations.

PropertyPostgreSQL
TopologyLeader-follower (1 primary, N standbys)
Log formatPhysical WAL (tied to storage engine). Also supports logical replication (pgoutput) since v10.
Sync modeConfigurable: async (default), sync, or remote_apply (sync + applied to standby memory)
FailoverManual (pg_ctl promote) or automated via Patroni/pg_auto_failover
Read from follower?Yes (hot standby). But physical replication lag can cause canceled queries on standbys.
GotchaPhysical WAL is version-locked. Rolling upgrades require logical replication or dump/restore.

MySQL: Leader-Follower with Binlog

PropertyMySQL (InnoDB)
TopologyLeader-follower. Also supports Group Replication (multi-leader with Paxos).
Log formatBinlog: statement-based (legacy), row-based (default since 5.7), or mixed.
Sync modeSemi-synchronous plugin available (wait for at least 1 replica ACK).
FailoverMySQL Orchestrator, MHA, or built-in Group Replication with automatic failover.
Read from follower?Yes. ProxySQL or MySQL Router handle routing.
GotchaGTID (Global Transaction ID) is critical for reliable failover. Without it, finding the correct binlog position after failover is a nightmare.

MongoDB: Replica Sets with Raft-like Election

PropertyMongoDB
TopologyReplica sets (1 primary, N-1 secondaries, optional arbiters).
Log formatOplog (operations log): logical format (similar to row-based binlog).
Sync modeConfigurable write concern: w=1 (leader only), w=majority, w=N.
FailoverAutomatic. Built-in Raft-inspired election protocol. Election takes 2-12 seconds.
Read from follower?Yes, with read preference: primary, primaryPreferred, secondary, nearest.
GotchaElection during network partitions can cause rollback of unreplicated writes on the old primary.

Cassandra: Leaderless with Tunable Quorums

PropertyCassandra
TopologyLeaderless (Dynamo-style). Any node can coordinate reads/writes.
Log formatMutations sent to replicas, each with a timestamp. Merged by LWW per column.
Sync modeTunable consistency: ONE, QUORUM, LOCAL_QUORUM, ALL.
FailoverNo leader to fail over. Nodes join/leave the ring via gossip protocol.
Read from follower?No leaders/followers distinction. All replicas serve reads and writes.
GotchaLWW conflict resolution means concurrent writes silently lose data. Deletes can resurrect (tombstone expiry). Counter columns need special handling.

How to Choose Between PostgreSQL and Cassandra: A Decision Framework

This is one of the most common system design decisions. Here's the structured way to think about it:

// Decision tree:

// Q1: Do you need strong consistency (ACID transactions)?
// YES → PostgreSQL (or CockroachDB for multi-region).
// NO → Continue.

// Q2: Is your write throughput > 50K writes/sec?
// YES → Cassandra (horizontal write scaling).
// NO → PostgreSQL handles this fine with leader-follower.

// Q3: Do you need multi-region writes (each region writes locally)?
// YES → Cassandra (leaderless) or CockroachDB (Raft).
// NO → PostgreSQL with regional followers.

// Q4: Is your data model primarily key-value or wide-column?
// YES → Cassandra (built for this).
// NO (relational joins, complex queries) → PostgreSQL.

// Q5: Can you tolerate eventual consistency?
// YES → Cassandra with tunable consistency.
// NO → PostgreSQL with synchronous replication.

In practice, many systems use BOTH: PostgreSQL for transactional data (user accounts, orders, payments) and Cassandra for high-volume event data (clickstreams, sensor readings, logs). CDC pipes changes from PostgreSQL to Cassandra for full-text search or analytics.

Real Incident: Cassandra Tombstone Storms

A less-discussed but dangerous failure mode in Cassandra: tombstone accumulation. When you delete a row in Cassandra, it doesn't physically remove the data. Instead, it writes a tombstone — a marker that says "this row was deleted." Tombstones are necessary because the delete must propagate to all replicas, and without a tombstone, a replica that missed the delete would "resurrect" the row on the next read repair.

The problem: tombstones accumulate until Cassandra runs a compaction (merging SSTables). If your workload involves heavy deletes (e.g., a TTL-based cache, or a queue that dequeues by deleting), you can accumulate millions of tombstones. When a read query scans a range, it must read through all tombstones to find live rows. A query that should take 5ms now takes 30 seconds because it's scanning 2 million tombstones.

// The tombstone death spiral:
1. Application deletes 1M rows in partition "hot-key".
2. 1M tombstones are written (fast — just append to SSTable).
3. Client reads partition "hot-key".
4. Cassandra scans ALL 1M tombstones to find live rows.
5. Read takes 30 seconds. Client times out.
6. Client retries. Multiple retries pile up.
7. Cassandra node runs out of heap memory processing tombstones.
8. Node goes down. Other nodes get more load. Cascade.

// Prevention:
// - Set gc_grace_seconds appropriately (default 864000 = 10 days).
// - Run compaction regularly (nodetool compact).
// - Don't use Cassandra as a queue (delete-heavy workload).
// - Monitor tombstone_warn_threshold and tombstone_failure_threshold.

CockroachDB: Raft-Based, Strongly Consistent

PropertyCockroachDB
TopologyRanges (key ranges) each replicated via Raft consensus. Leader per range (not per cluster).
Log formatRaft log entries (logical operations).
Sync modeSynchronous by design (Raft requires majority ACK for commit).
FailoverAutomatic Raft leader election per range. Sub-second for each range.
Read from follower?Follower reads available (with timestamp check for staleness bound).
GotchaCross-region writes incur multi-RTT latency (Raft requires majority across regions). Design tables with locality in mind.

Design Challenge: Global Chat App

"You're building a global chat app with users on every continent. Which replication topology do you choose?"

Considerations: Chat is write-heavy (every message is a write). Users expect < 100ms delivery. Messages have causal ordering within a conversation. Users sometimes go offline (mobile).

Strong answer: Per-conversation partitioning with leader-follower replication per partition. Each conversation is assigned to a datacenter based on where the majority of participants are. The leader for that conversation is in that datacenter. Followers in other regions serve reads for participants who are far away, accepting slightly delayed message delivery (replication lag). For offline users, queue messages locally and sync on reconnect (effectively a multi-leader model with conflict resolution based on message timestamps — which works because chat messages are append-only, not updateable). Cassandra-style leaderless would also work, using QUORUM writes and LOCAL_QUORUM reads, but you lose causal ordering unless you add logical timestamps.

System Design Deep Dive: Twitter's Timeline Replication

Twitter (now X) is a fascinating case study in replication strategy. The core challenge: when a user with 50 million followers tweets, that tweet must appear in 50 million timelines. How?

// Approach 1: Fan-out-on-read (pull model)
// When User opens their timeline:
// 1. Look up everyone they follow.
// 2. Query each followed user's tweet list.
// 3. Merge and sort by timestamp.
// Latency: O(num_following) × per-query-time
// If you follow 500 people: 500 queries per timeline load.
// Too slow for a product serving billions of timeline loads/day.

// Approach 2: Fan-out-on-write (push model)
// When a user tweets:
// 1. Look up all their followers.
// 2. Write the tweet to each follower's pre-computed timeline cache.
// Timeline read: just fetch the pre-computed list. O(1).
// But: a celebrity with 50M followers → 50M writes per tweet.
// At 100 bytes per timeline entry: 50M × 100B = 5 GB of writes.
// This is replication: each tweet is "replicated" to N follower timelines.

// Twitter's actual solution: HYBRID
// Regular users (< 10K followers): fan-out-on-write.
// Celebrities (> 10K followers): fan-out-on-read.
// When you load your timeline:
// 1. Fetch your pre-computed timeline (non-celebrity tweets).
// 2. Fetch recent tweets from celebrities you follow (separate query).
// 3. Merge.
// Best of both worlds: fast for most tweets, bounded for celebrities.

This is replication at the application level. The same concepts apply: fan-out-on-write is like synchronous replication (write is not "done" until all copies exist), fan-out-on-read is like lazy evaluation (compute the view on demand). The hybrid approach mirrors semi-synchronous replication: eagerly replicate to most targets, lazily handle the expensive ones.

Debug Scenarios

"Replication lag is growing steadily." Check: (1) Is the follower's disk I/O saturated? (long-running queries on the follower, vacuum, or compaction), (2) Is the network between leader and follower degraded? (packet loss, bandwidth saturation), (3) Is the write workload spiking? (large batch imports), (4) Is the follower CPU-bound on replay? (too many indexes to update per write).

"Reads return stale data intermittently." Check: (1) Are reads going to followers? (verify load balancer routing), (2) What's the current replication lag? (pg_stat_replication, SHOW SLAVE STATUS), (3) Is it always the same follower? (that follower might be unhealthy), (4) Is the application using connection pooling that routes some reads to followers? (PgBouncer config).

Testing Replication: Chaos Engineering

You can't trust your replication strategy until you've broken it deliberately. Here's how production teams validate their replication setup:

python
# Chaos engineering tests for replication
# Run these in staging, NOT production (unless you're Netflix)

class ReplicationChaosTests:

    def test_leader_failure(self):
        """Kill the leader and verify failover."""
        # 1. Write a canary value to the leader
        canary = f"chaos-test-{uuid.uuid4()}"
        leader.write("canary", canary)

        # 2. Wait for replication
        time.sleep(2)

        # 3. Kill the leader
        leader.kill()

        # 4. Wait for failover
        time.sleep(60)  # max expected failover time

        # 5. Verify: can we still read the canary?
        result = get_new_leader().read("canary")
        assert result == canary, "DATA LOSS during failover!"

        # 6. Verify: can we write?
        get_new_leader().write("post-failover", "alive")
        print("✓ Leader failover: PASS")

    def test_follower_failure(self):
        """Kill a follower and verify reads continue."""
        follower_1.kill()
        time.sleep(5)
        # Reads should still work (from remaining followers)
        result = read_from_cluster("any-key")
        assert result is not None
        print("✓ Follower failure: reads continue")

    def test_network_partition(self):
        """Simulate network partition between leader and followers."""
        # Use iptables or tc to block traffic
        block_traffic(leader, followers)

        # Leader should still accept writes
        leader.write("during-partition", "test")

        # Followers should serve (stale) reads
        # But NOT serve writes (no split brain)

        # Heal partition
        restore_traffic(leader, followers)
        time.sleep(10)

        # Followers should catch up
        result = follower_1.read("during-partition")
        assert result == "test"
        print("✓ Network partition recovery: PASS")

    def test_replication_lag_under_load(self):
        """Write heavily and measure lag increase."""
        initial_lag = measure_lag()
        # Write 100K rows rapidly
        for i in range(100000):
            leader.write(f"load-test-{i}", "x" * 1000)
        post_load_lag = measure_lag()
        print(f"Lag increase under load: {initial_lag}ms → {post_load_lag}ms")
        assert post_load_lag < 30000, "Lag exceeded 30s threshold!"
FoundationDB's approach: deterministic simulation testing. Instead of running chaos tests against real hardware (slow, flaky, expensive), FoundationDB simulates the entire distributed system in a single process. They inject every possible failure — network partitions, disk errors, clock skew, process crashes — in a deterministic, reproducible way. They've simulated the equivalent of millions of machine-hours of failures. This is why FoundationDB is considered one of the most reliable distributed databases ever built. Apple uses it as the metadata store for iCloud.

Full Design Walkthrough: Multi-Region E-Commerce

An interviewer says: "You're building an e-commerce platform with users in North America, Europe, and Asia. Design the database replication strategy."

Step 1: Classify the data
Not all data needs the same replication strategy. Product catalog (read-heavy, rarely updated) is different from shopping carts (write-heavy, per-user) which is different from order records (must be strongly consistent).
Step 2: Assign strategies per data type
Product catalog: Leader-follower. Leader in US-East, followers in EU and Asia. Read from nearest. Updates are infrequent (catalog team in US). Tolerate 5-10s lag for price changes.

Shopping carts: Leaderless (Dynamo-style). Carts are per-user, rarely conflict. Use W=2, R=1 for fast writes. If cart items conflict (rare), merge by union.

Orders: Leader-follower with sync replication. Orders must NOT be lost. Leader in the user's home region. Sync replicate to one follower for durability. Accept higher write latency (50-100ms) for durability guarantee.

User sessions: Leaderless with W=1, R=1. Speed over consistency. Session loss = user re-logs-in, not a catastrophe.
Step 3: Handle cross-region writes
For orders: user in Tokyo writes to a Tokyo leader. But order verification needs inventory data from the US-East product catalog leader. Use two-phase approach: (1) reserve inventory synchronously via RPC to US-East, (2) create order locally in Tokyo. If US-East is unreachable, queue the order with a timeout.
Step 4: Monitoring
Dashboard showing: replication lag per follower (alert if >5s), write throughput per leader, read distribution across replicas, failover events per week. On-call runbook for common scenarios: lag spike, follower crash, leader crash.

This answer demonstrates staff-level thinking: different strategies for different data, clear trade-off reasoning, concrete numbers, and operational awareness.

Replication Anti-Patterns

Common mistakes that cause production incidents:

Anti-patternWhy it seems reasonableWhy it failsWhat to do instead
Running heavy analytics queries on followers"Followers have the same data, might as well use them"Long-running queries hold row locks, slowing WAL replay. Replication lag spikes. Hot standby conflict cancels the query.Use a dedicated analytics replica with max_standby_streaming_delay = infinity, or use a separate OLAP system (ClickHouse, BigQuery)
Too many followers on one leader"More replicas = more read capacity"Each follower needs a replication connection and CPU on the leader to read and send WAL. Past 10-15 followers, the leader spends more time replicating than serving writes.Use cascading replication: leader → 2-3 followers → each feeds 2-3 more. Or use a CDC pipeline to fan out.
Synchronous replication across regions"We need zero data loss globally"Cross-region RTT is 50-200ms. Every write now takes 50-200ms minimum. Write throughput collapses.Synchronous within the same region (same AZ or nearby AZ). Asynchronous across regions. Accept region-level RPO.
No monitoring of replication lag"Replication just works, right?"Lag drifts unnoticed. Then a failover promotes a follower that's 30 minutes behind. 30 minutes of data lost.Alert if lag exceeds 10 seconds. Page if lag exceeds 60 seconds. Dashboard with per-follower lag.

Capacity Planning: How Many Replicas?

// Given:
Read throughput needed: 200,000 reads/sec
Write throughput: 5,000 writes/sec
Single node capacity: 50,000 reads/sec (at target latency)
Single node capacity: 10,000 writes/sec

// Leader handles all writes + some reads:
Leader write load: 5,000 / 10,000 = 50% of capacity used for writes
Leader remaining read capacity: 50,000 × (1 - 0.5) = 25,000 reads/sec

// Remaining reads to distribute: 200,000 - 25,000 = 175,000 reads/sec
// Each follower handles: ~50,000 reads/sec (no write overhead)
// But each follower also replays writes: 5,000 writes/sec of replay
// Replay overhead: ~5,000 / 10,000 = 50% → real read capacity: ~25,000/follower

Followers needed: 175,000 / 25,000 = 7 followers
Total nodes: 1 leader + 7 followers = 8 nodes

// Add 1 for headroom + failover spare = 9 nodes
// Note: this assumes uniform load balancing across followers.
// In practice, hot keys (celebrity profiles, viral posts) create skew.
Replication Strategy Comparison

Compare the five databases across key replication dimensions.

Design question: You're migrating from a single-region PostgreSQL to a multi-region setup. You need strong consistency (no stale reads), automatic failover, and SQL compatibility. Your current replication lag tolerance is zero. Which system is the best fit?

Chapter 9: Interview Arsenal

Everything you've learned, condensed into the frameworks and talking points that win system design interviews.

The Replication Decision Tree

Do you need multi-region writes?
If no → leader-follower (simplest, most battle-tested). If yes → continue.
Can you tolerate write conflicts?
If yes → multi-leader or leaderless. If no → consensus-based (Raft/Paxos), accept higher write latency.
Is your data model conflict-friendly?
Append-only (logs, events) → leaderless with LWW is fine. Mutable rows → need CRDTs or manual resolution. Collaborative text → OT or CRDT.
What's your read consistency requirement?
Eventual → any async replication. Read-your-writes → sticky sessions or read-from-leader. Strong → synchronous replication or consensus.

Cheat Sheet: All Strategies at a Glance

StrategyWrites go toConflict?ConsistencyFailoverBest for
Leader-Follower1 leaderNo (serialized)Eventual (async) or strong (sync)Manual or automated electionMost OLTP workloads
Multi-LeaderN leadersYes — must resolveEventualEach DC independentMulti-DC, offline clients
LeaderlessW of N replicasYes — siblings/LWWTunable (quorum)No leader to failHigh availability, immutable data
Consensus (Raft)Elected leader per shardNo (serialized by Raft)Strong (linearizable)Automatic, sub-secondStrong consistency requirements

System Design Talking Points

"Design a multi-region database for a social network."

"Design a collaborative document editor."

Coding Drill: Quorum Read/Write

python
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

class Replica:
    def __init__(self, id):
        self.id = id
        self.store = {}  # key -> (value, version)

    def write(self, key, value, version):
        self.store[key] = (value, version)
        return True

    def read(self, key):
        return self.store.get(key, (None, 0))

class QuorumClient:
    def __init__(self, replicas, w, r):
        self.replicas = replicas  # list of Replica
        self.N = len(replicas)
        self.W = w
        self.R = r
        self.version = 0

    def quorum_write(self, key, value):
        self.version += 1
        # Send to all, wait for W ACKs
        acks = 0
        for replica in self.replicas:
            if replica.write(key, value, self.version):
                acks += 1
            if acks >= self.W:
                return True  # quorum met
        return False  # not enough ACKs

    def quorum_read(self, key):
        # Read from R replicas, return highest version
        results = []
        chosen = random.sample(self.replicas, self.R)
        for replica in chosen:
            val, ver = replica.read(key)
            results.append((val, ver, replica))

        # Pick the value with highest version
        best = max(results, key=lambda x: x[1])

        # Read repair: update stale replicas
        for val, ver, replica in results:
            if ver < best[1]:
                replica.write(key, best[0], best[1])

        return best[0]

# Demo
replicas = [Replica(i) for i in range(3)]
client = QuorumClient(replicas, w=2, r=2)
client.quorum_write("user:42", "Alice")
print(client.quorum_read("user:42"))  # "Alice"

System Design Template: Replication Section

In every system design interview, when you reach the database/storage section, use this framework:

// 1. Classify your data by consistency requirement:
// Strong consistency → Raft/Paxos (CockroachDB, Spanner)
// Read-your-writes → Leader-follower with LSN tracking
// Eventual consistency → Leaderless or async leader-follower

// 2. State your replication topology:
// Single region → 1 leader + 2 followers (3-node, 2 AZs)
// Multi-region → 1 leader per region (multi-leader) OR
// 1 global leader + regional followers OR
// leaderless with tunable quorums

// 3. Quantify your RPO/RTO:
// RPO (Recovery Point Objective) = max data loss tolerable
// Sync replication: RPO = 0
// Async replication: RPO = replication lag (typ. 1-10s)
// RTO (Recovery Time Objective) = max downtime tolerable
// Manual failover: RTO = 5-30 minutes
// Automated failover: RTO = 10-60 seconds

// 4. Address the failure modes:
// What happens if the leader dies?
// What happens if a follower dies?
// What happens if the network partitions?
// What happens during a rolling upgrade?

Common Mistakes in System Design Interviews

MistakeWhy it's wrongWhat to say instead
"We'll use master-master replication for high availability"Multi-leader creates write conflicts that most interviewers expect you to address"We'll use leader-follower with automated failover. Multi-leader only if we need multi-region writes, and then we'll discuss conflict resolution strategy."
"We'll replicate everything synchronously for safety"Sync replication to 3+ followers blocks on the slowest one"Semi-synchronous: 1 sync follower for durability, rest async for read scalability."
"We'll use Cassandra because it's distributed"Cassandra uses LWW by default, which silently loses data on conflicts"Cassandra is a good fit for append-only data like event logs. For mutable data needing strong consistency, we'd use PostgreSQL or CockroachDB."
Not mentioning replication lagThe interviewer knows this is the core challenge"With async replication, we'll have 1-5 seconds of lag. For user-facing reads, we'll use sticky sessions and read-your-own-writes routing."

Debug Scenario Drills

SymptomLikely causeInvestigation
Replication lag growing linearlyWrite volume exceeds follower replay speedCheck follower CPU, disk IOPS. Consider parallel apply (MySQL 5.7+, PG 15+).
Intermittent stale readsRead-your-own-writes violationVerify read routing. Add sticky sessions or read-from-leader after writes.
Data appears then disappearsMonotonic reads violation (different followers)Add session affinity. Check load balancer health checks.
Duplicate key errors after failoverAutoincrement ID collisionFence old leader. Advance sequence. Switch to UUIDs.
Split-brain: both nodes accepting writesFencing failure during failoverImmediate: shut down one node. Long-term: STONITH + Raft.

Coding Drill: Implementing Read-Your-Own-Writes

python
import time
from typing import Optional

class ReplicationAwareRouter:
    """Routes reads to leader or follower based on
    read-your-own-writes consistency requirement."""

    def __init__(self, leader, followers):
        self.leader = leader
        self.followers = followers
        # Track last write LSN per user
        self.user_write_lsn = {}  # user_id -> (lsn, timestamp)
        self.consistency_window = 5.0  # seconds

    def record_write(self, user_id, lsn):
        """Called after every write. Records the LSN for routing."""
        self.user_write_lsn[user_id] = (lsn, time.time())

    def route_read(self, user_id, key) -> "Connection":
        """Route a read to the best replica for this user."""
        entry = self.user_write_lsn.get(user_id)

        if entry:
            required_lsn, write_time = entry
            # If write was recent, ensure we read fresh data
            if time.time() - write_time < self.consistency_window:
                # Try to find a follower that's caught up
                for f in self.followers:
                    if f.current_lsn() >= required_lsn:
                        return f  # This follower has the user's write
                # No follower is caught up — fall back to leader
                return self.leader

        # No recent write — any follower is fine
        return random.choice(self.followers)

Coding Drill: Failover Health Check

python
import threading

class FailoverMonitor:
    """Monitors leader health and triggers failover."""

    def __init__(self, leader, followers, heartbeat_interval=1.0,
                 timeout=10.0, max_lag_for_promotion=1000):
        self.leader = leader
        self.followers = followers
        self.heartbeat_interval = heartbeat_interval
        self.timeout = timeout
        self.max_lag = max_lag_for_promotion  # max bytes behind
        self.last_heartbeat = time.time()
        self.is_leader_alive = True

    def check_heartbeat(self):
        try:
            self.leader.ping()
            self.last_heartbeat = time.time()
            self.is_leader_alive = True
        except ConnectionError:
            elapsed = time.time() - self.last_heartbeat
            if elapsed > self.timeout:
                self.trigger_failover()

    def trigger_failover(self):
        # 1. Fence old leader (STONITH)
        self.leader.fence()  # Prevent split-brain

        # 2. Find best candidate (most up-to-date, within lag limit)
        candidates = [
            (f, f.replication_lag_bytes())
            for f in self.followers
            if f.replication_lag_bytes() <= self.max_lag
        ]
        if not candidates:
            raise RuntimeError("No eligible follower for promotion!")

        # Pick the one with least lag
        new_leader, lag = min(candidates, key=lambda x: x[1])

        # 3. Promote
        new_leader.promote_to_leader()

        # 4. Reconfigure remaining followers
        for f in self.followers:
            if f != new_leader:
                f.set_replication_source(new_leader)

        print(f"Failover complete. New leader: {new_leader.id}, lag was {lag} bytes")

Recommended Reading

Paper/ResourceKey contribution
Dynamo: Amazon's Key-Value Store (2007)Leaderless replication, quorums, vector clocks, consistent hashing
Raft: In Search of an Understandable Consensus Algorithm (2014)Leader election + log replication made understandable
CAP Theorem (Brewer, 2000)You can have at most 2 of: Consistency, Availability, Partition tolerance
PACELC (Abadi, 2012)Extension of CAP: even without partitions, you trade latency vs consistency
CRDTs: Comprehensive Study (Shapiro et al., 2011)Data structures that converge without coordination

Coding Drill: Implementing a Simple Replication Stream

python
import time, threading, queue

class ReplicationLog:
    """Write-ahead log used for replication."""

    def __init__(self):
        self.entries = []     # [(lsn, operation)]
        self.lsn_counter = 0
        self.lock = threading.Lock()

    def append(self, operation):
        with self.lock:
            self.lsn_counter += 1
            self.entries.append((self.lsn_counter, operation))
            return self.lsn_counter

    def get_entries_since(self, from_lsn):
        """Return all entries with LSN > from_lsn."""
        return [(lsn, op) for (lsn, op) in self.entries if lsn > from_lsn]


class Leader:
    """Leader node that accepts writes and streams to followers."""

    def __init__(self):
        self.data = {}
        self.wal = ReplicationLog()
        self.followers = []

    def write(self, key, value):
        # 1. Write to WAL
        lsn = self.wal.append({"op": "SET", "key": key, "value": value})
        # 2. Apply to local state
        self.data[key] = value
        # 3. Stream to followers (async)
        for f in self.followers:
            f.receive(lsn, {"op": "SET", "key": key, "value": value})
        return lsn

    def read(self, key):
        return self.data.get(key)


class Follower:
    """Follower node that replays changes from the leader."""

    def __init__(self, follower_id):
        self.id = follower_id
        self.data = {}
        self.applied_lsn = 0
        self.pending = queue.Queue()

    def receive(self, lsn, operation):
        """Receive a change from the leader (async)."""
        self.pending.put((lsn, operation))

    def apply_pending(self):
        """Apply pending changes (called by replay loop)."""
        while not self.pending.empty():
            lsn, op = self.pending.get()
            if lsn <= self.applied_lsn:
                continue  # Already applied (idempotent)
            if op["op"] == "SET":
                self.data[op["key"]] = op["value"]
            self.applied_lsn = lsn

    def read(self, key):
        self.apply_pending()  # Catch up before reading
        return self.data.get(key)

    def lag_behind(self, leader):
        """How many LSNs behind the leader?"""
        return leader.wal.lsn_counter - self.applied_lsn


# Demo: trace writes through the replication stream
leader = Leader()
f1 = Follower("follower-1")
f2 = Follower("follower-2")
leader.followers = [f1, f2]

leader.write("user:1", "Alice")    # LSN=1
leader.write("user:2", "Bob")      # LSN=2

# Before followers apply:
print(f1.lag_behind(leader))     # 2 (two writes pending)
print(f1.read("user:1"))         # "Alice" (apply_pending runs first)
print(f1.lag_behind(leader))     # 0 (caught up after read)

Coding Drill: Consistent Hashing for Replica Assignment

python
import hashlib

class ConsistentHashRing:
    """Assigns keys to N replicas using consistent hashing.
    Used by Cassandra, DynamoDB, and Riak to determine which
    nodes store a given key."""

    def __init__(self, nodes, virtual_nodes=150, replication_factor=3):
        self.ring = {}          # hash_position -> node_id
        self.sorted_keys = []   # sorted hash positions
        self.rf = replication_factor
        self.nodes = set(nodes)

        # Each physical node gets multiple positions on the ring
        # (virtual nodes) for better distribution
        for node in nodes:
            for i in range(virtual_nodes):
                key = f"{node}:{i}"
                h = self._hash(key)
                self.ring[h] = node
                self.sorted_keys.append(h)
        self.sorted_keys.sort()

    def _hash(self, key):
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def get_replicas(self, key):
        """Return the N replica nodes responsible for this key."""
        if not self.sorted_keys:
            return []

        h = self._hash(key)
        replicas = []
        seen_nodes = set()

        # Walk clockwise from the key's hash position
        idx = self._find_start(h)
        while len(replicas) < self.rf and len(seen_nodes) < len(self.nodes):
            pos = self.sorted_keys[idx % len(self.sorted_keys)]
            node = self.ring[pos]
            if node not in seen_nodes:
                replicas.append(node)
                seen_nodes.add(node)
            idx += 1
        return replicas

    def _find_start(self, h):
        """Binary search for first position >= h."""
        lo, hi = 0, len(self.sorted_keys) - 1
        while lo < hi:
            mid = (lo + hi) // 2
            if self.sorted_keys[mid] < h:
                lo = mid + 1
            else:
                hi = mid
        return lo

    def add_node(self, node):
        """Add a node — only keys assigned to this node need to move."""
        self.nodes.add(node)
        for i in range(150):
            key = f"{node}:{i}"
            h = self._hash(key)
            self.ring[h] = node
            self.sorted_keys.append(h)
        self.sorted_keys.sort()

# Demo
ring = ConsistentHashRing(["nodeA", "nodeB", "nodeC", "nodeD", "nodeE"])
print(ring.get_replicas("user:42"))   # e.g., ['nodeC', 'nodeA', 'nodeE']
print(ring.get_replicas("user:99"))   # e.g., ['nodeB', 'nodeD', 'nodeA']

# Add a new node — minimal key redistribution
ring.add_node("nodeF")
print(ring.get_replicas("user:42"))   # May or may not change

Interview Rapid-Fire: 10 Questions, 10 One-Line Answers

#QuestionOne-line answer
1Sync vs async replication?Sync: no data loss, slower writes. Async: fast writes, possible data loss on leader failure.
2What is replication lag?The delay between a write on the leader and that write being applied on a follower.
3What is split brain?Two nodes both believe they are the leader. Both accept writes. Data diverges.
4What is a quorum?W + R > N — the overlap between write set and read set guarantees reading fresh data.
5What is read repair?When a quorum read finds a stale replica, it writes the fresh value back to fix it.
6LWW stands for?Last-Write-Wins — resolve conflicts by keeping the write with the highest timestamp. Simple but lossy.
7What is a version vector?A per-replica counter that tracks causality — detects whether two writes are concurrent or ordered.
8What is STONITH?Shoot The Other Node In The Head — forcibly shut down the old leader to prevent split brain.
9What is CDC?Change Data Capture — streaming the replication log as events to downstream systems (search, cache, analytics).
10CAP theorem?During a network partition, choose Consistency (reject requests) or Availability (serve possibly stale data).

The Five Dimensions of Replication Knowledge

Every interview question about replication tests one of these dimensions. For each, here's a question you should be able to answer without hesitation:

DimensionQuestionKey points in your answer
CONCEPT"Explain the trade-off between sync and async replication."Sync: no data loss but blocks on slowest replica. Async: fast writes but data loss on leader failure. Semi-sync: best of both — one guaranteed replica, rest async.
DESIGN"Design a multi-region user profile service."Leader-follower per region. Writes to home region. Cross-region reads from local follower. Read-your-own-writes via sticky sessions or LSN tracking.
CODE"Implement quorum read/write with read repair."Write to W replicas. Read from R replicas. Compare versions. Write fresh value back to stale replicas.
DEBUG"Users see stale data after updating their profile."Read-your-own-writes violation. Check: which replica served the read? What's the replication lag? Fix: route post-write reads to leader for N seconds.
FRONTIER"What's the state of the art in conflict resolution?"CRDTs (Automerge, Yjs for text). OT still used by Google Docs. Raft consensus for strong consistency (CockroachDB, TiDB). Deterministic simulation testing (FoundationDB).
Staff-level question: Your team runs a leader-follower PostgreSQL cluster with 1 leader and 3 async followers. During peak hours, replication lag spikes to 30 seconds. The product team wants read-your-own-writes consistency. An engineer proposes "just read from the leader after every write." What are the problems with this approach, and what would you propose instead?

Chapter 10: Connections

Replication is one of three pillars of distributed data systems. Here's how it connects to the other DDIA chapters and the broader landscape.

Where Replication Fits

TopicRelationship to ReplicationDDIA Chapter
Partitioning (Sharding)Replication copies data across nodes. Partitioning splits data across nodes. Most systems use BOTH: each partition is replicated to N nodes.Chapter 6 (Partitioning)
TransactionsReplication gives you copies. Transactions give you ACID guarantees within those copies. Distributed transactions (2PC) coordinate writes across replicas.Chapter 7 (Transactions)
Consistency & ConsensusReplication creates the problem (multiple copies can diverge). Consensus algorithms (Raft, Paxos) are one solution: they ensure all replicas agree on the order of operations.Chapter 9 (Consistency and Consensus)
Batch & Stream ProcessingChange data capture (CDC) uses the replication log as a stream of events. Kafka consumers read the binlog/WAL to build derived datasets.Chapters 10-11 (Batch & Stream)

The Bigger Picture

Replication sits at the heart of the CAP/PACELC trade-off space:

CAP (Brewer, 2000): In the presence of a network Partition, you must choose between Consistency (every read returns the latest write) and Availability (every request receives a response). You can't have both. Leader-follower with sync replication chooses CP. Leaderless with sloppy quorums chooses AP.
PACELC (Abadi, 2012): Even when there is No Partition (normal operation), you still trade Latency vs Consistency. Synchronous replication gives consistency but adds latency (wait for replica ACK). Asynchronous replication gives low latency but sacrifices consistency (stale reads). This is the trade-off you navigate every day, not just during failures.

What This Lesson Did NOT Cover

TopicWhy it mattersWhere to learn it
Consensus algorithms (Raft, Paxos)How to make replicas agree on a single value. The foundation of strong consistency.DDIA Ch. 9, the Raft paper
Distributed transactions (2PC, Saga)How to coordinate writes across replicas atomically. Much harder than single-node transactions.DDIA Ch. 7, Ch. 9
CRDTs in depthMathematical structures for conflict-free merging. G-Counter, PN-Counter, LWW-Register, OR-Set.Shapiro et al. 2011 survey paper
Change Data CaptureUsing the replication log as an event stream for building derived systems.DDIA Ch. 11

The Replication Landscape: A Visual Summary

// The spectrum of replication strategies, ordered by consistency strength:

WEAKEST CONSISTENCY STRONGEST CONSISTENCY
|——————————————————————————————————|
| |
Async Leaderless Semi-sync Raft/Paxos Sync
single- W=1,R=1 leader- consensus leader-
leader (no quorum) follower (CockroachDB) follower
(Redis) (Cassandra) (PostgreSQL) (etcd, ZK) (rare)
| |
FASTEST WRITES SLOWEST WRITES
HIGHEST AVAILABILITY LOWEST AVAILABILITY
MOST DATA LOSS RISK ZERO DATA LOSS

Replication in Managed Cloud Services

If you're using a cloud provider, you don't configure replication from scratch — but you still need to understand the trade-offs behind the options:

ServiceReplication ModelWhat you configureHidden trade-off
AWS RDS (PostgreSQL/MySQL)Leader-follower. Multi-AZ = sync standby. Read replicas = async followers.Number of read replicas (up to 15). Multi-AZ on/off.Multi-AZ failover takes 60-120 seconds and can cause brief connection drops even with DNS updates.
AWS AuroraShared storage. 6-way replication across 3 AZs at the storage layer. Up to 15 read replicas.Number of readers. Reader endpoint for load balancing.Reader instances share storage but have independent buffer caches. After failover, the new writer starts with a cold cache.
Google Cloud SpannerPaxos consensus per split. Synchronous across zones/regions.Instance size. Regional vs multi-regional.Multi-regional Spanner has 300-400ms write latency (cross-continent Paxos). Regional is ~5ms. Huge cost difference.
Azure Cosmos DBConfigurable: strong, bounded staleness, session, consistent prefix, eventual.Consistency level. Regions. Conflict resolution policy.Strong consistency requires single-region writes. Multi-region writes only with weaker consistency levels.
MongoDB AtlasReplica sets. Configurable read/write concern.Cluster tier. Read preference. Write concern.w:majority adds ~2-5ms per write. Many developers use w:1 for speed and are surprised by data loss on failover.

The Cost of Replication

// Financial cost of different replication strategies (AWS, us-east-1, 2024):

// Single db.r6g.xlarge (4 vCPU, 32 GB): ~$0.50/hour = $365/month

// Leader + 2 async read replicas: $365 × 3 = $1,095/month
// + data transfer between replicas: ~$50/month
// Total: ~$1,145/month (3x single node)

// Multi-AZ (sync standby for HA): ~$730/month
// + 2 read replicas: $730 + $365 × 2 = $1,460/month

// Multi-region (leader in us-east, replica in eu-west):
// $365 × 2 + cross-region data transfer ($0.02/GB)
// At 100 GB/day replication: $365 × 2 + $60/month = $790/month

// Google Spanner multi-regional (for comparison):
// 1 node (2TB storage): ~$2,000/month
// The price of strong consistency across continents.

When NOT to Replicate

Replication is not always the answer. Consider these alternatives:

If your problem is...Replication helps?Better solution
Write throughput exceeds single nodeNo — all writes still go to the leaderPartitioning (sharding)
Data doesn't fit on one node (disk space)No — every replica stores a full copyPartitioning (sharding)
Protecting against accidental deletionNo — deletes replicate tooPoint-in-time backups, soft deletes
Complex cross-table queries are slowNo — followers have the same slow queriesRead-optimized derived views (materialized views, CQRS)
Need to serve data close to users globallyYes — this is exactly what replication is forMulti-region read replicas
Need to survive hardware failureYes — this is exactly what replication is forMulti-AZ or multi-region replication

How to Think About CAP in Practice

The CAP theorem is often misunderstood. Here's the nuanced version:

ScenarioWhat you chooseWhat you sacrificeReal system
No partition (normal operation)You can have BOTH consistency and availabilityLatency (PACELC trade-off)All systems in normal mode
Partition + choose CRefuse requests to partitioned nodesAvailability (some users get errors)CockroachDB, Spanner, etcd
Partition + choose AContinue serving from all nodesConsistency (stale reads, conflicts)Cassandra, DynamoDB, CouchDB

In practice, network partitions are rare (minutes per year in a well-run datacenter). So the PACELC extension is more useful day-to-day: even without partitions, you're constantly trading Latency vs. Consistency. Every time you choose async replication for speed, you're making a PACELC trade-off.

Key Takeaway

There is no "best" replication strategy. There are only trade-offs. Every system you build or evaluate sits somewhere in this space:

Strong consistency ←————————→ High availability
Low write latency ←————————→ Durability guarantee
Simple operations ←————————→ Rich conflict resolution
Single-region perf ←————————→ Multi-region presence

The engineer who understands WHERE a system sits in this space, and WHY it was designed that way, is the one who gets the staff-level offer.

The Local-First Movement: Replication Without Servers

A fascinating frontier in replication: what if there is no server at all? Local-first software runs entirely on the user's device, stores data locally, and uses CRDTs to sync between devices peer-to-peer.

PropertyCloud-first (traditional)Local-first
Primary copyServer (cloud database)User's device
Offline supportLimited or noneFull — local copy is authoritative
Sync mechanismServer mediates all writesCRDT merge between devices
Conflict resolutionServer decides (or rejects)Automatic CRDT merge — no conflicts by construction
LatencyNetwork RTT per operationZero (local read/write), eventual sync
PrivacyData stored on third-party serversData stays on user devices
ExamplesGoogle Docs, Notion, FigmaAutomerge, Yjs, Ink & Switch research

In this model, every user's device is effectively a "leader" in a multi-leader replication system. CRDTs ensure that all devices converge to the same state regardless of the order in which changes are applied. The challenge: CRDTs work well for text, counters, and sets, but poorly for structured data like relational tables or complex business objects.

Why this matters for system design interviews. If an interviewer asks "Design a collaborative document editor that works offline," the local-first approach with CRDTs is the state-of-the-art answer. It's more sophisticated than "use a server as the single source of truth" and demonstrates awareness of cutting-edge distributed systems research. Key names to drop: Martin Kleppmann (Automerge creator, also the author of DDIA), Ink & Switch (research lab), Yjs (popular CRDT library for text editing).

How Replication Evolved: A Brief History

EraSystemInnovation
1970sIBM System RFirst replicated databases. Manual failover. Statement-based replication.
1990sOracle RAC, MySQL ReplicationLeader-follower becomes standard. Async replication over TCP.
2007Amazon DynamoLeaderless replication, consistent hashing, vector clocks, quorums. Spawned Cassandra, Riak, Voldemort.
2012Google SpannerGlobally consistent replication using TrueTime (atomic clocks + GPS). First system to offer linearizability across continents.
2014Raft consensus (etcd, CockroachDB)Made Paxos understandable. Leader election + log replication in one algorithm. CockroachDB proves Raft can power SQL databases.
2018+Automerge, Yjs (CRDTs)Conflict-free replication for collaborative apps. Local-first software movement. No server needed for consistency.
2020+FoundationDB, TiDB, NeonDeterministic simulation testing. Disaggregated storage (compute replicas share storage). Serverless replication.
Closing thought. "A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable." — Leslie Lamport. Replication is our attempt to tame this reality. It never fully succeeds, but the alternatives — single nodes, manual backups, prayer — are worse.

What to Read Next

If this lesson gave you the conceptual foundation, here's the path to deeper mastery:

Next stepResourceWhat you'll learn
Implement Raft from scratchMIT 6.824 Lab 2 (free, online)Leader election, log replication, and safety proofs — by writing the code yourself.
Read the Dynamo paperDeCandia et al., SOSP 2007The original leaderless design. Every section is quotable in interviews.
Study FoundationDB"Testing Distributed Systems" (Will Wilson, Strange Loop talk)How deterministic simulation testing catches bugs that real-world testing misses.
Build a CDC pipelineDebezium tutorial + Kafka ConnectStream PostgreSQL changes to Elasticsearch. See logical replication in action.
Try CRDTsAutomerge library (JavaScript/Rust)Build a collaborative text editor with conflict-free merge. Local-first architecture.

One More Design Challenge

"Design the replication strategy for a global payments system."

This is arguably the hardest replication design challenge because financial data has the strictest consistency requirements:

// Requirements:
// - Zero data loss (RPO = 0)
// - Sub-5-second failover (RTO < 5s)
// - Strong consistency (no double-spending)
// - Multi-region for disaster recovery
// - < 100ms write latency for single-region operations

// Approach:
// 1. Use Raft-based consensus (CockroachDB or Google Spanner).
// Strong consistency without manual conflict resolution.

// 2. Place Raft replicas across 3 AZs in the primary region.
// Raft majority quorum = 2 of 3 AZs. Survives 1 AZ failure.
// Intra-region RTT: 1-3ms. Write latency: ~5-10ms. Acceptable.

// 3. Async replication to a disaster recovery region.
// "But you said RPO=0!" — yes, for AZ failures (covered by Raft).
// For entire-region failures (extremely rare), accept RPO = async lag.
// OR: 5-AZ Raft group across 2 regions. RPO=0 but writes take 50ms+.

// 4. For cross-region payments (user in EU paying US merchant):
// Debit EU account (local Raft commit, 5ms).
// Async message to US region for credit (eventual, 100ms-1s).
// Use saga pattern: if credit fails, compensating debit reversal.
// The transfer is "in progress" during the async window.

This answer demonstrates the key insight: even in a "zero data loss" system, you sometimes accept async replication for cross-region operations. The trick is designing the business logic to handle the async window gracefully (saga pattern, pending states, idempotent operations).

Final synthesis: Rank these four replication approaches from MOST to LEAST write latency (assuming replicas are in different regions): (A) Synchronous leader-follower, (B) Asynchronous leader-follower, (C) Leaderless with W=N, (D) Leaderless with W=1.