Sharding as a Design Pattern: What You Should Know
A technical treatment of sharding as a general purpose partitioning discipline (Using Rust).
There is a peculiar perception with how sharding gets discussed. You search the topic and in three clicks you’re reading about database scaling, consistent hashing in exactly one paragraph, and a diagram of a round robin load balancer with “shard” written on each box. I would like to note that none of this is meant as a slight to the authors; you can view it as a criticism on how the topic is commonly presented and discussed.
I will try my best to simplify this complex topic.
Sharding is a partitioning discipline that applies wherever you have a keyspace, a space that lives within that space, and a reason to avoid serialising access to the whole of it, that means databases, yes, but it also means in-process concurrent data structures, thread local routing tables, connection pools, actor mailboxes, and a dozen other contexts that never get mentioned alongside the word. The principles are the same, the Rust implementation is even more fun than the distributed version because the language forces you to think about ownership in ways that reveal assumptions most distributed systems miss.
Let me start.
Part I: The Geometry of Sharding
Before any code examples, you would need at least a passing idea of what sharding actually is, because the metaphors people often reach for (buckets, lanes, drawers) all obscure the real structure.
A shard is a cell in a partition of a keyspace. The keyspace is the set of all possible keys your system will ever address. The partition is a division of that space into non overlapping, exhaustive subsets. Every key belongs to one shard. This is the invariant, everything else is implementation, do well to remember that.
The function that maps a key to a shard is known as the routing function. It must be:
- Deterministic: the same key maps to the same shard, given a fixed configuration.
- Total: it handles every key in the keyspace without error.
- Surjective: under ideal distribution assumptions (every shard is reachable).
Notice what is not required: the routing function does not need to be injective, many keys may map to the same shard as it does not need to produce a uniform distribution though that is usually desirable, and it does not need to be stable under a configuration, though there’s a cost to instability.
This last point is where most sharding discussion go wrong. They present a hash-modulo routing function:
1
shard_id = hash(key) % num_shards
And then never mention that this function is incredibly unstable, when num_shards changes, nearly every key remaps. For a distributed cache, that means a thundering herd of cache misses. For a distributed database, it means a migration event that could require the reordering of majority of your data. This instability is more built into the function, as a property of the function itself, rather than a just a bug you can engineer around.
The solution to this is consistent hashing, which I will explain in Part III. But first let us look at the in-process case, which most discussions ignore entirely.
Part II: In-Process Sharding and the Contention Problem
Consider a concurrent map in a multi threaded program. A naive implementation would use a single RwLock<HashMap<K, V>>. Every read acquires a shared lock, and every write acquires an exclusive lock. Under low concurrency this is fine. Under high write concurrency every write serialises against every other and your throughput flatlines.
This is a solved problem, and the solution is sharding. You replace a single lock with N locks, each guarding a disjoint subset of the keyspace:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::{RwLock, RwLockReadGuard};
const NUM_SHARDS: usize = 16;
pub struct ShardedMap<K, V> {
shards: Vec<RwLock<HashMap<K, V>>>,
}
impl<K: Hash + Eq, V> ShardedMap<K, V> {
fn new() -> Self {
let shards = (0..NUM_SHARDS)
.map(|_| RwLock::new(HashMap::new()))
.collect();
Self { shards }
}
fn sharded_index(&self, key: &K) -> usize {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
(hasher.finish() as usize) % NUM_SHARDS
}
pub fn insert(&self, key: K, value: V) {
let index = self.sharded_index(&key);
self.shards[index].write().unwrap().insert(key, value); // obtain write lock
}
pub fn get<Q>(&self, key: &Q) -> Option<V>
where
K: std::borrow::Borrow<Q>,
Q: Hash + Eq,
V: Clone,
{
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
let index = (hasher.finish() as usize) % NUM_SHARDS;
let guard = self.shards[index].read().unwrap();
guard.get(key).cloned()
}
}
This works but it creates a false sharing trap on modern hardware.
The Cache Line Problem
A modern CPU Cache line is 64 bytes. When a core accesses memory it loads the entire 64 byte line containing that address. If two threads are writing to different memory locations that happen to live in the same cache line, they are effectively writing to the same thing from the cache coherency protocol’s perspective. Every write forces the other cores to invalidate their cached copy of that line, even though the logical data is separate. This is called false sharing and it will absolutely destroy your sharding gains on heavy workloads.
The Vec<RwLock<HashMap<K, V>>> layout is problematic. The RwLock metadata object sit adjacent in memory. Under contention, cores thrash the cache lines containing those lock words.
The fix is to pad each shard to a full cache line boundary:
1
2
3
4
#[repr(align(64))]
struct Shard<K, V> {
inner: RwLock<HashMap<K, V>>
}
the #[repr(align(64))] structure guarantees each Shard structure starts on a 64-byte boundary, because the struct is at least 64 bytes (due to the alignment). Adjacent shards cannot share a cache line. This is not a micro-optimization; on a machine with 32 cores hammering a sharded structure, the difference between padded and unpadded can be an order of magnitude.
This is precisely what DashMap does internally. Its source is worth a read. It stores shards as CachePadded<RwLock<...>>, and its default shard count is available_parallelism() * 4, rounded up to the next power of two.
Choosing the number of shards
The instinct is to make this a power of two so you can use bitwise masking instead of modulo:
1
2
3
let index = hash % num_shards // slower: modulo requires a division
let index = (hash & (num_shards - 1)) as usize // faster: mask works when num_shards is a power of two
This is a legitimate optimisation. The division modulo is genuinely expensive. It is a multi cycle operation that cannot be pipelined on most architectures. The bitwise mask is a single instruction.
How many shards? The rule of thumb is at least 4x your expected thread count. With 8 threads and 32 shards, the probability that two randomly chosen operations contend on the same shard is 1/32, or about 3%. With 8 threads and 8 shards, it is 1/8, or 12.5%. The contention probability decreases roughly as 1/N for uniform key distribution.
But here is the thing the rule of thumb does not tell you: If your key distribution is not uniform, you can have 1024 shards and one shard receiving 80% of the traffic. The shard count is not the whole story, the hash function quality and the key entropy are equally as important.
Part III: The Hash Function is not neutral
The routing function shard_id = hash(key) % N conceals a critical decision: which hashing function?
Rust’s standard library uses SipHash-1-3 by default, it was chosen for its DoS resistance: An adversary who can predict your hash value can craft keys that all map to the same bucket, degrading your HashMap to O(n) behaviour. SipHash defeats this by using a secret seed.
For internal sharding, where the keyspace property is not attacker-controlled, this security property is unnecessary overhead, you can use a faster non-cryptographic hash, the options worth knowing:
- AHash (from the
ahashcrate): Uses AES hardware instructions when available. Extremely fast, good distribution, and a good default for trusted internal sharding work. - FoldHash (the current default in
hashbrown): very fast, but only minimally DoS-resistant. - FxHash (used internally by firefox and rustc): Extremely simple and fast, but produces poor distribution for small integer keys with low variance. If your keys are sequential, fxhash will cluster them.
- XxHash / xx3h: Excellent for bulk hashing, strong avalanche behaviour, widely used in distributed systems for partitioned routing.
- Rendezous Hashing (HRW): Not a routing function per se, more of a routing strategy. For each shard you compute hash(key + shard_id) and route to the shard with the highest value. This has excellent uniformity and is remarkably stable under topology changes: when a shard is removed, only keys assigned to that shard need to be redistributed, I will return to this.
The avalanche effect matters for sharding. A good hash functions ensures that a single bit change in the hash alters approximately half of its output bits. Without this property keys with common prefixes will cluster into the same shard, creating hotspots.
You can verify your hash function’s avalanche behaviour empirically:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fn avalanche_score<H: Hasher + Default>(key: u64) -> f64 {
let mut original_hasher = H::default();
key.hash(&mut original_hasher);
let original = original_hasher.finish();
let mut differing_bits_total = 0u32;
for bit in 0..64 {
let flipped = key ^ (1u64 << bit);
let mut hasher = H::default();
flipped.hash(&mut hasher);
let hashed = hasher.finish();
differing_bits_total += (original ^ hashed).count_ones();
}
differing_bits_total as f64 / (64.0 * 64.0)
}
A score on or above 0.5 is fine, anything below should concern you for sharding applications.
Part IV: Consistent Hashing and the Hash Ring
When the set of shards is fixed, hash modulo is fine. The interesting problem is when shards are added or removed dynamically: a node joins a cluster, or a node fails and must be evacuated. Under hash modulo, changing N means nearly rehashing everything. You need a routing function that is stable under topology change.
Consistent hashing solves this. The classic formulation uses a hash ring:
More precisely: you hash each node to a position in a circular integer space (0, 2^64). To route a key, you hash the key to the same space, then walk clockwise for when you find the first node. When a node joins, it takes ownership of a contiguous arc of the ring; all other nodes are unaffected. When a node leaves, its arc is absorbed by its clockwise successor.
The problem with this naive formulation is that N nodes does not distribute uniformly across a random ring. You can get severe imbalance with one node owning 40% of the ring and another node owning 3%.
The solution for this is virtual nodes or vnodes. Each physical node is represented by multiple positions on the ring, typically by hashing node_id + replica_index with 150 vnodes per physical node, the distribution converges on uniformity by the law of large numbers.
Here is a minimal but correct implementation of this in Rust:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::hash::{DefaultHasher, Hash, Hasher};
// sorted, because we need to walk clockwise
pub struct HashRing<N: Clone> {
ring: BTreeMap<u64, N>,
vnode_per_node: usize,
}
// N: Clone, because each vnode needs a copy of the node
// clone -> duplication -> distribution
impl<N: Clone + Hash + Debug> HashRing<N> {
pub fn new(vnodes_per_node: usize) -> Self {
Self {
ring: BTreeMap::new(),
vnode_per_node,
}
}
fn vnode_hash(node: &N, replica: usize) -> u64 {
let mut hasher = DefaultHasher::new();
node.hash(&mut hasher);
replica.hash(&mut hasher);
hasher.finish()
}
pub fn add_node(&mut self, node: N) {
for i in 0..self.vnode_per_node {
let h = Self::vnode_hash(&node, i);
self.ring.insert(h, node.clone());
}
}
pub fn remove_node(&mut self, node: &N) {
for i in 0..self.vnode_per_node {
let h = Self::vnode_hash(node, i);
self.ring.remove(&h);
}
}
pub fn get_node<K: Hash>(&self, key: &K) -> Option<&N> {
if self.ring.is_empty() {
return None;
}
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
let h = hasher.finish();
// find the next clockwise neighbour from h, wrapping around
self.ring
.range(h..)
.next()
.or_else(|| self.ring.iter().next()) // wrapping around
.map(|(_, node)| node)
}
}
The BTreeMap gives us range which is the efficient “find first key >= h” operation that the clockwise walk requires. This is O(log n) per look up where n is the total number of vnode entries. For 10 physical nodes with 150 vnodes each that is about 1500 entries and 11 comparisons per entry. Perfectly acceptable.
Rendezvous Hashing as an Alternative
Consistent hashing with a hash ring has a subtlety that bites people: the BTreeMap operation is not cache friendly for large rings, and the vnode count needs tuning. Rendezvous hashing, also known as random weight hashing (HRW) achieves the same stability guarantee with a simpler structure at the cost of O(n) lookup time, where N is the number of physical nodes.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
use std::hash::DefaultHasher;
pub fn rendezvous_route<K: Hash, N: Hash>(key: &K, nodes: &[N]) -> Option<usize> {
nodes
.iter()
.enumerate()
.map(|(i, node)| {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
node.hash(&mut hasher);
(hasher.finish(), i)
}) .max_by_key(|(score, _)| *score)
.map(|(_, i)| i)
}
For small node counts (under 20 or so), this is often faster in practice than the BTreeMap approach because it is branch free and cache friendly, you can scan a small slice of node identifiers and do arithmetic.
The stability property follows from the same argument as the hash ring: when a node is removed, only keys assigned to it (those for which it had the highest score) need to be reassigned, to which ever node now has the highest scores for those keys. No other keys are affected.
Part V: The Architecture of a Sharded System
Here is the routing architecture for a sharded key-value store, showing the full request path from client to shard:
Notice that the router is stateless with respect to the data. It only needs the shard topology: the mapping from hash ranges to shard identities. This is the key architectural insight that makes sharding horizontally scalable. The routing logic can be run in every client, eliminating the router as a single point of failure.
This is the client-side routing pattern described in the Dynamo paper. A partition-aware client library can route requests directly to the appropriate coordinator nodes in the common case.
Part VI: Cross-Shard Operations and the Hard Problem
Everything so far has been the easy part. The hard part is what happens when a single logical operation needs to touch more than one shard.
Let us consider a bank transfer: debit account a (on shard 2) and credit account b (on shard 7). These are two separate shard operations. If the debit succeeds and the system crashes before the credit, you have destroyed money. If you credit before debiting and the debit fails, you have created money, both options are unacceptable.
This is the cross shard transaction problem, and it does not have a cheap solution.
Two phase commit (2PC) is the classic solution. In phase one, the coordinator asks each participant to prepare: acquire the necessary locks and write the operation to a write ahead log, but does not apply it. In phase two, if all participants vote yes, the coordinator writes a commit record and asks all participants to apply.
This protocol is correct, but has two serious problems.
First, it is blocking, if the coordinator crashes after phase one but before phase two, the participants are left holding locks until the coordinator recovers. The locks cannot be released without knowing the coordinators decision. This is the infamous “uncertain transaction” problem, and it is why 2PC is considered a poor solution for high availability systems.
Second, it adds a full round trip to every cross shard write. For operations that touch many shards, the latency compounds.
The modern alternative is the Saga pattern. A saga decomposes a multi-step operation into a sequence of local transactions, each with a corresponding compensating transaction that undoes it. If any step fails, the saga executes the compensating transaction in reverse order.
For the bank transfer:
- Forward:
debit(A, 100), credit(B, 100) - Compensating:
credit(A, 100)(reverses the debit)debit(B, 100)(reverses the credit).
If credit(B, 100) fails, the saga executes credit(A, 100) to restore consistency. There is no distributed locks. There is a window of inconsistency between steps, but consistency is eventually restored.
The crucial trade-off: sagas provide eventual consistency, not serializability. Two concurrent sagas operating on overlapping accounts can interleave in ways that would be forbidden by 2PC. Whether or not this is acceptable depends entirely on your consistency requirements.
For most in-process sharding scenarios, you can avoid cross shard operations by designing your shard key carefully, this is the real engineering discipline.
Part VII: Shard Key Design
The shard key is the field or composite of fields used to route a record to its shard. This choice is the most consequential decision in your sharding design, and it is almost always often made too quickly.
The properties you want from a shard key:
- High cardinality: If your key can only take 10 distinct values and you have 16 shards, most shards are empty. This seems obvious but gets violated in subtle ways: using
user_id % 10as a shard key when you have 16 shards means only 10 shards receive traffic. - Uniform distribution: the values of the key should be approximately uniformly distributed across the keyspace. Sequential integer IDs fail this criterion when the range of live data is much smaller than the the theoretical maximum.
- Query Locality: operations that are frequently performed together should ideally live on the same shard. A user’s session data and profile data, if queried in a hot path, should shard by
user_idso they co-locate. - Stability: a key change means a record must migrate between shards. This is expensive . User IDs and UUIDs are good shard keys. User email addresses are poor ones (they change on account rename, or marriage).
The worst anti pattern is choosing a key that creates temporal hotspots. If you shard by timestamp or date, all writes go to the shard containing the current time window. That shard is a hotspot; the others are cold. This pattern appears constantly in time series systems designed by people who were thinking about range queries (where time-based sharding makes sense) without thinking about write distribution.
The solution for time-series data is typically a composite key: (entity_id, time_bucket), where entity_id provides the write distribution and time_bucket provides query locality within an entity’s history.
Part VIII: Phantom Types and Shard Affinity in Rust
Here is something genuinely unusual that Rust’s type system enables and that no other commonly uses systems language provides: you can encode shard affinity into the type system and get compile-time enforcement that shard-crossing operations are explicit.
The idea to use phantom type parameters to tag values with their shard identity. An operation that would cross shards becomes a type error, unless you explicitly acknowledge its crossing:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash};
use std::marker::PhantomData;
pub struct ShardId<const N: usize>; // zero sized type for representing a shard index
// key known to belong to shard N
pub struct ShardedKey<K, const N: usize> {
inner: K,
_shard: PhantomData<ShardId<N>>,
}
impl<K: Hash, const N: usize> ShardedKey<K, N> {
fn new(key: K, num_shards: usize) -> Option<Self> {
if num_shards == 0 {
return None;
}
let index = compute_shard_index(&key, num_shards);
if index == N {
Some(Self {
inner: key,
_shard: PhantomData,
})
} else {
None
}
}
pub fn key(&self) -> &K {
&self.inner
}
}
// A shard that only accepts operations on keys belonging to it
pub struct TypedShard<K, V, const N: usize> {
data: HashMap<K, V>,
}
impl<K: Hash + Eq, V, const N: usize> TypedShard<K, V, N> {
pub fn new() -> Self {
Self {
data: HashMap::new(),
}
}
pub fn insert(&mut self, key: ShardedKey<K, N>, value: V) {
self.data.insert(key.inner, value);
}
pub fn get(&self, key: &ShardedKey<K, N>) -> Option<&V> {
self.data.get(&key.inner)
}
}
impl<K: Hash + Eq, V, const N: usize> Default for TypedShard<K, V, N> {
fn default() -> Self {
Self::new()
}
}
fn compute_shard_index<K: Hash>(key: &K, num_shards: usize) -> usize {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
(hasher.finish() as usize) % num_shards
}
This pattern will not suit every context: the const generics make it unwieldy when the shard is a runtime value. But in systems where the shard count is fixed at compile time (a common case in in-process sharding). It provides a genuinely useful value. Functions that operate on data from shard N cannot accept data from shard M, which enables the crossing of shard boundaries to be visible and auditable event in your systems, rather than a silent runtime behaviour.
1
2
3
4
5
6
7
// example of a clearer signature:
impl<K: Hash, const TOTAL_SHARDS: usize, const SHARD_IDX: usize> ShardedKey<K, TOTAL_SHARDS, SHARD_IDX> {
pub fn new(key: K) -> Option<Self> {
let idx = compute_shard_index(&key, TOTAL_SHARDS);
if idx == SHARD_IDX { Some(...) } else { None }
}
}
The broader principle is worth generalising: wherever you have a partitioned state, you can use Rust’s type system to make the partition boundaries visible. The newtype pattern, phantom types, and const generics, form a partitioning vocabulary that has no equivalent in Java, C++, or Go.
Part IX: Sharing in an Async Context
Modern Rust services are built on async runtimes, typically tokio. Sharding composes naturally with the actor model that Tokio enables.
The pattern is: one actor per shard, each with its own tokio::sync::mpsc channel. The router hashes the key and sends the message to the appropriate actor’s channel. Each actor processes its own queue sequentially (or with bounded parallelism). so no locking is required within a shard at all.
The key insight: state owned exclusivity by one actor requires no sync primitives. The message channel is the sync boundary. This is one of the more elegant expressions of Rust’s ownership model.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;
/// actor's language
enum ShardedMessage<K, V> {
Get { key: K, reply: oneshot::Sender<Option<V>> },
Insert { key: K, value: V },
}
async fn shard_actor<K: Hash + Eq, V: Clone>(mut rx: Receiver<ShardedMessage<K, V>>) {
let mut shard_store: HashMap<K, V> = HashMap::new();
while let Some(msg) = rx.recv().await {
match msg {
ShardedMessage::Get { key, reply } => {
reply.send(shard_store.get(&key).cloned());
}
ShardedMessage::Insert { key, value } => shard_store.insert(key, value),
}
}}
pub struct ShardedActorMap<K: Hash + Clone, V: Clone + Send + 'static> {
sender: Vec<Sender<ShardedMessage<K, V>>>,
}
impl<K, V> ShardedActorMap<K, V>
where
K: Hash + Eq + Clone + Send + 'static,
V: Clone + Send + 'static,
{
pub fn new(num_shards: usize) -> Self {
assert!(
num_shards > 0,
"ShardedActorMap must have at least one shard"
);
let mut senders = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
let (tx, rx) = channel(1024);
tokio::spawn(shard_actor(rx)); // it will live forever
senders.push(tx);
}
Self { sender: senders }
}
fn shard_index(&self, key: &K) -> usize {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
hasher.finish() as usize % self.sender.len()
}
pub async fn get(&self, key: &K) -> Option<V> {
let index = self.shard_index(key);
let (tx, rx) = oneshot::channel();
self.sender[index]
.send(ShardedMessage::Get {
key: key.clone(),
reply: tx,
})
.await
.ok()?;
rx.await.ok().flatten()
}
pub async fn insert(&self, key: K, value: V) {
let index = self.shard_index(&key);
let _ = self.sender[index]
.send(ShardedMessage::Insert { key, value })
.await;
}
}
This pattern scales remarkably well under async workloads because Tokio can schedule shard actors across its thread pool. The backpressure is built in via the bounded channel capacity. If a shard’s channel life fills up, the sender yields, naturally throttling traffic to hot shards.
The tradeoff vs lock based sharding: each operation has the overhead of channel allocation (the oneshot channel for reads) and a task context switch. For very low-latency-in-process operation, a cache padded RwLock will outperform the actor model. For workloads where each shard operation involves I/O or substantial computation, the actor model wins because it integrates naturally with async and does not hold during awaits.
Part X: Rebalancing and the Cost of Getting It Wrong
Every sharded system eventually needs to rebalance: add capacity, remove a failed node, repair skewed distribution. The cost of rebalancing is proportional to the fraction of keys that must move and this fraction depends entirely on your routing function.
Under hash-modulo with N shards, adding one shard (going from N, to N+1) requires moving approximately N/(N+1) of all keys, that is 90% of your data. While this is not a paper concern; it is a real operational nightmare that has caused production outages on systems that should have been designed better.
Under consistent hashing with vnodes, adding one physical node with K vnodes displaces approximately K/(total_vnodes) of the keyspace. For 10 nodes with 150 vnodes each (1500 total), adding an 11th node moves about 150/1650 = 9% of keys. This is 10x better, and the fraction decreases as you add nodes.
Under rendezvous hashing, adding a node moves exactly 1/(N+1) of keys, where N is the original node count, This is the optimal minimum: you cannot do better without knowing in advance which keys will be accessed. The 1/(N+1) fraction represents exactly the keys that should belong to a new node in a balanced assignment.
The practical implication for system design: if you anticipate topology changes (and you should, because hardware fails and capacity needs change), rendezvous hashing or consistent hashing should be prerequisites for your system.
Part XI: Monitoring What you Cannot See
Sharded systems have a failure mode that is invisible to standard latency metrics: shard skew. If one shard receives 10x more traffic than average, its latency increases while other shards look healthy. Aggregate latency appears fine; p50 and p95 are unaffected. Only p99 and p999 hint at the problem and only if the hot shard is a large enough fraction for traffic to register.
The operational discipline requires:
- Track per shard metrics, not just aggregate metrics. Shard request rate, shard queue depth (For actor model designs), shard error rate, and shard latency, all need independent visibility.
- Track the distribution of key hash values over time windows. If the entropy of your distribution is dropping (keys clustering in hash space), that is an early warning of impending hotspot formation.
- Implement shard drain: the ability to mark a shard as read only and redirect new writes to other shards, enabling migration without any downtime. This requires the router to consult a topology configuration that can be updated dynamically.
- The routing function should be instrumental to track which shard each key maps to. In rust, this means your
ShardedMapor actor router should expose per shard counters via something like a metrics crate or a simpleAtomicU64array that your observability layer can scrape.
Closing Thoughts
Sharding is a partitioning discipline that applies whenever you have a keyspace, state distributed across that keyspace, and a performance or scalability reason to avoid serialising the whole of it. The principles transfer from distributed key-value stores to in-process concurrent maps to async actor systems to connection pools.
Rust is an unusually good language for implementing these patterns for reasons that go beyond performance. The ownership model makes the isolation guarantees of sharding explicit and enforceable. The type system, through phantom types and const generics, allows you to make shard boundaries visible in your APIs rather than hiding in your runtime logic. The async runtime integrates naturally with actor model sharding. The unsafe-free path to cache-line-aware data structures exists through repr(align) and crossbeam-utils.
The things that could hurt you if you ignore them: hash function quality, false sharing, cross shard transactions, and routing instability under topology changes. None of which are exotic concerns, and are just normal operating conditions of any long lasting sharded system.
References
- Dynamo: Amazon’s Highly Available Key-value Store: DeCandia et al.
- Consistent Hashing and Random Trees: Karger et al.
- Rust’s std::collections::HashMap documentation.
- hashbrown crate documentation.
- DashMap source code.
- Sagas: Garcia-Molina & Salem.
- Computer Architecture: A Quantitative Approach: Hennessy & Patterson.
Thank you for reading!
The consistent hashing implementation in this post uses DefaultHasher for clarity. Production code should use a stable hash function such as xxh3 or ahash, since DefaultHasher is not guaranteed to be consistent across Rust versions or compilation targets.