Skip to main content

Sharding Strategies & Rebalancing

Scale writes by partitioning data across nodes

TL;DR

Sharding partitions data horizontally across independent databases/nodes. Range sharding divides by continuous ranges (User ID 1-100K on Shard A), simple but risks hotspots. Hash sharding uses consistent hashing for even distribution, mitigating hotspots but complicating rebalancing. Challenge: cross-shard queries require fan-out to multiple shards. Plan sharding key upfront—changing it later is extremely costly.

Learning Objectives

By the end of this article, you will understand:

  • Difference between range and hash sharding strategies
  • How consistent hashing minimizes rebalancing pain
  • Hotspot detection and mitigation techniques
  • Cross-shard query challenges and solutions
  • Rebalancing data when adding/removing nodes
  • Directory services and lookup strategies
  • Monitoring and operational considerations

Motivating Scenario

Your social network serves 500 million users. A single database maxes out at 10 million users before CPU/memory becomes saturated. Obvious solution: split users across multiple databases. User 1-10M on Shard 1, 10M-20M on Shard 2, etc. But celebrity users (10 million followers) create a hotspot: Shard 5 becomes bottleneck. Plus, adding Shard 51 requires rebalancing ~100M rows. Consistent hashing and proper monitoring solve these problems.

Core Concepts

Range Sharding

Partition by key ranges. E.g., User ID:

Shard 1: user_id 1-100,000
Shard 2: user_id 100,001-200,000
Shard 3: user_id 200,001-300,000

Pros:

  • Simple to implement and understand
  • Range queries efficient (all 50-100K users on same shard)
  • Easy to add shards (extend upper range)

Cons:

  • Prone to hotspots (active users may concentrate on one shard)
  • Uneven growth (some ranges fill faster than others)
  • Rebalancing adds/removes continuous ranges
Range Sharding vs Hash Sharding

Hash Sharding with Consistent Hashing

Use a hash function to distribute keys evenly. Consistent hashing allows adding shards without rehashing entire dataset.

# Basic hash sharding (not consistent)
shard_id = hash(user_id) % num_shards # Problem: adding shard changes all keys!

# Consistent hashing (Rendezvous hashing)
def find_shard(key, shard_ring):
hash_val = hash(key)
# Find closest shard on ring
for shard in sorted(shard_ring):
if shard.hash_range[0] <= hash_val < shard.hash_range[1]:
return shard
return shard_ring[0] # Wrap around

Benefit: Adding a shard only rehashes ~1/n of the data instead of all data.

Hotspot Detection and Mitigation

Hotspots occur when a shard handles disproportionate load:

Shard 1: 5 million users, 10K QPS (good)
Shard 2: 3 million users, 50K QPS (hotspot!)

Detection:

  • Monitor query rate per shard
  • Monitor storage per shard
  • Alert if any shard > 2x average

Mitigation:

  1. Sub-sharding: Further partition hot shard (Shard 2 → 2a, 2b)
  2. Rebalancing: Move users from hot shard to under-utilized shard
  3. Caching: Cache hot data locally (Redis)
  4. Shard key redesign: If inherent to key distribution, consider different key

Practical Example

Implementing Sharded Database with Consistent Hashing

import hashlib
from bisect import bisect_right

class ConsistentHashRing:
"""Map keys to shards using consistent hashing"""

def __init__(self, shards, virtual_nodes=160):
self.shards = shards
self.virtual_nodes = virtual_nodes
self.ring = {}
self.sorted_keys = []
self._build_ring()

def _build_ring(self):
for shard in self.shards:
for i in range(self.virtual_nodes):
virtual_key = f"{shard}-{i}"
hash_val = int(hashlib.md5(virtual_key.encode()).hexdigest(), 16)
self.ring[hash_val] = shard

self.sorted_keys = sorted(self.ring.keys())

def get_shard(self, key):
"""Find shard for given key"""
hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16)
idx = bisect_right(self.sorted_keys, hash_val)
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]

def add_shard(self, new_shard):
"""Add new shard to ring"""
self.shards.append(new_shard)
self._build_ring()

def remove_shard(self, shard):
"""Remove shard from ring"""
self.shards.remove(shard)
self._build_ring()

class ShardedDatabase:
def __init__(self, shard_configs):
# shard_configs: {'shard-0': {...}, 'shard-1': {...}}
import sqlite3
self.connections = {
name: sqlite3.connect(config['path'])
for name, config in shard_configs.items()
}
self.hash_ring = ConsistentHashRing(list(shard_configs.keys()))

def write(self, user_id, data):
"""Write to appropriate shard"""
shard = self.hash_ring.get_shard(str(user_id))
conn = self.connections[shard]
conn.execute(
"INSERT INTO users (id, data) VALUES (?, ?)",
(user_id, str(data))
)
conn.commit()

def read(self, user_id):
"""Read from appropriate shard"""
shard = self.hash_ring.get_shard(str(user_id))
conn = self.connections[shard]
cursor = conn.execute(
"SELECT data FROM users WHERE id = ?",
(user_id,)
)
return cursor.fetchone()

def scan_all_shards(self, query):
"""Cross-shard query: fan-out and merge"""
results = []
for shard, conn in self.connections.items():
cursor = conn.execute(query)
results.extend(cursor.fetchall())
return results

def get_hotspots(self):
"""Monitor shard load"""
sizes = {}
for shard, conn in self.connections.items():
cursor = conn.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
sizes[shard] = count

avg = sum(sizes.values()) / len(sizes)
hotspots = {s: c for s, c in sizes.items() if c > avg * 2}
return hotspots

def rebalance(self, source_shard, target_shard, count):
"""Move 'count' users from source to target shard"""
# Complex operation: move data, update routing
print(f"Rebalancing {count} users from {source_shard} to {target_shard}")

# Usage
config = {
'shard-0': {'path': '/data/shard-0.db'},
'shard-1': {'path': '/data/shard-1.db'},
'shard-2': {'path': '/data/shard-2.db'},
}

db = ShardedDatabase(config)

# Write: routes to appropriate shard
db.write(12345, {'name': 'Alice', 'email': 'alice@example.com'})

# Read: routes to same shard
result = db.read(12345)
print(f"User: {result}")

# Monitor hotspots
hotspots = db.get_hotspots()
if hotspots:
print(f"Hotspots detected: {hotspots}")
# Trigger rebalancing

When to Use / When Not to Use

Use When:
  1. Single database can't handle throughput (write or storage volume)
  2. Data naturally partitionable (by user ID, tenant ID, region)
  3. Acceptable to operate multiple independent databases
  4. Can tolerate operational complexity of distributed system
  5. Planned upfront (changing shard key later is very costly)
Avoid When:
  1. Database still has headroom (single machine sufficient)
  2. Frequent cross-shard queries that must combine data
  3. Complex joins across shards (normalized schemas)
  4. Transactions span multiple shards
  5. Shard key not obvious or changes frequently

Patterns and Pitfalls

Partition by key ranges (user 1-100K on Shard A). Simple, good for range queries. Risk: hotspots (active users cluster). Rebalancing adds ranges. Good for initial sharding; graduate to hash sharding at scale.
Consistent hashing distributes keys evenly. Mitigates hotspots. Adding shard only rehashes ~1/n data. Downside: range queries inefficient (scattered across shards).
Monitor data size and QPS per shard. Alert if any shard > 2x average. Investigate: Is key naturally skewed? Can sub-shard further? Cache hot data locally?
Queries spanning multiple shards are complex. Fan-out to all shards, collect results, merge. Performance degrades with shard count. Avoid for user-facing latency-sensitive queries.
Choose key that won't change (user ID good, email bad). Key determines data locality. Wrong key leads to rebalancing. Poor choice causes hotspots. Think deeply upfront.
Maintain shard directory: maps shard key range to shard location. Update on rebalancing. Use etcd, Zookeeper, or custom for high-consistency directory.

Operational Considerations

1. Directory Service

# Shard directory: maps key range to shard
shard_directory = {
'user': {
(0, 100000000): 'shard-0',
(100000001, 200000000): 'shard-1',
(200000001, 300000000): 'shard-2',
}
}

def find_shard(key_type, key_id):
for (min_id, max_id), shard in shard_directory[key_type].items():
if min_id <= key_id <= max_id:
return shard
raise ValueError(f"No shard for {key_type}:{key_id}")

2. Rebalancing Workflow

When adding Shard N:

  1. Update directory to include new shard ranges
  2. Copy affected data from old shards to new shard
  3. Verify data consistency
  4. Update routing logic
  5. Monitor for correctness

3. Monitoring Cross-Shard Queries

# Log fan-out queries
def scan_all_shards(query):
start = time.time()
results = []
for shard in all_shards:
result = execute_on_shard(shard, query)
results.extend(result)
duration = time.time() - start

# Alert if slow
if duration > 1.0:
logger.warn(f"Slow cross-shard query: {duration}s")

return results

Design Review Checklist

  • Identified bottleneck: storage, throughput, or latency?
  • Confirmed single database insufficient (measured actual load)
  • Designed shard key: immutable, well-distributed, naturally partitionable
  • Chose strategy: range (simple) vs hash (even distribution)
  • Planned directory service: etcd, Zookeeper, or custom lookup
  • Implemented hotspot detection and monitoring
  • Designed cross-shard query strategy (fan-out, merge, timeouts)
  • Tested rebalancing procedure in staging
  • Documented shard topology and key mapping
  • Set up alerting for hotspots, replication failures
  • Prepared runbook for adding/removing shards
  • Verified transactions don't span multiple shards (or use distributed TXN)

Self-Check Questions

  1. What's the difference between range and hash sharding?

    • Range: continuous key ranges per shard (simple, hotspots). Hash: distribute evenly (complex rebalancing).
  2. How does consistent hashing prevent full rebalancing?

    • Adding shard only rehashes ~1/n of keys, not all. Other shards mostly unaffected.
  3. What are hotspots and how do you detect them?

    • Shards with disproportionate load/data. Monitor QPS and storage per shard. Alert if > 2x average.
  4. Why are cross-shard queries problematic?

    • Must fan-out to multiple shards, merge results. Latency increases with shard count. Avoid for user-facing queries.

Next Steps

  1. Measure current bottleneck: Is it storage, throughput, or latency?
  2. Start with single database: Shard only when necessary
  3. Design shard key carefully: Think about future growth and partitioning
  4. Pilot with one shard pair: Test routing, monitoring, rebalancing
  5. Plan directory service: etcd, Zookeeper, or custom
  6. Monitor hotspots continuously: Act early on rebalancing

References