Designing Data-Intensive Applications — Chapter 7

Data Sharding

Hash partitioning, range partitioning, rebalancing — splitting data across machines.

Prerequisites: Hash functions + Key-value stores. That's it.
11
Chapters
9+
Simulations
5
Interview Dimensions

Chapter 0: The Problem

Your e-commerce database has grown to 10 TB. Your single PostgreSQL server has 2 TB of SSD storage and can handle 10,000 queries per second at peak. Today is Black Friday. Traffic spikes to 50,000 queries per second. The database falls over. Users see spinning wheels. Revenue flatlines.

You already know about replication from Chapter 6 — making copies of the same data on multiple machines. Replication helps with read throughput (spread reads across replicas) and fault tolerance (if one copy dies, others survive). But replication does not help with the storage problem. Each replica holds the full 10 TB. And replication does not help with write throughput — in a leader-based setup, all writes still go to one machine.

You need a fundamentally different strategy: instead of copying all the data everywhere, split the data so each machine holds only a fraction. Five machines, each holding 2 TB. Ten machines, each handling 5,000 queries per second. This is partitioning — also called sharding in MongoDB and MySQL, region in HBase, tablet in Bigtable, vBucket in Couchbase, and vnode in Cassandra. Different names, same idea: break a large dataset into smaller pieces and spread them across nodes.

The concept sounds simple. The devil is in the details. If you split badly, one machine gets all the traffic while the others idle. If you split by a key that changes over time, data migrates constantly. If a machine fails, you need to redistribute its data without taking the whole system offline. If a client wants to query across partition boundaries, you need to scatter the request to all machines and gather the results — slow and expensive.

Partitioning vs. Replication. These are orthogonal techniques. Partitioning splits data so each machine stores a subset. Replication copies data so each piece has multiple copies. Real systems use both: partition the data across 5 machines for capacity, and replicate each partition to 3 machines for fault tolerance. That's 15 machines total. Chapter 6 covered replication. This chapter covers partitioning. Chapter 8 covers how they interact with transactions.

What Goes Wrong Without Partitioning

Let's be precise about the bottlenecks a single machine hits:

BottleneckTypical limitWhat happens when exceeded
Storage capacity2-16 TB per machineDisk full. Writes fail. No more data.
Write throughput5K-50K writes/sec (SSD)Write latency climbs from ms to seconds. Timeouts.
Read throughput10K-100K reads/secCPU saturated. All queries slow down.
Memory (working set)64-512 GB RAMHot data spills to disk. Latency jumps 100x.

Replication alone can solve read throughput (add more read replicas) but cannot solve storage capacity or write throughput. Partitioning solves all four: each partition is small enough to fit on one machine, and writes are distributed across partitions.

The Central Goal: Even Distribution

The ideal partition scheme distributes data and query load evenly across nodes. If you have 10 nodes and each handles 10% of the data and 10% of the queries, you have achieved fair partitioning. If one node handles 50% of the queries while the other nine handle 5.5% each, that node is a hotspot. The system's throughput is limited by the hotspot — scaling up from 1 to 10 machines bought you only a 2x improvement instead of 10x.

Watch the simulation below. Five servers process incoming requests. First, they distribute evenly — each handles about 20% of traffic. Then a bad partition key causes a hotspot: one server gets crushed while others idle.

Load Distribution: Fair vs. Hotspot

Watch how requests distribute across 5 servers. Click "Hotspot" to see what happens with a bad partition key.

Running: even distribution

The overloaded server's latency climbs, its queue fills, and eventually it starts dropping requests. Meanwhile, the other four servers are barely working. This is the worst-case outcome of bad partitioning — you paid for 5 machines but got the performance of 1.5.

The partitioning problem, stated formally. Given N nodes and a set of keys K, assign each key k ∈ K to a node n(k) ∈ {1, ..., N} such that: (1) each node stores roughly |K|/N keys, (2) each node handles roughly 1/N of the query load, (3) adding or removing a node requires moving as few keys as possible, and (4) clients can efficiently determine n(k) without contacting all nodes. No scheme perfectly satisfies all four. The rest of this lesson explores the trade-offs.

A Concrete Worked Example

Let's make this tangible. You run an e-commerce platform. Here's your current situation:

// Current state: single PostgreSQL server
Storage: 8 TB used of 10 TB available
Write throughput: 40,000 writes/sec (near limit of ~50,000)
Read throughput: 80,000 reads/sec (handled by 3 read replicas)
Growth rate: 200 GB/week

// In 10 weeks: storage full. Writes fail. Game over.

// Partitioning plan: 5 partitions
Each partition: 8 TB / 5 = 1.6 TB (fits on one machine)
Each partition handles: 40K / 5 = 8K writes/sec (well within limits)
Each partition handles: 80K / 5 = 16K reads/sec (comfortable)
Growth per partition: 200 GB / 5 = 40 GB/week
Time until full: (10 TB - 1.6 TB) / 40 GB = 210 weeks = 4 years!

// And when you DO fill up? Add a 6th partition. No downtime.

The math is clear. Partitioning extends your runway from 10 weeks to 4 years, and when you eventually need more capacity, you add partitions rather than replacing the entire server.

The Terminology Zoo

Every database calls partitions something different. This is pure historical accident, but you will encounter all of these terms in interviews and documentation:

DatabaseName for "partition"Name for "partition key"
MongoDBShard / ChunkShard key
CassandraPartition (virtual node)Partition key
DynamoDBPartitionPartition key (hash key)
HBaseRegionRow key
BigTableTabletRow key
ElasticsearchShardDocument _id (or routing field)
KafkaPartitionMessage key
CockroachDBRangePrimary key prefix
MySQL (Vitess)ShardVindex / sharding key

Do not let the names confuse you. They all mean the same thing: a subset of the data that lives on one machine (or one group of replica machines).

Your database has 8 TB of data and handles 40K writes/sec. You add 3 read replicas (4 machines total). Which bottleneck have you NOT solved?

Chapter 1: Partitioning by Key Range

The simplest partitioning scheme is the one you already know from encyclopedias. Volume A-D gets one shelf, E-K gets another, L-R another, S-Z another. Each partition owns a continuous range of keys, and the ranges are chosen so that data distributes roughly evenly.

This is key range partitioning. Assign each partition a contiguous range of the key space. Given a key, you determine its partition by binary-searching through the range boundaries. Keys within each partition are kept in sorted order, which makes range queries extremely efficient — if you want all records with keys between "2024-01-01" and "2024-01-31", you find the partition(s) covering that range and do a single sequential scan within each.

How It Works, Step by Step

Say you have 1000 user IDs from 1 to 1000 and 4 partitions. The naive approach: partition 1 gets keys 1-250, partition 2 gets 251-500, partition 3 gets 501-750, partition 4 gets 751-1000. Boundaries are [250, 500, 750]. To find which partition owns key 637, binary search the boundaries: 637 > 500 and 637 ≤ 750, so it goes to partition 3.

// Range partitioning lookup
boundaries = [250, 500, 750] // 4 partitions, 3 boundaries
key = 637

// Binary search: O(log N) where N = number of partitions
637 > 250? yes → 637 > 500? yes → 637 > 750? no → partition 3

// Range query: "all keys 400-600"
400 > 250? yes → starts in partition 2
600 > 500? yes → 600 > 750? no → ends in partition 3
→ query partitions 2 and 3 only (not all 4)

The boundaries do not have to be evenly spaced. If keys 1-100 are accessed 10x more than keys 901-1000, you might set boundaries at [100, 300, 600] to balance load. Real systems like HBase and BigTable adapt boundaries dynamically — when a partition grows too large, it splits.

Who Uses Key Range Partitioning

SystemName for partitionKey range behavior
HBaseRegionAuto-splits regions when they exceed a configurable size (default 10 GB)
BigTableTabletAuto-splits tablets, merges small adjacent ones
MongoDBChunkRange-based sharding when you choose a range shard key
CockroachDBRange64 MB default range size, automatic splitting and merging

The Timestamp Hotspot Trap

Key range partitioning has a dangerous failure mode. Suppose you are storing IoT sensor readings, and your key is a timestamp. All new data has a timestamp of "right now." All writes go to the partition holding the most recent time range. The other partitions — holding historical data — receive zero writes. You have a write hotspot on a single partition.

This is not hypothetical. It is the #1 mistake people make when sharding by timestamp in HBase, and it shows up in nearly every systems design interview where candidates propose timestamp-based keys.

The fix: prefix the timestamp with something that distributes writes. If you have 100 sensors, use `sensor_id + timestamp` as the key. Now writes spread across partitions because sensor IDs vary. The cost: to query all readings for a given time range across all sensors, you now need to query 100 separate ranges (one per sensor prefix) and merge the results. There is no free lunch.

Adaptive Boundaries

In production systems, partition boundaries are not static. They adapt to the data distribution. Here's how HBase does it:

// HBase region splitting algorithm

Initial state: 1 region covers entire key space [−∞, +∞)
Region size limit: 10 GB (configurable via hbase.hregion.max.filesize)

Step 1: Data arrives. Region grows.
Region [−∞, +∞) size = 10.1 GB → SPLIT!

Step 2: HBase finds the midpoint of the region's data (not the key space).
Midpoint key = "user_5000" (the key that splits data 50/50 by bytes)

Step 3: Two new regions:
Region A: [−∞, "user_5000")
Region B: ["user_5000", +∞)

Step 4: Each region continues growing independently.
If Region B grows faster (more recent users), it splits again at 10 GB.
Region A stays small (historical users rarely updated).

// Result: more partitions where data is hot, fewer where it's cold.
// This is automatic, requires no manual intervention.

The beauty of adaptive splitting: the system automatically creates more partitions in "hot" regions of the key space. If 90% of writes land in keys starting with "2024-", that range will be split into many small partitions, while the "2020-" range remains a single large but rarely-accessed partition.

The downside: with a brand new table, you start with ONE region on ONE server. All writes hit that single server until the first split. This is the pre-splitting problem, and HBase's solution is to let you specify initial split points when creating a table: create 'events', {SPLITS => ['2024-04', '2024-07', '2024-10']}.

The key range paradox. Key range partitioning gives you efficient range queries (its strength) but makes you vulnerable to hotspots when keys have temporal or sequential patterns (its weakness). The more "scannable" your key space, the more likely you are to have all writes landing on the same partition. This paradox drives many systems toward hash partitioning — which we cover in the next chapter.
Key Range Partitioning

Add keys and watch them land in the correct partition. Toggle "Timestamp mode" to see the hotspot problem.

Mode: random keys (1-1000)

Worked Example: Sensor Data Partitioning

You have 4 partitions and receive sensor readings keyed by timestamp:

// Bad key: timestamp only
Partition boundaries: [2024-04-01, 2024-07-01, 2024-10-01]
P1: Jan-Mar 2024 | P2: Apr-Jun 2024 | P3: Jul-Sep 2024 | P4: Oct-Dec 2024

Today = Nov 15, 2024. ALL writes → P4. Hotspot!

// Fixed key: sensor_id + timestamp
Key = "sensor_042_2024-11-15T10:30:00"
Partition boundaries based on sensor_id ranges:
P1: sensor_001-025 | P2: sensor_026-050 | P3: sensor_051-075 | P4: sensor_076-100

sensor_042 → P2. sensor_091 → P4. Writes spread evenly!
Cost: "all readings at 10:30 AM" requires querying all 4 partitions.

Real-World Key Ranges: MongoDB Shard Key Choice

MongoDB's documentation has an excellent framework for evaluating shard keys. A good shard key has three properties, which MongoDB calls the "CAM" criteria:

PropertyMeaningBad exampleGood example
CardinalityNumber of distinct valuescontinent (only 7 values = max 7 chunks)user_id (millions of values)
Access frequencyEven distribution of read/write loadtimestamp (all writes hit latest range)hash(user_id) (writes spread evenly)
MonotonicityDoes the key always increase?Auto-increment ID (always increasing = always last chunk)UUID v4 (random = no monotonicity)

The ideal shard key scores well on all three: high cardinality, even access frequency, and non-monotonic. In practice, you often sacrifice one. The most common sacrifice is access frequency — you accept that some keys are queried more than others and handle hotspots at the application layer.

Compound Shard Keys in MongoDB

MongoDB supports compound shard keys — using multiple fields together. This can resolve the monotonicity problem without losing query efficiency:

// Bad: shard key = {timestamp: 1}
All new documents go to the chunk with the highest timestamp range.
Write hotspot on one shard. Other shards idle.

// Better: shard key = {region: 1, timestamp: 1}
Compound key. MongoDB routes by the FIRST field, then uses the second for ordering.
If you have 50 regions, writes spread across ~50 chunks at any given time.
Range query "all events in region=US between 10am-11am" hits only US chunks.

// Even better (MongoDB 5.0+): shard key = {region: "hashed", timestamp: 1}
Hash the first field for even distribution. Sort by timestamp within each hash bucket.
Combines the best of hash and range partitioning — exactly like Cassandra's compound key.
You're designing a chat application sharded by key range. Your key is the message_id, which is an auto-incrementing integer. What happens and how do you fix it?

Chapter 2: Partitioning by Hash

Key range partitioning's hotspot problem arises because keys with similar values cluster in the same partition. The fix is brutally simple: destroy the ordering. Run every key through a hash function that maps it to a seemingly random number, then partition by that hash value. Keys that were sequential ("user_001", "user_002", "user_003") now scatter across the hash space.

A good hash function for partitioning does not need to be cryptographically secure. It just needs to produce outputs that look uniformly distributed. MD5, MurmurHash, and xxHash are all common choices. The critical property: keys that are similar in value (like timestamps one second apart) must produce completely different hash values.

The Mechanics

// Hash partitioning: key → hash → partition
hash("user_001") = 0x7A3F → maps to value 31,295
hash("user_002") = 0xE812 → maps to value 59,410
hash("user_003") = 0x12C4 → maps to value 4,804

// With 4 partitions, simplest approach: hash mod N
31,295 mod 4 = 3 → partition 3
59,410 mod 4 = 2 → partition 2
4,804 mod 4 = 0 → partition 0

// Sequential keys scatter across partitions!
// But: range queries now require querying ALL partitions.

The trade-off is stark. Hash partitioning destroys key ordering. If you want "all users with IDs between 1000 and 2000", you cannot query a single partition — those IDs hash to scattered locations. You must send the query to all partitions and merge the results. This scatter-gather pattern is the price of even distribution.

Choosing a Hash Function

Not all hash functions are suitable for partitioning. The key requirement is uniformity — the hash should distribute keys as evenly as possible across the output range. It does NOT need to be cryptographic (resistant to adversarial attacks). Common choices:

Hash functionSpeedOutputUniformityUsed by
MD5Slow (crypto-grade)128 bitsExcellentCassandra (Murmur3 default, MD5 legacy)
MurmurHash3Very fast32 or 128 bitsExcellentCassandra, Elasticsearch
xxHashFastest32 or 64 bitsExcellentSome custom systems
Java hashCode()Fast32 bitsMediocreDO NOT USE (not uniform)
CRC32Fast32 bitsGood for error detection, mediocre for distributionKafka (for partition assignment)
Language built-in hashes are dangerous. Java's hashCode() and Python's hash() are designed for hash tables, not for partitioning. They may not be consistent across processes (Python randomizes hash seeds by default since 3.3). Always use a deterministic hash function like MurmurHash3 or MD5 for partitioning. Every process must compute the same hash for the same key — otherwise keys land on different partitions depending on which process routes them.

Hash Mod N: The Naive (Bad) Approach

The simplest hash partitioning is partition = hash(key) mod N. This works, but it has a fatal flaw: when N changes (you add or remove a node), almost every key changes partition. If you go from 4 to 5 nodes, roughly 80% of keys move. That is an enormous amount of data migration.

// hash mod N: disaster when N changes
hash("user_alice") = 42

N=4: 42 mod 4 = 2 → partition 2
N=5: 42 mod 5 = 2 → partition 2   (lucky, stays)

hash("user_bob") = 37
N=4: 37 mod 4 = 1 → partition 1
N=5: 37 mod 5 = 2 → partition 2   (moved!)

hash("user_carol") = 53
N=4: 53 mod 4 = 1 → partition 1
N=5: 53 mod 5 = 3 → partition 3   (moved!)

// On average, (N-1)/N = 75-80% of keys move when adding one node.

Consistent Hashing: The Better Way

Consistent hashing solves the rebalancing problem. The idea: arrange the hash space as a circle (a "ring") from 0 to 232-1. Place each node at a position on the ring (determined by hashing the node's name). To find which node owns a key, hash the key, then walk clockwise around the ring until you hit a node.

When you add a node, it takes over responsibility for the keys between itself and the next node counterclockwise. Only those keys move. When you remove a node, its keys transfer to the next node clockwise. In both cases, only ~1/N of the keys move — much better than the ~(N-1)/N of hash mod N.

// Consistent hashing: the ring
Ring: 0 ───────────────────────── 232-1 (wraps around)

Node A hashes to position 1000
Node B hashes to position 4000
Node C hashes to position 7000

Key "alice" hashes to 2500 → walk clockwise → hits B at 4000 → owned by B
Key "bob" hashes to 6100 → walk clockwise → hits C at 7000 → owned by C
Key "carol" hashes to 8500 → walk clockwise → wraps → hits A at 1000 → owned by A

// Add Node D at position 5500:
Keys between 4001 and 5500 move from C to D. Everything else stays.
Only ~25% of keys move (1/N with N=4).

In practice, consistent hashing with just N points on the ring leads to uneven distribution — nodes can end up with very different ranges. The solution is virtual nodes (vnodes): each physical node gets many positions on the ring (e.g., 256 vnodes per node). This smooths out the distribution. Cassandra, DynamoDB, and Riak all use virtual nodes.

Cassandra's Compound Key Trick

Cassandra offers an elegant compromise between hash and range partitioning. In Cassandra, a primary key has two parts: the partition key (hashed to determine the partition) and the clustering columns (used to sort data within the partition).

// Cassandra compound key example
PRIMARY KEY ((user_id), timestamp) // user_id is partition key, timestamp is clustering

// hash(user_id) determines the partition
// Within that partition, rows are sorted by timestamp

// Efficient: "all posts by user_42 in January"
// → hash(user_42) finds one partition, then range scan by timestamp

// Inefficient: "all posts by all users in January"
// → must query every partition (scatter-gather)

This gives you the best of both worlds for certain query patterns: hash distribution across partitions (no hotspots) plus sorted ordering within each partition (efficient range queries on the clustering column).

Consistent Hash Ring

Add keys to see them hash onto the ring and get assigned to nodes. Add/remove nodes to see minimal key movement.

3 nodes, 0 keys

How Many Keys Move? A Worked Example

Let's calculate exactly how consistent hashing compares to hash-mod-N when adding a node.

// Setup: 1000 keys, currently 4 nodes. Adding a 5th node.

// Hash Mod N:
Before: partition(key) = hash(key) mod 4
After: partition(key) = hash(key) mod 5

Key with hash=17: 17 mod 4 = 1, 17 mod 5 = 2 → MOVED
Key with hash=20: 20 mod 4 = 0, 20 mod 5 = 0 → stayed
Key with hash=33: 33 mod 4 = 1, 33 mod 5 = 3 → MOVED
Key with hash=40: 40 mod 4 = 0, 40 mod 5 = 0 → stayed
Key with hash=41: 41 mod 4 = 1, 41 mod 5 = 1 → stayed
Key with hash=42: 42 mod 4 = 2, 42 mod 5 = 2 → stayed
Key with hash=43: 43 mod 4 = 3, 43 mod 5 = 3 → stayed
Key with hash=44: 44 mod 4 = 0, 44 mod 5 = 4 → MOVED

// Expected keys moved: ~1000 * (1 - 1/5) = 800 keys (80%!)
// Formula: (N-1)/N fraction of keys move when going from N-1 to N nodes

// Consistent Hashing:
Ring has 4 nodes. New node D inserted between B and C.
Only keys in the arc from B to D (where D now sits) move from C to D.
Arc length ≈ 1/5 of ring → ~200 keys move.

// With 256 virtual nodes per physical node:
// Distribution is even smoother. Each key that moves
// goes from one of the existing nodes to the new node.
// Total keys moved ≈ 1000/5 = 200, but spread across many arcs.

Virtual Nodes: Why They Matter

With just N physical nodes on the ring, the arcs between them can be very uneven. One node might "own" 40% of the ring while another owns 10%. Virtual nodes fix this by placing each physical node at many positions on the ring.

// Without vnodes: 3 physical positions on ring
Node A at 0.10 → owns arc [0.78, 0.10] = 32% of ring
Node B at 0.45 → owns arc [0.10, 0.45] = 35% of ring
Node C at 0.78 → owns arc [0.45, 0.78] = 33% of ring
// Reasonable here, but with more nodes it gets worse.

// With 4 vnodes per node (real systems use 128-256):
Node A: positions [0.05, 0.28, 0.52, 0.83]
Node B: positions [0.12, 0.37, 0.61, 0.91]
Node C: positions [0.20, 0.44, 0.73, 0.97]
// 12 positions total. Each node owns ~4 small arcs.
// The arcs are smaller and more numerous, so the variance is lower.
// Standard deviation of load decreases as O(1/sqrt(vnodes)).

// Cassandra default: 256 vnodes per node. Load variance < 5%.

The trade-off: more vnodes means a larger ring metadata structure and more data movement when a node joins or leaves (because it takes over many small arcs from many nodes). Cassandra reduced its default from 256 to 16 vnodes in version 4.0, finding that 16 gives good enough balance with much less rebalancing overhead.

Range vs. Hash: the one-line summary. Key range partitioning gives you efficient range queries but risks hotspots on sequential keys. Hash partitioning gives you even distribution but destroys ordering (no efficient range queries). Cassandra's compound key is a hybrid: hash the partition key, sort within the partition by clustering columns. Choose based on your dominant query pattern.
You're running a consistent hash ring with 4 nodes and 1000 keys evenly distributed. You add a 5th node. Approximately how many keys move?

Chapter 3: Skewed Workloads and Hotspots

Hash partitioning distributes keys evenly. But it distributes load evenly only if all keys are accessed at similar rates. When a single key becomes extraordinarily popular — a celebrity's tweet goes viral, a product goes on sale, a breaking news article — the partition holding that key becomes a hotspot. And no amount of hashing can fix it, because all requests for the same key hash to the same partition.

This is fundamentally different from the key-range hotspot we saw in Chapter 1. That was a structural problem: sequential keys landing on the same partition because of their ordering. This is a workload problem: one key being so popular that its partition cannot handle the traffic, regardless of how cleverly you distributed keys.

Think of it this way: key-range hotspots are like a traffic jam caused by bad road design (all roads lead to one intersection). Hash-workload hotspots are like a traffic jam caused by a stadium concert — the roads are perfectly designed, but 50,000 people all want to go to the same place at the same time. No road redesign can fix that. You need parking lots (caches), shuttle buses (read replicas), or multiple entrances (key suffixing).

Quantifying the Hotspot Impact

Let's calculate exactly how much throughput you lose to a hotspot.

// System: 8 partitions, 10K requests/sec capacity each = 80K total capacity
// Normal workload: 80K req/sec, evenly distributed = 10K per partition. Perfect.

// Hotspot scenario: one key gets 40% of traffic
Total traffic: 80K req/sec
Hot partition load: 80K * 0.40 + 80K * 0.60/8 = 32K + 6K = 38K req/sec
Hot partition capacity: 10K req/sec
Excess: 28K req/sec → DROPPED or QUEUED

// Effective throughput:
Hot partition serves: 10K req/sec (maxed out)
Other 7 partitions serve: 7 * 6K = 42K req/sec (underloaded)
Total served: 52K of 80K = 65% throughput
35% of requests fail!

// Even worse: the hot partition's queue grows, increasing latency.
// At 38K arrival rate, 10K service rate: queue grows by 28K/sec.
// After 10 seconds: 280K requests queued. Average wait: 28 seconds.
// System is effectively down for 40% of users.

The Celebrity Problem

Imagine a social media platform. User @taylor has 200 million followers. She posts a photo. Within seconds, 2 million users try to load the post. The post's key is post_id = 9847283. It hashes to partition 7. Partition 7 now receives 2 million reads per second while partitions 1-6 and 8-10 are nearly idle. Partition 7's server crashes under load. The post becomes unavailable — and it's the one post that millions of people are trying to see.

Solutions (All Have Costs)

Solution 1: Random suffix. Append a random number (say 0-99) to the hot key before hashing. Instead of one key post_9847283, you now have 100 keys: post_9847283_00 through post_9847283_99. These hash to different partitions, spreading the load across up to 100 partitions.

// Without suffix: all reads hit one partition
hash("post_9847283") → partition 7    2M reads/sec on ONE node

// With 100 random suffixes: reads scatter across partitions
hash("post_9847283_00") → partition 2
hash("post_9847283_37") → partition 5
hash("post_9847283_82") → partition 9
...
// Each partition handles ~20K reads/sec instead of 2M

// Cost: reads must now query all 100 suffixed keys and merge results
// You need to KNOW which keys are hot (application-level decision)

The cost is real: reads now scatter-gather. To read the post, you pick a random suffix and read from one partition (fast). But to count total likes, you query all 100 suffixed records and sum them. And you need application-level logic to decide which keys get suffixed — the database cannot know which keys are hot.

Solution 2: Read replicas for hot partitions. Instead of splitting the key, replicate the partition. The partition holding the viral post gets additional read replicas. Writes still go to the leader, but reads can be served from any replica. This is a replication solution, not a partitioning solution — it uses the tools from Chapter 6.

Solution 3: Caching layer. Put a cache (Redis, Memcached) in front of the database. The first request for the viral post hits the database; subsequent requests hit the cache. The database partition never sees the spike. Most production systems use this approach — it is simple and effective.

Solution 4: DynamoDB Adaptive Capacity. DynamoDB's approach is interesting. Each partition has a throughput limit (initially evenly divided). If one partition gets hot, DynamoDB automatically borrows unused capacity from other partitions to give the hot one more throughput. If the entire table's capacity is exhausted, it splits the hot partition. But this only helps with partition-level hotspots, not single-key hotspots. If one key gets 90% of reads, DynamoDB cannot help — you need application-level splitting.

The Write-Amplification Cost of Suffixing

Random suffixes are not free. Let's quantify the cost for reads and writes:

// Without suffixes: one key, one partition
Write: 1 write operation
Read (get post): 1 read operation
Read (get like count): 1 read operation
Total for 1 write + 2 reads = 3 operations

// With 100 suffixes: one key split across ~100 partitions
Write: 1 write to random suffix (post content stored once, suffix chosen randomly)
Read (get post): 1 read from random suffix (any copy has the content)
Read (get like count): 100 reads, one per suffix, sum results!
Total for 1 write + 1 read + 1 aggregation = 102 operations

// The aggregation reads are the killer.
// Mitigation: maintain a separate counter (also suffixed + periodically aggregated)

In practice, most systems combine suffixing with a counter cache. The like count is maintained as a separate, periodically-aggregated counter rather than computed from scratch on every read. Twitter, for example, maintains per-tweet metrics in a separate system (a pre-computed count) rather than querying all suffix partitions.

No automatic solution exists. Today's databases cannot automatically detect and mitigate hotspots from a single extremely popular key. The application must handle it. Some databases (like DynamoDB's adaptive capacity) can detect hot partitions and give them more throughput, but they cannot split a single key across partitions. This is a fundamental limitation: if your workload concentrates on one key, you need application-level strategies.
Celebrity Hotspot Simulation

Watch a viral post overwhelm one partition. Then apply random suffixes to spread the load.

Idle. Click "Go Viral!" to simulate.

Worked Example: Mitigating a Hot Key

// System: 5 partitions, 50K capacity each = 250K total
// Normal: each partition handles ~50K req/sec. Balanced.

// Viral event: key "trending_post" gets 300K req/sec
hash("trending_post") → P3
P3 load: 300K + 50K normal = 350K. Capacity: 50K. Overloaded 7x!

// Fix 1: 10 random suffixes
hash("trending_post_0") → P1
hash("trending_post_1") → P3
hash("trending_post_2") → P5 ...
300K / 10 = 30K per suffix. Each lands on a different partition.
Max partition load = 50K + 30K = 80K. Still over capacity for some!

// Fix: use 20 suffixes. 300K/20 = 15K per suffix.
Max partition load = 50K + 15K*4 = 110K. Still uneven!
// Reality: with 20 suffixes across 5 partitions, each gets ~4 suffixes
// 50K normal + 4*15K = 110K. Need to add more partitions OR use caching.

Real-World Hotspot Stories

Instagram's Hot Partition. When Instagram launched, they sharded their PostgreSQL database by user_id. This worked well until Justin Bieber posted a photo. The partition containing Bieber's user_id received millions of like-writes per second. The solution: they moved likes to a separate service with its own sharding (by post_id, not user_id), and added a write-back cache that batched like-count updates.

Twitter's Retweet Storm. A viral tweet getting retweeted creates a hotspot on the tweet's partition. Twitter's solution: a separate "timeline service" that pre-computes feeds. Retweet counts are maintained in a distributed counter system (not the main database), with eventual consistency — the count you see might be a few seconds behind reality.

DynamoDB Throttling. DynamoDB automatically partitions tables and allocates throughput (Read Capacity Units, Write Capacity Units) evenly across partitions. If your table has 1000 RCU and 4 partitions, each gets 250 RCU. If one partition receives 400 RCU of traffic, it gets throttled — even though the table-level limit (1000) is not reached. This is the "hot partition" problem, and it caught many early DynamoDB users off guard. DynamoDB's adaptive capacity (launched 2017) partially addresses this by borrowing unused capacity from cold partitions.

// DynamoDB hot partition throttling
Table: 10,000 RCU provisioned
Partitions: 10 (DynamoDB decides this internally)
Per-partition limit: 1,000 RCU each

Key "viral_post" on partition 3: receives 3,000 RCU
Partition 3 limit: 1,000 RCU → THROTTLED! 2,000 requests/sec rejected.
Table-wide usage: only 3,000 + 9*100 = 3,900 RCU of 10,000. Tons of headroom!

// With adaptive capacity:
DynamoDB detects partition 3 is hot.
Borrows 2,000 unused RCU from partitions 1,2,4-10.
Partition 3 now has 3,000 RCU. No throttling.
// But: if table-wide usage is already near 10,000, no capacity to borrow.
Your system uses hash partitioning with 8 partitions. A single key receives 80% of all read traffic. You apply a random suffix with 8 values (0-7). What is the WORST-CASE load on the hottest partition, compared to the average?

Chapter 4: Secondary Indexes

Everything so far assumes you are looking up records by their primary key. Partition the primary key, hash or range, and you know exactly which node to ask. One hop. Fast. But real applications rarely query by primary key alone. "Show me all cars listed for sale in San Francisco that are red." The primary key is car_id. You are querying by city and color. These are secondary indexes — and they are the hardest part of partitioning.

A secondary index is a data structure that maps a non-primary-key field to the set of records that have a given value for that field. Index on color=red returns [car_17, car_42, car_93, ...]. Without an index, you'd have to scan every record on every partition. With an index, you can jump directly to the matching records. But where does the index live?

There are exactly two strategies, and they have opposite trade-offs.

Strategy 1: Document-Partitioned (Local) Index

Each partition maintains its own secondary index covering only the documents stored on that partition. If partition 1 holds car_17 and car_93 (both red), partition 1's local index for color=red points to those two. If partition 3 holds car_42 (also red), partition 3's local index has that entry.

// Local (document-partitioned) index

Partition 0: car_5(blue), car_12(red) → local index: {red: [12], blue: [5]}
Partition 1: car_17(red), car_93(red) → local index: {red: [17, 93]}
Partition 2: car_28(blue), car_61(green) → local index: {blue: [28], green: [61]}
Partition 3: car_42(red), car_55(blue) → local index: {red: [42], blue: [55]}

// Query: "all red cars"
Must query ALL 4 partitions and merge results: [12, 17, 93, 42]
This is "scatter-gather" — expensive but writes are local!

// Write: add car_99(red) to partition 2
Only update partition 2's local index. One partition touched. Fast!

Writes are fast — you only update the local index on the partition that holds the document. Reads with secondary indexes are slow — you must query all partitions (scatter-gather), merge results, and handle failures from any partition. This is called a scatter/gather query, and it is the primary drawback of local indexes.

MongoDB, Riak, Cassandra, Elasticsearch, SolrCloud, and VoltDB all use document-partitioned (local) indexes.

Strategy 2: Term-Partitioned (Global) Index

Instead of each partition indexing its own documents, you build a global secondary index that covers all documents across all partitions — and then you partition the index itself.

// Global (term-partitioned) index

Data partitions (by car_id):
P0: car_5(blue), car_12(red)
P1: car_17(red), car_93(red)
P2: car_28(blue), car_61(green)
P3: car_42(red), car_55(blue)

Index partitions (by color term):
Index P_A (colors a-g): {blue: [5, 28, 55], green: [61]}
Index P_B (colors h-z): {red: [12, 17, 42, 93]}

// Query: "all red cars"
Hash or range lookup on "red" → Index P_B → returns [12, 17, 42, 93]
ONE partition queried. Fast!

// Write: add car_99(red) to data partition 2
Must also update Index P_B (which might be on a DIFFERENT node).
Cross-partition write. Needs coordination. Slow!

Reads are fast — one lookup in the global index finds all matching documents. Writes are slow — adding or modifying a document may require updating index entries on different partitions, which means cross-partition writes (possibly requiring distributed transactions or at minimum network hops).

In practice, global indexes are often updated asynchronously. The write returns immediately, and the index is updated later. This means the index may be stale — a newly written document might not appear in index results for a short time. DynamoDB's global secondary indexes work this way.

The Scatter-Gather Cost: Quantified

Let's calculate how much scatter-gather actually costs. This matters because "query all partitions" sounds expensive, but the actual cost depends on your setup.

// Setup: 20 partitions across 20 nodes
// Query: "all products where category = 'electronics'"

// With local index (scatter-gather):
Network: 20 parallel requests (one per partition) = 1 round trip
Latency = max(latency of all 20 responses) parallel, so bounded by slowest
Typical: if each node responds in 5 ms, p50 = 5 ms, p99 = 25 ms
Why p99 is bad: with 20 nodes, probability of at least one being slow is high.
P(all 20 respond in <10ms) = 0.9920 = 0.82. 18% chance of >10ms!

// With global index (single lookup):
Network: 1 request to the index partition + 1 request to fetch data
Latency = 2 sequential requests = 10 ms typical
p99 = 20 ms (only 1-2 nodes involved, much less variance)

// The tail latency problem:
// Scatter-gather amplifies tail latency because you wait for the SLOWEST.
// Jeff Dean's "tail at scale" paper: with 100 servers and 1% chance of
// 1-second delay, 63% of requests experience >1s on at least one server.

This is why Elasticsearch (which uses scatter-gather for every search query) carefully optimizes its p99 latency. Every additional shard you add to an index increases the probability of one shard being slow. Elasticsearch's recommendation: keep shard count low (one shard per 10-50 GB of data), and do not over-shard.

Real-World Example: Elasticsearch Indexing Strategy

Elasticsearch is a particularly instructive case because it uses local indexes exclusively. Every search query is a scatter-gather across all shards of an index. Here's how production systems manage this:

// Elasticsearch: 1 index for product catalog
Index: "products" with 5 primary shards + 1 replica each = 10 shards total

// Query: "electronics AND price < 100 AND brand = Nike"
Phase 1 (Query): scatter to all 5 primary shards, each returns top-10 doc IDs
Phase 2 (Fetch): gather top-10 overall, fetch full documents from owning shards

// This is fast for small shard counts (5 shards = 5 parallel queries).
// But imagine you time-shard: one index per day for logs.
// Query "last 30 days" = 30 indices * 5 shards = 150 scatter-gather!
// Solution: rollup indices. Merge old daily indices into weekly/monthly.
Local vs. Global: the decision rule. If your workload is write-heavy and you rarely query by secondary indexes, use local indexes (fast writes, accept slow scatter-gather reads). If your workload needs fast secondary-index lookups and you can tolerate slightly stale results, use a global index updated asynchronously. If you need both fast writes AND fast secondary-index reads with perfect consistency — there is no good solution. This is a fundamental trade-off.
Local vs. Global Index Lookup

Compare the same query against local (scatter-gather) vs. global (single-lookup) indexes.

Click a button to animate a query.
PropertyLocal (Document-Partitioned)Global (Term-Partitioned)
Write speedFast (update one partition)Slow (may update remote index partition)
Read speedSlow (scatter-gather all partitions)Fast (single index lookup)
ConsistencyImmediate (local update)Often async (stale reads possible)
Used byMongoDB, Cassandra, ElasticsearchDynamoDB GSI, Riak search

Hybrid Approach: Sidecar Search Index

Many production systems avoid the local-vs-global dilemma entirely by using a sidecar search index. The primary database handles writes and primary key lookups (sharded by the main key). A separate search system (Elasticsearch, Solr) handles secondary index queries.

// Architecture: primary DB + search sidecar

Primary DB (PostgreSQL/Cassandra):
Sharded by user_id. Fast writes, fast primary key lookups.
No secondary indexes on the DB itself.

Search Index (Elasticsearch):
Receives change events from primary DB (CDC / change data capture).
Indexes all searchable fields: name, email, location, etc.
Eventually consistent (lag: typically 1-5 seconds).

// Query: "users in San Francisco with name starting with 'A'"
1. Client → Elasticsearch: search query
2. Elasticsearch returns matching user_ids: [42, 107, 393]
3. Client → Primary DB: GET user_ids [42, 107, 393]
4. Primary DB: each user_id hashes to a known partition. 3 parallel lookups.
5. Client receives full user records.

// Pros: primary DB stays simple (no scatter-gather), search is optimized
// Cons: eventual consistency, two systems to maintain, double storage
// Used by: most large-scale systems (Uber, LinkedIn, Airbnb)

This pattern is so common that it has a name: CQRS (Command Query Responsibility Segregation). The "command" side (writes) is optimized for writes. The "query" side (reads) is optimized for reads. They use different data structures, different indexing strategies, and often different databases entirely.

Index Maintenance During Rebalancing

When a partition moves from node A to node B, what happens to its local index? The index must move with the partition. This means the rebalancing process must transfer not just the data, but also all local indexes. For a partition with 1 GB of data and 500 MB of indexes, you're actually transferring 1.5 GB.

With global indexes, the situation is worse. If partitions move, the global index entries that point to those partitions must be updated. Every moved record triggers an index update. This is why global indexes are typically updated asynchronously — doing it synchronously during rebalancing would make rebalancing unacceptably slow.

You're building an e-commerce search. Users search by category, price range, and brand. You have 20 data partitions. A user searches for "brand=Nike AND price<100". With document-partitioned (local) indexes, how many partitions does this query touch?

Chapter 5: Rebalancing Partitions

Nothing stays the same. You add machines to handle growth. A machine fails and must be replaced. The dataset grows, and partitions that were once balanced become lopsided. Data must move from one node to another. This is rebalancing — the process of reassigning partitions to nodes so that load and storage are distributed fairly.

Rebalancing must satisfy three requirements:

RequirementWhy
Fair distribution afterEach node should handle roughly equal load/storage
Availability duringThe system must continue serving reads and writes while rebalancing
Minimal movementMove only the data that needs to move — not everything

Strategy 1: Fixed Number of Partitions

Create a large number of partitions upfront — say 1000 partitions for 10 nodes (100 per node). Each partition is small. When you add an 11th node, it "steals" partitions from existing nodes. The partition count never changes; only the assignment of partitions to nodes changes.

// Fixed partitions: 12 partitions, initially 3 nodes (4 each)
Node A: [P0, P1, P2, P3]
Node B: [P4, P5, P6, P7]
Node C: [P8, P9, P10, P11]

// Add Node D: steal 1 partition from each existing node
Node A: [P0, P1, P2]     gave P3 to D
Node B: [P4, P5, P6]     gave P7 to D
Node C: [P8, P9, P10]    gave P11 to D
Node D: [P3, P7, P11]    new node

// Only 3 partitions moved. Old data within partitions unchanged.
// Each node now has 3 partitions. Balanced.

The key design decision: how many partitions to create initially. Too few, and partitions become too large (slow rebalancing, too much data per partition). Too many, and the overhead of tracking partition metadata becomes significant. Riak, Elasticsearch, Couchbase, and Voldemort all use this strategy. The partition count is typically set at cluster creation time and never changed.

Sizing rule of thumb. Choose a partition count such that each partition is between 100 MB and 10 GB. If your dataset is 10 TB and you want 1 GB partitions, create 10,000 partitions. If you start with 10 nodes, each gets 1,000 partitions. If you grow to 100 nodes, each gets 100 partitions. The partition size stays constant as you add nodes.

Strategy 2: Dynamic Partitioning

With dynamic partitioning, partitions split and merge automatically based on size. When a partition exceeds a configured threshold (say 10 GB), it splits into two. When adjacent partitions shrink below a threshold, they merge. This is exactly like how B-tree pages split — in fact, HBase regions and BigTable tablets work this way.

// Dynamic partitioning: HBase-style
Initial state: 1 partition covers the entire key range

P0: [a ... z]   size: 500 MB
... data grows ...
P0: [a ... z]   size: 10 GB → SPLIT!

P0: [a ... m]   size: 5 GB
P1: [n ... z]   size: 5 GB

... P0 grows, P1 shrinks ...
P0: [a ... m]   size: 10 GB → SPLIT again!
P0: [a ... g]   size: 5 GB
P2: [h ... m]   size: 5 GB
P1: [n ... z]   size: 3 GB
// 3 partitions now, sizes automatically balanced

The advantage: partition count adapts to data volume. Start small, grow organically. The disadvantage: with a brand-new database, you start with one partition on one node. All writes go to that single node until it splits. HBase mitigates this with pre-splitting: you specify initial split points so the database starts with multiple partitions from day one.

Strategy 3: Partitions Proportional to Nodes

Cassandra takes a third approach: fix the number of partitions per node (say 256). When you add a node, it creates 256 new partitions by splitting 256 randomly chosen existing partitions. The total partition count grows with the cluster.

This keeps partition size roughly constant as the cluster grows. More nodes = more partitions = smaller partitions. Each split moves half the data from an existing partition to the new node.

Comparing All Three: A Concrete Scenario

Let's trace the same scenario through all three strategies. Starting point: 3 nodes, 300 GB of data. Goal: add a 4th node.

// STRATEGY 1: Fixed (12 partitions, 25 GB each, 4 per node)
Before: Node A = [P0,P1,P2,P3], Node B = [P4,P5,P6,P7], Node C = [P8,P9,P10,P11]
Add Node D: steal 1 partition from each → D = [P3,P7,P11]
Data moved: 3 × 25 GB = 75 GB (25% of total)
Partition sizes: unchanged (25 GB each)
Partition count: unchanged (12)

// STRATEGY 2: Dynamic (initially ~6 partitions, 50 GB each)
Before: auto-split created 6 partitions at ~50 GB each
Node A = [P0,P1], Node B = [P2,P3], Node C = [P4,P5]
Add Node D: move P1 and P3 to D (each ~50 GB)
Data moved: 100 GB (33% of total) — more because partitions are larger
Partition count: still 6 (splits happen based on size, not node count)

// STRATEGY 3: Proportional (4 partitions per node = 12 initially)
Before: similar to fixed, 4 per node, 12 total
Add Node D: D gets 4 partitions by splitting 4 random existing ones
Each split: half the data stays, half moves to D
Data moved: 4 × 25 GB / 2 = 50 GB (17% of total)
Partition count: grows to 16 (12 original + 4 new from splits)

// Winner for minimal data movement: proportional (17%)
// Winner for simplicity: fixed (no splits, just reassignment)
// Winner for adaptive sizing: dynamic (partitions match data distribution)
Rebalancing Simulator

Start with 3 nodes and 12 partitions. Add/remove nodes and watch partitions migrate. Track data movement.

3 nodes, 12 partitions (4 each). 0 migrations so far.

Automatic vs. Manual Rebalancing

Should rebalancing be fully automatic (database decides when and what to move) or manual (operator triggers rebalancing)?

Fully automatic sounds appealing but is dangerous. If a node is slow due to a transient issue (GC pause, network blip), an automatic rebalancer might conclude it's dead and start moving its data to other nodes. This creates a massive amount of network traffic, which makes the problem worse — other nodes slow down under the rebalancing load, which triggers more rebalancing. A cascading failure.

Most production systems use a semi-automatic approach: the database proposes a rebalancing plan, but a human (or an external orchestrator) must approve it. Couchbase, Riak, and Voldemort generate suggested partition assignments that an admin reviews before executing.

The Rebalancing Throttle: Balancing Speed vs. Impact

During rebalancing, the system moves data across the network. This consumes bandwidth that would otherwise serve user requests. Every production system has a throttle — a limit on how fast data can move during rebalancing.

// Elasticsearch rebalancing throttle
Default: 40 MB/s per node (indices.recovery.max_bytes_per_sec)
10-node cluster moving 100 GB: 100 GB / (40 MB/s * 10 nodes) = 256 seconds ~4 min

// But during those 4 minutes:
Each node loses 40 MB/s of network capacity for serving queries
If node has 1 Gbps = 125 MB/s network, losing 40 MB/s = 32% bandwidth reduction
Query latency may increase 30-50% during rebalancing

// CockroachDB approach:
Throttles rebalancing based on observed query latency.
If p99 latency exceeds threshold: pause rebalancing.
If latency is healthy: increase rebalancing speed.
Adaptive. Self-tuning. But may take much longer to finish.

// Best practice: schedule rebalancing during low-traffic periods.
// Add a node at 3 AM, rebalance at full speed while users sleep.

Partition Splitting vs. Partition Movement

It's important to distinguish between two different operations that both fall under "rebalancing":

OperationWhat happensWhenData movement
Partition movementEntire partition moves from node A to node BAdding/removing nodesFull partition data transferred
Partition splittingOne partition splits into two on different nodesPartition exceeds size thresholdHalf the data moves to new location
Partition mergingTwo adjacent partitions combine into onePartitions shrink below thresholdOne partition's data moves to neighbor's node

Movement happens during scale-out events. Splitting happens during data growth. Merging happens during data shrinkage or TTL expiration. All three must happen without downtime. All three must be throttled to avoid impacting serving.

Rebalancing While Serving Traffic

The hardest part of rebalancing is not the algorithm — it's doing it without downtime. Data must move from one node to another while both nodes continue serving requests. Here's how it works in practice:

// Phase 1: Prepare
Source node: "I'm about to hand off partition P7 to target node."
Target node: starts accepting writes for P7 into a temporary buffer.
Source node: still serving reads AND writes for P7.

// Phase 2: Transfer
Source streams P7's data to target in the background.
During transfer: source continues serving P7.
New writes to P7 go to source AND get replicated to target (dual-write).
Duration: P7 is 1 GB, network is 1 Gbps = ~8 seconds to transfer.

// Phase 3: Catch-up
After bulk transfer: target applies buffered writes that arrived during transfer.
This "catch-up" phase is short because the buffer accumulated only during transfer time.

// Phase 4: Cutover
Briefly pause writes to P7 (milliseconds).
Update routing: "P7 is now on target node."
Resume. Target serves P7. Source deletes its copy of P7.

// Total downtime for P7: < 100 ms (the cutover pause).
// Other partitions: zero downtime. They weren't involved.

This is essentially the same technique as database replication — the target "catches up" like a new follower. The brief pause at cutover ensures no writes are lost. Elasticsearch, CockroachDB, and HBase all use variants of this approach.

How Long Does Rebalancing Take?

// Calculation for a real scenario:
Cluster: 10 nodes, 1000 partitions (100 per node), 1 GB per partition
Action: add 2 nodes (go from 10 to 12)

Target distribution: 1000 / 12 = ~83 partitions per node
Current nodes give up: 100 - 83 = 17 partitions each → 10 * 17 = 170 moved
Actually: 2 new nodes * 83 = 166 partitions moved

Data moved: 166 * 1 GB = 166 GB
Network throughput (throttled to 50% to avoid impacting serving): 500 Mbps per node
Each source node sends ~17 GB: 17 GB / (500 Mbps / 8) = 272 seconds = ~4.5 minutes

// But transfers happen in PARALLEL from all source nodes:
All 10 existing nodes send simultaneously → total time = 4.5 minutes
(Limited by the target nodes' ingest capacity, not source egress)

// In practice: 5-15 minutes for a 1 TB cluster adding 2 nodes.
// For a 100 TB cluster: could be hours. Plan accordingly.
Never use hash mod N for rebalancing. It might seem obvious: partition = hash(key) mod N, and when N changes, recalculate. But this moves ~(N-1)/N of all keys on every change. Adding one node to a 10-node cluster moves 90% of data. Use fixed partitions, dynamic partitioning, or consistent hashing instead.
StrategyPartition countWhen to useUsed by
FixedSet at creation, never changesPredictable dataset sizeRiak, Elasticsearch, Couchbase
DynamicGrows/shrinks with dataUnpredictable growthHBase, MongoDB, CockroachDB
ProportionalFixed per nodeSteady growth, many nodesCassandra

Rebalancing Failure Modes

What happens when rebalancing itself fails? This is the nightmare scenario that keeps SREs awake at night.

// Failure 1: Target node crashes mid-transfer
Source was sending P7 to target. Target dies at 60% complete.
Source still has the full copy of P7. No data lost.
Rebalancer detects failure, picks a new target, restarts transfer.
// Key: source never deletes its copy until target confirms receipt.

// Failure 2: Source node crashes mid-transfer
Source dies while sending P7 to target. Target has partial data.
Target discards incomplete transfer. P7 is still on source (but source is down).
If replicated: another replica of P7 becomes leader. Rebalancing retried later.
If not replicated: P7 is unavailable until source recovers.

// Failure 3: Network partition during rebalancing
Source sends data to target. Network splits. Source thinks it succeeded.
Target never got the last batch. Routing says P7 is on target.
Reads to P7 fail! Target has incomplete data.
Fix: checksums on transferred data. Target validates complete receipt
before sending "ready" to routing tier.

// Failure 4: Cascading rebalancing
Node A is slow (GC pause). Rebalancer thinks A is dead, moves its partitions away.
Massive data transfer starts. Nodes B, C, D slow down under transfer load.
Rebalancer thinks B is slow too. Tries to move B's partitions. More transfers.
Cascade. All nodes overloaded. Cluster goes down.
Fix: NEVER auto-rebalance on slow nodes. Require human approval.
The golden rule of rebalancing. Never delete the source copy until the target confirms receipt AND the routing table is updated. This ensures that at every moment, at least one node has the complete partition data. Violating this rule is how data gets lost.
You have a 100-node cluster with fixed partitioning (1000 partitions, 10 per node). You add 10 new nodes. What is the minimum number of partitions that must move to achieve a perfectly balanced distribution?

Chapter 6: Request Routing

A client has a key. It needs to read or write the record for that key. But the data is spread across 10 nodes. Which node has the partition for this key? This is the request routing problem — also called service discovery for partitioned systems.

There are three fundamental approaches, and every distributed database uses one of them (or a hybrid).

Approach 1: Contact Any Node (Gossip)

The client sends the request to any node in the cluster. If that node owns the partition for the requested key, it handles the request directly. If not, it forwards the request to the correct node, receives the response, and relays it back to the client.

Every node must know the full partition-to-node mapping. They learn this through a gossip protocol: nodes periodically exchange state information with each other. When partitions move (rebalancing), nodes gossip the updated mapping until all nodes converge. Cassandra and Riak use this approach.

// Gossip-based routing (Cassandra style)

Client → Node 3 (any node): "GET key=user_42"
Node 3 checks: hash(user_42) → partition 7 → owned by Node 5
Node 3 → forwards to Node 5
Node 5 → reads data → responds to Node 3
Node 3 → responds to Client

// Pros: no external dependency, no single point of failure
// Cons: extra hop if wrong node contacted, gossip convergence delay

Approach 2: Routing Tier (ZooKeeper)

A separate coordination service maintains the authoritative partition-to-node mapping. The client first asks the routing tier "which node owns partition for key X?", then contacts that node directly.

ZooKeeper is the most common coordination service for this. It stores the mapping and notifies all interested parties when it changes (via watches). When partitions rebalance, the database updates ZooKeeper, which pushes the change to the routing tier and any clients with active watches.

// ZooKeeper-based routing (HBase, Kafka style)

Client → Routing Tier: "Where is key=user_42?"
Routing Tier checks ZooKeeper mapping: hash(user_42) → P7 → Node 5
Routing Tier → Client: "Node 5"
Client → Node 5: "GET key=user_42" (direct connection)
Node 5 → Client: response

// When rebalancing moves P7 to Node 2:
Database updates ZooKeeper: P7 → Node 2
ZooKeeper notifies Routing Tier and all watchers
Next request for P7 goes directly to Node 2

// Pros: client talks to correct node on first try (after lookup)
// Cons: ZooKeeper is an external dependency and can be a bottleneck

LinkedIn's Espresso, HBase, SolrCloud, and Kafka all use ZooKeeper for partition discovery.

Approach 3: Client-Side Routing

The client itself knows the partition-to-node mapping. It computes hash(key), determines the partition, looks up the node, and connects directly. No intermediary. The client library subscribes to ZooKeeper watches (or gossip) to keep its mapping up to date.

// Client-side routing

Client (with embedded mapping):
hash(user_42) → P7 → Node 5
Client → Node 5: "GET key=user_42" (direct, zero hops)

// Pros: lowest latency (no extra hops or lookups)
// Cons: client library is complex, must handle stale mappings

MongoDB uses a hybrid: a dedicated routing process called mongos (approach 2) that clients talk to, plus config servers that store the mapping (approach 2 again). Some MongoDB drivers can also cache the mapping for direct routing (approach 3).

How Kafka Routes to Partitions

Kafka is a fascinating case study because it uses partitioning for message ordering. Each Kafka topic is divided into partitions, and messages with the same key always go to the same partition (consistent assignment). This guarantees that messages for a given entity are processed in order.

// Kafka partition assignment
Topic "orders" with 8 partitions

Producer sends message with key = "customer_42"
Partition = murmur2(key) mod 8 = 3
Message goes to partition 3

// ALL messages with key "customer_42" go to partition 3
// Consumer reading partition 3 sees them IN ORDER

// Routing: Kafka client library knows the partition-to-broker mapping
// Client fetches metadata from any broker: "which broker leads partition 3?"
// Client sends directly to that broker (approach 3: client-side routing)

// If key is null: round-robin across partitions (no ordering guarantee)
// This is the Kafka "sticky partitioner" — batch messages to same partition
// until batch is full, then switch. Better throughput than true round-robin.

Kafka's partition count is fixed at topic creation time and cannot be changed without creating a new topic (similar to the fixed partitioning strategy). This is why choosing the right partition count upfront is critical. Too few partitions limits consumer parallelism; too many increases coordination overhead and end-to-end latency.

Handling Partition Failures During Routing

What happens when the node that owns a partition is completely down?

// Scenario: Node 3 owns partition P7 and crashes

// Without replication: P7 is unavailable until Node 3 recovers.
// All reads/writes to P7 fail. Other partitions unaffected.

// With replication (leader-follower):
P7 has a follower on Node 5.
Failure detector (heartbeat timeout, typically 10-30 seconds) detects Node 3 is down.
ZooKeeper/consensus: Node 5's follower is promoted to leader for P7.
Routing tables updated: P7 → Node 5.

// Total unavailability for P7: 10-30 seconds (failure detection) + 1-2 seconds (failover)
// During this window: requests to P7 either fail or queue (depends on client retry policy)

// With Raft consensus (CockroachDB, TiKV):
// Failover is faster: Raft election timeout ~1-3 seconds.
// New leader elected from remaining replicas. No external coordination.
Request Routing Comparison

Animate the same request through all three routing approaches. Watch the hops.

Click a routing approach to animate.
ApproachLatencyComplexityDependencyUsed by
Gossip1-2 hopsLow (client is simple)None (peer-to-peer)Cassandra, Riak
Routing tier1 hop + lookupMediumZooKeeper/etcdHBase, Kafka, Espresso
Client-side0 extra hopsHigh (smart client)ZooKeeper watchesMongoDB drivers

What Happens When the Mapping Is Stale?

All routing approaches face the same problem: between the time partitions rebalance and the time all clients/routers learn the new mapping, requests may go to the wrong node. How do systems handle this?

// Stale routing: request sent to wrong node

// Cassandra (gossip):
Client → Node A: "GET key=user_42"
Node A checks: "I don't own this partition anymore (moved during rebalancing)"
Option 1: Forward to correct node (adds 1 hop, but succeeds)
Option 2: Return error "NOT_MY_PARTITION", client retries with updated routing
Cassandra uses option 1 (transparent forwarding). Client never sees the stale routing.

// MongoDB (mongos):
mongos checks config server: "P7 is on Node 5"
mongos → Node 5: "GET key=user_42"
Node 5: "StaleConfigError — P7 moved to Node 3"
mongos refreshes config from config server, retries to Node 3
Client sees slightly higher latency but no error

// General pattern:
1. Try the node you think owns the partition
2. If wrong: the node tells you the correct owner (or forwards for you)
3. Update your local mapping cache
4. On next request, you go directly to the right node

// Worst case: during active rebalancing, a few requests see 1 extra hop.
// This is acceptable for all production systems.

ZooKeeper's Role in Detail

ZooKeeper is used by HBase, Kafka, SolrCloud, and others as the "source of truth" for partition assignments. Here's the specific data it stores and how changes propagate:

// ZooKeeper znodes (paths) for a typical partition assignment:
/partitions/0 → {"node": "nodeA", "version": 7}
/partitions/1 → {"node": "nodeB", "version": 12}
/partitions/2 → {"node": "nodeA", "version": 5}
...

// Change propagation via watches:
1. Rebalancer moves P0 from nodeA to nodeC
2. Rebalancer writes: /partitions/0 → {"node": "nodeC", "version": 8}
3. ZooKeeper sends notification to all watchers of /partitions/0
4. All routing tiers and clients with watches update their local cache
5. Next request for P0 goes directly to nodeC

// Latency of change propagation: typically < 100 ms
// ZooKeeper guarantees: linearizable writes, sequential consistency for reads
// Limitation: ZooKeeper cluster of 3-5 nodes can handle ~10K writes/sec
// This is fine for partition assignment (changes rarely), but NOT for data storage.
DNS-based routing. For simpler setups, some systems use DNS: each partition gets a DNS name that resolves to the node's IP. Rebalancing = update DNS records. The downside: DNS has TTLs (time-to-live), so stale mappings persist for seconds to minutes after rebalancing. Acceptable for some applications, dangerous for others.

Parallel Queries and MapReduce

Partitioning enables parallel query execution. A query that scans an entire table can be split: each partition processes its share in parallel, and results are merged. This is the essence of MapReduce and modern distributed SQL engines.

// Query: "SELECT country, COUNT(*) FROM users GROUP BY country"
// Table sharded into 10 partitions by hash(user_id)

// Step 1: Map (parallel, one per partition)
P0: scans its 100K rows → partial counts: {US: 45K, UK: 30K, JP: 25K}
P1: scans its 100K rows → partial counts: {US: 42K, UK: 32K, JP: 26K}
... (8 more partitions, in parallel)

// Step 2: Reduce (merge partial results)
Coordinator collects 10 partial count maps, sums them per country.
Final: {US: 450K, UK: 310K, JP: 240K}

// Speedup: ~10x for scan queries (limited by slowest partition)
// This is why data warehouses (BigQuery, Redshift, Snowflake)
// partition aggressively: more partitions = more parallelism.

// But: queries that need GLOBAL ordering (ORDER BY + LIMIT)
// cannot fully parallelize. Each partition sorts locally,
// coordinator does a final merge-sort of the top results.
Design challenge: You're building a distributed cache with 50 nodes. Latency is critical (sub-millisecond). Which routing approach do you choose and why?

Chapter 7: Partitioning and Replication

In a real distributed database, partitioning and replication work together. Each partition is replicated to multiple nodes for fault tolerance. A node might be the leader for partitions 1 and 3, and a follower for partitions 5 and 7. Every node wears multiple hats.

Consider a system with 6 partitions and 4 nodes, with a replication factor of 3 (each partition has 3 copies: 1 leader + 2 followers):

// 6 partitions, 4 nodes, replication factor 3
// Each partition: 1 leader (L) + 2 followers (F)

Node A: P0(L), P1(F), P3(F), P5(F) leads P0
Node B: P1(L), P2(F), P0(F), P4(F) leads P1
Node C: P2(L), P3(L), P4(F), P0(F) leads P2, P3
Node D: P4(L), P5(L), P1(F), P2(F) leads P4, P5

// Node C fails:
P2 leadership transfers to Node B or D (which hold P2 followers)
P3 leadership transfers to Node A (which holds P3 follower)
P4 and P0 lose one follower each — still have leader + 1 follower

// No data loss. No downtime. Automatic failover per partition.

This is the key insight: failure is per-partition, not per-node. When a node dies, it only affects the partitions it was leading. Each of those partitions has followers on other nodes that can take over leadership independently. The rest of the system is unaffected.

How Replication Factor Affects Partition Placement

With a replication factor of 3, each partition needs 3 different nodes. The placement algorithm must ensure that no two replicas of the same partition are on the same node (or, better, the same rack, or the same data center). This is replica placement — and it's a constraint satisfaction problem.

// Placement constraints
Rule 1: No two replicas of partition P on the same node
Rule 2: No two replicas of partition P on the same rack (rack awareness)
Rule 3: At least one replica in each data center (cross-DC replication)

// With 4 nodes across 2 racks:
Rack 1: Node A, Node B
Rack 2: Node C, Node D

P0: Leader=A(Rack1), Follower=C(Rack2), Follower=D(Rack2)   spans racks
P1: Leader=B(Rack1), Follower=D(Rack2), Follower=C(Rack2)   spans racks

// If Rack 1 loses power: P0 and P1 failover to Rack 2 nodes. Data safe.

Read From Follower, Write to Leader

A common optimization: route read queries to the nearest follower instead of the leader. This reduces read latency (especially cross-datacenter) and reduces load on the leader. The trade-off: followers may lag behind the leader, so reads from followers might return stale data. Chapter 6 covered this in detail — read-your-writes consistency, monotonic reads, and consistent prefix reads are all relevant here.

Worked Example: Capacity Planning with Partitioning + Replication

// Requirements:
Data: 2 TB, growing 100 GB/month
Throughput: 100K reads/sec, 20K writes/sec
Durability: survive any 1 node failure (replication factor 3)
Latency: p99 < 10 ms for reads

// Step 1: Partition sizing
Target partition size: 2 GB (small enough for fast rebalancing)
Number of partitions: 2 TB / 2 GB = 1000 partitions

// Step 2: Replica count
Replication factor 3: 1000 × 3 = 3000 partition copies total
Total storage: 2 TB × 3 = 6 TB

// Step 3: Node sizing
Node capacity: 500 GB storage, 10K reads/sec, 5K writes/sec
Storage constraint: 6 TB / 500 GB = 12 nodes minimum
Read constraint: 100K / 10K = 10 nodes (but replicas can serve reads!)
Write constraint: 20K / 5K = 4 nodes (but writes go to leaders only)

// Step 4: Final answer
12 nodes (storage is the binding constraint)
Each node holds 3000/12 = 250 partition copies
Each node is leader for ~1000/12 = 83 partitions
Each node is follower for ~167 partitions
Write load per node: 20K * (83/1000) = 1660 writes/sec (comfortable)
Read load per node: 100K/12 = 8333 reads/sec (reads spread across all replicas)

// Growth plan: at 100 GB/month, add a node every 5 months.
// Rebalancing: move ~250 partitions to new node (~500 GB). Takes ~30 min at 300 MB/s.

Cross-Datacenter Partition Placement

For global applications, partitions and their replicas span multiple datacenters. The placement gets more complex:

// 3 datacenters: US-East, EU-West, AP-Tokyo
// Replication factor 3: one replica in each DC

P0: Leader=US-East-Node1, Follower=EU-West-Node3, Follower=AP-Tokyo-Node2
P1: Leader=EU-West-Node1, Follower=US-East-Node2, Follower=AP-Tokyo-Node1
P2: Leader=AP-Tokyo-Node1, Follower=US-East-Node3, Follower=EU-West-Node2

// Leadership distribution: each DC leads ~1/3 of partitions
// Read optimization: route to nearest replica (local DC follower)
// Write latency: cross-DC round trip to replicate (~100-200 ms)

// CockroachDB and YugabyteDB support "zone configs" for this:
// ALTER PARTITION us_users OF TABLE users
// CONFIGURE ZONE USING lease_preferences = '[[+region=us-east]]'
// This pins the leader for US user partitions to the US-East DC.
Partition + Replication Layout

4 nodes, 6 partitions, replication factor 3. Click a node to simulate failure — watch leadership transfer.

All 4 nodes healthy. Click "Fail Random Node" to simulate failure.
The multiplication factor. With P partitions, N nodes, and replication factor R, you store P × R partition copies across N nodes. Each node holds approximately (P × R) / N partition copies. For 1000 partitions, 10 nodes, replication factor 3: each node holds 300 partition copies (some as leader, most as follower). Storage overhead: 3x the raw data. This is the cost of fault tolerance.

Quorum Reads and Writes with Partitioning

In leaderless replication systems (Cassandra, Riak, DynamoDB), reads and writes use quorums. With replication factor N, a write is confirmed when W replicas acknowledge, and a read returns after R replicas respond. As long as W + R > N, you get strong consistency (at least one replica in the read set saw the latest write).

// Quorum configuration per partition
Replication factor N = 3 (each partition has 3 copies)

// Strong consistency: W + R > N
W = 2, R = 2: write waits for 2 acks, read contacts 2 replicas. 2+2 > 3. Strong.
W = 3, R = 1: write waits for ALL replicas, read only needs 1. Strong, but slow writes.
W = 1, R = 3: write fast (1 ack), read contacts ALL replicas. Strong, but slow reads.

// Eventual consistency: W + R ≤ N
W = 1, R = 1: fast writes AND fast reads, but may read stale data.
Used for workloads that prioritize availability over consistency.

// The catch with partitioning:
Quorum is per-partition. If partition P7 has replicas on nodes [A, B, C]:
Write to P7: send to A, B, C. Wait for 2 acks.
If A is slow but B and C respond: write succeeds. A gets the data later.
Read from P7: contact B and C (2 of 3). At least one saw the write. Consistent.

// Cross-partition operations (e.g., "transfer $100 from user_1 to user_2")
// where user_1 is on P3 and user_2 is on P7:
// Quorum alone is NOT enough. You need distributed transactions (Ch 8).
You have 12 partitions, 4 nodes, replication factor 3. Node B fails. How many partitions lose their leader, and can the system continue serving ALL partitions?

Chapter 8: Sharding in Practice

Theory is clean. Practice is messy. Every real database makes specific trade-offs in how it implements partitioning. Let's survey the major systems and the decisions they've made — then look at the anti-patterns that trip up engineers in production.

System Comparison

SystemPartition strategyRebalancingRoutingSecondary indexes
CassandraHash (consistent hashing + vnodes)Proportional to nodesGossip (any node)Local (materialized views)
MongoDBHash OR range (configurable)Dynamic (chunk splitting)mongos routing tierLocal (scatter-gather)
DynamoDBHash (partition key)Dynamic + adaptiveClient-side (SDK)Global (GSI, async)
HBaseRange (sorted by row key)Dynamic (region splitting)ZooKeeper + META tableNone (use Phoenix or secondary)
ElasticsearchHash (document _id)Fixed (set at index creation)Any node forwardsLocal (all shards queried)
CockroachDBRange (sorted keys)Dynamic (range splitting)Gossip + leaseholderGlobal (distributed txn)
PostgreSQL CitusHash (distribution column)Semi-manual (shard rebalancer)Coordinator nodeLocal
Vitess (MySQL)Range (vindex-based)Semi-manual (SplitClone)vtgate routing tierLocal (scatter-gather)

Design Challenge: URL Shortener at Scale

You're building a URL shortener (like bit.ly) that handles 100K writes/sec and 1M reads/sec. Each short URL maps to a long URL. How do you shard it?

// Data model
Key: short_code (e.g., "a3Xk9p") → Value: {long_url, created_at, click_count}

// Partition strategy: hash(short_code)
Short codes are already random-looking (base62 encoded). Hash them anyway
for uniform distribution. No need for range queries on short codes.

// Sizing
100K writes/sec → 100K new URLs per second
Average record: 500 bytes (short code + long URL + metadata)
Daily growth: 100K × 86400 × 500B = 4.32 TB/day
With replication factor 3: 12.96 TB/day

// Partition count
Target partition size: 1 GB
Day 1: 4.32 TB → ~4320 partitions
Grow to 100 TB in ~23 days → dynamic partitioning (auto-split)

// Node count
1M reads/sec. Each node handles ~50K reads/sec.
1M / 50K = 20 nodes minimum for reads.
100K writes/sec across 20 nodes = 5K writes/node. Comfortable.
With replication factor 3: 60 nodes total.

Design Challenge: Social Media Timeline at Scale

This is one of the most-asked system design questions. Let's partition a Twitter-like timeline.

// Data model: tweets stored by tweet_id
tweet_id (snowflake ID) → {author_id, text, timestamp, media_urls, like_count}

// Partition strategy for tweet storage:
Option A: hash(tweet_id). Even distribution. But "get all tweets by user X"
requires scatter-gather across all partitions.

Option B: hash(author_id). All tweets by one user on same partition.
"Get user's tweets" is a single-partition query. But celebrities cause hotspots.

Option C: Compound key — hash(author_id) + sort by timestamp.
"Get user's latest 20 tweets" = single partition + range scan. Best of both.

// Partition strategy for timeline (home feed):
Pre-computed: each user's home feed is a materialized view, partitioned by user_id.
When someone tweets, fan-out-on-write pushes to followers' feeds.

// The celebrity problem resurfaces:
User with 50M followers → tweet triggers 50M writes to 50M user feeds.
Solution: hybrid model. For users following <5000 accounts, pre-compute feed.
For users following celebrities, merge celebrity tweets at read time (fan-out-on-read).
Twitter actually does this: celebrities' tweets are NOT pre-fanned-out.

// Storage estimation:
500M users, 500 tweets/day (total), 280 chars avg = 1 KB per tweet
Daily tweet storage: 500 * 1 KB = 500 KB (trivial for tweets alone)
Daily timeline writes: 500M users * avg 200 followed * 500 tweets = huge
Actually: avg user follows 200, avg followee tweets 2/day = 400 timeline entries/day/user
500M * 400 * 100 bytes per entry = 20 TB/day of timeline data!
This is why timelines have a TTL (Twitter keeps ~800 most recent per user).

Common Anti-Patterns

Anti-pattern 1: Sharding too early. A startup with 10 GB of data and 500 queries/sec does not need sharding. A single PostgreSQL server handles this easily. Sharding adds complexity: cross-shard queries, distributed transactions, schema changes across shards, operational overhead. Shard when you must, not when you might.

Anti-pattern 2: Wrong shard key. Choosing user_id as the shard key when 90% of queries filter by date_range. Every query becomes scatter-gather. The shard key must match your dominant query pattern.

Anti-pattern 3: Cross-shard joins. "SELECT orders.* JOIN users ON orders.user_id = users.id" where orders and users are on different shards. This requires fetching data from multiple shards and joining in the application layer. Either co-locate related data on the same shard (shard both tables by user_id) or denormalize (embed user info in the order record).

Anti-pattern 4: Ignoring shard key immutability. If you shard by user_id and a user changes their ID (or you shard by email and users change emails), the record must move to a different shard. Most systems don't support this well. Choose an immutable shard key.

Anti-pattern 5: Too many partitions. "More partitions = more parallelism" seems logical but has overhead. Each partition has metadata (routing info, replica locations, compaction state). With 100,000 partitions, the metadata alone can consume significant memory on the coordinator node. Elasticsearch recommends no more than 20 shards per GB of heap memory on the master node.

Anti-pattern 6: Cross-shard transactions. "Just use two-phase commit across shards" is technically possible but practically devastating to performance. A cross-shard transaction requires locking rows on multiple nodes, coordinating a prepare phase, then a commit phase. If any node is slow or down, the transaction blocks. CockroachDB and Spanner support this but at significant latency cost. The better answer: design your shard key so that transactions stay within a single shard. In an e-commerce system, shard by customer_id — then "add item to cart + update cart total" is a single-shard transaction.

Resharding: The Nuclear Option

What happens when you chose the wrong shard key and need to change it? This is resharding — one of the most painful operations in distributed systems.

// Resharding: changing from shard_key=user_id to shard_key=country+user_id

// Step 1: Create new cluster with new shard key
New cluster: 8 shards, key = hash(country+user_id)
Old cluster: 8 shards, key = hash(user_id)

// Step 2: Backfill — copy all existing data to new cluster
For each record in old cluster:
Read record → compute new shard from new key → write to new cluster
Duration: 2 TB at 100 MB/sec = 5.5 hours (if you can sustain the write rate)

// Step 3: Double-write — write to BOTH clusters during transition
Application writes to old cluster AND new cluster simultaneously
Duration: however long you need to validate the new cluster (days to weeks)

// Step 4: Cutover — switch reads to new cluster
Verify data consistency between old and new clusters
Flip read traffic to new cluster (gradually, canary first)
Monitor for errors. If something's wrong: flip back to old cluster.

// Step 5: Decommission old cluster
Stop double-writes. Delete old cluster. Done.

// Total time: 2-4 weeks in practice.
// This is why getting the shard key right the FIRST time matters so much.
Debug scenario: one shard has 10x the data of others. Diagnosis: (1) Check the shard key distribution — is one value vastly more common than others? (e.g., sharding by country when 60% of users are in the US). (2) Check for a "default" or "null" value that many records share. (3) Check if the shard key is monotonically increasing (timestamp, auto-increment). Fix: choose a higher-cardinality shard key, add a prefix/suffix, or use hash partitioning.

Monitoring a Sharded Cluster

In production, you need to monitor these metrics per partition:

MetricWhat it tells youAlarm threshold
Partition size (bytes)Is one partition growing faster than others?>2x average = investigate shard key skew
Requests/sec per partitionIs one partition getting more traffic?>3x average = possible hotspot
p99 latency per partitionIs one partition slower?>2x average = disk full, compaction lag, or GC
Replication lag per partitionAre followers keeping up?>1 second = follower overloaded or network issue
Scatter-gather query countHow many queries touch all partitions?>50% of queries = wrong shard key for your workload
Cross-partition transaction rateHow many transactions span partitions?>10% = consider changing shard key to co-locate

The most dangerous situation is a silent hotspot: one partition is overloaded but the system-wide metrics look fine because the other partitions are averaging it out. Always monitor per-partition, not just per-node or per-cluster.

Schema Changes in a Sharded Database

Adding a column, creating an index, or changing a data type must happen on every partition. In a non-sharded database, this is one ALTER TABLE command. In a sharded database with 1000 partitions, it's 1000 ALTER TABLE commands — executed without downtime.

// Approach 1: Rolling schema change (Vitess, CockroachDB)
Execute ALTER TABLE on one partition at a time.
During the change, both old and new schema are valid.
Application code must handle both schemas (backward compatible changes only).
Duration: 1000 partitions * 30 seconds each = ~8 hours

// Approach 2: Ghost table (gh-ost, pt-online-schema-change)
Create a new table with the new schema.
Copy data from old to new table, row by row, while serving traffic.
Apply ongoing changes to both tables (dual-write).
Atomic rename: new table takes old table's name.
Duration: depends on data size. 1 TB table: hours to days.

// Approach 3: Schema-less (MongoDB, Cassandra)
No ALTER TABLE needed — schema is per-document/row.
Application handles schema evolution in code.
Old documents have old fields, new documents have new fields.
Read code: if "new_field" not present, use default value.
// Much simpler operationally, but schema drift can become a mess.
System Comparison

Visual comparison of partitioning strategies across real databases.

You're designing a multi-tenant SaaS where each tenant has 1-100 GB of data. You shard by tenant_id. One tenant (a large enterprise) has 500 GB — 50x larger than average. What's the problem and how do you fix it?

Chapter 9: Interview Arsenal

Sharding appears in nearly every FAANG systems design interview. The interviewer gives you a system with "billions of records" or "petabytes of data," and you must partition it. Here is everything you need, organized by interview dimension.

Quick Reference: Partitioning Strategies

StrategyProsConsUse when
Key rangeEfficient range queries, sorted scansHotspots on sequential keysTime-series with non-timestamp prefix
HashEven distribution, no hotspots on sequential keysNo range queries, scatter-gatherPoint lookups (user profiles, KV store)
Compound (Cassandra)Hash distribution + sorted within partitionRange queries only on clustering columnUser feeds, chat histories, IoT by device
GeographicData locality for regional queriesUneven population distributionLocation-based services

System Design Problems

Problem 1: Design a distributed key-value store.

// Framework answer:
1. Partition by hash(key) — consistent hashing with vnodes
2. Replication factor 3 for fault tolerance
3. Client-side routing for lowest latency
4. Tunable consistency: W=2, R=2 for strong consistency (W+R > N=3)
5. Rebalancing: fixed partitions (pre-allocate based on expected capacity)
6. Anti-entropy: Merkle trees to detect and repair divergent replicas

Problem 2: Design a social media feed system (fan-out on read vs write).

// Partition by user_id (hash)
Each user's posts are on one partition

// Fan-out on write (push model):
When user posts, write to all followers' timelines (pre-computed feeds)
Problem: celebrity with 10M followers = 10M writes per post
Solution: hybrid — push for users with <5000 followers, pull for celebrities

// Fan-out on read (pull model):
When user opens feed, query all followed users' partitions and merge
Problem: following 500 users = 500 partition queries per feed load
Solution: cache + pagination + async background refresh

Problem 3: Shard a user database for 100M users.

// Shard key: user_id (UUID, immutable, high cardinality)
hash(user_id) → partition

// Sizing:
100M users × 2 KB/user = 200 GB total
Target partition size: 1 GB → 200 partitions
With replication factor 3: 600 copies across ~20 nodes (30 each)

// Secondary indexes:
Lookup by email: global index (email → user_id → partition)
Lookup by username: global index
Search by name/location: Elasticsearch sidecar (not primary DB)

// Growth plan:
At 1B users: 2 TB. Same partition count, larger partitions (10 GB).
Or dynamic partitioning: auto-split at 10 GB threshold.

Coding Drill: Consistent Hashing

python
import hashlib

class ConsistentHashRing:
    def __init__(self, nodes=None, vnodes=150):
        self.ring = {}            # hash_position -> node_name
        self.sorted_keys = []     # sorted positions for binary search
        self.vnodes = vnodes      # virtual nodes per physical node
        if nodes:
            for node in nodes:
                self.add_node(node)

    def _hash(self, key):
        """Hash a string to a position on the ring (0 to 2^32-1)."""
        digest = hashlib.md5(key.encode()).hexdigest()
        return int(digest[:8], 16)  # use first 32 bits

    def add_node(self, node):
        """Add a physical node with `vnodes` virtual positions."""
        for i in range(self.vnodes):
            vnode_key = f"{node}:vnode{i}"
            pos = self._hash(vnode_key)
            self.ring[pos] = node
        self.sorted_keys = sorted(self.ring.keys())

    def remove_node(self, node):
        """Remove all virtual nodes for a physical node."""
        for i in range(self.vnodes):
            vnode_key = f"{node}:vnode{i}"
            pos = self._hash(vnode_key)
            del self.ring[pos]
        self.sorted_keys = sorted(self.ring.keys())

    def get_node(self, key):
        """Find the node responsible for a given key."""
        if not self.ring:
            return None
        h = self._hash(key)
        # Binary search for the first position >= h
        import bisect
        idx = bisect.bisect_left(self.sorted_keys, h)
        if idx == len(self.sorted_keys):
            idx = 0  # wrap around the ring
        return self.ring[self.sorted_keys[idx]]


# Usage:
ring = ConsistentHashRing(["nodeA", "nodeB", "nodeC"])
print(ring.get_node("user_42"))   # e.g., "nodeB"
print(ring.get_node("user_99"))   # e.g., "nodeA"

# Add a node — only ~1/N keys change ownership
ring.add_node("nodeD")
print(ring.get_node("user_42"))   # might change, might not

Coding Drill: Partition Rebalancer

python
class FixedPartitionRebalancer:
    """Implements fixed-partition rebalancing strategy.
    Create partitions once, move them between nodes on join/leave."""

    def __init__(self, num_partitions, initial_nodes):
        self.num_partitions = num_partitions
        self.nodes = list(initial_nodes)
        # Initial assignment: round-robin
        self.assignment = {}  # partition_id -> node_name
        for i in range(num_partitions):
            self.assignment[i] = self.nodes[i % len(self.nodes)]

    def get_load(self):
        """Returns dict of node -> partition count."""
        load = {n: 0 for n in self.nodes}
        for p, n in self.assignment.items():
            if n in load:
                load[n] += 1
        return load

    def add_node(self, node_name):
        """Add a node and rebalance: steal from overloaded nodes."""
        self.nodes.append(node_name)
        target = self.num_partitions // len(self.nodes)
        moved = []

        # Steal partitions from nodes that have too many
        load = self.get_load()
        for p_id in range(self.num_partitions):
            current_node = self.assignment[p_id]
            if load.get(current_node, 0) > target + 1 and load.get(node_name, 0) < target:
                self.assignment[p_id] = node_name
                load[current_node] -= 1
                load[node_name] = load.get(node_name, 0) + 1
                moved.append((p_id, current_node, node_name))
        return moved  # list of (partition, from_node, to_node)

    def remove_node(self, node_name):
        """Remove a node: distribute its partitions to remaining nodes."""
        self.nodes.remove(node_name)
        orphans = [p for p, n in self.assignment.items() if n == node_name]
        moved = []

        for p_id in orphans:
            # Assign to node with fewest partitions
            load = self.get_load()
            min_node = min(self.nodes, key=lambda n: load.get(n, 0))
            self.assignment[p_id] = min_node
            moved.append((p_id, node_name, min_node))
        return moved


# Usage:
rb = FixedPartitionRebalancer(12, ["A", "B", "C"])
print(rb.get_load())  # {'A': 4, 'B': 4, 'C': 4}

moved = rb.add_node("D")
print(f"Moved {len(moved)} partitions")  # 3
print(rb.get_load())  # {'A': 3, 'B': 3, 'C': 3, 'D': 3}

moved = rb.remove_node("B")
print(f"Moved {len(moved)} partitions")  # 3
print(rb.get_load())  # {'A': 4, 'C': 4, 'D': 4}

Coding Drill: Simple Partition Router

python
class PartitionRouter:
    """Routes keys to partitions using hash or range strategy."""

    def __init__(self, strategy="hash", num_partitions=8, boundaries=None):
        self.strategy = strategy
        self.num_partitions = num_partitions
        self.boundaries = boundaries or []  # for range strategy

    def route(self, key):
        if self.strategy == "hash":
            h = hash(key) % self.num_partitions
            return f"P{h}"
        elif self.strategy == "range":
            for i, boundary in enumerate(self.boundaries):
                if key <= boundary:
                    return f"P{i}"
            return f"P{len(self.boundaries)}"  # last partition

    def route_range(self, start, end):
        """For range strategy: which partitions cover [start, end]?"""
        if self.strategy == "hash":
            return [f"P{i}" for i in range(self.num_partitions)]  # all!
        partitions = set()
        partitions.add(self.route(start))
        partitions.add(self.route(end))
        # Also include any partition whose boundary falls within range
        for b in self.boundaries:
            if start <= b <= end:
                partitions.add(self.route(b))
        return sorted(partitions)

# Hash partitioning: point lookup is 1 partition, range is ALL
hr = PartitionRouter("hash", num_partitions=4)
print(hr.route("user_42"))         # "P2" (one partition)
print(hr.route_range("a", "z"))   # ["P0","P1","P2","P3"] (all!)

# Range partitioning: range query touches 1-2 partitions
rr = PartitionRouter("range", boundaries=[250, 500, 750])
print(rr.route(637))               # "P2" (one partition)
print(rr.route_range(400, 600))   # ["P1","P2"] (two partitions)

Problem 4: Design a Distributed Rate Limiter

// Rate limit: 100 requests per minute per user
// Challenge: rate limiter itself must be distributed

// Approach: partition rate limit counters by user_id
hash(user_id) → partition (one node per partition)
Each node maintains a sliding window counter for its users.

// Data structure per user:
user_id → {window_start: timestamp, count: int}
On each request: if now - window_start > 60s, reset. Else increment.
If count > 100, reject.

// Why partitioning matters:
If ALL rate limit checks hit one node: bottleneck.
With hash(user_id): each node handles 1/N of users. No hotspot
(unless one user sends 1M requests, but that's handled by per-key logic).

// Consistency concern:
If a user's requests hit different rate limiter nodes (race condition)?
Not possible with hash(user_id) — all requests for one user go to same node.
But if the rate limiter node fails, requests might route to a backup
without the current count. Accept brief over-counting after failover.

Problem 5: Design a Distributed Leaderboard

// Requirements: real-time leaderboard for 10M players in an online game
// Operations: update_score(player_id, score), get_rank(player_id), get_top_100()

// Naive approach: shard by player_id
hash(player_id) → partition
update_score: fast (single partition)
get_rank: IMPOSSIBLE without scatter-gather. Need ALL partitions to count
how many players have a higher score.
get_top_100: scatter-gather all partitions, merge top-100 from each. Slow.

// Better: sorted set (Redis ZSET) approach
If data fits in one machine: single Redis ZSET. O(log N) updates and rank lookups.
10M players * 100 bytes each = 1 GB. Fits in one machine easily.

// If too large for one machine: partition by SCORE RANGE
P0: scores 0-999, P1: scores 1000-1999, P2: scores 2000+
get_rank(player): find partition, count entries in all higher partitions.
Requires querying only higher-score partitions (not all!).

// Hybrid approach:
Keep the full sorted leaderboard in Redis (fits in RAM).
Shard the player details (profile, history) by player_id in PostgreSQL.
Score updates: write to Redis ZSET + update PostgreSQL asynchronously.
// Lesson: not everything needs the same shard strategy.

Problem 6: Shard a Time-Series Database

// Scenario: IoT platform ingesting 1M events/sec from 100K devices
// Event: {device_id, timestamp, sensor_readings[50 floats]}
// Event size: ~400 bytes. Daily volume: 1M * 86400 * 400B = 34 TB/day

// Query patterns:
// 1. "All readings from device X in the last hour" (90% of queries)
// 2. "Average temperature across all devices at 3pm" (5%)
// 3. "Devices with anomalous readings in the last 5 min" (5%)

// Shard key: device_id (compound: hash(device_id) + sort by timestamp)
// This makes query 1 a single-partition range scan (fast!)
// Query 2 and 3 require scatter-gather (acceptable, they're rare)

// Partition count:
100K devices / 10 devices per partition = 10,000 partitions
(10 devices per partition ensures each partition gets enough data to be useful)
10,000 partitions / 50 nodes = 200 partitions per node

// TTL: auto-delete data older than 90 days
// Without TTL: 34 TB/day * 90 days = 3.06 PB. Unfeasible.
// With TTL: data drops off the old end as it arrives on the new end.
// Partition size stabilizes at ~306 GB each (3.06 PB / 10K partitions)

The Five Dimensions Applied to Sharding

DimensionExample QuestionWhat a Staff Answer Adds
Concept"Explain consistent hashing"Discusses virtual nodes, load variance formula, why Cassandra reduced vnodes from 256 to 16
Design"Shard a user database for 100M users"Calculates partition count from storage/throughput constraints, addresses resharding strategy, secondary index choice
Code"Implement a consistent hash ring"Uses binary search for O(log N) lookup, handles vnodes, discusses hash function choice
Debug"One shard has 10x the data of others"Systematic diagnosis: check key cardinality, check for NULL/default values, check growth pattern. Proposes migration plan.
Frontier"How does Spanner do global sharding?"TrueTime, external consistency, Paxos per partition group, zone-aware placement

Debug Scenarios

SymptomDiagnosisFix
One shard 10x larger than othersShard key has low cardinality or skewed distributionRe-shard with higher-cardinality key or hash
Range queries are slowUsing hash partitioning (scatter-gather)Switch to range partitioning or Cassandra compound key
Writes spike one partitionSequential/timestamp keyPrefix with random or high-cardinality field
Cross-shard queries dominateWrong shard key for query patternRe-shard by dominant query dimension or denormalize
Rebalancing takes hoursToo few partitions (each is huge)Increase partition count (may require re-partitioning)
Stale secondary index resultsUsing async global indexAccept eventual consistency or switch to local index

Interview Red Flags (What NOT to Say)

Red flagWhy it's wrongWhat to say instead
"Just shard everything by user_id"Not all queries are by user_id. Secondary access patterns matter."What are the primary query patterns? Let me choose a shard key that optimizes the dominant access path."
"Use consistent hashing" (with no further detail)Consistent hashing is a mechanism, not a strategy. Doesn't address shard key choice, secondary indexes, or rebalancing."I'd use consistent hashing for the partition-to-node mapping, with hash(user_id) as the partition key, virtual nodes for even distribution, and a ZooKeeper-based routing tier."
"We'll handle hotspots when they happen"Reactive. Shows no foresight."Given the expected access patterns, celebrity accounts could hotspot partition N. I'd proactively add a random suffix for accounts with >1M followers."
"Shard by date for time-series"All current writes hit one shard."Compound key: hash(device_id) for partition, sort by timestamp within. This distributes writes while preserving per-device temporal ordering."
"Just add more shards when we need them"Ignores the rebalancing cost. Adding shards means moving data."I'd start with 4x the expected partition count to handle growth without resharding. Dynamic splitting as a fallback."

Whiteboard Template for System Design Interviews

When an interviewer says "this system needs to handle 10TB of data and 100K QPS," here is the framework to follow:

1. Identify the entity and access patterns
"What are we storing? How do users query it? What's the read/write ratio?"
2. Choose a shard key
"The dominant query is by X, so shard by X. Secondary patterns: Y (local index, scatter-gather acceptable for 5% of queries)."
3. Calculate partition count and node count
"10TB / 1GB target = 10K partitions. 100K QPS / 10K per node = 10 nodes. With replication factor 3: 30 nodes."
4. Choose partitioning strategy
"Hash for point lookups, range for scans, compound for both. Consistent hashing for the node mapping."
5. Address edge cases
"Hotspots: suffix hot keys. Rebalancing: fixed partitions with semi-automatic migration. Routing: client-side for low latency."
6. Growth plan
"At 10x scale: add nodes, partitions redistribute. At 100x: consider dynamic partitioning or resharding."
You're designing a chat application. Messages are stored with key (chat_room_id, message_id). Users typically view messages in one chat room at a time (range scan within a room). Which partitioning strategy is best?

Chapter 10: Connections

Sharding does not exist in isolation. It connects to every other concept in distributed systems.

Where Sharding Fits

TopicConnection to sharding
Replication (Ch 6)Each partition is replicated for fault tolerance. Replication happens within partitions — leader-follower per partition, not per database.
Transactions (Ch 8)Cross-partition transactions require two-phase commit or similar protocols. Single-partition transactions are fast and simple. Shard key choice determines how many transactions cross partitions.
Storage & Retrieval (Ch 4)Each partition uses the same storage engine (LSM-tree or B-tree). Understanding storage engines helps you predict partition performance.
Consensus (Ch 9)ZooKeeper (used for partition routing) is a consensus system. Partition leader election uses consensus algorithms like Raft or Paxos.

The Evolution of Partitioning

EraApproachExample
2000sApplication-level sharding (manual)MySQL with app routing logic
2007Dynamo paper: consistent hashingAmazon DynamoDB, Riak, Cassandra
2006-2012Range partitioning with auto-splitBigTable, HBase
2012+NewSQL: automatic sharding + transactionsSpanner, CockroachDB, TiDB
2020sServerless auto-partitioningDynamoDB on-demand, Neon, PlanetScale

What We Did Not Cover

This lesson focused on single-datacenter partitioning. In practice, partitions are also distributed across datacenters for geographic locality and disaster recovery. Cross-datacenter partitioning introduces additional latency and consistency challenges. The Spanner paper (Google, 2012) addresses this with TrueTime — GPS-synchronized clocks that enable globally consistent transactions across partitions in different continents.

We also did not cover resharding — the painful process of changing your shard key after the system is in production. This typically requires creating a parallel cluster with the new scheme, double-writing during migration, then cutting over. It can take weeks for large datasets and is one of the strongest arguments for getting the shard key right the first time.

We did not cover geo-partitioning — placing partitions in specific geographic regions to comply with data residency laws (GDPR requires EU user data to stay in the EU). CockroachDB's zone configs and Spanner's placement policies support this. Nor did we cover partition-level backups — taking consistent snapshots of individual partitions without stopping the entire cluster.

The Partitioning Landscape in 2026

The trend is toward automatic, transparent sharding. Newer databases like PlanetScale (MySQL-based), Neon (Postgres-based), and TiDB handle partitioning internally — the application sees a single logical database. Serverless databases like DynamoDB on-demand mode partition automatically based on load. The engineer's job shifts from "implement sharding" to "choose the right shard key and understand the trade-offs."

But understanding the mechanics still matters. When your DynamoDB table has a throttled partition, you need to know why. When your Elasticsearch cluster has one hot shard, you need to know how to rebalance. When an interviewer asks "design a distributed key-value store," you need to derive consistent hashing from first principles. The abstractions get better, but the fundamentals don't change.

Further Reading

ResourceWhy it matters
Amazon Dynamo paper (2007)The foundational work on consistent hashing with virtual nodes, quorum reads/writes, and gossip-based routing. Every NoSQL database descends from this.
Google Spanner paper (2012)How to do globally distributed range partitioning with strong consistency using TrueTime (GPS + atomic clocks). The "holy grail" of distributed databases.
Kafka: A Distributed Messaging System (2011)Clean example of hash partitioning for message ordering. Partition count fixed at creation, consumer groups assign partitions.
Cassandra Architecture docsBest practical description of consistent hashing with vnodes, gossip protocol, and compound primary keys.
Vitess Architecture docsHow MySQL sharding works at YouTube scale. Vindexes, VTGate routing, and resharding workflows.
CockroachDB blog: "How We Built a Range-Based Sharding System"Modern take on range partitioning with automatic splitting, Raft consensus per range, and SQL compatibility.
Recommended reading. The Amazon Dynamo paper (2007) — the foundational work on consistent hashing in production systems. The Google Spanner paper (2012) — how to do globally distributed partitioning with strong consistency. The Cassandra architecture documentation — the most accessible description of consistent hashing with virtual nodes.

The Decision Tree: When to Shard

Is your single-node database struggling?
If storage < 80% capacity, throughput < 70% capacity, and latency acceptable: DO NOT SHARD. Sharding adds complexity. Only shard when you must.
↓ Yes, it's struggling
Is the problem read throughput only?
Add read replicas first. Much simpler than sharding. Can often 10x read throughput with 10 replicas.
↓ No, writes/storage are the bottleneck
Do you need range queries on the partition key?
If yes: key range partitioning (accept hotspot risk). If no: hash partitioning (simpler, even distribution).
↓ Chosen strategy
Is your workload dominated by one query pattern?
If yes: shard by that query's key. If multiple patterns are equally important: consider compound keys or denormalization.
↓ Implemented
Do you need cross-shard queries?
Minimize them. Co-locate related data. Use local indexes and accept scatter-gather. Or add a search index (Elasticsearch) as a sidecar for secondary access patterns.

Key Takeaways for Interviews

#PrincipleOne-liner
1Shard key = dominant query dimensionShard by what you query by most
2Hash for point lookups, range for scansCan't have both unless compound key
3Fixed partitions for predictable dataChoose count upfront, never change it
4Dynamic partitions for unpredictable growthAuto-split large, merge small
5Consistent hashing minimizes rebalancingO(1/N) keys move when adding a node
6Hot keys need application-level handlingSuffix, cache, or dedicated replicas
7Local indexes: fast writes, slow readsScatter-gather for secondary lookups
8Global indexes: fast reads, slow writesAsync update means stale results
9Partition + replicate for productionSplit for capacity, copy for durability
10Never hash mod N80% of data moves on every node change

A Final Mental Model

Partitioning is fundamentally about divide and conquer. A problem too large for one machine gets divided into sub-problems, each assigned to a different machine. The challenges are all at the boundaries: routing (which machine has my sub-problem?), rebalancing (how to redivide when machines change?), and cross-partition operations (what if my query spans multiple sub-problems?).

The single most important decision you make when designing a partitioned system is the partition key. Everything else — routing, rebalancing, indexing — follows from this one choice. A good partition key co-locates data that is frequently accessed together. A bad partition key forces cross-partition operations for common queries. Spend more time choosing the key than on any other design decision.

And remember: the best partition is the one you don't need. If your data fits on one machine, do not partition. Sharding adds complexity at every level — schema changes, transactions, debugging, monitoring. Only shard when the math tells you a single machine cannot handle the load, the storage, or the latency requirements. Then shard minimally, with the simplest strategy that meets your needs.

"The key to performance is elegance, not battalions of special cases." — Jon Bentley