Built byPhoenix

© 2026 Phoenix

← Blog
Distributed SystemsData ReplicationCAP TheoremArchitectureDatabasesSystem Design

Data Replication in Distributed Systems: From CAP Theorem to Multi-Region Architecture

Phoenix·November 29, 2025·45 min read

Data Replication in Distributed Systems: From CAP Theorem to Multi-Region Architecture

Building distributed systems that are both highly available and consistent is one of the most challenging problems in modern software engineering. At the heart of this challenge lies data replication - the practice of storing copies of data across multiple nodes, regions, or data centers.

In this comprehensive guide, we'll explore everything from the theoretical foundations (CAP theorem, consistency models) to practical implementation patterns (multi-region replication, conflict resolution) used by companies like Amazon, Google, and Netflix to serve billions of users globally.

Table of Contents

    • Strong Consistency
    • Eventual Consistency
    • Causal Consistency
    • Read-Your-Writes Consistency
    • Monotonic Reads
    • Linearizability (Strict Consistency)
    • Snapshot Isolation
    • Serializability
    • Primary-Replica (Master-Slave)
    • Multi-Primary (Multi-Master)
    • Quorum-Based Replication
    • Chain Replication
    • Consensus-Based Replication (Paxos & Raft)
    • Advanced Quorum Tuning
    • WAL Implementation
    • WAL Shipping for Replication
    • WAL Archiving
    • WAL Optimization Techniques
    • Distributed WAL Systems
    • Active-Passive (DR Setup)
    • Active-Active (Multi-Region Writes)
    • Read Replicas
    • Sharded Multi-Region
    • Last-Write-Wins (LWW)
    • Vector Clocks
    • Hybrid Logical Clocks (HLC)
    • CRDTs (Conflict-Free Replicated Data Types)
    • Application-Level Merge
    • Read Repair
    • Anti-Entropy with Merkle Trees
    • Netflix (Cassandra Multi-Region)
    • Uber (Schemaless / Docstore)
    • Figma (Operational Transform with CRDTs)
    • Monitoring Replication Lag
    • Handling Replication Failures
    • Data Consistency Validation
    • Split-Brain Prevention
    • Testing with Chaos Engineering
    • Capacity Planning for Replication

Why Data Replication?

Before diving into the how, let's understand the why. Data replication solves several critical problems:

1. High Availability

Single points of failure are unacceptable in production systems. If your database crashes and you have no replicas, your application goes down completely.

┌─────────────┐│   Client    │└──────┬──────┘       │       ▼┌─────────────┐        ❌ Crashes│   Primary   │────────────────────→ SYSTEM DOWN│   Database  │└─────────────┘

With replication:

┌─────────────┐│   Client    │└──────┬──────┘       │   ┌───┴────┐   │        │   ▼        ▼┌─────┐  ┌─────┐        ❌ Primary fails│ P   │  │ R1  │────────────────────→ Failover to R1└─────┘  └─────┘           │         ┌─┴──┐         │    │         ▼    ▼      ┌────┐ ┌────┐      │ R2 │ │ R3 │      └────┘ └────┘System continues running ✅

2. Reduced Latency

Users in Tokyo shouldn't have to wait for data from a server in Virginia. By replicating data to multiple geographic locations, you serve users from the nearest region.

Without Geographic Replication:

User in Tokyo → 150ms RTT → Server in VirginiaTotal latency: 150ms + processing time

With Geographic Replication:

User in Tokyo → 5ms RTT → Server in TokyoTotal latency: 5ms + processing time (30x faster!)

3. Disaster Recovery

Data center fires, natural disasters, network partitions - these aren't hypothetical scenarios. They happen. Cross-region replication ensures your data survives catastrophic failures.

4. Read Scalability

Distribute read traffic across multiple replicas to handle massive scale:

         ┌─────────────┐         │   Primary   │  (Handles all writes)         │   Database  │         └──────┬──────┘                │        ┌───────┼───────┐        │       │       │        ▼       ▼       ▼     ┌────┐  ┌────┐  ┌────┐     │ R1 │  │ R2 │  │ R3 │  (Distribute reads)     └─┬──┘  └─┬──┘  └─┬──┘       │       │       │     10K/s   10K/s   10K/s  = 30K reads/second

The CAP Theorem

Before choosing a replication strategy, you must understand the CAP theorem, formulated by Eric Brewer in 2000. It states that a distributed system can provide at most two of the following three guarantees:

C - Consistency

Every read receives the most recent write or an error. All nodes see the same data at the same time.

Client A writes X=5  ↓┌────────────┐│   Node 1   │  X=5 ✅│   Node 2   │  X=5 ✅│   Node 3   │  X=5 ✅└────────────┘  ↓Client B reads X → Gets 5 (guaranteed)

A - Availability

Every request receives a response, without guarantee that it contains the most recent write. The system continues operating even during failures.

Node 1 fails  ↓Clients can still:- Read from Node 2 ✅- Read from Node 3 ✅- Write to Node 2 ✅(System keeps running)

P - Partition Tolerance

The system continues to operate despite network partitions (messages between nodes are dropped or delayed).

┌─────┐          ┌─────┐│ N1  │  ─ ❌ ─  │ N2  │  (Network partition)└─────┘          └─────┘  │                │  │                │Both continue operating independently

The Tradeoff

In a distributed system, network partitions will happen. You cannot avoid them. Therefore, you must choose between Consistency and Availability during a partition.

CP Systems (Consistency + Partition Tolerance)

Sacrifice availability to maintain consistency. If nodes can't communicate, refuse requests to prevent stale data.

Examples:

  • MongoDB (with majority write concern)
  • HBase
  • Redis Cluster (when configured for consistency)
  • ZooKeeper
  • etcd

Use Cases:

  • Banking systems (account balances must be accurate)
  • Inventory management (prevent overselling)
  • Configuration management (all nodes must have same config)
python
# MongoDB CP behaviortry:    db.accounts.update_one(        {"user_id": "alice"},        {"$inc": {"balance": -100}},        write_concern=WriteConcern(w="majority")  # Wait for majority    )except NetworkTimeout:    # Cannot reach majority → Reject write    return "Service temporarily unavailable"

AP Systems (Availability + Partition Tolerance)

Sacrifice consistency to maintain availability. Accept potentially stale data to keep the system running.

Examples:

  • Cassandra
  • DynamoDB
  • CouchDB
  • Riak
  • Cosmos DB (at lower consistency levels)

Use Cases:

  • Social media feeds (eventual consistency is acceptable)
  • Shopping carts (temporary inconsistency okay)
  • Session stores (availability more important)
  • Analytics systems (approximate counts acceptable)
python
# Cassandra AP behaviorsession.execute(    "UPDATE users SET last_login = ? WHERE user_id = ?",    [datetime.now(), "alice"],    consistency_level=ConsistencyLevel.ONE  # Respond immediately)# Returns success even if some replicas are unreachable

Beyond CAP: PACELC

The CAP theorem only describes behavior during partitions. PACELC extends this:

  • If Partition (P): Choose Availability (A) or Consistency (C)
  • Else (E): Choose Latency (L) or Consistency (C)

Even without partitions, you trade consistency for latency:

SystemPartition BehaviorNormal Behavior
CassandraPAEL (low latency)
MongoDBPCEC (consistent)
DynamoDBPA/PCEL/EC (configurable)
PostgreSQL (sync replication)PCEC

Consistency Models

Beyond the CAP extremes, there are many consistency models offering different guarantees:

1. Strong Consistency

Guarantee: Reads always return the most recent write.

T1: Write X=5T2: Read X → Returns 5 (guaranteed)

Implementation: Synchronous replication, quorum reads/writes

Cost: High latency, lower availability

Use: Financial transactions, inventory

2. Eventual Consistency

Guarantee: If no new updates, eventually all replicas converge to the same value.

T1: Write X=5 to Node 1T2: Read X from Node 2 → Might return 4 (old value)T3: (after replication) Read X from Node 2 → Returns 5

Implementation: Asynchronous replication

Cost: Temporary inconsistency

Use: Social media, caching, analytics

3. Causal Consistency

Guarantee: Writes that are causally related are seen in order.

User A: Post message → Comment on messageAll users see: Post before Comment ✅User A: Post XUser B: Post Y (unrelated)Some users might see: Y before X ✅ (okay, not causally related)

Use: Social networks, collaborative editing

4. Read-Your-Writes Consistency

Guarantee: Users see their own writes immediately.

User A: Update profile pictureUser A: Refresh page → Sees new picture ✅User B: Views User A's profile → Might see old picture temporarily

Implementation: Read from same replica you wrote to, or use session tokens

Use: User-facing applications

5. Monotonic Reads

Guarantee: If you've read version N, you'll never read version < N.

T1: Read X → Returns X=5T2: Read X → Returns X=5 or higher (never X=4)

Use: Timeline consistency in feeds

6. Linearizability (Strict Consistency)

Guarantee: The strongest consistency model. All operations appear to execute atomically in some sequential order, and that order is consistent with real-time ordering.

If operation A completes before operation B begins:  → All processes must see A before BExample: Bank transferT1: Write (Account_A -= $100) [completes at 10:00:00.001]T2: Write (Account_B += $100) [starts at 10:00:00.002]T3: Read (Account_A, Account_B) → Must see both updates or neither

Implementation: Consensus protocols (Raft, Paxos), synchronized clocks

Cost: Very high latency, requires coordination

Use: Financial transactions, atomic counters, distributed locks

Linearizability vs Sequential Consistency:

Linearizability: Respects real-time orderingSequential: Only requires some total order (can reorder concurrent ops)Linearizable execution:T1: ─────[Write X=1]─────────> (completes at t=5)T2: ──────────────────[Read X]──> Must return 1 (starts at t=6)Sequential but NOT Linearizable:T1: ─────[Write X=1]─────────> (completes at t=5)T2: ──────────────────[Read X=0]──> (starts at t=6, but sees old value)    This violates real-time ordering!

7. Snapshot Isolation

Guarantee: Each transaction sees a consistent snapshot of the database as of the transaction start time.

python
# PostgreSQL Snapshot Isolation (REPEATABLE READ)BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;# T1: Read account balance at snapshot time t=100SELECT balance FROM accounts WHERE id = 'alice';  # Returns $500# T2: (Another transaction updates balance to $600 at t=101)# T3: Read again in same transactionSELECT balance FROM accounts WHERE id = 'alice';  # Still returns $500# Transaction sees consistent snapshot!COMMIT;

Implementation: Multi-Version Concurrency Control (MVCC)

MVCC keeps multiple versions of each row:accounts table:┌────────┬─────────┬─────────────┬─────────────┐│ id     │ balance │ xmin (txid) │ xmax (txid) │├────────┼─────────┼─────────────┼─────────────┤│ alice  │ $500    │ 100         │ 101         │ (old version)│ alice  │ $600    │ 101         │ ∞           │ (new version)└────────┴─────────┴─────────────┴─────────────┘Transaction at t=100: Sees version with xmin ≤ 100, xmax > 100 → $500Transaction at t=102: Sees version with xmin ≤ 102, xmax > 102 → $600

Anomalies:

  • Write Skew: Two transactions read overlapping data and make decisions based on what they read, but write to different objects
python
# Doctor on-call scheduling (write skew example)# Constraint: At least 1 doctor must be on-call# Currently: Alice and Bob both on-call# Transaction 1 (Alice):BEGIN;SELECT COUNT(*) FROM on_call WHERE on_call = true;  # Returns 2# Alice sees Bob is on-call, so she can leaveUPDATE on_call SET on_call = false WHERE doctor = 'Alice';COMMIT;# Transaction 2 (Bob) - concurrent:BEGIN;SELECT COUNT(*) FROM on_call WHERE on_call = true;  # Returns 2# Bob sees Alice is on-call, so he can leaveUPDATE on_call SET on_call = false WHERE doctor = 'Bob';COMMIT;# RESULT: No doctors on-call! Constraint violated!# Snapshot isolation doesn't prevent this

Solution: Serializable Snapshot Isolation (SSI) with predicate locks

Use: Most RDBMS default isolation level, read-heavy workloads

8. Serializability

Guarantee: The strongest isolation level. Transactions execute as if they ran serially (one at a time), even though they run concurrently.

python
# PostgreSQL Serializable IsolationBEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;# Transaction automatically aborts if serialization anomaly detectedtry:    # Perform operations    cur.execute("SELECT COUNT(*) FROM on_call WHERE on_call = true")    count = cur.fetchone()[0]    if count > 1:        cur.execute("UPDATE on_call SET on_call = false WHERE doctor = 'Alice'")    conn.commit()except SerializationFailure:    # Database detected write skew, retry transaction    conn.rollback()    retry_transaction()

Implementation Techniques:

  1. Two-Phase Locking (2PL):
Growing phase:  Acquire all locksShrinking phase: Release all locks (after commit/abort)Problems:- Deadlocks possible- Lower concurrency- Requires deadlock detection
  1. Serializable Snapshot Isolation (SSI):
Track read-write conflicts:- Detect if T1 reads X, then T2 writes X, then T1 writes Y- This creates a dependency cycle → abort one transactionAdvantages:- No locks needed- Better performance than 2PL- Used by PostgreSQL, CockroachDB

Consistency Model Hierarchy:

Strongest (Most Guarantees)         ↓┌───────────────────────────┐│  Linearizability          │ ← Strictest: Real-time ordering├───────────────────────────┤│  Serializability          │ ← Transactions behave serially├───────────────────────────┤│  Snapshot Isolation       │ ← Consistent snapshots (MVCC)├───────────────────────────┤│  Causal Consistency       │ ← Causally related ops ordered├───────────────────────────┤│  Read Your Writes         │ ← Session consistency├───────────────────────────┤│  Monotonic Reads          │ ← No backwards time travel├───────────────────────────┤│  Eventual Consistency     │ ← Eventually converges└───────────────────────────┘         ↓Weakest (Fewest Guarantees, Lowest Latency)

Comparison Table:

ModelLatencyAvailabilityUse Case
LinearizabilityVery HighLowCritical operations, distributed locks
SerializabilityHighMediumBanking, inventory with complex constraints
Snapshot IsolationMediumMediumMulti-row reads, reporting
CausalLow-MediumHighSocial networks, collaborative apps
EventualVery LowVery HighCaches, analytics, feeds

Replication Topologies

How you connect your replicas determines the replication behavior:

1. Primary-Replica (Master-Slave)

One primary accepts writes, replicates to read-only replicas.

           ┌──────────┐    Writes │ PRIMARY  │      ───→ └────┬─────┘                │ (replicate)         ┌──────┼──────┐         ▼      ▼      ▼      ┌────┐ ┌────┐ ┌────┐Reads │ R1 │ │ R2 │ │ R3 │  ←── └────┘ └────┘ └────┘

Advantages:

  • Simple to understand and implement
  • No write conflicts (single writer)
  • Strong consistency possible

Disadvantages:

  • Primary is a bottleneck for writes
  • Primary is single point of failure
  • Replicas lag behind primary

Synchronous Replication:

python
# PostgreSQL synchronous replication# postgresql.confsynchronous_commit = onsynchronous_standby_names = 'replica1,replica2'# Write waits for replicasconn.execute("INSERT INTO users VALUES (...)")# Returns only after replicas confirm write

Asynchronous Replication:

python
# MySQL async replication# Primary returns immediatelyconn.execute("INSERT INTO users VALUES (...)")# Replica receives update later (lag: milliseconds to seconds)

2. Multi-Primary (Multi-Master)

Multiple nodes accept writes simultaneously.

    ┌──────────┐      ┌──────────┐      ┌──────────┐    │ PRIMARY  │ ←──→ │ PRIMARY  │ ←──→ │ PRIMARY  │    │  (US)    │      │  (EU)    │      │  (ASIA)  │    └──────────┘      └──────────┘      └──────────┘        ↕                  ↕                  ↕    Bidirectional replication between all primaries

Advantages:

  • High availability (no single primary)
  • Low latency writes (write to nearest primary)
  • Scales writes across regions

Disadvantages:

  • Write conflicts (two primaries modify same data)
  • Complex conflict resolution
  • Harder to maintain consistency

Write Conflict Example:

T1: User in US updates profile.name = "Alice Smith"T2: User in EU updates profile.name = "Alice Johnson" (same user!)Both writes succeed locally, then replicate:US Primary:  "Alice Smith"   → replicates to EUEU Primary:  "Alice Johnson" → replicates to USCONFLICT! Which value wins?

Solutions:

  • Last-Write-Wins (LWW) with timestamps
  • Vector clocks
  • Application-level merge logic
  • Conflict-free Replicated Data Types (CRDTs)

3. Quorum-Based Replication

Require consensus from majority of nodes for reads/writes.

5 Nodes totalWrite: Must succeed on 3+ nodes (quorum)Read:  Must check 3+ nodes (quorum)Guarantee: Read quorum + Write quorum > Total nodes           → Reads always see latest write

Example: Cassandra

python
# Write to 3 out of 5 replicassession.execute(    "INSERT INTO users (id, name) VALUES (?, ?)",    [user_id, "Alice"],    consistency_level=ConsistencyLevel.QUORUM  # 3/5 nodes)# Read from 3 out of 5 replicasresult = session.execute(    "SELECT name FROM users WHERE id = ?",    [user_id],    consistency_level=ConsistencyLevel.QUORUM  # 3/5 nodes)# Returns most recent value (by timestamp)

Tuning W + R > N:

RWNConsistencyLatencyAvailability
113EventualLowHigh
223StrongMediumMedium
313Read-heavyHigh readHigh write
133Write-heavyLow readLow write

4. Chain Replication

Writes propagate through a chain of replicas:

Write  →  [Head] → [R1] → [R2] → [Tail]Read   ←                           [Tail]Writes go to HeadReads come from Tail (always consistent!)

Advantages:

  • Strong consistency with good throughput
  • Tail always has latest data

Disadvantages:

  • Chain failure cascades
  • High latency for writes (must traverse chain)

Use: Microsoft Azure Storage, CORFU, Chain Replication systems

5. Consensus-Based Replication (Paxos & Raft)

Achieve strong consistency through distributed consensus protocols.

Raft Consensus Algorithm

Raft is designed to be understandable and provides the same guarantees as Paxos.

Raft Cluster (5 nodes):┌──────────┐│  Leader  │ ← Handles all client requests└────┬─────┘     │ (Replicate log entries)     ├────────┬────────┬────────┐     ▼        ▼        ▼        ▼  ┌────┐  ┌────┐  ┌────┐  ┌────┐  │ F1 │  │ F2 │  │ F3 │  │ F4 │ ← Followers  └────┘  └────┘  └────┘  └────┘

Raft Phases:

  1. Leader Election
- Nodes start as followers- If no heartbeat from leader (timeout): become candidate- Request votes from other nodes- Win election if majority votes received- New leader sends heartbeats to prevent new electionsElection timeout: 150-300ms (randomized to prevent splits)
  1. Log Replication
python
# Leader receives client requestdef handle_write(data):    # 1. Append to local log    log_entry = LogEntry(        term=current_term,        index=last_log_index + 1,        data=data    )    append_to_log(log_entry)    # 2. Replicate to followers    for follower in followers:        send_append_entries(follower, [log_entry])    # 3. Wait for majority acknowledgment    acks = wait_for_acks(majority_count)    # 4. Commit entry (apply to state machine)    if acks >= majority_count:        commit_index = log_entry.index        apply_to_state_machine(log_entry)        return SUCCESS    return FAILURE  # Couldn't reach majority

Raft Log Structure:

Leader Log:┌─────┬─────┬─────┬─────┬─────┬─────┐│ 1,1 │ 1,2 │ 2,3 │ 2,4 │ 3,5 │ 3,6 │└─────┴─────┴─────┴─────┴─────┴─────┘  term,index for each log entry                            ↑                     commit_index=6Follower Logs (eventually consistent):Follower 1: [1,1] [1,2] [2,3] [2,4] [3,5] [3,6] ✅ Up-to-dateFollower 2: [1,1] [1,2] [2,3] [2,4] [3,5]       ⏳ Lagging (entry 6 replicating)Follower 3: [1,1] [1,2] [2,3]                   ⏳ Further behind
  1. Safety Guarantee
Raft ensures:- Election Safety: At most one leader per term- Leader Append-Only: Leader never overwrites log entries- Log Matching: If two logs contain entry with same index/term,                 all previous entries are identical- Leader Completeness: If entry committed in term T,                       it will be present in leaders of all terms > T- State Machine Safety: If server applies log entry at index i,                        no other server applies different entry at i

Raft Implementation Example (etcd):

go
// etcd Raft configurationimport "go.etcd.io/etcd/raft"// Create Raft nodestorage := raft.NewMemoryStorage()config := &raft.Config{    ID:              nodeID,    ElectionTick:    10,   // Elections after 10 ticks    HeartbeatTick:   1,    // Heartbeat every tick    Storage:         storage,    MaxSizePerMsg:   1024 * 1024,    MaxInflightMsgs: 256,}node := raft.StartNode(config, []raft.Peer{{ID: 1}, {ID: 2}, {ID: 3}})// Write datafunc writeKey(key, value string) {    data := []byte(fmt.Sprintf("%s=%s", key, value))    // Propose to Raft    node.Propose(context.TODO(), data)    // Wait for commit    select {    case rd := <-node.Ready():        // Entry committed, apply to state machine        for _, entry := range rd.CommittedEntries {            applyToStateMachine(entry.Data)        }        node.Advance()    }}

Paxos Algorithm

Original consensus algorithm, more complex but proven correct.

Paxos Roles:

┌──────────────┐│  Proposers   │ ← Propose values└──────┬───────┘       │       ▼┌──────────────┐│  Acceptors   │ ← Vote on proposals (form quorum)└──────┬───────┘       │       ▼┌──────────────┐│  Learners    │ ← Learn chosen value└──────────────┘

Paxos Phases:

  1. Phase 1a (Prepare): Proposer sends prepare(n) to acceptors
  2. Phase 1b (Promise): Acceptors promise not to accept proposals < n
  3. Phase 2a (Accept): Proposer sends accept(n, value) to acceptors
  4. Phase 2b (Accepted): Acceptors accept if no higher proposal seen
python
class PaxosAcceptor:    def __init__(self):        self.promised_id = None  # Highest proposal ID promised        self.accepted_id = None  # ID of accepted proposal        self.accepted_value = None  # Accepted value    def prepare(self, proposal_id):        ""Phase 1b: Promise""        if self.promised_id is None or proposal_id > self.promised_id:            self.promised_id = proposal_id            return {                'promised': True,                'accepted_id': self.accepted_id,                'accepted_value': self.accepted_value            }        return {'promised': False}    def accept(self, proposal_id, value):        ""Phase 2b: Accept""        if self.promised_id is None or proposal_id >= self.promised_id:            self.promised_id = proposal_id            self.accepted_id = proposal_id            self.accepted_value = value            return {'accepted': True}        return {'accepted': False}class PaxosProposer:    def propose(self, value, acceptors):        proposal_id = generate_unique_id()        # Phase 1: Prepare        promises = []        for acceptor in acceptors:            response = acceptor.prepare(proposal_id)            if response['promised']:                promises.append(response)        # Need majority to proceed        if len(promises) < len(acceptors) // 2 + 1:            return None  # Failed to get quorum        # Check if any acceptor already accepted a value        # If so, must propose that value (Paxos invariant)        max_accepted_id = -1        proposed_value = value        for promise in promises:            if promise['accepted_id'] and promise['accepted_id'] > max_accepted_id:                max_accepted_id = promise['accepted_id']                proposed_value = promise['accepted_value']        # Phase 2: Accept        acceptances = []        for acceptor in acceptors:            response = acceptor.accept(proposal_id, proposed_value)            if response['accepted']:                acceptances.append(response)        # Value chosen if majority accepted        if len(acceptances) >= len(acceptors) // 2 + 1:            return proposed_value        return None  # Failed to get quorum

Multi-Paxos (Practical Variant):

Instead of running Paxos for each value:1. Elect a stable leader (using Paxos)2. Leader proposes sequence of values without prepare phase3. Much more efficient in practiceThis is essentially what Raft does!

Comparison: Raft vs Paxos

FeatureRaftPaxos
UnderstandabilityHigh (designed for clarity)Low (complex, many variants)
Leader electionBuilt-in, stable leaderSeparate, can have dueling proposers
Log structureStrong leader, append-onlyWeaker, gaps allowed
PerformanceComparableComparable
Implementationsetcd, Consul, CockroachDBChubby, Spanner (Multi-Paxos)
ReconfigurationEasier (joint consensus)More complex

When to Use Consensus Protocols:

✅ Use Raft/Paxos when:- Need linearizable consistency- Coordinating distributed locks- Leader election required- Building distributed configuration store- Implementing replicated state machines❌ Don't use when:- Eventual consistency acceptable- Low latency critical (consensus has overhead)- Simple primary-replica sufficient- No need for automatic failover

Production Example: Consul using Raft

python
import consul# Consul uses Raft for consistent K/V storec = consul.Consul()# Writes go through Raft consensussuccess = c.kv.put('config/database/url', 'postgresql://...')# Guaranteed consistent readsindex, data = c.kv.get('config/database/url')# CAS (Compare-and-Set) operations atomic via Raftc.kv.put('counter', '0')index, current = c.kv.get('counter')# Only succeeds if modify_index matches (no concurrent updates)success = c.kv.put(    'counter',    str(int(current['Value']) + 1),    cas=current['ModifyIndex']  # Compare-and-swap)

6. Quorum-Based Replication (Advanced)

Let's dive deeper into quorum mathematics and tuning.

Quorum Formula:

For strong consistency:W + R > NWhere:- N = Total replicas- W = Write quorum (must ack write)- R = Read quorum (must ack read)Why W + R > N ensures consistency:→ Read and write quorums must overlap→ At least one node in read quorum has latest write

Sloppy Quorums (Dynamo-style):

python
# Cassandra with sloppy quorumclass SloppyQuorum:    def __init__(self, replication_factor=3):        self.N = replication_factor        self.W = 2  # QUORUM        self.R = 2  # QUORUM    def write(self, key, value):        # Determine preference list (N nodes)        nodes = self.get_preference_list(key, self.N)        # Try to write to N nodes        written = []        for node in nodes:            try:                node.put(key, value)                written.append(node)                # Stop once W nodes acknowledged                if len(written) >= self.W:                    return SUCCESS            except NodeDown:                continue        # If < W primary nodes available, use "hinted handoff"        # Write to extra nodes temporarily        if len(written) < self.W:            extra_nodes = self.get_extra_nodes(key)            for node in extra_nodes:                # Store hint: "This write belongs to node X"                node.put_with_hint(key, value, intended_node=nodes[0])                written.append(node)                if len(written) >= self.W:                    return SUCCESS        return FAILURE    def read(self, key):        nodes = self.get_preference_list(key, self.N)        # Read from R nodes        responses = []        for node in nodes:            try:                value = node.get(key)                responses.append(value)                if len(responses) >= self.R:                    break            except NodeDown:                continue        # Return most recent value (by timestamp)        return max(responses, key=lambda v: v.timestamp)

Hinted Handoff:

Normal case (all nodes up):Write goes to nodes: [A, B, C]Node B is down:Write goes to nodes: [A, C, D (hint for B)]When B comes back online:Node D transfers hint to BB now has the write it missed

Write-Ahead Log (WAL) Deep Dive

The Write-Ahead Log is the backbone of database replication and durability. Understanding WAL is critical for building robust distributed systems.

What is WAL?

WAL Principle: Log changes BEFORE applying them to data files┌─────────────┐│   Client    │└──────┬──────┘       │ UPDATE users SET name='Alice'       ▼┌─────────────────────────────────────┐│  Database Process                   ││                                     ││  1. Write to WAL      [Log entry]  │ ← MUST happen first│     ↓                               ││  2. Return success to client        ││     ↓                               ││  3. Apply to data files (async)     │ ← Can happen later│                                     │└─────────────────────────────────────┘Why?→ Crash between step 2 and 3? Recovery replays WAL→ Replication: Send WAL to replicas→ Point-in-time recovery: Restore from WAL

PostgreSQL WAL Implementation

python
# WAL record structureclass WALRecord:    def __init__(self):        self.lsn = None          # Log Sequence Number (position in WAL)        self.xid = None          # Transaction ID        self.rmid = None         # Resource Manager ID        self.info = None         # Record type info        self.data = None         # Actual change data    def __repr__(self):        return f"WAL@{self.lsn}: XID={self.xid} {self.data}"# Example WAL records for a transaction:# WAL@0/1000: XID=100 BEGIN# WAL@0/1020: XID=100 INSERT INTO users (id, name) VALUES (1, 'Alice')# WAL@0/1040: XID=100 COMMIT

WAL Write Process

Client writes data:1. Build WAL record   ┌────────────────────────────┐   │ LSN: 0/AB12CD34            │   │ XID: 12345                 │   │ Type: INSERT               │   │ Table: users               │   │ Data: (id=1, name='Alice') │   └────────────────────────────┘2. Append to WAL buffer (in memory)   ┌─────────────────┐   │   WAL Buffer    │   │ [record][record]│ ← In-memory   └─────────────────┘3. fsync() to disk (durability!)   ┌─────────────────┐   │   WAL File      │   │ wal/00000001... │ ← On disk   └─────────────────┘4. Return success to client5. Later: Apply to heap files (background writer)   ┌─────────────────┐   │  Data Files     │   │  base/16384/... │   └─────────────────┘

fsync and Durability:

python
# Without fsync (DANGEROUS!)with open('wal_file', 'ab') as f:    f.write(wal_record)    # Data might still be in OS page cache    # Power loss = data loss!# With fsync (SAFE)import oswith open('wal_file', 'ab') as f:    f.write(wal_record)    f.flush()       # Flush Python buffer to OS    os.fsync(f.fileno())  # Force OS to write to physical disk    # Now guaranteed on disk!

WAL Configuration Trade-offs:

sql
-- PostgreSQL synchronous_commit settings-- 1. off (FASTEST, UNSAFE)SET synchronous_commit = off;-- Returns before WAL written to disk-- Risk: Last few transactions lost on crash-- Use: High-throughput logging, analytics-- 2. local (DEFAULT, SAFE)SET synchronous_commit = local;-- Returns after WAL written to local disk-- Guaranteed durability on primary-- Use: Single-instance deployments-- 3. remote_write (REPLICATION-AWARE)SET synchronous_commit = remote_write;-- Returns after replica received WAL (but not fsynced)-- Risk: Replica crash could lose data-- Use: Balance between performance and replication-- 4. on / remote_apply (STRICTEST)SET synchronous_commit = on;-- Returns after replica fsynced AND applied WAL-- Maximum durability-- Use: Financial systems, critical data

WAL Shipping for Replication

Primary → Replica WAL shipping:┌──────────────────┐                    ┌──────────────────┐│     Primary      │                    │     Replica      ││                  │                    │                  ││ 1. Write to WAL  │                    │                  ││    ↓             │                    │                  ││ 2. fsync         │                    │                  ││    ↓             │                    │                  ││ 3. Stream WAL ───────────────────────→│ 4. Receive WAL   ││    (TCP socket)  │                    │    ↓             ││                  │                    │ 5. Write to WAL  ││                  │                    │    ↓             ││                  │                    │ 6. fsync         ││                  │                    │    ↓             ││ 7. Get ACK   ←───────────────────────│ 7. Send ACK      ││    ↓             │                    │    ↓             ││ 8. Commit        │                    │ 8. Replay WAL    ││                  │                    │    (apply changes)│└──────────────────┘                    └──────────────────┘

PostgreSQL Streaming Replication:

python
# Primary configuration# postgresql.confwal_level = replica              # Generate enough WAL for replicationmax_wal_senders = 10             # Max concurrent replica connectionswal_keep_size = '1GB'            # Keep at least 1GB of WALwal_sender_timeout = 60s         # Disconnect dead replicas# Replica configuration# postgresql.confhot_standby = on                 # Allow read queries on replicamax_standby_streaming_delay = 30s# recovery.conf (or postgresql.auto.conf)primary_conninfo = 'host=primary.example.com port=5432 user=replicator password=...'primary_slot_name = 'replica_1'  # Replication slot (prevents WAL deletion)# Monitoring replication lagSELECT    client_addr,    state,    sent_lsn,    write_lsn,    flush_lsn,    replay_lsn,    sync_state,    pg_wal_lsn_diff(sent_lsn, replay_lsn) AS lag_bytesFROM pg_stat_replication;-- Example output:-- client_addr | state     | lag_bytes-- 10.0.1.5    | streaming | 2048      (2KB lag - excellent!)-- 10.0.1.6    | streaming | 5242880   (5MB lag - investigate)

WAL Archiving (Point-in-Time Recovery)

bash
# Archive WAL files to S3 for disaster recovery# postgresql.confarchive_mode = onarchive_command = 'aws s3 cp %p s3://mydb-wal-archive/%f'archive_timeout = 300  # Force archive every 5 minutes# WAL archiving workflow:# 1. Primary fills WAL segment (default: 16MB)# 2. Segment complete → run archive_command# 3. Copy to remote storage (S3, NFS, etc.)# 4. Can now reuse/delete local WAL segment# Restore from archive (disaster recovery):# restore_command = 'aws s3 cp s3://mydb-wal-archive/%f %p'# recovery_target_time = '2025-11-29 10:30:00'# Restores database to exact state at 10:30 AM!

WAL Optimization Techniques

1. Group Commit:

python
# Problem: fsync is expensive (5-10ms per call)# Solution: Batch multiple transactions into one fsyncclass GroupCommit:    def __init__(self):        self.pending_txns = []        self.commit_delay = 0.001  # 1ms    async def commit_transaction(self, txn):        # Add to batch        self.pending_txns.append(txn)        # Wait for batch to fill or timeout        await asyncio.sleep(self.commit_delay)        # Flush all pending        if self.pending_txns:            batch = self.pending_txns            self.pending_txns = []            # Single fsync for entire batch!            write_and_fsync_wal(batch)            # All transactions durable            for t in batch:                t.notify_committed()# Result: 1000 commits/sec → 100,000 commits/sec!# Latency: +1ms

2. WAL Compression:

sql
-- PostgreSQL 14+ WAL compressionALTER SYSTEM SET wal_compression = on;-- Reduces WAL size by ~50% for UPDATE-heavy workloads-- Trade-off: Slight CPU overhead-- Before compression: UPDATE generates full row image-- After compression: UPDATE generates compressed delta

3. Checkpoints:

Checkpoint process:1. Flush all dirty pages to disk2. Write checkpoint record to WAL3. Allows discarding old WAL┌─────────────────────────────────────┐│         WAL Timeline                │├─────────────────────────────────────┤│ [records] [CHECKPOINT] [records]... ││     ↑                    ↑          ││  Can delete      Keep for recovery  │└─────────────────────────────────────┘Recovery starts from last checkpoint!# PostgreSQL checkpoint tuningcheckpoint_timeout = 15min         # Max time between checkpointscheckpoint_completion_target = 0.9 # Spread checkpoint I/Omax_wal_size = 4GB                 # Trigger checkpoint if exceeded# Trade-offs:# Frequent checkpoints: Less WAL, faster recovery, more I/O# Infrequent checkpoints: More WAL, slower recovery, less I/O

WAL in Distributed Databases

Google Spanner's Distributed WAL:

Spanner uses Paxos-replicated WAL:┌─────────────────────────────────────┐│         Paxos Group                 ││                                     ││  Leader: [WAL: entry 1,2,3,4]      ││     │                               ││     ├──→ Follower 1: [1,2,3,4]     ││     ├──→ Follower 2: [1,2,3,4]     ││     ├──→ Follower 3: [1,2,3]  ⏳   ││     └──→ Follower 4: [1,2,3]  ⏳   ││                                     ││  Commit when majority have entry    ││  (3/5 = quorum)                     │└─────────────────────────────────────┘TrueTime API: Globally synchronized timestamps→ Ensures external consistency (linearizability)

CockroachDB's Raft-based WAL:

go
// CockroachDB replicates ranges using Raft// Each range has its own Raft log (WAL)type Range struct {    rangeID  int64    raftLog  RaftLog  // Replicated WAL    replicas []Replica}// Write to rangefunc (r *Range) Write(key, value []byte) error {    // Create Raft log entry    entry := raftpb.Entry{        Type: raftpb.EntryNormal,        Data: encodeKV(key, value),    }    // Propose to Raft (replicates via WAL)    r.raftLog.Propose(context.TODO(), entry.Data)    // Wait for commit (majority replicated)    // Then apply to RocksDB storage engine    return r.waitForCommit(entry.Index)}

Cross-Region Replication Strategies

Let's explore how to replicate data across geographic regions for global applications.

Strategy 1: Active-Passive (DR Setup)

Primary region handles all traffic. Secondary region is on standby for disaster recovery.

┌──────────────────┐                 ┌──────────────────┐│   US-EAST-1      │                 │   US-WEST-2      ││   (ACTIVE)       │    Async Sync   │   (PASSIVE)      ││                  │  ─────────────→ │                  ││  ┌────────────┐  │                 │  ┌────────────┐  ││  │  Primary   │  │                 │  │  Standby   │  ││  │  Database  │  │                 │  │  Database  │  ││  └────────────┘  │                 │  └────────────┘  ││        ↕         │                 │                  ││   All Traffic    │                 │   No Traffic     │└──────────────────┘                 └──────────────────┘Primary Fails? → Failover to US-WEST-2

Implementation: PostgreSQL with Streaming Replication

bash
# Primary (US-EAST-1)# postgresql.confwal_level = replicamax_wal_senders = 5wal_keep_size = '1GB'# Standby (US-WEST-2)# recovery.confprimary_conninfo = 'host=primary.us-east-1.rds.amazonaws.com port=5432'restore_command = 'aws s3 cp s3://wal-archive/%f %p'# Automatic failover with Patronipatroni:  scope: postgres-cluster  name: us-west-2  restapi:    listen: 0.0.0.0:8008  etcd:    host: etcd.cluster.local:2379

Pros:

  • Simple setup
  • Low cost (standby uses minimal resources)
  • Strong consistency (single writer)

Cons:

  • Standby region unused during normal operation
  • Failover takes time (30s - 5min)
  • Users far from primary have high latency

Use Cases: Compliance requirements, disaster recovery, cost-sensitive applications

Strategy 2: Active-Active (Multi-Region Writes)

Multiple regions accept writes simultaneously.

┌──────────────────┐         ┌──────────────────┐│   US-EAST-1      │ ←────→ │   EU-WEST-1      ││                  │  Bi-   │                  ││  ┌────────────┐  │  dir   │  ┌────────────┐  ││  │  Primary   │  │  Sync  │  │  Primary   │  ││  └────────────┘  │         │  └────────────┘  ││        ↕         │         │        ↕         ││  US Users        │         │  EU Users        │└──────────────────┘         └──────────────────┘          ↕                           ↕     Low latency              Low latency

Implementation: DynamoDB Global Tables

python
import boto3# Create Global Tabledynamodb = boto3.client('dynamodb')# Define replication groupresponse = dynamodb.create_global_table(    GlobalTableName='users',    ReplicationGroup=[        {'RegionName': 'us-east-1'},        {'RegionName': 'eu-west-1'},        {'RegionName': 'ap-southeast-1'}    ])# Writes to any region automatically replicate# Typical replication lag: < 1 seconddynamodb_us = boto3.resource('dynamodb', region_name='us-east-1')table = dynamodb_us.Table('users')# User in US writestable.put_item(Item={'user_id': 'alice', 'name': 'Alice'})# Available in EU within ~1 seconddynamodb_eu = boto3.resource('dynamodb', region_name='eu-west-1')table_eu = dynamodb_eu.Table('users')response = table_eu.get_item(Key={'user_id': 'alice'})# Returns {'user_id': 'alice', 'name': 'Alice'}

Conflict Resolution: Last-Write-Wins

python
# T1: User updates name in UStable_us.update_item(    Key={'user_id': 'alice'},    UpdateExpression='SET #name = :name',    ExpressionAttributeNames={'#name': 'name'},    ExpressionAttributeValues={':name': 'Alice Smith'})# T2: Same user updates name in EU (concurrent!)table_eu.update_item(    Key={'user_id': 'alice'},    UpdateExpression='SET #name = :name',    ExpressionAttributeNames={'#name': 'name'},    ExpressionAttributeValues={':name': 'Alice Johnson'})# DynamoDB resolves conflict using timestamps# Later write wins (based on internal timestamp)# Final value: "Alice Johnson" (if EU write was newer)

Pros:

  • Low latency worldwide (writes to nearest region)
  • High availability (no single point of failure)
  • Scales globally

Cons:

  • Eventual consistency
  • Conflict resolution complexity
  • Higher infrastructure cost

Use Cases: Global applications, social platforms, e-commerce

Strategy 3: Read Replicas (Read Scaling)

Primary in one region, read replicas globally.

                  ┌──────────────────┐                  │   US-EAST-1      │                  │                  │            Writes│  ┌────────────┐  │              ───→│  │  PRIMARY   │  │                  │  └─────┬──────┘  │                  └────────┼─────────┘                           │         ┌─────────────────┼─────────────────┐         │                 │                 │         ▼                 ▼                 ▼┌────────────────┐ ┌────────────────┐ ┌────────────────┐│  EU-WEST-1     │ │  AP-SOUTH-1    │ │  SA-EAST-1     ││  ┌──────────┐  │ │  ┌──────────┐  │ │  ┌──────────┐  ││  │ REPLICA  │  │ │  │ REPLICA  │  │ │  │ REPLICA  │  ││  └────┬─────┘  │ │  └────┬─────┘  │ │  └────┬─────┘  ││       ↕        │ │       ↕        │ │       ↕        ││  EU Reads      │ │  Asia Reads    │ │  SA Reads      │└────────────────┘ └────────────────┘ └────────────────┘

Implementation: MySQL Read Replicas on AWS RDS

python
import boto3rds = boto3.client('rds', region_name='us-east-1')# Create read replica in EUrds.create_db_instance_read_replica(    DBInstanceIdentifier='mydb-eu-replica',    SourceDBInstanceIdentifier='mydb-primary',    DBInstanceClass='db.r5.large',    AvailabilityZone='eu-west-1a',    PubliclyAccessible=False)# Application routing logicclass DatabaseRouter:    def __init__(self):        self.primary = "mydb-primary.us-east-1.rds.amazonaws.com"        self.replicas = {            "us": "mydb-primary.us-east-1.rds.amazonaws.com",            "eu": "mydb-eu-replica.eu-west-1.rds.amazonaws.com",            "ap": "mydb-ap-replica.ap-south-1.rds.amazonaws.com"        }    def get_write_connection(self):        # All writes to primary        return connect(self.primary)    def get_read_connection(self, user_region):        # Reads from nearest replica        replica = self.replicas.get(user_region, self.primary)        return connect(replica)# Usagerouter = DatabaseRouter()# Write (always primary)write_conn = router.get_write_connection()write_conn.execute("INSERT INTO users (name) VALUES ('Alice')")# Read (region-specific)read_conn = router.get_read_connection(user_region="eu")users = read_conn.execute("SELECT * FROM users WHERE active = true")

Pros:

  • Fast reads globally
  • Scales read traffic
  • Cost-effective

Cons:

  • Replication lag (stale reads possible)
  • Writes still go to primary (single bottleneck)
  • Primary region failure affects writes

Use Cases: Read-heavy applications, content delivery, analytics

Strategy 4: Sharded Multi-Region

Partition data by geographic region or user location.

User Data Partitioning by Region:┌──────────────────┐     ┌──────────────────┐     ┌──────────────────┐│   US Region      │     │   EU Region      │     │   Asia Region    ││                  │     │                  │     │                  ││  US Users Data   │     │  EU Users Data   │     │  Asia Users Data ││  ┌────────────┐  │     │  ┌────────────┐  │     │  ┌────────────┐  ││  │  Shard 1   │  │     │  │  Shard 2   │  │     │  │  Shard 3   │  ││  └────────────┘  │     │  └────────────┘  │     │  └────────────┘  ││       ↕          │     │       ↕          │     │       ↕          ││  US Traffic      │     │  EU Traffic      │     │  Asia Traffic    │└──────────────────┘     └──────────────────┘     └──────────────────┘Cross-region queries require routing to multiple shards

Implementation: MongoDB with Zone Sharding

javascript
// Define shard zones by geographic regionsh.addShardTag("shard-us", "US")sh.addShardTag("shard-eu", "EU")sh.addShardTag("shard-asia", "ASIA")// Create tag ranges based on user locationsh.addTagRange(  "myapp.users",  { user_region: "US", user_id: MinKey },  { user_region: "US", user_id: MaxKey },  "US")sh.addTagRange(  "myapp.users",  { user_region: "EU", user_id: MinKey },  { user_region: "EU", user_id: MaxKey },  "EU")// Enable sharding on collectionsh.enableSharding("myapp")sh.shardCollection("myapp.users", { user_region: 1, user_id: 1 })// Application codeasync function createUser(userData) {    // Data automatically routed to correct shard    await db.users.insertOne({        user_id: userData.id,        user_region: userData.region,  // "US", "EU", or "ASIA"        name: userData.name,        email: userData.email    });}// Queries to local shard are fastconst usUsers = await db.users.find({ user_region: "US" });// Cross-region queries slower (scatter-gather)const allUsers = await db.users.find({});  // Queries all shards

Pros:

  • Excellent performance (data local to users)
  • Scales horizontally
  • Data sovereignty compliance (EU data stays in EU)

Cons:

  • Complex sharding logic
  • Cross-shard queries expensive
  • Rebalancing complexity

Use Cases: Global SaaS, data residency requirements, multi-tenant applications

Conflict Resolution

In multi-master replication, conflicts are inevitable. Here's how to handle them:

1. Last-Write-Wins (LWW)

Simplest approach: latest timestamp wins.

python
class LWWRegister:    def __init__(self):        self.value = None        self.timestamp = 0    def write(self, value, timestamp):        if timestamp > self.timestamp:            self.value = value            self.timestamp = timestamp    def merge(self, remote_value, remote_timestamp):        # Keep value with higher timestamp        if remote_timestamp > self.timestamp:            self.value = remote_value            self.timestamp = remote_timestamp# Usageregister_us = LWWRegister()register_eu = LWWRegister()# Concurrent writesregister_us.write("Alice Smith", timestamp=1000)register_eu.write("Alice Johnson", timestamp=1001)# Replication and mergeregister_us.merge("Alice Johnson", 1001)  # EU value winsregister_eu.merge("Alice Smith", 1000)    # US value discarded# Both converge to "Alice Johnson"

Pros: Simple, deterministic Cons: Lost updates, not suitable for all data types

2. Vector Clocks

Track causality to detect true conflicts.

python
class VectorClock:    def __init__(self, node_id):        self.node_id = node_id        self.clock = {}    def increment(self):        self.clock[self.node_id] = self.clock.get(self.node_id, 0) + 1        return self.clock.copy()    def update(self, other_clock):        for node, count in other_clock.items():            self.clock[node] = max(self.clock.get(node, 0), count)    def compare(self, other_clock):        # Returns: "before", "after", "concurrent", or "equal"        less = False        greater = False        all_nodes = set(self.clock.keys()) | set(other_clock.keys())        for node in all_nodes:            self_count = self.clock.get(node, 0)            other_count = other_clock.get(node, 0)            if self_count < other_count:                less = True            elif self_count > other_count:                greater = True        if less and greater:            return "concurrent"  # Conflict!        elif less:            return "before"        elif greater:            return "after"        else:            return "equal"# Example: Detecting conflictsvc_us = VectorClock("US")vc_eu = VectorClock("EU")# Sequential writes (no conflict)clock1 = vc_us.increment()  # {US: 1}vc_eu.update(clock1)        # EU knows about US:1clock2 = vc_eu.increment()  # {US: 1, EU: 1}print(vc_us.compare(clock2))  # "before" (causal order)# Concurrent writes (conflict!)vc_us2 = VectorClock("US")vc_eu2 = VectorClock("EU")clock_us = vc_us2.increment()  # {US: 1}clock_eu = vc_eu2.increment()  # {EU: 1}print(vc_us2.compare(clock_eu))  # "concurrent" (CONFLICT!)

3. Hybrid Logical Clocks (HLC)

Combine physical time with logical counters for better ordering in distributed systems.

Problem with Physical Clocks:

Node 1 (clock: 10:00:00.500): Write X=1Node 2 (clock: 10:00:00.499): Write X=2 (clock skew!)Which write is actually newer? Can't trust physical time alone!

Problem with Logical Clocks (Lamport/Vector):

Logical clocks provide ordering but lose wall-clock time→ Can't do time-based queries: "Get data from last 5 minutes"→ Can't expire cached entries based on time

HLC Solution: Best of Both Worlds

python
class HybridLogicalClock:    ""    HLC combines:    - Physical time (wall clock)    - Logical counter (for events within same physical time)    ""    def __init__(self):        self.latest_time = 0  # Physical time        self.logical = 0      # Logical counter    def send_event(self):        ""Generate timestamp for local event""        physical_time = int(time.time() * 1000000)  # Microseconds        if physical_time > self.latest_time:            # Physical time advanced            self.latest_time = physical_time            self.logical = 0        else:            # Multiple events in same physical time            self.logical += 1        return (self.latest_time, self.logical)    def receive_event(self, remote_time, remote_logical):        ""Update clock on receiving remote event""        physical_time = int(time.time() * 1000000)        # Take maximum of local and remote physical time        self.latest_time = max(physical_time, remote_time, self.latest_time)        if self.latest_time == physical_time and self.latest_time == remote_time:            # All three equal: increment logical            self.logical = max(self.logical, remote_logical) + 1        elif self.latest_time == physical_time:            # Local physical time wins            self.logical = max(self.logical, remote_logical) + 1        elif self.latest_time == remote_time:            # Remote time wins            self.logical = remote_logical + 1        else:            # latest_time from old local time            self.logical += 1        return (self.latest_time, self.logical)# Usage Examplenode1 = HybridLogicalClock()node2 = HybridLogicalClock()# Node 1 performs local writets1 = node1.send_event()  # (1638360000000000, 0)print(f"Node 1 writes at HLC: ${ts1}")# Node 2 receives message with ts1ts2 = node2.receive_event(ts1[0], ts1[1])  # (1638360000000000, 1)print(f"Node 2 receives, HLC: ${ts2}")# Node 2 writes locallyts3 = node2.send_event()  # (1638360000000000, 2)print(f"Node 2 writes at HLC: ${ts3}")# Timestamps maintain causality AND approximate wall-clock time!

HLC Properties:

1. Bounded by physical time:   HLC timestamp ≈ actual wall-clock time   (logical component small: typically 0-10)2. Preserves causality:   If event A → event B (happens-before)   Then HLC(A) < HLC(B)3. Enables time-based queries:   "Get all writes from last hour"   → Query where HLC.physical > now - 36004. Works with clock skew:   Even if clocks differ by 100ms, logical component ensures ordering

CockroachDB HLC Implementation:

sql
-- CockroachDB uses HLC for MVCC timestampsCREATE TABLE users (    id UUID PRIMARY KEY,    name STRING,    email STRING);-- Every row version has HLC timestampINSERT INTO users VALUES (uuid_generate_v4(), 'Alice', 'alice@example.com');-- Internal: Stored with HLC (1638360000000000, 5)-- Time-travel queriesSELECT * FROM users AS OF SYSTEM TIME '-1h';-- Returns data as it was 1 hour ago (using HLC.physical)-- Transaction isolation using HLCBEGIN;SET TRANSACTION AS OF SYSTEM TIME '2025-11-29 10:00:00';-- All reads see snapshot at HLC corresponding to 10:00:00SELECT * FROM users;COMMIT;

Comparison: Timestamps in Distributed Systems

Timestamp TypeOrderingWall-Clock TimeClock Skew ToleranceUse Case
Physical (NTP)❌ Unreliable✅ Yes❌ NoLocal systems only
Lamport Clocks✅ Causal❌ No✅ PerfectAcademic
Vector Clocks✅ Causal❌ No✅ PerfectVersion control, Riak
HLC✅ Causal✅ Approximate✅ GoodCockroachDB, YugabyteDB
TrueTime (Spanner)✅ Strong✅ Exact✅ BestGoogle Spanner only

4. CRDTs (Conflict-Free Replicated Data Types)

Mathematical data structures that automatically resolve conflicts.

python
# G-Counter (Grow-only Counter) - CRDTclass GCounter:    def __init__(self, node_id):        self.node_id = node_id        self.counts = {}    def increment(self, amount=1):        self.counts[self.node_id] = self.counts.get(self.node_id, 0) + amount    def value(self):        return sum(self.counts.values())    def merge(self, other):        # Merge is commutative and idempotent        for node, count in other.counts.items():            self.counts[node] = max(self.counts.get(node, 0), count)# Usage: Distributed page view countercounter_us = GCounter("US")counter_eu = GCounter("EU")counter_asia = GCounter("ASIA")# Concurrent incrementscounter_us.increment(100)    # 100 views in UScounter_eu.increment(50)     # 50 views in EUcounter_asia.increment(75)   # 75 views in Asia# Merge all counterscounter_us.merge(counter_eu)counter_us.merge(counter_asia)print(counter_us.value())  # 225 (no conflicts!)

Popular CRDTs:

  • G-Counter: Grow-only counter
  • PN-Counter: Positive-negative counter (increment/decrement)
  • G-Set: Grow-only set
  • OR-Set: Observed-remove set (add/remove)
  • LWW-Register: Last-write-wins register
  • RGA: Replicated Growable Array (collaborative editing)

4. Application-Level Merge

Sometimes domain logic determines merge strategy:

python
class ShoppingCart:    def __init__(self):        self.items = {}  # product_id -> quantity    def add_item(self, product_id, quantity):        self.items[product_id] = self.items.get(product_id, 0) + quantity    def remove_item(self, product_id):        if product_id in self.items:            del self.items[product_id]    def merge(self, other_cart):        # Domain-specific merge logic        for product_id, quantity in other_cart.items.items():            if product_id in self.items:                # Take maximum quantity (user benefit)                self.items[product_id] = max(                    self.items[product_id],                    quantity                )            else:                # Add new item                self.items[product_id] = quantity# User adds items from multiple devices concurrentlycart_mobile = ShoppingCart()cart_mobile.add_item("shoe-123", 1)cart_desktop = ShoppingCart()cart_desktop.add_item("shirt-456", 2)cart_desktop.add_item("shoe-123", 2)  # Same product, different quantity!# Merge cartscart_mobile.merge(cart_desktop)# Result: shoe-123: 2, shirt-456: 2# Took maximum quantity for conflicting item

Real-World Implementations

Example 1: Netflix (Cassandra Multi-Region)

Netflix uses Cassandra for multi-region replication across AWS regions:

yaml
# Cassandra keyspace with multi-region replicationCREATE KEYSPACE netflixWITH REPLICATION = {    'class': 'NetworkTopologyStrategy',    'us-east': 3,      # 3 replicas in US East    'us-west': 3,      # 3 replicas in US West    'eu-west': 3,      # 3 replicas in EU West    'ap-southeast': 3  # 3 replicas in Asia Pacific};# Table for user viewing historyCREATE TABLE user_viewing_history (    user_id UUID,    content_id UUID,    watched_at TIMESTAMP,    position INT,    device_id UUID,    PRIMARY KEY (user_id, watched_at, content_id)) WITH CLUSTERING ORDER BY (watched_at DESC);
python
# Application codefrom cassandra.cluster import Clusterfrom cassandra import ConsistencyLevelcluster = Cluster(['cassandra.us-east.netflix.com'])session = cluster.connect('netflix')# Write with LOCAL_QUORUM (fast, regional consistency)session.execute(    ""    INSERT INTO user_viewing_history    (user_id, content_id, watched_at, position, device_id)    VALUES (?, ?, ?, ?, ?)    "",    [user_id, content_id, datetime.now(), position, device_id],    consistency_level=ConsistencyLevel.LOCAL_QUORUM  # 2/3 local nodes)# Async replication to other regions happens automatically# Viewing history available globally within seconds

Example 2: Uber (Schemaless / Docstore)

Uber built a custom multi-region datastore on top of MySQL:

python
# Schemaless architecture# Each row has: DocID, Timestamp, Payload (JSON)# Region-aware routingclass UberDocstore:    def __init__(self):        self.regions = {            'us': MySQLConnection('us-mysql-cluster'),            'eu': MySQLConnection('eu-mysql-cluster'),            'apac': MySQLConnection('apac-mysql-cluster')        }    def write(self, doc_id, payload, region='us'):        # Write to regional cluster        conn = self.regions[region]        conn.execute(            "INSERT INTO documents (doc_id, timestamp, payload) VALUES (?, ?, ?)",            [doc_id, time.time(), json.dumps(payload)]        )        # Async replication to other regions        self._async_replicate(doc_id, payload, source_region=region)    def read(self, doc_id, region='us'):        # Read from regional cluster        conn = self.regions[region]        result = conn.execute(            "SELECT payload FROM documents WHERE doc_id = ? ORDER BY timestamp DESC LIMIT 1",            [doc_id]        )        return json.loads(result[0]['payload'])# Usage: Driver location updatesdocstore = UberDocstore()# Driver in San Franciscodocstore.write(    doc_id="driver:12345",    payload={        "location": {"lat": 37.7749, "lon": -122.4194},        "status": "available",        "updated_at": "2025-11-29T10:30:00Z"    },    region="us")# Available in EU within ~100ms for cross-region matching

Example 3: Figma (Operational Transform with CRDTs)

Figma uses CRDTs for real-time collaborative editing:

typescript
// Simplified Figma-like collaborative editingclass CollaborativeDocument {  private content: RGA<string>; // Replicated Growable Array (CRDT)  private nodeId: string;  constructor(nodeId: string) {    this.nodeId = nodeId;    this.content = new RGA(nodeId);  }  // Insert character at position  insert(position: number, char: string) {    this.content.insertAt(position, char, this.nodeId);    this.broadcast({      type: 'insert',      position,      char,      nodeId: this.nodeId    });  }  // Delete character at position  delete(position: number) {    this.content.removeAt(position);    this.broadcast({      type: 'delete',      position,      nodeId: this.nodeId    });  }  // Merge remote operations  applyRemoteOp(op: Operation) {    if (op.type === 'insert') {      this.content.insertAt(op.position, op.char, op.nodeId);    } else if (op.type === 'delete') {      this.content.removeAt(op.position);    }    // CRDT ensures convergence without conflicts!  }}// Concurrent edits converge automaticallyconst doc_user1 = new CollaborativeDocument("user1");const doc_user2 = new CollaborativeDocument("user2");// Both start with "Hello"doc_user1.insert(5, "!");  // "Hello!"doc_user2.insert(0, ">");  // ">Hello"// After sync, both converge to: ">Hello!"

Read Repair and Anti-Entropy

Eventual consistency systems need background processes to ensure replicas converge. Two key techniques: read repair and anti-entropy.

Read Repair

Detect and fix inconsistencies during read operations.

python
# Cassandra-style read repairclass ReadRepair:    def __init__(self, consistency_level='QUORUM'):        self.consistency_level = consistency_level    def read(self, key, replicas):        # Read from multiple replicas        responses = []        for replica in replicas[:3]:  # Read from 3 replicas            response = replica.get(key)            responses.append((replica, response))        # Find most recent value (by timestamp)        latest_value = max(responses, key=lambda r: r[1].timestamp)        # Check for inconsistencies        stale_replicas = []        for replica, response in responses:            if response.timestamp < latest_value[1].timestamp:                stale_replicas.append(replica)        # Repair stale replicas in background        if stale_replicas:            self.async_repair(key, latest_value[1], stale_replicas)        # Return latest value to client        return latest_value[1].data    async def async_repair(self, key, latest_value, stale_replicas):        ""Asynchronously update stale replicas""        for replica in stale_replicas:            try:                replica.put(key, latest_value)                print(f"Read repair: Updated ${replica} with latest value")            except Exception as e:                print(f"Read repair failed for ${replica}: ${e}")# Usagerepair = ReadRepair()# Client reads datavalue = repair.read(key='user:123', replicas=cluster.get_replicas('user:123'))# → Returns latest value# → Triggers background repair of stale replicas

Read Repair Workflow:

Client requests key="user:123"1. Read from replicas A, B, C:   ┌─────────────────────────────┐   │ A: {name: "Alice", ts: 100} │   │ B: {name: "Alice", ts: 100} │   │ C: {name: "Bob",   ts: 90}  │ ← Stale!   └─────────────────────────────┘2. Return latest value to client:   → {name: "Alice", ts: 100}3. Background repair:   Send latest value to replica C:   C: {name: "Bob", ts: 90} → {name: "Alice", ts: 100}Eventually all replicas consistent!

Read Repair Trade-offs:

✅ Pros:- Repairs happen on-demand (for actively read data)- No extra network traffic for unread data- Fixes inconsistencies before they cause issues❌ Cons:- Adds latency to reads (must query multiple replicas)- Doesn't repair data that's never read- Read hot-spots can cause repair storms

Anti-Entropy (Merkle Trees)

Periodically compare and sync entire datasets between replicas.

Naive Anti-Entropy (Too Expensive):

python
# DON'T DO THIS - transfers entire datasetdef naive_sync(local_db, remote_db):    # Get all data from both databases    local_data = local_db.get_all()   # 1TB of data!    remote_data = remote_db.get_all() # 1TB of data!    # Compare and sync    for key, value in local_data.items():        if key not in remote_data or remote_data[key] != value:            remote_db.put(key, value)# Problem: Transfers gigabytes even if only 1 key differs!

Merkle Trees (Efficient):

python
import hashlibclass MerkleTree:    ""    Hierarchical hash tree for efficient comparison    ""    def __init__(self, key_ranges):        self.tree = {}        self.key_ranges = key_ranges    def build(self, data):        ""Build Merkle tree from data""        # Leaf nodes: hash each key range        for range_id, (start, end) in enumerate(self.key_ranges):            range_data = {k: v for k, v in data.items() if start <= k < end}            range_hash = self.hash_dict(range_data)            self.tree[f'leaf_{range_id}'] = range_hash        # Internal nodes: hash of children        self.tree['root'] = self.hash_children(            [self.tree[f'leaf_{i}'] for i in range(len(self.key_ranges))]        )    def hash_dict(self, data):        ""Hash dictionary of key-value pairs""        items = sorted(data.items())        return hashlib.md5(str(items).encode()).hexdigest()    def hash_children(self, child_hashes):        ""Hash of child node hashes""        combined = ''.join(child_hashes)        return hashlib.md5(combined.encode()).hexdigest()# Anti-entropy with Merkle treesdef merkle_sync(local_db, remote_db):    # Define key ranges (e.g., 256 ranges)    key_ranges = [(i * 1000, (i + 1) * 1000) for i in range(256)]    # Build Merkle trees    local_tree = MerkleTree(key_ranges)    remote_tree = MerkleTree(key_ranges)    local_tree.build(local_db.get_all())    remote_tree.build(remote_db.get_all())    # Compare roots    if local_tree.tree['root'] == remote_tree.tree['root']:        print("Databases in sync! No transfer needed.")        return    # Find differing ranges    differing_ranges = []    for range_id in range(len(key_ranges)):        local_hash = local_tree.tree[f'leaf_{range_id}']        remote_hash = remote_tree.tree[f'leaf_{range_id}']        if local_hash != remote_hash:            differing_ranges.append(range_id)    # Sync only differing ranges    for range_id in differing_ranges:        start, end = key_ranges[range_id]        range_data = {k: v for k, v in local_db.get_all().items() if start <= k < end}        # Transfer only this range        for key, value in range_data.items():            remote_db.put(key, value)    print(f"Synced ${len(differing_ranges)}/${len(key_ranges)} ranges")# Example: 1TB database, only 10MB different# Without Merkle: Transfer 1TB# With Merkle: Transfer ~10MB (100x savings!)

Merkle Tree Structure:

                     Root                  Hash(ABCD)                 /          \          Hash(AB)            Hash(CD)          /      \            /      \    Hash(A)   Hash(B)   Hash(C)   Hash(D)       │        │         │         │    Range1   Range2   Range3   Range4    [0-250K] [250K-  [500K-  [750K-             500K]   750K]   1M]Compare trees:1. If Root hashes match → databases identical, done!2. If Root differs → compare children Hash(AB) vs Hash(CD)3. If Hash(AB) differs → compare Hash(A) vs Hash(B)4. If Hash(A) differs → sync Range1 onlyOnly sync differing ranges!

Cassandra Anti-Entropy Implementation:

python
# Cassandra nodetool repair# Runs Merkle tree sync across replicas# Repair specific keyspacenodetool repair myapp# Repair workflow:# 1. Build Merkle trees on all replicas (in parallel)# 2. Exchange root hashes# 3. If roots differ, recurse into differing ranges# 4. Stream only differing SSTables# Monitoring repairnodetool compactionstats# Output:# pending tasks: 0# Active compaction:# - compaction_type: Validation (Merkle tree build)# - keyspace: myapp# - progress: 45%# Schedule periodic repairs (cron)# Recommended: Every 7-10 days0 2 * * 0 nodetool repair myapp  # Sunday 2AM

DynamoDB Anti-Entropy:

python
# DynamoDB uses continuous background anti-entropy# Merkle tree per partition# Trees rebuilt every 10 minutes# Automatic sync of differing ranges# No user action needed, but can monitor:import boto3cloudwatch = boto3.client('cloudwatch')# Check replication lag (indicator of anti-entropy load)response = cloudwatch.get_metric_statistics(    Namespace='AWS/DynamoDB',    MetricName='ReplicationLatency',    Dimensions=[        {'Name': 'TableName', 'Value': 'users'},        {'Name': 'ReceivingRegion', 'Value': 'us-west-2'}    ],    StartTime=datetime.now() - timedelta(hours=1),    EndTime=datetime.now(),    Period=300,    Statistics=['Average'])# High latency might indicate anti-entropy catching up

When to Use Each Technique:

┌─────────────────┬──────────────┬─────────────────┐│ Technique       │ When         │ Frequency       │├─────────────────┼──────────────┼─────────────────┤│ Read Repair     │ On every read│ Real-time       ││                 │ (if enabled) │                 │├─────────────────┼──────────────┼─────────────────┤│ Anti-Entropy    │ Background   │ Hours to days   ││ (Merkle Trees)  │ process      │ (e.g., weekly)  │└─────────────────┴──────────────┴─────────────────┘Combined approach:1. Read repair: Fixes hot data quickly2. Anti-entropy: Ensures cold data eventually syncsBest practice:- Enable read repair for critical tables- Run anti-entropy weekly during low-traffic hours- Monitor repair progress and replication lag

Hinted Handoff vs Anti-Entropy:

Hinted Handoff:- Temporary write forwarding when node down- Immediate: hints replayed when node returns- Use: Handle transient failuresAnti-Entropy:- Comprehensive background sync- Periodic: runs every N hours/days- Use: Fix any inconsistency (missed writes, corruption, etc.)Both work together for reliability!

Production Considerations

1. Monitoring Replication Lag

python
# PostgreSQL replication lag monitoringimport psycopg2import timedef check_replication_lag(primary_conn, replica_conn):    # Get WAL position from primary    primary_cur = primary_conn.cursor()    primary_cur.execute("SELECT pg_current_wal_lsn()")    primary_lsn = primary_cur.fetchone()[0]    # Get replay position from replica    replica_cur = replica_conn.cursor()    replica_cur.execute("SELECT pg_last_wal_replay_lsn()")    replica_lsn = replica_cur.fetchone()[0]    # Calculate lag in bytes    replica_cur.execute(        "SELECT pg_wal_lsn_diff(%s, %s)",        [primary_lsn, replica_lsn]    )    lag_bytes = replica_cur.fetchone()[0]    # Alert if lag > 100MB    if lag_bytes > 100 * 1024 * 1024:        alert(f"Replication lag: ${lag_bytes / 1024 / 1024:.2f} MB")    return lag_bytes# Monitor continuouslywhile True:    lag = check_replication_lag(primary_conn, replica_conn)    print(f"Replication lag: ${lag} bytes")    time.sleep(10)

2. Handling Replication Failures

python
# Automatic failover with retry logicclass ReplicationManager:    def __init__(self, primary, replicas):        self.primary = primary        self.replicas = replicas        self.max_lag_seconds = 10    def write_with_retry(self, query, params, max_retries=3):        for attempt in range(max_retries):            try:                # Attempt write to primary                self.primary.execute(query, params)                # Wait for replication (optional)                self.wait_for_replication()                return True            except PrimaryUnavailable:                # Promote replica to primary                new_primary = self.promote_replica()                self.primary = new_primary            except ReplicationTimeout:                if attempt == max_retries - 1:                    raise                time.sleep(2 ** attempt)  # Exponential backoff        return False    def promote_replica(self):        # Find replica with least lag        best_replica = min(            self.replicas,            key=lambda r: r.get_replication_lag()        )        # Promote to primary        best_replica.promote()        # Reconfigure other replicas        for replica in self.replicas:            if replica != best_replica:                replica.follow(best_replica)        return best_replica

3. Data Consistency Validation

python
# Periodic consistency checksimport hashlibdef verify_cross_region_consistency(regions):    ""    Check if all regions have same data    ""    checksums = {}    for region_name, conn in regions.items():        # Compute checksum of all data        cursor = conn.execute(            ""            SELECT MD5(STRING_AGG(CAST(id AS TEXT) || data, '' ORDER BY id))            FROM critical_table            ""        )        checksums[region_name] = cursor.fetchone()[0]    # Compare checksums    reference = list(checksums.values())[0]    inconsistent_regions = [        region for region, checksum in checksums.items()        if checksum != reference    ]    if inconsistent_regions:        alert(f"Data inconsistency detected in: {inconsistent_regions}")        # Trigger reconciliation        reconcile_regions(inconsistent_regions)    return len(inconsistent_regions) == 0

4. Split-Brain Prevention

python
# Distributed consensus with etcdimport etcd3class PrimaryElection:    def __init__(self, etcd_client, node_id):        self.etcd = etcd_client        self.node_id = node_id        self.lease = None    def try_become_primary(self, ttl=10):        # Create lease        self.lease = self.etcd.lease(ttl)        # Try to acquire primary lock        success = self.etcd.put_if_not_exists(            '/cluster/primary',            self.node_id,            lease=self.lease        )        if success:            # Successfully became primary            self.heartbeat()  # Keep lease alive            return True        else:            # Another node is primary            return False    def heartbeat(self):        # Refresh lease to stay primary        while True:            self.lease.refresh()            time.sleep(5)  # Refresh every 5 seconds# Usageetcd = etcd3.client()election = PrimaryElection(etcd, node_id="us-east-1")if election.try_become_primary():    # This node is primary - accept writes    start_primary_mode()else:    # This node is replica - reject writes    start_replica_mode()

5. Testing Distributed Replication (Chaos Engineering)

Testing distributed systems requires simulating real-world failures.

Jepsen: The Gold Standard

clojure
; Jepsen test for distributed database(deftest bank-test  "Test that bank account transfers maintain consistency"  (let [n 5]  ; 5 nodes    (c/run!      (c/nemesis  ; Fault injection        (nemesis/partition-random-halves))  ; Split network      (c/checker        (checker/total-queue))  ; Verify no money lost      ; Operations: concurrent transfers      (gen/mix [        (gen/once {:f :transfer :value {:from 0 :to 1 :amount 10}})        (gen/once {:f :transfer :value {:from 1 :to 2 :amount 5}})      ]))))

What Jepsen Tests:

Network Partitions:┌──────┐  ┌──────┐       ┌──────┐│  N1  │──│  N2  │   ❌  │  N3  │└──────┘  └──────┘       └──────┘   │          │            │Split-brain scenario:- Does system maintain consistency?- Are writes lost?- Does it recover correctly?Clock Skew:Node 1: 10:00:00.000Node 2: 10:00:00.500  (+500ms)Node 3: 09:59:59.800  (-200ms)→ Does timestamp ordering break?Disk Failures:- Corrupt WAL files- Partial writes (power loss)- Bit rotProcess Crashes:- Kill database mid-transaction- Kill during leader election- Kill during replication

Writing Your Own Chaos Tests:

python
import randomimport threadingimport timeclass ChaosTest:    def __init__(self, cluster):        self.cluster = cluster        self.failures = []    def partition_network(self, duration=30):        ""Simulate network partition""        # Split cluster into two groups        group_a = self.cluster.nodes[:len(self.cluster.nodes)//2]        group_b = self.cluster.nodes[len(self.cluster.nodes)//2:]        print(f"CHAOS: Network partition for ${duration}s")        print(f"  Group A: ${[n.id for n in group_a]}")        print(f"  Group B: ${[n.id for n in group_b]}")        # Block traffic between groups        for node_a in group_a:            for node_b in group_b:                node_a.block_traffic_to(node_b)                node_b.block_traffic_to(node_a)        # Wait        time.sleep(duration)        # Heal partition        for node_a in group_a:            for node_b in group_b:                node_a.allow_traffic_to(node_b)                node_b.allow_traffic_to(node_a)        print("CHAOS: Network healed")    def kill_random_node(self):        ""Kill random node""        node = random.choice(self.cluster.nodes)        print(f"CHAOS: Killing node ${node.id}")        node.kill()        time.sleep(5)  # Wait for cluster to detect failure        return node    def induce_clock_skew(self, max_skew_ms=1000):        ""Introduce clock skew""        for node in self.cluster.nodes:            skew = random.randint(-max_skew_ms, max_skew_ms)            node.adjust_clock(skew)            print(f"CHAOS: Node ${node.id} clock skew: ${skew}ms")    def corrupt_wal_file(self, node):        ""Simulate disk corruption""        print(f"CHAOS: Corrupting WAL on node ${node.id}")        wal_file = node.get_latest_wal_file()        # Corrupt random bytes        with open(wal_file, 'r+b') as f:            f.seek(random.randint(0, os.path.getsize(wal_file)))            f.write(bytes([random.randint(0, 255)]))    def run_chaos_scenario(self):        ""Run comprehensive chaos test""        print("=== Starting Chaos Engineering Test ===")        # Baseline: verify system works        assert self.verify_consistency(), "System inconsistent before chaos!"        # Chaos 1: Network partition during writes        write_thread = threading.Thread(target=self.continuous_writes, args=(30,))        write_thread.start()        time.sleep(5)  # Let some writes succeed        self.partition_network(duration=15)        write_thread.join()        # Verify consistency after partition        assert self.verify_consistency(), "Consistency violated after partition!"        # Chaos 2: Kill node during heavy load        write_thread = threading.Thread(target=self.continuous_writes, args=(20,))        write_thread.start()        time.sleep(5)        killed_node = self.kill_random_node()        write_thread.join()        # Verify system still works        assert self.verify_consistency(), "Consistency violated after node kill!"        # Chaos 3: Restore killed node, verify catch-up        killed_node.start()        time.sleep(30)  # Wait for replication to catch up        assert self.verify_consistency(), "Node didn't catch up correctly!"        print("=== Chaos Test PASSED ===")    def continuous_writes(self, duration):        ""Write data continuously""        start = time.time()        count = 0        while time.time() - start < duration:            try:                self.cluster.write(f"key_${count}", f"value_${count}")                count += 1            except Exception as e:                print(f"Write failed: ${e}")            time.sleep(0.1)        print(f"Completed ${count} writes in ${duration}s")    def verify_consistency(self):        ""Check all replicas have same data""        print("Verifying consistency across replicas...")        # Get data from all nodes        node_data = {}        for node in self.cluster.nodes:            if node.is_running():                node_data[node.id] = node.get_all_data()        # Compare checksums        checksums = {            node_id: hash(frozenset(data.items()))            for node_id, data in node_data.items()        }        if len(set(checksums.values())) == 1:            print("✅ All replicas consistent")            return True        else:            print("❌ Replicas diverged!")            for node_id, checksum in checksums.items():                print(f"  Node ${node_id}: ${checksum}")            return False# Run chaos testcluster = create_test_cluster(nodes=5, replication_factor=3)chaos = ChaosTest(cluster)chaos.run_chaos_scenario()

Chaos Engineering Best Practices:

1. Start small:   - Single node failure   - Short partitions (seconds)   - Low traffic2. Gradually increase severity:   - Multiple node failures   - Long partitions (minutes)   - High traffic + failures3. Test failure modes:   ✓ Network partitions   ✓ Process crashes   ✓ Disk full   ✓ Clock skew   ✓ Slow network (high latency)   ✓ Packet loss   ✓ Byzantine failures (rare)4. Verify invariants:   ✓ No data loss   ✓ No dirty reads   ✓ Bounded staleness   ✓ Monotonic reads   ✓ Recovery completes5. Automate tests:   - Run in CI/CD   - Randomized scenarios   - Continuous chaos in staging

Netflix Chaos Monkey:

python
# Chaos Monkey randomly kills instances in production# Ensures system resilient to failuresclass ChaosMonkey:    def __init__(self, cluster, probability=0.01):        self.cluster = cluster        self.kill_probability = probability    def run(self):        ""Periodically kill random instances""        while True:            time.sleep(3600)  # Every hour            if random.random() < self.kill_probability:                # Kill random instance                instance = random.choice(self.cluster.instances)                print(f"🐵 Chaos Monkey: Terminating ${instance.id}")                instance.terminate()                # Monitor recovery                self.monitor_recovery(instance)    def monitor_recovery(self, killed_instance):        ""Verify system recovered from failure""        # Check that:        # 1. Traffic rerouted to healthy instances        # 2. New instance launched automatically        # 3. No user-facing errors        pass# Run in production (yes, really!)monkey = ChaosMonkey(production_cluster, probability=0.01)monkey.run()  # 1% chance per hour of killing instance

Fault Injection with Toxiproxy:

python
from toxiproxy import Toxiproxy# Toxiproxy: proxy for simulating network problemstoxiproxy = Toxiproxy()# Create proxy for database connectiondb_proxy = toxiproxy.create(    name="db",    listen="127.0.0.1:5433",    upstream="postgres:5432")# Test 1: High latencydb_proxy.add_toxic(    name="latency",    type="latency",    attributes={"latency": 1000}  # Add 1s latency)# Application now experiences slow queries# Verify timeout handling works# Test 2: Packet lossdb_proxy.add_toxic(    name="packet_loss",    type="timeout",    attributes={"timeout": 0}  # 0 = drop packets)# Verify application retries correctly# Test 3: Connection limitsdb_proxy.add_toxic(    name="limit_data",    type="limit_data",    attributes={        "bytes": 1000  # Close after 1KB    })# Verify application handles partial responses# Remove toxics to restore normal operationdb_proxy.remove_toxic("latency")

6. Capacity Planning for Replication

Estimating Replication Bandwidth:

python
# Calculate bandwidth needed for replication# Parameterswrite_rate = 10_000  # writes per secondavg_write_size = 1024  # bytes per writereplication_factor = 3  # 3 replicas# Bandwidth per secondbytes_per_second = write_rate * avg_write_sizeprint(f"Primary generates: ${bytes_per_second / 1024 / 1024:.2f} MB/s")# Replication traffic# Each write goes to (RF - 1) replicasreplication_bandwidth = bytes_per_second * (replication_factor - 1)print(f"Replication bandwidth: ${replication_bandwidth / 1024 / 1024:.2f} MB/s")# Cross-region bandwidth (more expensive)# Assume 2 regions, each with 3 replicascross_region_bandwidth = bytes_per_second * 3  # All writes to other regionprint(f"Cross-region bandwidth: ${cross_region_bandwidth / 1024 / 1024:.2f} MB/s")# Monthly cost estimate (AWS)# Inter-region: $0.02 per GBmonthly_seconds = 30 * 24 * 60 * 60monthly_gb = (cross_region_bandwidth * monthly_seconds) / 1024 / 1024 / 1024monthly_cost = monthly_gb * 0.02print(f"Monthly cross-region cost: $${monthly_cost:.2f}")

Replication Lag SLOs:

python
# Define SLOs for replication lagSLO_TARGETS = {    "p50": 10,    # ms - median lag    "p95": 100,   # ms - 95th percentile    "p99": 500,   # ms - 99th percentile    "p99.9": 2000 # ms - 99.9th percentile}# Monitor and alertdef check_replication_lag_slo(metrics):    for percentile, target_ms in SLO_TARGETS.items():        actual = metrics.get_percentile(percentile)        if actual > target_ms:            alert(                f"Replication lag SLO violated: "                f"{percentile} = {actual}ms (target: {target_ms}ms)"            )# Dashboard queries (Prometheus)# rate(replication_lag_seconds_sum[5m]) / rate(replication_lag_seconds_count[5m])# histogram_quantile(0.95, rate(replication_lag_seconds_bucket[5m]))

Conclusion

Data replication is fundamental to building distributed systems that are highly available, performant, and resilient. Key takeaways:

Choosing a Replication Strategy

RequirementStrategy
Strong consistencyPrimary-Replica (sync) + Quorum reads
Low latency globallyActive-Active Multi-Region
Read scalingRead Replicas
Write scalingSharding + Replication
Disaster recoveryActive-Passive Cross-Region
Compliance (data residency)Region-specific sharding

CAP Theorem in Practice

  • CP Systems: Choose when correctness is critical (finance, inventory)
  • AP Systems: Choose when availability matters more (social media, caching)
  • Tunable Consistency: Use systems like Cassandra/DynamoDB for flexibility

Conflict Resolution

  • LWW: Simple but lossy
  • Vector Clocks: Detects conflicts, needs app-level resolution
  • CRDTs: Automatic conflict-free merging
  • Application Logic: Domain-specific merge rules

Production Essentials

  1. Monitor replication lag continuously
  2. Test failover scenarios regularly
  3. Validate consistency across regions
  4. Plan for split-brain prevention
  5. Design for eventual consistency when using AP systems

Building distributed systems is hard, but with the right replication strategy and understanding of trade-offs, you can build systems that serve billions of users with high availability and low latency worldwide.


Further Reading:

  • by Martin Kleppmann

Building the distributed future, one replica at a time.

← All postsShare on X
┌─────────────┐│   Client    │└──────┬──────┘       │       ▼┌─────────────┐        ❌ Crashes│   Primary   │────────────────────→ SYSTEM DOWN│   Database  │└─────────────┘
┌─────────────┐│   Client    │└──────┬──────┘       │   ┌───┴────┐   │        │   ▼        ▼┌─────┐  ┌─────┐        ❌ Primary fails│ P   │  │ R1  │────────────────────→ Failover to R1└─────┘  └─────┘           │         ┌─┴──┐         │    │         ▼    ▼      ┌────┐ ┌────┐      │ R2 │ │ R3 │      └────┘ └────┘System continues running ✅
User in Tokyo → 150ms RTT → Server in VirginiaTotal latency: 150ms + processing time
User in Tokyo → 5ms RTT → Server in TokyoTotal latency: 5ms + processing time (30x faster!)
         ┌─────────────┐         │   Primary   │  (Handles all writes)         │   Database  │         └──────┬──────┘                │        ┌───────┼───────┐        │       │       │        ▼       ▼       ▼     ┌────┐  ┌────┐  ┌────┐     │ R1 │  │ R2 │  │ R3 │  (Distribute reads)     └─┬──┘  └─┬──┘  └─┬──┘       │       │       │     10K/s   10K/s   10K/s  = 30K reads/second
Client A writes X=5  ↓┌────────────┐│   Node 1   │  X=5 ✅│   Node 2   │  X=5 ✅│   Node 3   │  X=5 ✅└────────────┘  ↓Client B reads X → Gets 5 (guaranteed)
Node 1 fails  ↓Clients can still:- Read from Node 2 ✅- Read from Node 3 ✅- Write to Node 2 ✅(System keeps running)
┌─────┐          ┌─────┐│ N1  │  ─ ❌ ─  │ N2  │  (Network partition)└─────┘          └─────┘  │                │  │                │Both continue operating independently
# MongoDB CP behaviortry:    db.accounts.update_one(        {"user_id": "alice"},        {"$inc": {"balance": -100}},        write_concern=WriteConcern(w="majority")  # Wait for majority    )except NetworkTimeout:    # Cannot reach majority → Reject write    return "Service temporarily unavailable"
# Cassandra AP behaviorsession.execute(    "UPDATE users SET last_login = ? WHERE user_id = ?",    [datetime.now(), "alice"],    consistency_level=ConsistencyLevel.ONE  # Respond immediately)# Returns success even if some replicas are unreachable
T1: Write X=5T2: Read X → Returns 5 (guaranteed)
T1: Write X=5 to Node 1T2: Read X from Node 2 → Might return 4 (old value)T3: (after replication) Read X from Node 2 → Returns 5
User A: Post message → Comment on messageAll users see: Post before Comment ✅User A: Post XUser B: Post Y (unrelated)Some users might see: Y before X ✅ (okay, not causally related)
User A: Update profile pictureUser A: Refresh page → Sees new picture ✅User B: Views User A's profile → Might see old picture temporarily
T1: Read X → Returns X=5T2: Read X → Returns X=5 or higher (never X=4)
If operation A completes before operation B begins:  → All processes must see A before BExample: Bank transferT1: Write (Account_A -= $100) [completes at 10:00:00.001]T2: Write (Account_B += $100) [starts at 10:00:00.002]T3: Read (Account_A, Account_B) → Must see both updates or neither
Linearizability: Respects real-time orderingSequential: Only requires some total order (can reorder concurrent ops)Linearizable execution:T1: ─────[Write X=1]─────────> (completes at t=5)T2: ──────────────────[Read X]──> Must return 1 (starts at t=6)Sequential but NOT Linearizable:T1: ─────[Write X=1]─────────> (completes at t=5)T2: ──────────────────[Read X=0]──> (starts at t=6, but sees old value)    This violates real-time ordering!
# PostgreSQL Snapshot Isolation (REPEATABLE READ)BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;# T1: Read account balance at snapshot time t=100SELECT balance FROM accounts WHERE id = 'alice';  # Returns $500# T2: (Another transaction updates balance to $600 at t=101)# T3: Read again in same transactionSELECT balance FROM accounts WHERE id = 'alice';  # Still returns $500# Transaction sees consistent snapshot!COMMIT;
MVCC keeps multiple versions of each row:accounts table:┌────────┬─────────┬─────────────┬─────────────┐│ id     │ balance │ xmin (txid) │ xmax (txid) │├────────┼─────────┼─────────────┼─────────────┤│ alice  │ $500    │ 100         │ 101         │ (old version)│ alice  │ $600    │ 101         │ ∞           │ (new version)└────────┴─────────┴─────────────┴─────────────┘Transaction at t=100: Sees version with xmin ≤ 100, xmax > 100 → $500Transaction at t=102: Sees version with xmin ≤ 102, xmax > 102 → $600
# Doctor on-call scheduling (write skew example)# Constraint: At least 1 doctor must be on-call# Currently: Alice and Bob both on-call# Transaction 1 (Alice):BEGIN;SELECT COUNT(*) FROM on_call WHERE on_call = true;  # Returns 2# Alice sees Bob is on-call, so she can leaveUPDATE on_call SET on_call = false WHERE doctor = 'Alice';COMMIT;# Transaction 2 (Bob) - concurrent:BEGIN;SELECT COUNT(*) FROM on_call WHERE on_call = true;  # Returns 2# Bob sees Alice is on-call, so he can leaveUPDATE on_call SET on_call = false WHERE doctor = 'Bob';COMMIT;# RESULT: No doctors on-call! Constraint violated!# Snapshot isolation doesn't prevent this
# PostgreSQL Serializable IsolationBEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;# Transaction automatically aborts if serialization anomaly detectedtry:    # Perform operations    cur.execute("SELECT COUNT(*) FROM on_call WHERE on_call = true")    count = cur.fetchone()[0]    if count > 1:        cur.execute("UPDATE on_call SET on_call = false WHERE doctor = 'Alice'")    conn.commit()except SerializationFailure:    # Database detected write skew, retry transaction    conn.rollback()    retry_transaction()
Growing phase:  Acquire all locksShrinking phase: Release all locks (after commit/abort)Problems:- Deadlocks possible- Lower concurrency- Requires deadlock detection
Track read-write conflicts:- Detect if T1 reads X, then T2 writes X, then T1 writes Y- This creates a dependency cycle → abort one transactionAdvantages:- No locks needed- Better performance than 2PL- Used by PostgreSQL, CockroachDB
Strongest (Most Guarantees)         ↓┌───────────────────────────┐│  Linearizability          │ ← Strictest: Real-time ordering├───────────────────────────┤│  Serializability          │ ← Transactions behave serially├───────────────────────────┤│  Snapshot Isolation       │ ← Consistent snapshots (MVCC)├───────────────────────────┤│  Causal Consistency       │ ← Causally related ops ordered├───────────────────────────┤│  Read Your Writes         │ ← Session consistency├───────────────────────────┤│  Monotonic Reads          │ ← No backwards time travel├───────────────────────────┤│  Eventual Consistency     │ ← Eventually converges└───────────────────────────┘         ↓Weakest (Fewest Guarantees, Lowest Latency)
           ┌──────────┐    Writes │ PRIMARY  │      ───→ └────┬─────┘                │ (replicate)         ┌──────┼──────┐         ▼      ▼      ▼      ┌────┐ ┌────┐ ┌────┐Reads │ R1 │ │ R2 │ │ R3 │  ←── └────┘ └────┘ └────┘
# PostgreSQL synchronous replication# postgresql.confsynchronous_commit = onsynchronous_standby_names = 'replica1,replica2'# Write waits for replicasconn.execute("INSERT INTO users VALUES (...)")# Returns only after replicas confirm write
# MySQL async replication# Primary returns immediatelyconn.execute("INSERT INTO users VALUES (...)")# Replica receives update later (lag: milliseconds to seconds)
    ┌──────────┐      ┌──────────┐      ┌──────────┐    │ PRIMARY  │ ←──→ │ PRIMARY  │ ←──→ │ PRIMARY  │    │  (US)    │      │  (EU)    │      │  (ASIA)  │    └──────────┘      └──────────┘      └──────────┘        ↕                  ↕                  ↕    Bidirectional replication between all primaries
T1: User in US updates profile.name = "Alice Smith"T2: User in EU updates profile.name = "Alice Johnson" (same user!)Both writes succeed locally, then replicate:US Primary:  "Alice Smith"   → replicates to EUEU Primary:  "Alice Johnson" → replicates to USCONFLICT! Which value wins?
5 Nodes totalWrite: Must succeed on 3+ nodes (quorum)Read:  Must check 3+ nodes (quorum)Guarantee: Read quorum + Write quorum > Total nodes           → Reads always see latest write
# Write to 3 out of 5 replicassession.execute(    "INSERT INTO users (id, name) VALUES (?, ?)",    [user_id, "Alice"],    consistency_level=ConsistencyLevel.QUORUM  # 3/5 nodes)# Read from 3 out of 5 replicasresult = session.execute(    "SELECT name FROM users WHERE id = ?",    [user_id],    consistency_level=ConsistencyLevel.QUORUM  # 3/5 nodes)# Returns most recent value (by timestamp)
Write  →  [Head] → [R1] → [R2] → [Tail]Read   ←                           [Tail]Writes go to HeadReads come from Tail (always consistent!)
Raft Cluster (5 nodes):┌──────────┐│  Leader  │ ← Handles all client requests└────┬─────┘     │ (Replicate log entries)     ├────────┬────────┬────────┐     ▼        ▼        ▼        ▼  ┌────┐  ┌────┐  ┌────┐  ┌────┐  │ F1 │  │ F2 │  │ F3 │  │ F4 │ ← Followers  └────┘  └────┘  └────┘  └────┘
- Nodes start as followers- If no heartbeat from leader (timeout): become candidate- Request votes from other nodes- Win election if majority votes received- New leader sends heartbeats to prevent new electionsElection timeout: 150-300ms (randomized to prevent splits)
# Leader receives client requestdef handle_write(data):    # 1. Append to local log    log_entry = LogEntry(        term=current_term,        index=last_log_index + 1,        data=data    )    append_to_log(log_entry)    # 2. Replicate to followers    for follower in followers:        send_append_entries(follower, [log_entry])    # 3. Wait for majority acknowledgment    acks = wait_for_acks(majority_count)    # 4. Commit entry (apply to state machine)    if acks >= majority_count:        commit_index = log_entry.index        apply_to_state_machine(log_entry)        return SUCCESS    return FAILURE  # Couldn't reach majority
Leader Log:┌─────┬─────┬─────┬─────┬─────┬─────┐│ 1,1 │ 1,2 │ 2,3 │ 2,4 │ 3,5 │ 3,6 │└─────┴─────┴─────┴─────┴─────┴─────┘  term,index for each log entry                            ↑                     commit_index=6Follower Logs (eventually consistent):Follower 1: [1,1] [1,2] [2,3] [2,4] [3,5] [3,6] ✅ Up-to-dateFollower 2: [1,1] [1,2] [2,3] [2,4] [3,5]       ⏳ Lagging (entry 6 replicating)Follower 3: [1,1] [1,2] [2,3]                   ⏳ Further behind
Raft ensures:- Election Safety: At most one leader per term- Leader Append-Only: Leader never overwrites log entries- Log Matching: If two logs contain entry with same index/term,                 all previous entries are identical- Leader Completeness: If entry committed in term T,                       it will be present in leaders of all terms > T- State Machine Safety: If server applies log entry at index i,                        no other server applies different entry at i
// etcd Raft configurationimport "go.etcd.io/etcd/raft"// Create Raft nodestorage := raft.NewMemoryStorage()config := &raft.Config{    ID:              nodeID,    ElectionTick:    10,   // Elections after 10 ticks    HeartbeatTick:   1,    // Heartbeat every tick    Storage:         storage,    MaxSizePerMsg:   1024 * 1024,    MaxInflightMsgs: 256,}node := raft.StartNode(config, []raft.Peer{{ID: 1}, {ID: 2}, {ID: 3}})// Write datafunc writeKey(key, value string) {    data := []byte(fmt.Sprintf("%s=%s", key, value))    // Propose to Raft    node.Propose(context.TODO(), data)    // Wait for commit    select {    case rd := <-node.Ready():        // Entry committed, apply to state machine        for _, entry := range rd.CommittedEntries {            applyToStateMachine(entry.Data)        }        node.Advance()    }}
┌──────────────┐│  Proposers   │ ← Propose values└──────┬───────┘       │       ▼┌──────────────┐│  Acceptors   │ ← Vote on proposals (form quorum)└──────┬───────┘       │       ▼┌──────────────┐│  Learners    │ ← Learn chosen value└──────────────┘
class PaxosAcceptor:    def __init__(self):        self.promised_id = None  # Highest proposal ID promised        self.accepted_id = None  # ID of accepted proposal        self.accepted_value = None  # Accepted value    def prepare(self, proposal_id):        ""Phase 1b: Promise""        if self.promised_id is None or proposal_id > self.promised_id:            self.promised_id = proposal_id            return {                'promised': True,                'accepted_id': self.accepted_id,                'accepted_value': self.accepted_value            }        return {'promised': False}    def accept(self, proposal_id, value):        ""Phase 2b: Accept""        if self.promised_id is None or proposal_id >= self.promised_id:            self.promised_id = proposal_id            self.accepted_id = proposal_id            self.accepted_value = value            return {'accepted': True}        return {'accepted': False}class PaxosProposer:    def propose(self, value, acceptors):        proposal_id = generate_unique_id()        # Phase 1: Prepare        promises = []        for acceptor in acceptors:            response = acceptor.prepare(proposal_id)            if response['promised']:                promises.append(response)        # Need majority to proceed        if len(promises) < len(acceptors) // 2 + 1:            return None  # Failed to get quorum        # Check if any acceptor already accepted a value        # If so, must propose that value (Paxos invariant)        max_accepted_id = -1        proposed_value = value        for promise in promises:            if promise['accepted_id'] and promise['accepted_id'] > max_accepted_id:                max_accepted_id = promise['accepted_id']                proposed_value = promise['accepted_value']        # Phase 2: Accept        acceptances = []        for acceptor in acceptors:            response = acceptor.accept(proposal_id, proposed_value)            if response['accepted']:                acceptances.append(response)        # Value chosen if majority accepted        if len(acceptances) >= len(acceptors) // 2 + 1:            return proposed_value        return None  # Failed to get quorum
Instead of running Paxos for each value:1. Elect a stable leader (using Paxos)2. Leader proposes sequence of values without prepare phase3. Much more efficient in practiceThis is essentially what Raft does!
✅ Use Raft/Paxos when:- Need linearizable consistency- Coordinating distributed locks- Leader election required- Building distributed configuration store- Implementing replicated state machines❌ Don't use when:- Eventual consistency acceptable- Low latency critical (consensus has overhead)- Simple primary-replica sufficient- No need for automatic failover
import consul# Consul uses Raft for consistent K/V storec = consul.Consul()# Writes go through Raft consensussuccess = c.kv.put('config/database/url', 'postgresql://...')# Guaranteed consistent readsindex, data = c.kv.get('config/database/url')# CAS (Compare-and-Set) operations atomic via Raftc.kv.put('counter', '0')index, current = c.kv.get('counter')# Only succeeds if modify_index matches (no concurrent updates)success = c.kv.put(    'counter',    str(int(current['Value']) + 1),    cas=current['ModifyIndex']  # Compare-and-swap)
For strong consistency:W + R > NWhere:- N = Total replicas- W = Write quorum (must ack write)- R = Read quorum (must ack read)Why W + R > N ensures consistency:→ Read and write quorums must overlap→ At least one node in read quorum has latest write
# Cassandra with sloppy quorumclass SloppyQuorum:    def __init__(self, replication_factor=3):        self.N = replication_factor        self.W = 2  # QUORUM        self.R = 2  # QUORUM    def write(self, key, value):        # Determine preference list (N nodes)        nodes = self.get_preference_list(key, self.N)        # Try to write to N nodes        written = []        for node in nodes:            try:                node.put(key, value)                written.append(node)                # Stop once W nodes acknowledged                if len(written) >= self.W:                    return SUCCESS            except NodeDown:                continue        # If < W primary nodes available, use "hinted handoff"        # Write to extra nodes temporarily        if len(written) < self.W:            extra_nodes = self.get_extra_nodes(key)            for node in extra_nodes:                # Store hint: "This write belongs to node X"                node.put_with_hint(key, value, intended_node=nodes[0])                written.append(node)                if len(written) >= self.W:                    return SUCCESS        return FAILURE    def read(self, key):        nodes = self.get_preference_list(key, self.N)        # Read from R nodes        responses = []        for node in nodes:            try:                value = node.get(key)                responses.append(value)                if len(responses) >= self.R:                    break            except NodeDown:                continue        # Return most recent value (by timestamp)        return max(responses, key=lambda v: v.timestamp)
Normal case (all nodes up):Write goes to nodes: [A, B, C]Node B is down:Write goes to nodes: [A, C, D (hint for B)]When B comes back online:Node D transfers hint to BB now has the write it missed
WAL Principle: Log changes BEFORE applying them to data files┌─────────────┐│   Client    │└──────┬──────┘       │ UPDATE users SET name='Alice'       ▼┌─────────────────────────────────────┐│  Database Process                   ││                                     ││  1. Write to WAL      [Log entry]  │ ← MUST happen first│     ↓                               ││  2. Return success to client        ││     ↓                               ││  3. Apply to data files (async)     │ ← Can happen later│                                     │└─────────────────────────────────────┘Why?→ Crash between step 2 and 3? Recovery replays WAL→ Replication: Send WAL to replicas→ Point-in-time recovery: Restore from WAL
# WAL record structureclass WALRecord:    def __init__(self):        self.lsn = None          # Log Sequence Number (position in WAL)        self.xid = None          # Transaction ID        self.rmid = None         # Resource Manager ID        self.info = None         # Record type info        self.data = None         # Actual change data    def __repr__(self):        return f"WAL@{self.lsn}: XID={self.xid} {self.data}"# Example WAL records for a transaction:# WAL@0/1000: XID=100 BEGIN# WAL@0/1020: XID=100 INSERT INTO users (id, name) VALUES (1, 'Alice')# WAL@0/1040: XID=100 COMMIT
Client writes data:1. Build WAL record   ┌────────────────────────────┐   │ LSN: 0/AB12CD34            │   │ XID: 12345                 │   │ Type: INSERT               │   │ Table: users               │   │ Data: (id=1, name='Alice') │   └────────────────────────────┘2. Append to WAL buffer (in memory)   ┌─────────────────┐   │   WAL Buffer    │   │ [record][record]│ ← In-memory   └─────────────────┘3. fsync() to disk (durability!)   ┌─────────────────┐   │   WAL File      │   │ wal/00000001... │ ← On disk   └─────────────────┘4. Return success to client5. Later: Apply to heap files (background writer)   ┌─────────────────┐   │  Data Files     │   │  base/16384/... │   └─────────────────┘
# Without fsync (DANGEROUS!)with open('wal_file', 'ab') as f:    f.write(wal_record)    # Data might still be in OS page cache    # Power loss = data loss!# With fsync (SAFE)import oswith open('wal_file', 'ab') as f:    f.write(wal_record)    f.flush()       # Flush Python buffer to OS    os.fsync(f.fileno())  # Force OS to write to physical disk    # Now guaranteed on disk!
-- PostgreSQL synchronous_commit settings-- 1. off (FASTEST, UNSAFE)SET synchronous_commit = off;-- Returns before WAL written to disk-- Risk: Last few transactions lost on crash-- Use: High-throughput logging, analytics-- 2. local (DEFAULT, SAFE)SET synchronous_commit = local;-- Returns after WAL written to local disk-- Guaranteed durability on primary-- Use: Single-instance deployments-- 3. remote_write (REPLICATION-AWARE)SET synchronous_commit = remote_write;-- Returns after replica received WAL (but not fsynced)-- Risk: Replica crash could lose data-- Use: Balance between performance and replication-- 4. on / remote_apply (STRICTEST)SET synchronous_commit = on;-- Returns after replica fsynced AND applied WAL-- Maximum durability-- Use: Financial systems, critical data
Primary → Replica WAL shipping:┌──────────────────┐                    ┌──────────────────┐│     Primary      │                    │     Replica      ││                  │                    │                  ││ 1. Write to WAL  │                    │                  ││    ↓             │                    │                  ││ 2. fsync         │                    │                  ││    ↓             │                    │                  ││ 3. Stream WAL ───────────────────────→│ 4. Receive WAL   ││    (TCP socket)  │                    │    ↓             ││                  │                    │ 5. Write to WAL  ││                  │                    │    ↓             ││                  │                    │ 6. fsync         ││                  │                    │    ↓             ││ 7. Get ACK   ←───────────────────────│ 7. Send ACK      ││    ↓             │                    │    ↓             ││ 8. Commit        │                    │ 8. Replay WAL    ││                  │                    │    (apply changes)│└──────────────────┘                    └──────────────────┘
# Primary configuration# postgresql.confwal_level = replica              # Generate enough WAL for replicationmax_wal_senders = 10             # Max concurrent replica connectionswal_keep_size = '1GB'            # Keep at least 1GB of WALwal_sender_timeout = 60s         # Disconnect dead replicas# Replica configuration# postgresql.confhot_standby = on                 # Allow read queries on replicamax_standby_streaming_delay = 30s# recovery.conf (or postgresql.auto.conf)primary_conninfo = 'host=primary.example.com port=5432 user=replicator password=...'primary_slot_name = 'replica_1'  # Replication slot (prevents WAL deletion)# Monitoring replication lagSELECT    client_addr,    state,    sent_lsn,    write_lsn,    flush_lsn,    replay_lsn,    sync_state,    pg_wal_lsn_diff(sent_lsn, replay_lsn) AS lag_bytesFROM pg_stat_replication;-- Example output:-- client_addr | state     | lag_bytes-- 10.0.1.5    | streaming | 2048      (2KB lag - excellent!)-- 10.0.1.6    | streaming | 5242880   (5MB lag - investigate)
# Archive WAL files to S3 for disaster recovery# postgresql.confarchive_mode = onarchive_command = 'aws s3 cp %p s3://mydb-wal-archive/%f'archive_timeout = 300  # Force archive every 5 minutes# WAL archiving workflow:# 1. Primary fills WAL segment (default: 16MB)# 2. Segment complete → run archive_command# 3. Copy to remote storage (S3, NFS, etc.)# 4. Can now reuse/delete local WAL segment# Restore from archive (disaster recovery):# restore_command = 'aws s3 cp s3://mydb-wal-archive/%f %p'# recovery_target_time = '2025-11-29 10:30:00'# Restores database to exact state at 10:30 AM!
# Problem: fsync is expensive (5-10ms per call)# Solution: Batch multiple transactions into one fsyncclass GroupCommit:    def __init__(self):        self.pending_txns = []        self.commit_delay = 0.001  # 1ms    async def commit_transaction(self, txn):        # Add to batch        self.pending_txns.append(txn)        # Wait for batch to fill or timeout        await asyncio.sleep(self.commit_delay)        # Flush all pending        if self.pending_txns:            batch = self.pending_txns            self.pending_txns = []            # Single fsync for entire batch!            write_and_fsync_wal(batch)            # All transactions durable            for t in batch:                t.notify_committed()# Result: 1000 commits/sec → 100,000 commits/sec!# Latency: +1ms
-- PostgreSQL 14+ WAL compressionALTER SYSTEM SET wal_compression = on;-- Reduces WAL size by ~50% for UPDATE-heavy workloads-- Trade-off: Slight CPU overhead-- Before compression: UPDATE generates full row image-- After compression: UPDATE generates compressed delta
Checkpoint process:1. Flush all dirty pages to disk2. Write checkpoint record to WAL3. Allows discarding old WAL┌─────────────────────────────────────┐│         WAL Timeline                │├─────────────────────────────────────┤│ [records] [CHECKPOINT] [records]... ││     ↑                    ↑          ││  Can delete      Keep for recovery  │└─────────────────────────────────────┘Recovery starts from last checkpoint!# PostgreSQL checkpoint tuningcheckpoint_timeout = 15min         # Max time between checkpointscheckpoint_completion_target = 0.9 # Spread checkpoint I/Omax_wal_size = 4GB                 # Trigger checkpoint if exceeded# Trade-offs:# Frequent checkpoints: Less WAL, faster recovery, more I/O# Infrequent checkpoints: More WAL, slower recovery, less I/O
Spanner uses Paxos-replicated WAL:┌─────────────────────────────────────┐│         Paxos Group                 ││                                     ││  Leader: [WAL: entry 1,2,3,4]      ││     │                               ││     ├──→ Follower 1: [1,2,3,4]     ││     ├──→ Follower 2: [1,2,3,4]     ││     ├──→ Follower 3: [1,2,3]  ⏳   ││     └──→ Follower 4: [1,2,3]  ⏳   ││                                     ││  Commit when majority have entry    ││  (3/5 = quorum)                     │└─────────────────────────────────────┘TrueTime API: Globally synchronized timestamps→ Ensures external consistency (linearizability)
// CockroachDB replicates ranges using Raft// Each range has its own Raft log (WAL)type Range struct {    rangeID  int64    raftLog  RaftLog  // Replicated WAL    replicas []Replica}// Write to rangefunc (r *Range) Write(key, value []byte) error {    // Create Raft log entry    entry := raftpb.Entry{        Type: raftpb.EntryNormal,        Data: encodeKV(key, value),    }    // Propose to Raft (replicates via WAL)    r.raftLog.Propose(context.TODO(), entry.Data)    // Wait for commit (majority replicated)    // Then apply to RocksDB storage engine    return r.waitForCommit(entry.Index)}
┌──────────────────┐                 ┌──────────────────┐│   US-EAST-1      │                 │   US-WEST-2      ││   (ACTIVE)       │    Async Sync   │   (PASSIVE)      ││                  │  ─────────────→ │                  ││  ┌────────────┐  │                 │  ┌────────────┐  ││  │  Primary   │  │                 │  │  Standby   │  ││  │  Database  │  │                 │  │  Database  │  ││  └────────────┘  │                 │  └────────────┘  ││        ↕         │                 │                  ││   All Traffic    │                 │   No Traffic     │└──────────────────┘                 └──────────────────┘Primary Fails? → Failover to US-WEST-2
# Primary (US-EAST-1)# postgresql.confwal_level = replicamax_wal_senders = 5wal_keep_size = '1GB'# Standby (US-WEST-2)# recovery.confprimary_conninfo = 'host=primary.us-east-1.rds.amazonaws.com port=5432'restore_command = 'aws s3 cp s3://wal-archive/%f %p'# Automatic failover with Patronipatroni:  scope: postgres-cluster  name: us-west-2  restapi:    listen: 0.0.0.0:8008  etcd:    host: etcd.cluster.local:2379
┌──────────────────┐         ┌──────────────────┐│   US-EAST-1      │ ←────→ │   EU-WEST-1      ││                  │  Bi-   │                  ││  ┌────────────┐  │  dir   │  ┌────────────┐  ││  │  Primary   │  │  Sync  │  │  Primary   │  ││  └────────────┘  │         │  └────────────┘  ││        ↕         │         │        ↕         ││  US Users        │         │  EU Users        │└──────────────────┘         └──────────────────┘          ↕                           ↕     Low latency              Low latency
import boto3# Create Global Tabledynamodb = boto3.client('dynamodb')# Define replication groupresponse = dynamodb.create_global_table(    GlobalTableName='users',    ReplicationGroup=[        {'RegionName': 'us-east-1'},        {'RegionName': 'eu-west-1'},        {'RegionName': 'ap-southeast-1'}    ])# Writes to any region automatically replicate# Typical replication lag: < 1 seconddynamodb_us = boto3.resource('dynamodb', region_name='us-east-1')table = dynamodb_us.Table('users')# User in US writestable.put_item(Item={'user_id': 'alice', 'name': 'Alice'})# Available in EU within ~1 seconddynamodb_eu = boto3.resource('dynamodb', region_name='eu-west-1')table_eu = dynamodb_eu.Table('users')response = table_eu.get_item(Key={'user_id': 'alice'})# Returns {'user_id': 'alice', 'name': 'Alice'}
# T1: User updates name in UStable_us.update_item(    Key={'user_id': 'alice'},    UpdateExpression='SET #name = :name',    ExpressionAttributeNames={'#name': 'name'},    ExpressionAttributeValues={':name': 'Alice Smith'})# T2: Same user updates name in EU (concurrent!)table_eu.update_item(    Key={'user_id': 'alice'},    UpdateExpression='SET #name = :name',    ExpressionAttributeNames={'#name': 'name'},    ExpressionAttributeValues={':name': 'Alice Johnson'})# DynamoDB resolves conflict using timestamps# Later write wins (based on internal timestamp)# Final value: "Alice Johnson" (if EU write was newer)
                  ┌──────────────────┐                  │   US-EAST-1      │                  │                  │            Writes│  ┌────────────┐  │              ───→│  │  PRIMARY   │  │                  │  └─────┬──────┘  │                  └────────┼─────────┘                           │         ┌─────────────────┼─────────────────┐         │                 │                 │         ▼                 ▼                 ▼┌────────────────┐ ┌────────────────┐ ┌────────────────┐│  EU-WEST-1     │ │  AP-SOUTH-1    │ │  SA-EAST-1     ││  ┌──────────┐  │ │  ┌──────────┐  │ │  ┌──────────┐  ││  │ REPLICA  │  │ │  │ REPLICA  │  │ │  │ REPLICA  │  ││  └────┬─────┘  │ │  └────┬─────┘  │ │  └────┬─────┘  ││       ↕        │ │       ↕        │ │       ↕        ││  EU Reads      │ │  Asia Reads    │ │  SA Reads      │└────────────────┘ └────────────────┘ └────────────────┘
import boto3rds = boto3.client('rds', region_name='us-east-1')# Create read replica in EUrds.create_db_instance_read_replica(    DBInstanceIdentifier='mydb-eu-replica',    SourceDBInstanceIdentifier='mydb-primary',    DBInstanceClass='db.r5.large',    AvailabilityZone='eu-west-1a',    PubliclyAccessible=False)# Application routing logicclass DatabaseRouter:    def __init__(self):        self.primary = "mydb-primary.us-east-1.rds.amazonaws.com"        self.replicas = {            "us": "mydb-primary.us-east-1.rds.amazonaws.com",            "eu": "mydb-eu-replica.eu-west-1.rds.amazonaws.com",            "ap": "mydb-ap-replica.ap-south-1.rds.amazonaws.com"        }    def get_write_connection(self):        # All writes to primary        return connect(self.primary)    def get_read_connection(self, user_region):        # Reads from nearest replica        replica = self.replicas.get(user_region, self.primary)        return connect(replica)# Usagerouter = DatabaseRouter()# Write (always primary)write_conn = router.get_write_connection()write_conn.execute("INSERT INTO users (name) VALUES ('Alice')")# Read (region-specific)read_conn = router.get_read_connection(user_region="eu")users = read_conn.execute("SELECT * FROM users WHERE active = true")
User Data Partitioning by Region:┌──────────────────┐     ┌──────────────────┐     ┌──────────────────┐│   US Region      │     │   EU Region      │     │   Asia Region    ││                  │     │                  │     │                  ││  US Users Data   │     │  EU Users Data   │     │  Asia Users Data ││  ┌────────────┐  │     │  ┌────────────┐  │     │  ┌────────────┐  ││  │  Shard 1   │  │     │  │  Shard 2   │  │     │  │  Shard 3   │  ││  └────────────┘  │     │  └────────────┘  │     │  └────────────┘  ││       ↕          │     │       ↕          │     │       ↕          ││  US Traffic      │     │  EU Traffic      │     │  Asia Traffic    │└──────────────────┘     └──────────────────┘     └──────────────────┘Cross-region queries require routing to multiple shards
// Define shard zones by geographic regionsh.addShardTag("shard-us", "US")sh.addShardTag("shard-eu", "EU")sh.addShardTag("shard-asia", "ASIA")// Create tag ranges based on user locationsh.addTagRange(  "myapp.users",  { user_region: "US", user_id: MinKey },  { user_region: "US", user_id: MaxKey },  "US")sh.addTagRange(  "myapp.users",  { user_region: "EU", user_id: MinKey },  { user_region: "EU", user_id: MaxKey },  "EU")// Enable sharding on collectionsh.enableSharding("myapp")sh.shardCollection("myapp.users", { user_region: 1, user_id: 1 })// Application codeasync function createUser(userData) {    // Data automatically routed to correct shard    await db.users.insertOne({        user_id: userData.id,        user_region: userData.region,  // "US", "EU", or "ASIA"        name: userData.name,        email: userData.email    });}// Queries to local shard are fastconst usUsers = await db.users.find({ user_region: "US" });// Cross-region queries slower (scatter-gather)const allUsers = await db.users.find({});  // Queries all shards
class LWWRegister:    def __init__(self):        self.value = None        self.timestamp = 0    def write(self, value, timestamp):        if timestamp > self.timestamp:            self.value = value            self.timestamp = timestamp    def merge(self, remote_value, remote_timestamp):        # Keep value with higher timestamp        if remote_timestamp > self.timestamp:            self.value = remote_value            self.timestamp = remote_timestamp# Usageregister_us = LWWRegister()register_eu = LWWRegister()# Concurrent writesregister_us.write("Alice Smith", timestamp=1000)register_eu.write("Alice Johnson", timestamp=1001)# Replication and mergeregister_us.merge("Alice Johnson", 1001)  # EU value winsregister_eu.merge("Alice Smith", 1000)    # US value discarded# Both converge to "Alice Johnson"
class VectorClock:    def __init__(self, node_id):        self.node_id = node_id        self.clock = {}    def increment(self):        self.clock[self.node_id] = self.clock.get(self.node_id, 0) + 1        return self.clock.copy()    def update(self, other_clock):        for node, count in other_clock.items():            self.clock[node] = max(self.clock.get(node, 0), count)    def compare(self, other_clock):        # Returns: "before", "after", "concurrent", or "equal"        less = False        greater = False        all_nodes = set(self.clock.keys()) | set(other_clock.keys())        for node in all_nodes:            self_count = self.clock.get(node, 0)            other_count = other_clock.get(node, 0)            if self_count < other_count:                less = True            elif self_count > other_count:                greater = True        if less and greater:            return "concurrent"  # Conflict!        elif less:            return "before"        elif greater:            return "after"        else:            return "equal"# Example: Detecting conflictsvc_us = VectorClock("US")vc_eu = VectorClock("EU")# Sequential writes (no conflict)clock1 = vc_us.increment()  # {US: 1}vc_eu.update(clock1)        # EU knows about US:1clock2 = vc_eu.increment()  # {US: 1, EU: 1}print(vc_us.compare(clock2))  # "before" (causal order)# Concurrent writes (conflict!)vc_us2 = VectorClock("US")vc_eu2 = VectorClock("EU")clock_us = vc_us2.increment()  # {US: 1}clock_eu = vc_eu2.increment()  # {EU: 1}print(vc_us2.compare(clock_eu))  # "concurrent" (CONFLICT!)
Node 1 (clock: 10:00:00.500): Write X=1Node 2 (clock: 10:00:00.499): Write X=2 (clock skew!)Which write is actually newer? Can't trust physical time alone!
Logical clocks provide ordering but lose wall-clock time→ Can't do time-based queries: "Get data from last 5 minutes"→ Can't expire cached entries based on time
class HybridLogicalClock:    ""    HLC combines:    - Physical time (wall clock)    - Logical counter (for events within same physical time)    ""    def __init__(self):        self.latest_time = 0  # Physical time        self.logical = 0      # Logical counter    def send_event(self):        ""Generate timestamp for local event""        physical_time = int(time.time() * 1000000)  # Microseconds        if physical_time > self.latest_time:            # Physical time advanced            self.latest_time = physical_time            self.logical = 0        else:            # Multiple events in same physical time            self.logical += 1        return (self.latest_time, self.logical)    def receive_event(self, remote_time, remote_logical):        ""Update clock on receiving remote event""        physical_time = int(time.time() * 1000000)        # Take maximum of local and remote physical time        self.latest_time = max(physical_time, remote_time, self.latest_time)        if self.latest_time == physical_time and self.latest_time == remote_time:            # All three equal: increment logical            self.logical = max(self.logical, remote_logical) + 1        elif self.latest_time == physical_time:            # Local physical time wins            self.logical = max(self.logical, remote_logical) + 1        elif self.latest_time == remote_time:            # Remote time wins            self.logical = remote_logical + 1        else:            # latest_time from old local time            self.logical += 1        return (self.latest_time, self.logical)# Usage Examplenode1 = HybridLogicalClock()node2 = HybridLogicalClock()# Node 1 performs local writets1 = node1.send_event()  # (1638360000000000, 0)print(f"Node 1 writes at HLC: ${ts1}")# Node 2 receives message with ts1ts2 = node2.receive_event(ts1[0], ts1[1])  # (1638360000000000, 1)print(f"Node 2 receives, HLC: ${ts2}")# Node 2 writes locallyts3 = node2.send_event()  # (1638360000000000, 2)print(f"Node 2 writes at HLC: ${ts3}")# Timestamps maintain causality AND approximate wall-clock time!
1. Bounded by physical time:   HLC timestamp ≈ actual wall-clock time   (logical component small: typically 0-10)2. Preserves causality:   If event A → event B (happens-before)   Then HLC(A) < HLC(B)3. Enables time-based queries:   "Get all writes from last hour"   → Query where HLC.physical > now - 36004. Works with clock skew:   Even if clocks differ by 100ms, logical component ensures ordering
-- CockroachDB uses HLC for MVCC timestampsCREATE TABLE users (    id UUID PRIMARY KEY,    name STRING,    email STRING);-- Every row version has HLC timestampINSERT INTO users VALUES (uuid_generate_v4(), 'Alice', 'alice@example.com');-- Internal: Stored with HLC (1638360000000000, 5)-- Time-travel queriesSELECT * FROM users AS OF SYSTEM TIME '-1h';-- Returns data as it was 1 hour ago (using HLC.physical)-- Transaction isolation using HLCBEGIN;SET TRANSACTION AS OF SYSTEM TIME '2025-11-29 10:00:00';-- All reads see snapshot at HLC corresponding to 10:00:00SELECT * FROM users;COMMIT;
# G-Counter (Grow-only Counter) - CRDTclass GCounter:    def __init__(self, node_id):        self.node_id = node_id        self.counts = {}    def increment(self, amount=1):        self.counts[self.node_id] = self.counts.get(self.node_id, 0) + amount    def value(self):        return sum(self.counts.values())    def merge(self, other):        # Merge is commutative and idempotent        for node, count in other.counts.items():            self.counts[node] = max(self.counts.get(node, 0), count)# Usage: Distributed page view countercounter_us = GCounter("US")counter_eu = GCounter("EU")counter_asia = GCounter("ASIA")# Concurrent incrementscounter_us.increment(100)    # 100 views in UScounter_eu.increment(50)     # 50 views in EUcounter_asia.increment(75)   # 75 views in Asia# Merge all counterscounter_us.merge(counter_eu)counter_us.merge(counter_asia)print(counter_us.value())  # 225 (no conflicts!)
class ShoppingCart:    def __init__(self):        self.items = {}  # product_id -> quantity    def add_item(self, product_id, quantity):        self.items[product_id] = self.items.get(product_id, 0) + quantity    def remove_item(self, product_id):        if product_id in self.items:            del self.items[product_id]    def merge(self, other_cart):        # Domain-specific merge logic        for product_id, quantity in other_cart.items.items():            if product_id in self.items:                # Take maximum quantity (user benefit)                self.items[product_id] = max(                    self.items[product_id],                    quantity                )            else:                # Add new item                self.items[product_id] = quantity# User adds items from multiple devices concurrentlycart_mobile = ShoppingCart()cart_mobile.add_item("shoe-123", 1)cart_desktop = ShoppingCart()cart_desktop.add_item("shirt-456", 2)cart_desktop.add_item("shoe-123", 2)  # Same product, different quantity!# Merge cartscart_mobile.merge(cart_desktop)# Result: shoe-123: 2, shirt-456: 2# Took maximum quantity for conflicting item
# Cassandra keyspace with multi-region replicationCREATE KEYSPACE netflixWITH REPLICATION = {    'class': 'NetworkTopologyStrategy',    'us-east': 3,      # 3 replicas in US East    'us-west': 3,      # 3 replicas in US West    'eu-west': 3,      # 3 replicas in EU West    'ap-southeast': 3  # 3 replicas in Asia Pacific};# Table for user viewing historyCREATE TABLE user_viewing_history (    user_id UUID,    content_id UUID,    watched_at TIMESTAMP,    position INT,    device_id UUID,    PRIMARY KEY (user_id, watched_at, content_id)) WITH CLUSTERING ORDER BY (watched_at DESC);
# Application codefrom cassandra.cluster import Clusterfrom cassandra import ConsistencyLevelcluster = Cluster(['cassandra.us-east.netflix.com'])session = cluster.connect('netflix')# Write with LOCAL_QUORUM (fast, regional consistency)session.execute(    ""    INSERT INTO user_viewing_history    (user_id, content_id, watched_at, position, device_id)    VALUES (?, ?, ?, ?, ?)    "",    [user_id, content_id, datetime.now(), position, device_id],    consistency_level=ConsistencyLevel.LOCAL_QUORUM  # 2/3 local nodes)# Async replication to other regions happens automatically# Viewing history available globally within seconds
# Schemaless architecture# Each row has: DocID, Timestamp, Payload (JSON)# Region-aware routingclass UberDocstore:    def __init__(self):        self.regions = {            'us': MySQLConnection('us-mysql-cluster'),            'eu': MySQLConnection('eu-mysql-cluster'),            'apac': MySQLConnection('apac-mysql-cluster')        }    def write(self, doc_id, payload, region='us'):        # Write to regional cluster        conn = self.regions[region]        conn.execute(            "INSERT INTO documents (doc_id, timestamp, payload) VALUES (?, ?, ?)",            [doc_id, time.time(), json.dumps(payload)]        )        # Async replication to other regions        self._async_replicate(doc_id, payload, source_region=region)    def read(self, doc_id, region='us'):        # Read from regional cluster        conn = self.regions[region]        result = conn.execute(            "SELECT payload FROM documents WHERE doc_id = ? ORDER BY timestamp DESC LIMIT 1",            [doc_id]        )        return json.loads(result[0]['payload'])# Usage: Driver location updatesdocstore = UberDocstore()# Driver in San Franciscodocstore.write(    doc_id="driver:12345",    payload={        "location": {"lat": 37.7749, "lon": -122.4194},        "status": "available",        "updated_at": "2025-11-29T10:30:00Z"    },    region="us")# Available in EU within ~100ms for cross-region matching
// Simplified Figma-like collaborative editingclass CollaborativeDocument {  private content: RGA<string>; // Replicated Growable Array (CRDT)  private nodeId: string;  constructor(nodeId: string) {    this.nodeId = nodeId;    this.content = new RGA(nodeId);  }  // Insert character at position  insert(position: number, char: string) {    this.content.insertAt(position, char, this.nodeId);    this.broadcast({      type: 'insert',      position,      char,      nodeId: this.nodeId    });  }  // Delete character at position  delete(position: number) {    this.content.removeAt(position);    this.broadcast({      type: 'delete',      position,      nodeId: this.nodeId    });  }  // Merge remote operations  applyRemoteOp(op: Operation) {    if (op.type === 'insert') {      this.content.insertAt(op.position, op.char, op.nodeId);    } else if (op.type === 'delete') {      this.content.removeAt(op.position);    }    // CRDT ensures convergence without conflicts!  }}// Concurrent edits converge automaticallyconst doc_user1 = new CollaborativeDocument("user1");const doc_user2 = new CollaborativeDocument("user2");// Both start with "Hello"doc_user1.insert(5, "!");  // "Hello!"doc_user2.insert(0, ">");  // ">Hello"// After sync, both converge to: ">Hello!"
# Cassandra-style read repairclass ReadRepair:    def __init__(self, consistency_level='QUORUM'):        self.consistency_level = consistency_level    def read(self, key, replicas):        # Read from multiple replicas        responses = []        for replica in replicas[:3]:  # Read from 3 replicas            response = replica.get(key)            responses.append((replica, response))        # Find most recent value (by timestamp)        latest_value = max(responses, key=lambda r: r[1].timestamp)        # Check for inconsistencies        stale_replicas = []        for replica, response in responses:            if response.timestamp < latest_value[1].timestamp:                stale_replicas.append(replica)        # Repair stale replicas in background        if stale_replicas:            self.async_repair(key, latest_value[1], stale_replicas)        # Return latest value to client        return latest_value[1].data    async def async_repair(self, key, latest_value, stale_replicas):        ""Asynchronously update stale replicas""        for replica in stale_replicas:            try:                replica.put(key, latest_value)                print(f"Read repair: Updated ${replica} with latest value")            except Exception as e:                print(f"Read repair failed for ${replica}: ${e}")# Usagerepair = ReadRepair()# Client reads datavalue = repair.read(key='user:123', replicas=cluster.get_replicas('user:123'))# → Returns latest value# → Triggers background repair of stale replicas
Client requests key="user:123"1. Read from replicas A, B, C:   ┌─────────────────────────────┐   │ A: {name: "Alice", ts: 100} │   │ B: {name: "Alice", ts: 100} │   │ C: {name: "Bob",   ts: 90}  │ ← Stale!   └─────────────────────────────┘2. Return latest value to client:   → {name: "Alice", ts: 100}3. Background repair:   Send latest value to replica C:   C: {name: "Bob", ts: 90} → {name: "Alice", ts: 100}Eventually all replicas consistent!
✅ Pros:- Repairs happen on-demand (for actively read data)- No extra network traffic for unread data- Fixes inconsistencies before they cause issues❌ Cons:- Adds latency to reads (must query multiple replicas)- Doesn't repair data that's never read- Read hot-spots can cause repair storms
# DON'T DO THIS - transfers entire datasetdef naive_sync(local_db, remote_db):    # Get all data from both databases    local_data = local_db.get_all()   # 1TB of data!    remote_data = remote_db.get_all() # 1TB of data!    # Compare and sync    for key, value in local_data.items():        if key not in remote_data or remote_data[key] != value:            remote_db.put(key, value)# Problem: Transfers gigabytes even if only 1 key differs!
import hashlibclass MerkleTree:    ""    Hierarchical hash tree for efficient comparison    ""    def __init__(self, key_ranges):        self.tree = {}        self.key_ranges = key_ranges    def build(self, data):        ""Build Merkle tree from data""        # Leaf nodes: hash each key range        for range_id, (start, end) in enumerate(self.key_ranges):            range_data = {k: v for k, v in data.items() if start <= k < end}            range_hash = self.hash_dict(range_data)            self.tree[f'leaf_{range_id}'] = range_hash        # Internal nodes: hash of children        self.tree['root'] = self.hash_children(            [self.tree[f'leaf_{i}'] for i in range(len(self.key_ranges))]        )    def hash_dict(self, data):        ""Hash dictionary of key-value pairs""        items = sorted(data.items())        return hashlib.md5(str(items).encode()).hexdigest()    def hash_children(self, child_hashes):        ""Hash of child node hashes""        combined = ''.join(child_hashes)        return hashlib.md5(combined.encode()).hexdigest()# Anti-entropy with Merkle treesdef merkle_sync(local_db, remote_db):    # Define key ranges (e.g., 256 ranges)    key_ranges = [(i * 1000, (i + 1) * 1000) for i in range(256)]    # Build Merkle trees    local_tree = MerkleTree(key_ranges)    remote_tree = MerkleTree(key_ranges)    local_tree.build(local_db.get_all())    remote_tree.build(remote_db.get_all())    # Compare roots    if local_tree.tree['root'] == remote_tree.tree['root']:        print("Databases in sync! No transfer needed.")        return    # Find differing ranges    differing_ranges = []    for range_id in range(len(key_ranges)):        local_hash = local_tree.tree[f'leaf_{range_id}']        remote_hash = remote_tree.tree[f'leaf_{range_id}']        if local_hash != remote_hash:            differing_ranges.append(range_id)    # Sync only differing ranges    for range_id in differing_ranges:        start, end = key_ranges[range_id]        range_data = {k: v for k, v in local_db.get_all().items() if start <= k < end}        # Transfer only this range        for key, value in range_data.items():            remote_db.put(key, value)    print(f"Synced ${len(differing_ranges)}/${len(key_ranges)} ranges")# Example: 1TB database, only 10MB different# Without Merkle: Transfer 1TB# With Merkle: Transfer ~10MB (100x savings!)
                     Root                  Hash(ABCD)                 /          \          Hash(AB)            Hash(CD)          /      \            /      \    Hash(A)   Hash(B)   Hash(C)   Hash(D)       │        │         │         │    Range1   Range2   Range3   Range4    [0-250K] [250K-  [500K-  [750K-             500K]   750K]   1M]Compare trees:1. If Root hashes match → databases identical, done!2. If Root differs → compare children Hash(AB) vs Hash(CD)3. If Hash(AB) differs → compare Hash(A) vs Hash(B)4. If Hash(A) differs → sync Range1 onlyOnly sync differing ranges!
# Cassandra nodetool repair# Runs Merkle tree sync across replicas# Repair specific keyspacenodetool repair myapp# Repair workflow:# 1. Build Merkle trees on all replicas (in parallel)# 2. Exchange root hashes# 3. If roots differ, recurse into differing ranges# 4. Stream only differing SSTables# Monitoring repairnodetool compactionstats# Output:# pending tasks: 0# Active compaction:# - compaction_type: Validation (Merkle tree build)# - keyspace: myapp# - progress: 45%# Schedule periodic repairs (cron)# Recommended: Every 7-10 days0 2 * * 0 nodetool repair myapp  # Sunday 2AM
# DynamoDB uses continuous background anti-entropy# Merkle tree per partition# Trees rebuilt every 10 minutes# Automatic sync of differing ranges# No user action needed, but can monitor:import boto3cloudwatch = boto3.client('cloudwatch')# Check replication lag (indicator of anti-entropy load)response = cloudwatch.get_metric_statistics(    Namespace='AWS/DynamoDB',    MetricName='ReplicationLatency',    Dimensions=[        {'Name': 'TableName', 'Value': 'users'},        {'Name': 'ReceivingRegion', 'Value': 'us-west-2'}    ],    StartTime=datetime.now() - timedelta(hours=1),    EndTime=datetime.now(),    Period=300,    Statistics=['Average'])# High latency might indicate anti-entropy catching up
┌─────────────────┬──────────────┬─────────────────┐│ Technique       │ When         │ Frequency       │├─────────────────┼──────────────┼─────────────────┤│ Read Repair     │ On every read│ Real-time       ││                 │ (if enabled) │                 │├─────────────────┼──────────────┼─────────────────┤│ Anti-Entropy    │ Background   │ Hours to days   ││ (Merkle Trees)  │ process      │ (e.g., weekly)  │└─────────────────┴──────────────┴─────────────────┘Combined approach:1. Read repair: Fixes hot data quickly2. Anti-entropy: Ensures cold data eventually syncsBest practice:- Enable read repair for critical tables- Run anti-entropy weekly during low-traffic hours- Monitor repair progress and replication lag
Hinted Handoff:- Temporary write forwarding when node down- Immediate: hints replayed when node returns- Use: Handle transient failuresAnti-Entropy:- Comprehensive background sync- Periodic: runs every N hours/days- Use: Fix any inconsistency (missed writes, corruption, etc.)Both work together for reliability!
# PostgreSQL replication lag monitoringimport psycopg2import timedef check_replication_lag(primary_conn, replica_conn):    # Get WAL position from primary    primary_cur = primary_conn.cursor()    primary_cur.execute("SELECT pg_current_wal_lsn()")    primary_lsn = primary_cur.fetchone()[0]    # Get replay position from replica    replica_cur = replica_conn.cursor()    replica_cur.execute("SELECT pg_last_wal_replay_lsn()")    replica_lsn = replica_cur.fetchone()[0]    # Calculate lag in bytes    replica_cur.execute(        "SELECT pg_wal_lsn_diff(%s, %s)",        [primary_lsn, replica_lsn]    )    lag_bytes = replica_cur.fetchone()[0]    # Alert if lag > 100MB    if lag_bytes > 100 * 1024 * 1024:        alert(f"Replication lag: ${lag_bytes / 1024 / 1024:.2f} MB")    return lag_bytes# Monitor continuouslywhile True:    lag = check_replication_lag(primary_conn, replica_conn)    print(f"Replication lag: ${lag} bytes")    time.sleep(10)
# Automatic failover with retry logicclass ReplicationManager:    def __init__(self, primary, replicas):        self.primary = primary        self.replicas = replicas        self.max_lag_seconds = 10    def write_with_retry(self, query, params, max_retries=3):        for attempt in range(max_retries):            try:                # Attempt write to primary                self.primary.execute(query, params)                # Wait for replication (optional)                self.wait_for_replication()                return True            except PrimaryUnavailable:                # Promote replica to primary                new_primary = self.promote_replica()                self.primary = new_primary            except ReplicationTimeout:                if attempt == max_retries - 1:                    raise                time.sleep(2 ** attempt)  # Exponential backoff        return False    def promote_replica(self):        # Find replica with least lag        best_replica = min(            self.replicas,            key=lambda r: r.get_replication_lag()        )        # Promote to primary        best_replica.promote()        # Reconfigure other replicas        for replica in self.replicas:            if replica != best_replica:                replica.follow(best_replica)        return best_replica
# Periodic consistency checksimport hashlibdef verify_cross_region_consistency(regions):    ""    Check if all regions have same data    ""    checksums = {}    for region_name, conn in regions.items():        # Compute checksum of all data        cursor = conn.execute(            ""            SELECT MD5(STRING_AGG(CAST(id AS TEXT) || data, '' ORDER BY id))            FROM critical_table            ""        )        checksums[region_name] = cursor.fetchone()[0]    # Compare checksums    reference = list(checksums.values())[0]    inconsistent_regions = [        region for region, checksum in checksums.items()        if checksum != reference    ]    if inconsistent_regions:        alert(f"Data inconsistency detected in: {inconsistent_regions}")        # Trigger reconciliation        reconcile_regions(inconsistent_regions)    return len(inconsistent_regions) == 0
# Distributed consensus with etcdimport etcd3class PrimaryElection:    def __init__(self, etcd_client, node_id):        self.etcd = etcd_client        self.node_id = node_id        self.lease = None    def try_become_primary(self, ttl=10):        # Create lease        self.lease = self.etcd.lease(ttl)        # Try to acquire primary lock        success = self.etcd.put_if_not_exists(            '/cluster/primary',            self.node_id,            lease=self.lease        )        if success:            # Successfully became primary            self.heartbeat()  # Keep lease alive            return True        else:            # Another node is primary            return False    def heartbeat(self):        # Refresh lease to stay primary        while True:            self.lease.refresh()            time.sleep(5)  # Refresh every 5 seconds# Usageetcd = etcd3.client()election = PrimaryElection(etcd, node_id="us-east-1")if election.try_become_primary():    # This node is primary - accept writes    start_primary_mode()else:    # This node is replica - reject writes    start_replica_mode()
; Jepsen test for distributed database(deftest bank-test  "Test that bank account transfers maintain consistency"  (let [n 5]  ; 5 nodes    (c/run!      (c/nemesis  ; Fault injection        (nemesis/partition-random-halves))  ; Split network      (c/checker        (checker/total-queue))  ; Verify no money lost      ; Operations: concurrent transfers      (gen/mix [        (gen/once {:f :transfer :value {:from 0 :to 1 :amount 10}})        (gen/once {:f :transfer :value {:from 1 :to 2 :amount 5}})      ]))))
Network Partitions:┌──────┐  ┌──────┐       ┌──────┐│  N1  │──│  N2  │   ❌  │  N3  │└──────┘  └──────┘       └──────┘   │          │            │Split-brain scenario:- Does system maintain consistency?- Are writes lost?- Does it recover correctly?Clock Skew:Node 1: 10:00:00.000Node 2: 10:00:00.500  (+500ms)Node 3: 09:59:59.800  (-200ms)→ Does timestamp ordering break?Disk Failures:- Corrupt WAL files- Partial writes (power loss)- Bit rotProcess Crashes:- Kill database mid-transaction- Kill during leader election- Kill during replication
import randomimport threadingimport timeclass ChaosTest:    def __init__(self, cluster):        self.cluster = cluster        self.failures = []    def partition_network(self, duration=30):        ""Simulate network partition""        # Split cluster into two groups        group_a = self.cluster.nodes[:len(self.cluster.nodes)//2]        group_b = self.cluster.nodes[len(self.cluster.nodes)//2:]        print(f"CHAOS: Network partition for ${duration}s")        print(f"  Group A: ${[n.id for n in group_a]}")        print(f"  Group B: ${[n.id for n in group_b]}")        # Block traffic between groups        for node_a in group_a:            for node_b in group_b:                node_a.block_traffic_to(node_b)                node_b.block_traffic_to(node_a)        # Wait        time.sleep(duration)        # Heal partition        for node_a in group_a:            for node_b in group_b:                node_a.allow_traffic_to(node_b)                node_b.allow_traffic_to(node_a)        print("CHAOS: Network healed")    def kill_random_node(self):        ""Kill random node""        node = random.choice(self.cluster.nodes)        print(f"CHAOS: Killing node ${node.id}")        node.kill()        time.sleep(5)  # Wait for cluster to detect failure        return node    def induce_clock_skew(self, max_skew_ms=1000):        ""Introduce clock skew""        for node in self.cluster.nodes:            skew = random.randint(-max_skew_ms, max_skew_ms)            node.adjust_clock(skew)            print(f"CHAOS: Node ${node.id} clock skew: ${skew}ms")    def corrupt_wal_file(self, node):        ""Simulate disk corruption""        print(f"CHAOS: Corrupting WAL on node ${node.id}")        wal_file = node.get_latest_wal_file()        # Corrupt random bytes        with open(wal_file, 'r+b') as f:            f.seek(random.randint(0, os.path.getsize(wal_file)))            f.write(bytes([random.randint(0, 255)]))    def run_chaos_scenario(self):        ""Run comprehensive chaos test""        print("=== Starting Chaos Engineering Test ===")        # Baseline: verify system works        assert self.verify_consistency(), "System inconsistent before chaos!"        # Chaos 1: Network partition during writes        write_thread = threading.Thread(target=self.continuous_writes, args=(30,))        write_thread.start()        time.sleep(5)  # Let some writes succeed        self.partition_network(duration=15)        write_thread.join()        # Verify consistency after partition        assert self.verify_consistency(), "Consistency violated after partition!"        # Chaos 2: Kill node during heavy load        write_thread = threading.Thread(target=self.continuous_writes, args=(20,))        write_thread.start()        time.sleep(5)        killed_node = self.kill_random_node()        write_thread.join()        # Verify system still works        assert self.verify_consistency(), "Consistency violated after node kill!"        # Chaos 3: Restore killed node, verify catch-up        killed_node.start()        time.sleep(30)  # Wait for replication to catch up        assert self.verify_consistency(), "Node didn't catch up correctly!"        print("=== Chaos Test PASSED ===")    def continuous_writes(self, duration):        ""Write data continuously""        start = time.time()        count = 0        while time.time() - start < duration:            try:                self.cluster.write(f"key_${count}", f"value_${count}")                count += 1            except Exception as e:                print(f"Write failed: ${e}")            time.sleep(0.1)        print(f"Completed ${count} writes in ${duration}s")    def verify_consistency(self):        ""Check all replicas have same data""        print("Verifying consistency across replicas...")        # Get data from all nodes        node_data = {}        for node in self.cluster.nodes:            if node.is_running():                node_data[node.id] = node.get_all_data()        # Compare checksums        checksums = {            node_id: hash(frozenset(data.items()))            for node_id, data in node_data.items()        }        if len(set(checksums.values())) == 1:            print("✅ All replicas consistent")            return True        else:            print("❌ Replicas diverged!")            for node_id, checksum in checksums.items():                print(f"  Node ${node_id}: ${checksum}")            return False# Run chaos testcluster = create_test_cluster(nodes=5, replication_factor=3)chaos = ChaosTest(cluster)chaos.run_chaos_scenario()
1. Start small:   - Single node failure   - Short partitions (seconds)   - Low traffic2. Gradually increase severity:   - Multiple node failures   - Long partitions (minutes)   - High traffic + failures3. Test failure modes:   ✓ Network partitions   ✓ Process crashes   ✓ Disk full   ✓ Clock skew   ✓ Slow network (high latency)   ✓ Packet loss   ✓ Byzantine failures (rare)4. Verify invariants:   ✓ No data loss   ✓ No dirty reads   ✓ Bounded staleness   ✓ Monotonic reads   ✓ Recovery completes5. Automate tests:   - Run in CI/CD   - Randomized scenarios   - Continuous chaos in staging
# Chaos Monkey randomly kills instances in production# Ensures system resilient to failuresclass ChaosMonkey:    def __init__(self, cluster, probability=0.01):        self.cluster = cluster        self.kill_probability = probability    def run(self):        ""Periodically kill random instances""        while True:            time.sleep(3600)  # Every hour            if random.random() < self.kill_probability:                # Kill random instance                instance = random.choice(self.cluster.instances)                print(f"🐵 Chaos Monkey: Terminating ${instance.id}")                instance.terminate()                # Monitor recovery                self.monitor_recovery(instance)    def monitor_recovery(self, killed_instance):        ""Verify system recovered from failure""        # Check that:        # 1. Traffic rerouted to healthy instances        # 2. New instance launched automatically        # 3. No user-facing errors        pass# Run in production (yes, really!)monkey = ChaosMonkey(production_cluster, probability=0.01)monkey.run()  # 1% chance per hour of killing instance
from toxiproxy import Toxiproxy# Toxiproxy: proxy for simulating network problemstoxiproxy = Toxiproxy()# Create proxy for database connectiondb_proxy = toxiproxy.create(    name="db",    listen="127.0.0.1:5433",    upstream="postgres:5432")# Test 1: High latencydb_proxy.add_toxic(    name="latency",    type="latency",    attributes={"latency": 1000}  # Add 1s latency)# Application now experiences slow queries# Verify timeout handling works# Test 2: Packet lossdb_proxy.add_toxic(    name="packet_loss",    type="timeout",    attributes={"timeout": 0}  # 0 = drop packets)# Verify application retries correctly# Test 3: Connection limitsdb_proxy.add_toxic(    name="limit_data",    type="limit_data",    attributes={        "bytes": 1000  # Close after 1KB    })# Verify application handles partial responses# Remove toxics to restore normal operationdb_proxy.remove_toxic("latency")
# Calculate bandwidth needed for replication# Parameterswrite_rate = 10_000  # writes per secondavg_write_size = 1024  # bytes per writereplication_factor = 3  # 3 replicas# Bandwidth per secondbytes_per_second = write_rate * avg_write_sizeprint(f"Primary generates: ${bytes_per_second / 1024 / 1024:.2f} MB/s")# Replication traffic# Each write goes to (RF - 1) replicasreplication_bandwidth = bytes_per_second * (replication_factor - 1)print(f"Replication bandwidth: ${replication_bandwidth / 1024 / 1024:.2f} MB/s")# Cross-region bandwidth (more expensive)# Assume 2 regions, each with 3 replicascross_region_bandwidth = bytes_per_second * 3  # All writes to other regionprint(f"Cross-region bandwidth: ${cross_region_bandwidth / 1024 / 1024:.2f} MB/s")# Monthly cost estimate (AWS)# Inter-region: $0.02 per GBmonthly_seconds = 30 * 24 * 60 * 60monthly_gb = (cross_region_bandwidth * monthly_seconds) / 1024 / 1024 / 1024monthly_cost = monthly_gb * 0.02print(f"Monthly cross-region cost: $${monthly_cost:.2f}")
# Define SLOs for replication lagSLO_TARGETS = {    "p50": 10,    # ms - median lag    "p95": 100,   # ms - 95th percentile    "p99": 500,   # ms - 99th percentile    "p99.9": 2000 # ms - 99.9th percentile}# Monitor and alertdef check_replication_lag_slo(metrics):    for percentile, target_ms in SLO_TARGETS.items():        actual = metrics.get_percentile(percentile)        if actual > target_ms:            alert(                f"Replication lag SLO violated: "                f"{percentile} = {actual}ms (target: {target_ms}ms)"            )# Dashboard queries (Prometheus)# rate(replication_lag_seconds_sum[5m]) / rate(replication_lag_seconds_count[5m])# histogram_quantile(0.95, rate(replication_lag_seconds_bucket[5m]))