Hash partitioning, range partitioning, rebalancing — splitting data across machines.
Your e-commerce database has grown to 10 TB. Your single PostgreSQL server has 2 TB of SSD storage and can handle 10,000 queries per second at peak. Today is Black Friday. Traffic spikes to 50,000 queries per second. The database falls over. Users see spinning wheels. Revenue flatlines.
You already know about replication from Chapter 6 — making copies of the same data on multiple machines. Replication helps with read throughput (spread reads across replicas) and fault tolerance (if one copy dies, others survive). But replication does not help with the storage problem. Each replica holds the full 10 TB. And replication does not help with write throughput — in a leader-based setup, all writes still go to one machine.
You need a fundamentally different strategy: instead of copying all the data everywhere, split the data so each machine holds only a fraction. Five machines, each holding 2 TB. Ten machines, each handling 5,000 queries per second. This is partitioning — also called sharding in MongoDB and MySQL, region in HBase, tablet in Bigtable, vBucket in Couchbase, and vnode in Cassandra. Different names, same idea: break a large dataset into smaller pieces and spread them across nodes.
The concept sounds simple. The devil is in the details. If you split badly, one machine gets all the traffic while the others idle. If you split by a key that changes over time, data migrates constantly. If a machine fails, you need to redistribute its data without taking the whole system offline. If a client wants to query across partition boundaries, you need to scatter the request to all machines and gather the results — slow and expensive.
Let's be precise about the bottlenecks a single machine hits:
| Bottleneck | Typical limit | What happens when exceeded |
|---|---|---|
| Storage capacity | 2-16 TB per machine | Disk full. Writes fail. No more data. |
| Write throughput | 5K-50K writes/sec (SSD) | Write latency climbs from ms to seconds. Timeouts. |
| Read throughput | 10K-100K reads/sec | CPU saturated. All queries slow down. |
| Memory (working set) | 64-512 GB RAM | Hot data spills to disk. Latency jumps 100x. |
Replication alone can solve read throughput (add more read replicas) but cannot solve storage capacity or write throughput. Partitioning solves all four: each partition is small enough to fit on one machine, and writes are distributed across partitions.
The ideal partition scheme distributes data and query load evenly across nodes. If you have 10 nodes and each handles 10% of the data and 10% of the queries, you have achieved fair partitioning. If one node handles 50% of the queries while the other nine handle 5.5% each, that node is a hotspot. The system's throughput is limited by the hotspot — scaling up from 1 to 10 machines bought you only a 2x improvement instead of 10x.
Watch the simulation below. Five servers process incoming requests. First, they distribute evenly — each handles about 20% of traffic. Then a bad partition key causes a hotspot: one server gets crushed while others idle.
Watch how requests distribute across 5 servers. Click "Hotspot" to see what happens with a bad partition key.
The overloaded server's latency climbs, its queue fills, and eventually it starts dropping requests. Meanwhile, the other four servers are barely working. This is the worst-case outcome of bad partitioning — you paid for 5 machines but got the performance of 1.5.
Let's make this tangible. You run an e-commerce platform. Here's your current situation:
The math is clear. Partitioning extends your runway from 10 weeks to 4 years, and when you eventually need more capacity, you add partitions rather than replacing the entire server.
Every database calls partitions something different. This is pure historical accident, but you will encounter all of these terms in interviews and documentation:
| Database | Name for "partition" | Name for "partition key" |
|---|---|---|
| MongoDB | Shard / Chunk | Shard key |
| Cassandra | Partition (virtual node) | Partition key |
| DynamoDB | Partition | Partition key (hash key) |
| HBase | Region | Row key |
| BigTable | Tablet | Row key |
| Elasticsearch | Shard | Document _id (or routing field) |
| Kafka | Partition | Message key |
| CockroachDB | Range | Primary key prefix |
| MySQL (Vitess) | Shard | Vindex / sharding key |
Do not let the names confuse you. They all mean the same thing: a subset of the data that lives on one machine (or one group of replica machines).
The simplest partitioning scheme is the one you already know from encyclopedias. Volume A-D gets one shelf, E-K gets another, L-R another, S-Z another. Each partition owns a continuous range of keys, and the ranges are chosen so that data distributes roughly evenly.
This is key range partitioning. Assign each partition a contiguous range of the key space. Given a key, you determine its partition by binary-searching through the range boundaries. Keys within each partition are kept in sorted order, which makes range queries extremely efficient — if you want all records with keys between "2024-01-01" and "2024-01-31", you find the partition(s) covering that range and do a single sequential scan within each.
Say you have 1000 user IDs from 1 to 1000 and 4 partitions. The naive approach: partition 1 gets keys 1-250, partition 2 gets 251-500, partition 3 gets 501-750, partition 4 gets 751-1000. Boundaries are [250, 500, 750]. To find which partition owns key 637, binary search the boundaries: 637 > 500 and 637 ≤ 750, so it goes to partition 3.
The boundaries do not have to be evenly spaced. If keys 1-100 are accessed 10x more than keys 901-1000, you might set boundaries at [100, 300, 600] to balance load. Real systems like HBase and BigTable adapt boundaries dynamically — when a partition grows too large, it splits.
| System | Name for partition | Key range behavior |
|---|---|---|
| HBase | Region | Auto-splits regions when they exceed a configurable size (default 10 GB) |
| BigTable | Tablet | Auto-splits tablets, merges small adjacent ones |
| MongoDB | Chunk | Range-based sharding when you choose a range shard key |
| CockroachDB | Range | 64 MB default range size, automatic splitting and merging |
Key range partitioning has a dangerous failure mode. Suppose you are storing IoT sensor readings, and your key is a timestamp. All new data has a timestamp of "right now." All writes go to the partition holding the most recent time range. The other partitions — holding historical data — receive zero writes. You have a write hotspot on a single partition.
This is not hypothetical. It is the #1 mistake people make when sharding by timestamp in HBase, and it shows up in nearly every systems design interview where candidates propose timestamp-based keys.
The fix: prefix the timestamp with something that distributes writes. If you have 100 sensors, use `sensor_id + timestamp` as the key. Now writes spread across partitions because sensor IDs vary. The cost: to query all readings for a given time range across all sensors, you now need to query 100 separate ranges (one per sensor prefix) and merge the results. There is no free lunch.
In production systems, partition boundaries are not static. They adapt to the data distribution. Here's how HBase does it:
The beauty of adaptive splitting: the system automatically creates more partitions in "hot" regions of the key space. If 90% of writes land in keys starting with "2024-", that range will be split into many small partitions, while the "2020-" range remains a single large but rarely-accessed partition.
The downside: with a brand new table, you start with ONE region on ONE server. All writes hit that single server until the first split. This is the pre-splitting problem, and HBase's solution is to let you specify initial split points when creating a table: create 'events', {SPLITS => ['2024-04', '2024-07', '2024-10']}.
Add keys and watch them land in the correct partition. Toggle "Timestamp mode" to see the hotspot problem.
You have 4 partitions and receive sensor readings keyed by timestamp:
MongoDB's documentation has an excellent framework for evaluating shard keys. A good shard key has three properties, which MongoDB calls the "CAM" criteria:
| Property | Meaning | Bad example | Good example |
|---|---|---|---|
| Cardinality | Number of distinct values | continent (only 7 values = max 7 chunks) | user_id (millions of values) |
| Access frequency | Even distribution of read/write load | timestamp (all writes hit latest range) | hash(user_id) (writes spread evenly) |
| Monotonicity | Does the key always increase? | Auto-increment ID (always increasing = always last chunk) | UUID v4 (random = no monotonicity) |
The ideal shard key scores well on all three: high cardinality, even access frequency, and non-monotonic. In practice, you often sacrifice one. The most common sacrifice is access frequency — you accept that some keys are queried more than others and handle hotspots at the application layer.
MongoDB supports compound shard keys — using multiple fields together. This can resolve the monotonicity problem without losing query efficiency:
Key range partitioning's hotspot problem arises because keys with similar values cluster in the same partition. The fix is brutally simple: destroy the ordering. Run every key through a hash function that maps it to a seemingly random number, then partition by that hash value. Keys that were sequential ("user_001", "user_002", "user_003") now scatter across the hash space.
A good hash function for partitioning does not need to be cryptographically secure. It just needs to produce outputs that look uniformly distributed. MD5, MurmurHash, and xxHash are all common choices. The critical property: keys that are similar in value (like timestamps one second apart) must produce completely different hash values.
The trade-off is stark. Hash partitioning destroys key ordering. If you want "all users with IDs between 1000 and 2000", you cannot query a single partition — those IDs hash to scattered locations. You must send the query to all partitions and merge the results. This scatter-gather pattern is the price of even distribution.
Not all hash functions are suitable for partitioning. The key requirement is uniformity — the hash should distribute keys as evenly as possible across the output range. It does NOT need to be cryptographic (resistant to adversarial attacks). Common choices:
| Hash function | Speed | Output | Uniformity | Used by |
|---|---|---|---|---|
| MD5 | Slow (crypto-grade) | 128 bits | Excellent | Cassandra (Murmur3 default, MD5 legacy) |
| MurmurHash3 | Very fast | 32 or 128 bits | Excellent | Cassandra, Elasticsearch |
| xxHash | Fastest | 32 or 64 bits | Excellent | Some custom systems |
| Java hashCode() | Fast | 32 bits | Mediocre | DO NOT USE (not uniform) |
| CRC32 | Fast | 32 bits | Good for error detection, mediocre for distribution | Kafka (for partition assignment) |
hashCode() and Python's hash() are designed for hash tables, not for partitioning. They may not be consistent across processes (Python randomizes hash seeds by default since 3.3). Always use a deterministic hash function like MurmurHash3 or MD5 for partitioning. Every process must compute the same hash for the same key — otherwise keys land on different partitions depending on which process routes them.The simplest hash partitioning is partition = hash(key) mod N. This works, but it has a fatal flaw: when N changes (you add or remove a node), almost every key changes partition. If you go from 4 to 5 nodes, roughly 80% of keys move. That is an enormous amount of data migration.
Consistent hashing solves the rebalancing problem. The idea: arrange the hash space as a circle (a "ring") from 0 to 232-1. Place each node at a position on the ring (determined by hashing the node's name). To find which node owns a key, hash the key, then walk clockwise around the ring until you hit a node.
When you add a node, it takes over responsibility for the keys between itself and the next node counterclockwise. Only those keys move. When you remove a node, its keys transfer to the next node clockwise. In both cases, only ~1/N of the keys move — much better than the ~(N-1)/N of hash mod N.
In practice, consistent hashing with just N points on the ring leads to uneven distribution — nodes can end up with very different ranges. The solution is virtual nodes (vnodes): each physical node gets many positions on the ring (e.g., 256 vnodes per node). This smooths out the distribution. Cassandra, DynamoDB, and Riak all use virtual nodes.
Cassandra offers an elegant compromise between hash and range partitioning. In Cassandra, a primary key has two parts: the partition key (hashed to determine the partition) and the clustering columns (used to sort data within the partition).
This gives you the best of both worlds for certain query patterns: hash distribution across partitions (no hotspots) plus sorted ordering within each partition (efficient range queries on the clustering column).
Add keys to see them hash onto the ring and get assigned to nodes. Add/remove nodes to see minimal key movement.
Let's calculate exactly how consistent hashing compares to hash-mod-N when adding a node.
With just N physical nodes on the ring, the arcs between them can be very uneven. One node might "own" 40% of the ring while another owns 10%. Virtual nodes fix this by placing each physical node at many positions on the ring.
The trade-off: more vnodes means a larger ring metadata structure and more data movement when a node joins or leaves (because it takes over many small arcs from many nodes). Cassandra reduced its default from 256 to 16 vnodes in version 4.0, finding that 16 gives good enough balance with much less rebalancing overhead.
Hash partitioning distributes keys evenly. But it distributes load evenly only if all keys are accessed at similar rates. When a single key becomes extraordinarily popular — a celebrity's tweet goes viral, a product goes on sale, a breaking news article — the partition holding that key becomes a hotspot. And no amount of hashing can fix it, because all requests for the same key hash to the same partition.
This is fundamentally different from the key-range hotspot we saw in Chapter 1. That was a structural problem: sequential keys landing on the same partition because of their ordering. This is a workload problem: one key being so popular that its partition cannot handle the traffic, regardless of how cleverly you distributed keys.
Think of it this way: key-range hotspots are like a traffic jam caused by bad road design (all roads lead to one intersection). Hash-workload hotspots are like a traffic jam caused by a stadium concert — the roads are perfectly designed, but 50,000 people all want to go to the same place at the same time. No road redesign can fix that. You need parking lots (caches), shuttle buses (read replicas), or multiple entrances (key suffixing).
Let's calculate exactly how much throughput you lose to a hotspot.
Imagine a social media platform. User @taylor has 200 million followers. She posts a photo. Within seconds, 2 million users try to load the post. The post's key is post_id = 9847283. It hashes to partition 7. Partition 7 now receives 2 million reads per second while partitions 1-6 and 8-10 are nearly idle. Partition 7's server crashes under load. The post becomes unavailable — and it's the one post that millions of people are trying to see.
Solution 1: Random suffix. Append a random number (say 0-99) to the hot key before hashing. Instead of one key post_9847283, you now have 100 keys: post_9847283_00 through post_9847283_99. These hash to different partitions, spreading the load across up to 100 partitions.
The cost is real: reads now scatter-gather. To read the post, you pick a random suffix and read from one partition (fast). But to count total likes, you query all 100 suffixed records and sum them. And you need application-level logic to decide which keys get suffixed — the database cannot know which keys are hot.
Solution 2: Read replicas for hot partitions. Instead of splitting the key, replicate the partition. The partition holding the viral post gets additional read replicas. Writes still go to the leader, but reads can be served from any replica. This is a replication solution, not a partitioning solution — it uses the tools from Chapter 6.
Solution 3: Caching layer. Put a cache (Redis, Memcached) in front of the database. The first request for the viral post hits the database; subsequent requests hit the cache. The database partition never sees the spike. Most production systems use this approach — it is simple and effective.
Solution 4: DynamoDB Adaptive Capacity. DynamoDB's approach is interesting. Each partition has a throughput limit (initially evenly divided). If one partition gets hot, DynamoDB automatically borrows unused capacity from other partitions to give the hot one more throughput. If the entire table's capacity is exhausted, it splits the hot partition. But this only helps with partition-level hotspots, not single-key hotspots. If one key gets 90% of reads, DynamoDB cannot help — you need application-level splitting.
Random suffixes are not free. Let's quantify the cost for reads and writes:
In practice, most systems combine suffixing with a counter cache. The like count is maintained as a separate, periodically-aggregated counter rather than computed from scratch on every read. Twitter, for example, maintains per-tweet metrics in a separate system (a pre-computed count) rather than querying all suffix partitions.
Watch a viral post overwhelm one partition. Then apply random suffixes to spread the load.
Instagram's Hot Partition. When Instagram launched, they sharded their PostgreSQL database by user_id. This worked well until Justin Bieber posted a photo. The partition containing Bieber's user_id received millions of like-writes per second. The solution: they moved likes to a separate service with its own sharding (by post_id, not user_id), and added a write-back cache that batched like-count updates.
Twitter's Retweet Storm. A viral tweet getting retweeted creates a hotspot on the tweet's partition. Twitter's solution: a separate "timeline service" that pre-computes feeds. Retweet counts are maintained in a distributed counter system (not the main database), with eventual consistency — the count you see might be a few seconds behind reality.
DynamoDB Throttling. DynamoDB automatically partitions tables and allocates throughput (Read Capacity Units, Write Capacity Units) evenly across partitions. If your table has 1000 RCU and 4 partitions, each gets 250 RCU. If one partition receives 400 RCU of traffic, it gets throttled — even though the table-level limit (1000) is not reached. This is the "hot partition" problem, and it caught many early DynamoDB users off guard. DynamoDB's adaptive capacity (launched 2017) partially addresses this by borrowing unused capacity from cold partitions.
Everything so far assumes you are looking up records by their primary key. Partition the primary key, hash or range, and you know exactly which node to ask. One hop. Fast. But real applications rarely query by primary key alone. "Show me all cars listed for sale in San Francisco that are red." The primary key is car_id. You are querying by city and color. These are secondary indexes — and they are the hardest part of partitioning.
A secondary index is a data structure that maps a non-primary-key field to the set of records that have a given value for that field. Index on color=red returns [car_17, car_42, car_93, ...]. Without an index, you'd have to scan every record on every partition. With an index, you can jump directly to the matching records. But where does the index live?
There are exactly two strategies, and they have opposite trade-offs.
Each partition maintains its own secondary index covering only the documents stored on that partition. If partition 1 holds car_17 and car_93 (both red), partition 1's local index for color=red points to those two. If partition 3 holds car_42 (also red), partition 3's local index has that entry.
Writes are fast — you only update the local index on the partition that holds the document. Reads with secondary indexes are slow — you must query all partitions (scatter-gather), merge results, and handle failures from any partition. This is called a scatter/gather query, and it is the primary drawback of local indexes.
MongoDB, Riak, Cassandra, Elasticsearch, SolrCloud, and VoltDB all use document-partitioned (local) indexes.
Instead of each partition indexing its own documents, you build a global secondary index that covers all documents across all partitions — and then you partition the index itself.
Reads are fast — one lookup in the global index finds all matching documents. Writes are slow — adding or modifying a document may require updating index entries on different partitions, which means cross-partition writes (possibly requiring distributed transactions or at minimum network hops).
In practice, global indexes are often updated asynchronously. The write returns immediately, and the index is updated later. This means the index may be stale — a newly written document might not appear in index results for a short time. DynamoDB's global secondary indexes work this way.
Let's calculate how much scatter-gather actually costs. This matters because "query all partitions" sounds expensive, but the actual cost depends on your setup.
This is why Elasticsearch (which uses scatter-gather for every search query) carefully optimizes its p99 latency. Every additional shard you add to an index increases the probability of one shard being slow. Elasticsearch's recommendation: keep shard count low (one shard per 10-50 GB of data), and do not over-shard.
Elasticsearch is a particularly instructive case because it uses local indexes exclusively. Every search query is a scatter-gather across all shards of an index. Here's how production systems manage this:
Compare the same query against local (scatter-gather) vs. global (single-lookup) indexes.
| Property | Local (Document-Partitioned) | Global (Term-Partitioned) |
|---|---|---|
| Write speed | Fast (update one partition) | Slow (may update remote index partition) |
| Read speed | Slow (scatter-gather all partitions) | Fast (single index lookup) |
| Consistency | Immediate (local update) | Often async (stale reads possible) |
| Used by | MongoDB, Cassandra, Elasticsearch | DynamoDB GSI, Riak search |
Many production systems avoid the local-vs-global dilemma entirely by using a sidecar search index. The primary database handles writes and primary key lookups (sharded by the main key). A separate search system (Elasticsearch, Solr) handles secondary index queries.
This pattern is so common that it has a name: CQRS (Command Query Responsibility Segregation). The "command" side (writes) is optimized for writes. The "query" side (reads) is optimized for reads. They use different data structures, different indexing strategies, and often different databases entirely.
When a partition moves from node A to node B, what happens to its local index? The index must move with the partition. This means the rebalancing process must transfer not just the data, but also all local indexes. For a partition with 1 GB of data and 500 MB of indexes, you're actually transferring 1.5 GB.
With global indexes, the situation is worse. If partitions move, the global index entries that point to those partitions must be updated. Every moved record triggers an index update. This is why global indexes are typically updated asynchronously — doing it synchronously during rebalancing would make rebalancing unacceptably slow.
Nothing stays the same. You add machines to handle growth. A machine fails and must be replaced. The dataset grows, and partitions that were once balanced become lopsided. Data must move from one node to another. This is rebalancing — the process of reassigning partitions to nodes so that load and storage are distributed fairly.
Rebalancing must satisfy three requirements:
| Requirement | Why |
|---|---|
| Fair distribution after | Each node should handle roughly equal load/storage |
| Availability during | The system must continue serving reads and writes while rebalancing |
| Minimal movement | Move only the data that needs to move — not everything |
Create a large number of partitions upfront — say 1000 partitions for 10 nodes (100 per node). Each partition is small. When you add an 11th node, it "steals" partitions from existing nodes. The partition count never changes; only the assignment of partitions to nodes changes.
The key design decision: how many partitions to create initially. Too few, and partitions become too large (slow rebalancing, too much data per partition). Too many, and the overhead of tracking partition metadata becomes significant. Riak, Elasticsearch, Couchbase, and Voldemort all use this strategy. The partition count is typically set at cluster creation time and never changed.
With dynamic partitioning, partitions split and merge automatically based on size. When a partition exceeds a configured threshold (say 10 GB), it splits into two. When adjacent partitions shrink below a threshold, they merge. This is exactly like how B-tree pages split — in fact, HBase regions and BigTable tablets work this way.
The advantage: partition count adapts to data volume. Start small, grow organically. The disadvantage: with a brand-new database, you start with one partition on one node. All writes go to that single node until it splits. HBase mitigates this with pre-splitting: you specify initial split points so the database starts with multiple partitions from day one.
Cassandra takes a third approach: fix the number of partitions per node (say 256). When you add a node, it creates 256 new partitions by splitting 256 randomly chosen existing partitions. The total partition count grows with the cluster.
This keeps partition size roughly constant as the cluster grows. More nodes = more partitions = smaller partitions. Each split moves half the data from an existing partition to the new node.
Let's trace the same scenario through all three strategies. Starting point: 3 nodes, 300 GB of data. Goal: add a 4th node.
Start with 3 nodes and 12 partitions. Add/remove nodes and watch partitions migrate. Track data movement.
Should rebalancing be fully automatic (database decides when and what to move) or manual (operator triggers rebalancing)?
Fully automatic sounds appealing but is dangerous. If a node is slow due to a transient issue (GC pause, network blip), an automatic rebalancer might conclude it's dead and start moving its data to other nodes. This creates a massive amount of network traffic, which makes the problem worse — other nodes slow down under the rebalancing load, which triggers more rebalancing. A cascading failure.
Most production systems use a semi-automatic approach: the database proposes a rebalancing plan, but a human (or an external orchestrator) must approve it. Couchbase, Riak, and Voldemort generate suggested partition assignments that an admin reviews before executing.
During rebalancing, the system moves data across the network. This consumes bandwidth that would otherwise serve user requests. Every production system has a throttle — a limit on how fast data can move during rebalancing.
It's important to distinguish between two different operations that both fall under "rebalancing":
| Operation | What happens | When | Data movement |
|---|---|---|---|
| Partition movement | Entire partition moves from node A to node B | Adding/removing nodes | Full partition data transferred |
| Partition splitting | One partition splits into two on different nodes | Partition exceeds size threshold | Half the data moves to new location |
| Partition merging | Two adjacent partitions combine into one | Partitions shrink below threshold | One partition's data moves to neighbor's node |
Movement happens during scale-out events. Splitting happens during data growth. Merging happens during data shrinkage or TTL expiration. All three must happen without downtime. All three must be throttled to avoid impacting serving.
The hardest part of rebalancing is not the algorithm — it's doing it without downtime. Data must move from one node to another while both nodes continue serving requests. Here's how it works in practice:
This is essentially the same technique as database replication — the target "catches up" like a new follower. The brief pause at cutover ensures no writes are lost. Elasticsearch, CockroachDB, and HBase all use variants of this approach.
partition = hash(key) mod N, and when N changes, recalculate. But this moves ~(N-1)/N of all keys on every change. Adding one node to a 10-node cluster moves 90% of data. Use fixed partitions, dynamic partitioning, or consistent hashing instead.| Strategy | Partition count | When to use | Used by |
|---|---|---|---|
| Fixed | Set at creation, never changes | Predictable dataset size | Riak, Elasticsearch, Couchbase |
| Dynamic | Grows/shrinks with data | Unpredictable growth | HBase, MongoDB, CockroachDB |
| Proportional | Fixed per node | Steady growth, many nodes | Cassandra |
What happens when rebalancing itself fails? This is the nightmare scenario that keeps SREs awake at night.
A client has a key. It needs to read or write the record for that key. But the data is spread across 10 nodes. Which node has the partition for this key? This is the request routing problem — also called service discovery for partitioned systems.
There are three fundamental approaches, and every distributed database uses one of them (or a hybrid).
The client sends the request to any node in the cluster. If that node owns the partition for the requested key, it handles the request directly. If not, it forwards the request to the correct node, receives the response, and relays it back to the client.
Every node must know the full partition-to-node mapping. They learn this through a gossip protocol: nodes periodically exchange state information with each other. When partitions move (rebalancing), nodes gossip the updated mapping until all nodes converge. Cassandra and Riak use this approach.
A separate coordination service maintains the authoritative partition-to-node mapping. The client first asks the routing tier "which node owns partition for key X?", then contacts that node directly.
ZooKeeper is the most common coordination service for this. It stores the mapping and notifies all interested parties when it changes (via watches). When partitions rebalance, the database updates ZooKeeper, which pushes the change to the routing tier and any clients with active watches.
LinkedIn's Espresso, HBase, SolrCloud, and Kafka all use ZooKeeper for partition discovery.
The client itself knows the partition-to-node mapping. It computes hash(key), determines the partition, looks up the node, and connects directly. No intermediary. The client library subscribes to ZooKeeper watches (or gossip) to keep its mapping up to date.
MongoDB uses a hybrid: a dedicated routing process called mongos (approach 2) that clients talk to, plus config servers that store the mapping (approach 2 again). Some MongoDB drivers can also cache the mapping for direct routing (approach 3).
Kafka is a fascinating case study because it uses partitioning for message ordering. Each Kafka topic is divided into partitions, and messages with the same key always go to the same partition (consistent assignment). This guarantees that messages for a given entity are processed in order.
Kafka's partition count is fixed at topic creation time and cannot be changed without creating a new topic (similar to the fixed partitioning strategy). This is why choosing the right partition count upfront is critical. Too few partitions limits consumer parallelism; too many increases coordination overhead and end-to-end latency.
What happens when the node that owns a partition is completely down?
Animate the same request through all three routing approaches. Watch the hops.
| Approach | Latency | Complexity | Dependency | Used by |
|---|---|---|---|---|
| Gossip | 1-2 hops | Low (client is simple) | None (peer-to-peer) | Cassandra, Riak |
| Routing tier | 1 hop + lookup | Medium | ZooKeeper/etcd | HBase, Kafka, Espresso |
| Client-side | 0 extra hops | High (smart client) | ZooKeeper watches | MongoDB drivers |
All routing approaches face the same problem: between the time partitions rebalance and the time all clients/routers learn the new mapping, requests may go to the wrong node. How do systems handle this?
ZooKeeper is used by HBase, Kafka, SolrCloud, and others as the "source of truth" for partition assignments. Here's the specific data it stores and how changes propagate:
Partitioning enables parallel query execution. A query that scans an entire table can be split: each partition processes its share in parallel, and results are merged. This is the essence of MapReduce and modern distributed SQL engines.
In a real distributed database, partitioning and replication work together. Each partition is replicated to multiple nodes for fault tolerance. A node might be the leader for partitions 1 and 3, and a follower for partitions 5 and 7. Every node wears multiple hats.
Consider a system with 6 partitions and 4 nodes, with a replication factor of 3 (each partition has 3 copies: 1 leader + 2 followers):
This is the key insight: failure is per-partition, not per-node. When a node dies, it only affects the partitions it was leading. Each of those partitions has followers on other nodes that can take over leadership independently. The rest of the system is unaffected.
With a replication factor of 3, each partition needs 3 different nodes. The placement algorithm must ensure that no two replicas of the same partition are on the same node (or, better, the same rack, or the same data center). This is replica placement — and it's a constraint satisfaction problem.
A common optimization: route read queries to the nearest follower instead of the leader. This reduces read latency (especially cross-datacenter) and reduces load on the leader. The trade-off: followers may lag behind the leader, so reads from followers might return stale data. Chapter 6 covered this in detail — read-your-writes consistency, monotonic reads, and consistent prefix reads are all relevant here.
For global applications, partitions and their replicas span multiple datacenters. The placement gets more complex:
4 nodes, 6 partitions, replication factor 3. Click a node to simulate failure — watch leadership transfer.
In leaderless replication systems (Cassandra, Riak, DynamoDB), reads and writes use quorums. With replication factor N, a write is confirmed when W replicas acknowledge, and a read returns after R replicas respond. As long as W + R > N, you get strong consistency (at least one replica in the read set saw the latest write).
Theory is clean. Practice is messy. Every real database makes specific trade-offs in how it implements partitioning. Let's survey the major systems and the decisions they've made — then look at the anti-patterns that trip up engineers in production.
| System | Partition strategy | Rebalancing | Routing | Secondary indexes |
|---|---|---|---|---|
| Cassandra | Hash (consistent hashing + vnodes) | Proportional to nodes | Gossip (any node) | Local (materialized views) |
| MongoDB | Hash OR range (configurable) | Dynamic (chunk splitting) | mongos routing tier | Local (scatter-gather) |
| DynamoDB | Hash (partition key) | Dynamic + adaptive | Client-side (SDK) | Global (GSI, async) |
| HBase | Range (sorted by row key) | Dynamic (region splitting) | ZooKeeper + META table | None (use Phoenix or secondary) |
| Elasticsearch | Hash (document _id) | Fixed (set at index creation) | Any node forwards | Local (all shards queried) |
| CockroachDB | Range (sorted keys) | Dynamic (range splitting) | Gossip + leaseholder | Global (distributed txn) |
| PostgreSQL Citus | Hash (distribution column) | Semi-manual (shard rebalancer) | Coordinator node | Local |
| Vitess (MySQL) | Range (vindex-based) | Semi-manual (SplitClone) | vtgate routing tier | Local (scatter-gather) |
You're building a URL shortener (like bit.ly) that handles 100K writes/sec and 1M reads/sec. Each short URL maps to a long URL. How do you shard it?
This is one of the most-asked system design questions. Let's partition a Twitter-like timeline.
Anti-pattern 1: Sharding too early. A startup with 10 GB of data and 500 queries/sec does not need sharding. A single PostgreSQL server handles this easily. Sharding adds complexity: cross-shard queries, distributed transactions, schema changes across shards, operational overhead. Shard when you must, not when you might.
Anti-pattern 2: Wrong shard key. Choosing user_id as the shard key when 90% of queries filter by date_range. Every query becomes scatter-gather. The shard key must match your dominant query pattern.
Anti-pattern 3: Cross-shard joins. "SELECT orders.* JOIN users ON orders.user_id = users.id" where orders and users are on different shards. This requires fetching data from multiple shards and joining in the application layer. Either co-locate related data on the same shard (shard both tables by user_id) or denormalize (embed user info in the order record).
Anti-pattern 4: Ignoring shard key immutability. If you shard by user_id and a user changes their ID (or you shard by email and users change emails), the record must move to a different shard. Most systems don't support this well. Choose an immutable shard key.
Anti-pattern 5: Too many partitions. "More partitions = more parallelism" seems logical but has overhead. Each partition has metadata (routing info, replica locations, compaction state). With 100,000 partitions, the metadata alone can consume significant memory on the coordinator node. Elasticsearch recommends no more than 20 shards per GB of heap memory on the master node.
Anti-pattern 6: Cross-shard transactions. "Just use two-phase commit across shards" is technically possible but practically devastating to performance. A cross-shard transaction requires locking rows on multiple nodes, coordinating a prepare phase, then a commit phase. If any node is slow or down, the transaction blocks. CockroachDB and Spanner support this but at significant latency cost. The better answer: design your shard key so that transactions stay within a single shard. In an e-commerce system, shard by customer_id — then "add item to cart + update cart total" is a single-shard transaction.
What happens when you chose the wrong shard key and need to change it? This is resharding — one of the most painful operations in distributed systems.
In production, you need to monitor these metrics per partition:
| Metric | What it tells you | Alarm threshold |
|---|---|---|
| Partition size (bytes) | Is one partition growing faster than others? | >2x average = investigate shard key skew |
| Requests/sec per partition | Is one partition getting more traffic? | >3x average = possible hotspot |
| p99 latency per partition | Is one partition slower? | >2x average = disk full, compaction lag, or GC |
| Replication lag per partition | Are followers keeping up? | >1 second = follower overloaded or network issue |
| Scatter-gather query count | How many queries touch all partitions? | >50% of queries = wrong shard key for your workload |
| Cross-partition transaction rate | How many transactions span partitions? | >10% = consider changing shard key to co-locate |
The most dangerous situation is a silent hotspot: one partition is overloaded but the system-wide metrics look fine because the other partitions are averaging it out. Always monitor per-partition, not just per-node or per-cluster.
Adding a column, creating an index, or changing a data type must happen on every partition. In a non-sharded database, this is one ALTER TABLE command. In a sharded database with 1000 partitions, it's 1000 ALTER TABLE commands — executed without downtime.
Visual comparison of partitioning strategies across real databases.
Sharding appears in nearly every FAANG systems design interview. The interviewer gives you a system with "billions of records" or "petabytes of data," and you must partition it. Here is everything you need, organized by interview dimension.
| Strategy | Pros | Cons | Use when |
|---|---|---|---|
| Key range | Efficient range queries, sorted scans | Hotspots on sequential keys | Time-series with non-timestamp prefix |
| Hash | Even distribution, no hotspots on sequential keys | No range queries, scatter-gather | Point lookups (user profiles, KV store) |
| Compound (Cassandra) | Hash distribution + sorted within partition | Range queries only on clustering column | User feeds, chat histories, IoT by device |
| Geographic | Data locality for regional queries | Uneven population distribution | Location-based services |
Problem 1: Design a distributed key-value store.
Problem 2: Design a social media feed system (fan-out on read vs write).
Problem 3: Shard a user database for 100M users.
python import hashlib class ConsistentHashRing: def __init__(self, nodes=None, vnodes=150): self.ring = {} # hash_position -> node_name self.sorted_keys = [] # sorted positions for binary search self.vnodes = vnodes # virtual nodes per physical node if nodes: for node in nodes: self.add_node(node) def _hash(self, key): """Hash a string to a position on the ring (0 to 2^32-1).""" digest = hashlib.md5(key.encode()).hexdigest() return int(digest[:8], 16) # use first 32 bits def add_node(self, node): """Add a physical node with `vnodes` virtual positions.""" for i in range(self.vnodes): vnode_key = f"{node}:vnode{i}" pos = self._hash(vnode_key) self.ring[pos] = node self.sorted_keys = sorted(self.ring.keys()) def remove_node(self, node): """Remove all virtual nodes for a physical node.""" for i in range(self.vnodes): vnode_key = f"{node}:vnode{i}" pos = self._hash(vnode_key) del self.ring[pos] self.sorted_keys = sorted(self.ring.keys()) def get_node(self, key): """Find the node responsible for a given key.""" if not self.ring: return None h = self._hash(key) # Binary search for the first position >= h import bisect idx = bisect.bisect_left(self.sorted_keys, h) if idx == len(self.sorted_keys): idx = 0 # wrap around the ring return self.ring[self.sorted_keys[idx]] # Usage: ring = ConsistentHashRing(["nodeA", "nodeB", "nodeC"]) print(ring.get_node("user_42")) # e.g., "nodeB" print(ring.get_node("user_99")) # e.g., "nodeA" # Add a node — only ~1/N keys change ownership ring.add_node("nodeD") print(ring.get_node("user_42")) # might change, might not
python class FixedPartitionRebalancer: """Implements fixed-partition rebalancing strategy. Create partitions once, move them between nodes on join/leave.""" def __init__(self, num_partitions, initial_nodes): self.num_partitions = num_partitions self.nodes = list(initial_nodes) # Initial assignment: round-robin self.assignment = {} # partition_id -> node_name for i in range(num_partitions): self.assignment[i] = self.nodes[i % len(self.nodes)] def get_load(self): """Returns dict of node -> partition count.""" load = {n: 0 for n in self.nodes} for p, n in self.assignment.items(): if n in load: load[n] += 1 return load def add_node(self, node_name): """Add a node and rebalance: steal from overloaded nodes.""" self.nodes.append(node_name) target = self.num_partitions // len(self.nodes) moved = [] # Steal partitions from nodes that have too many load = self.get_load() for p_id in range(self.num_partitions): current_node = self.assignment[p_id] if load.get(current_node, 0) > target + 1 and load.get(node_name, 0) < target: self.assignment[p_id] = node_name load[current_node] -= 1 load[node_name] = load.get(node_name, 0) + 1 moved.append((p_id, current_node, node_name)) return moved # list of (partition, from_node, to_node) def remove_node(self, node_name): """Remove a node: distribute its partitions to remaining nodes.""" self.nodes.remove(node_name) orphans = [p for p, n in self.assignment.items() if n == node_name] moved = [] for p_id in orphans: # Assign to node with fewest partitions load = self.get_load() min_node = min(self.nodes, key=lambda n: load.get(n, 0)) self.assignment[p_id] = min_node moved.append((p_id, node_name, min_node)) return moved # Usage: rb = FixedPartitionRebalancer(12, ["A", "B", "C"]) print(rb.get_load()) # {'A': 4, 'B': 4, 'C': 4} moved = rb.add_node("D") print(f"Moved {len(moved)} partitions") # 3 print(rb.get_load()) # {'A': 3, 'B': 3, 'C': 3, 'D': 3} moved = rb.remove_node("B") print(f"Moved {len(moved)} partitions") # 3 print(rb.get_load()) # {'A': 4, 'C': 4, 'D': 4}
python class PartitionRouter: """Routes keys to partitions using hash or range strategy.""" def __init__(self, strategy="hash", num_partitions=8, boundaries=None): self.strategy = strategy self.num_partitions = num_partitions self.boundaries = boundaries or [] # for range strategy def route(self, key): if self.strategy == "hash": h = hash(key) % self.num_partitions return f"P{h}" elif self.strategy == "range": for i, boundary in enumerate(self.boundaries): if key <= boundary: return f"P{i}" return f"P{len(self.boundaries)}" # last partition def route_range(self, start, end): """For range strategy: which partitions cover [start, end]?""" if self.strategy == "hash": return [f"P{i}" for i in range(self.num_partitions)] # all! partitions = set() partitions.add(self.route(start)) partitions.add(self.route(end)) # Also include any partition whose boundary falls within range for b in self.boundaries: if start <= b <= end: partitions.add(self.route(b)) return sorted(partitions) # Hash partitioning: point lookup is 1 partition, range is ALL hr = PartitionRouter("hash", num_partitions=4) print(hr.route("user_42")) # "P2" (one partition) print(hr.route_range("a", "z")) # ["P0","P1","P2","P3"] (all!) # Range partitioning: range query touches 1-2 partitions rr = PartitionRouter("range", boundaries=[250, 500, 750]) print(rr.route(637)) # "P2" (one partition) print(rr.route_range(400, 600)) # ["P1","P2"] (two partitions)
| Dimension | Example Question | What a Staff Answer Adds |
|---|---|---|
| Concept | "Explain consistent hashing" | Discusses virtual nodes, load variance formula, why Cassandra reduced vnodes from 256 to 16 |
| Design | "Shard a user database for 100M users" | Calculates partition count from storage/throughput constraints, addresses resharding strategy, secondary index choice |
| Code | "Implement a consistent hash ring" | Uses binary search for O(log N) lookup, handles vnodes, discusses hash function choice |
| Debug | "One shard has 10x the data of others" | Systematic diagnosis: check key cardinality, check for NULL/default values, check growth pattern. Proposes migration plan. |
| Frontier | "How does Spanner do global sharding?" | TrueTime, external consistency, Paxos per partition group, zone-aware placement |
| Symptom | Diagnosis | Fix |
|---|---|---|
| One shard 10x larger than others | Shard key has low cardinality or skewed distribution | Re-shard with higher-cardinality key or hash |
| Range queries are slow | Using hash partitioning (scatter-gather) | Switch to range partitioning or Cassandra compound key |
| Writes spike one partition | Sequential/timestamp key | Prefix with random or high-cardinality field |
| Cross-shard queries dominate | Wrong shard key for query pattern | Re-shard by dominant query dimension or denormalize |
| Rebalancing takes hours | Too few partitions (each is huge) | Increase partition count (may require re-partitioning) |
| Stale secondary index results | Using async global index | Accept eventual consistency or switch to local index |
| Red flag | Why it's wrong | What to say instead |
|---|---|---|
| "Just shard everything by user_id" | Not all queries are by user_id. Secondary access patterns matter. | "What are the primary query patterns? Let me choose a shard key that optimizes the dominant access path." |
| "Use consistent hashing" (with no further detail) | Consistent hashing is a mechanism, not a strategy. Doesn't address shard key choice, secondary indexes, or rebalancing. | "I'd use consistent hashing for the partition-to-node mapping, with hash(user_id) as the partition key, virtual nodes for even distribution, and a ZooKeeper-based routing tier." |
| "We'll handle hotspots when they happen" | Reactive. Shows no foresight. | "Given the expected access patterns, celebrity accounts could hotspot partition N. I'd proactively add a random suffix for accounts with >1M followers." |
| "Shard by date for time-series" | All current writes hit one shard. | "Compound key: hash(device_id) for partition, sort by timestamp within. This distributes writes while preserving per-device temporal ordering." |
| "Just add more shards when we need them" | Ignores the rebalancing cost. Adding shards means moving data. | "I'd start with 4x the expected partition count to handle growth without resharding. Dynamic splitting as a fallback." |
When an interviewer says "this system needs to handle 10TB of data and 100K QPS," here is the framework to follow:
Sharding does not exist in isolation. It connects to every other concept in distributed systems.
| Topic | Connection to sharding |
|---|---|
| Replication (Ch 6) | Each partition is replicated for fault tolerance. Replication happens within partitions — leader-follower per partition, not per database. |
| Transactions (Ch 8) | Cross-partition transactions require two-phase commit or similar protocols. Single-partition transactions are fast and simple. Shard key choice determines how many transactions cross partitions. |
| Storage & Retrieval (Ch 4) | Each partition uses the same storage engine (LSM-tree or B-tree). Understanding storage engines helps you predict partition performance. |
| Consensus (Ch 9) | ZooKeeper (used for partition routing) is a consensus system. Partition leader election uses consensus algorithms like Raft or Paxos. |
| Era | Approach | Example |
|---|---|---|
| 2000s | Application-level sharding (manual) | MySQL with app routing logic |
| 2007 | Dynamo paper: consistent hashing | Amazon DynamoDB, Riak, Cassandra |
| 2006-2012 | Range partitioning with auto-split | BigTable, HBase |
| 2012+ | NewSQL: automatic sharding + transactions | Spanner, CockroachDB, TiDB |
| 2020s | Serverless auto-partitioning | DynamoDB on-demand, Neon, PlanetScale |
This lesson focused on single-datacenter partitioning. In practice, partitions are also distributed across datacenters for geographic locality and disaster recovery. Cross-datacenter partitioning introduces additional latency and consistency challenges. The Spanner paper (Google, 2012) addresses this with TrueTime — GPS-synchronized clocks that enable globally consistent transactions across partitions in different continents.
We also did not cover resharding — the painful process of changing your shard key after the system is in production. This typically requires creating a parallel cluster with the new scheme, double-writing during migration, then cutting over. It can take weeks for large datasets and is one of the strongest arguments for getting the shard key right the first time.
We did not cover geo-partitioning — placing partitions in specific geographic regions to comply with data residency laws (GDPR requires EU user data to stay in the EU). CockroachDB's zone configs and Spanner's placement policies support this. Nor did we cover partition-level backups — taking consistent snapshots of individual partitions without stopping the entire cluster.
The trend is toward automatic, transparent sharding. Newer databases like PlanetScale (MySQL-based), Neon (Postgres-based), and TiDB handle partitioning internally — the application sees a single logical database. Serverless databases like DynamoDB on-demand mode partition automatically based on load. The engineer's job shifts from "implement sharding" to "choose the right shard key and understand the trade-offs."
But understanding the mechanics still matters. When your DynamoDB table has a throttled partition, you need to know why. When your Elasticsearch cluster has one hot shard, you need to know how to rebalance. When an interviewer asks "design a distributed key-value store," you need to derive consistent hashing from first principles. The abstractions get better, but the fundamentals don't change.
| Resource | Why it matters |
|---|---|
| Amazon Dynamo paper (2007) | The foundational work on consistent hashing with virtual nodes, quorum reads/writes, and gossip-based routing. Every NoSQL database descends from this. |
| Google Spanner paper (2012) | How to do globally distributed range partitioning with strong consistency using TrueTime (GPS + atomic clocks). The "holy grail" of distributed databases. |
| Kafka: A Distributed Messaging System (2011) | Clean example of hash partitioning for message ordering. Partition count fixed at creation, consumer groups assign partitions. |
| Cassandra Architecture docs | Best practical description of consistent hashing with vnodes, gossip protocol, and compound primary keys. |
| Vitess Architecture docs | How MySQL sharding works at YouTube scale. Vindexes, VTGate routing, and resharding workflows. |
| CockroachDB blog: "How We Built a Range-Based Sharding System" | Modern take on range partitioning with automatic splitting, Raft consensus per range, and SQL compatibility. |
| # | Principle | One-liner |
|---|---|---|
| 1 | Shard key = dominant query dimension | Shard by what you query by most |
| 2 | Hash for point lookups, range for scans | Can't have both unless compound key |
| 3 | Fixed partitions for predictable data | Choose count upfront, never change it |
| 4 | Dynamic partitions for unpredictable growth | Auto-split large, merge small |
| 5 | Consistent hashing minimizes rebalancing | O(1/N) keys move when adding a node |
| 6 | Hot keys need application-level handling | Suffix, cache, or dedicated replicas |
| 7 | Local indexes: fast writes, slow reads | Scatter-gather for secondary lookups |
| 8 | Global indexes: fast reads, slow writes | Async update means stale results |
| 9 | Partition + replicate for production | Split for capacity, copy for durability |
| 10 | Never hash mod N | 80% of data moves on every node change |
Partitioning is fundamentally about divide and conquer. A problem too large for one machine gets divided into sub-problems, each assigned to a different machine. The challenges are all at the boundaries: routing (which machine has my sub-problem?), rebalancing (how to redivide when machines change?), and cross-partition operations (what if my query spans multiple sub-problems?).
The single most important decision you make when designing a partitioned system is the partition key. Everything else — routing, rebalancing, indexing — follows from this one choice. A good partition key co-locates data that is frequently accessed together. A bad partition key forces cross-partition operations for common queries. Spend more time choosing the key than on any other design decision.
And remember: the best partition is the one you don't need. If your data fits on one machine, do not partition. Sharding adds complexity at every level — schema changes, transactions, debugging, monitoring. Only shard when the math tells you a single machine cannot handle the load, the storage, or the latency requirements. Then shard minimally, with the simplest strategy that meets your needs.
"The key to performance is elegance, not battalions of special cases." — Jon Bentley