Replication
Implemented · some Plannedmarekvs replicates by pushing, not by quorum. A write commits locally, acks the client, and is then fanned out to the nodes that own its partition and to the nodes that have subscribed to it. There is no coordinator on the data path and no synchronous cross-node round-trip. Divergence — from a dropped push, a ring overrun, or a transient membership disagreement — is healed by anti-entropy within the staleness bound.
This page covers placement, the write and read paths, interest subscriptions, the wire format, and the transport. Every tunable named here lives in the single-source-of-truth defaults table.
#Topology
Keys hash into P = 4096 fixed partitions (the unit of placement — see the
data model). Each partition has three replica
roles:
| Role | Meaning |
|---|---|
Home replicas H(p) | The N nodes durably responsible for partition p. Permanent until placement changes. |
Primary home H1(p) | The highest-scoring alive home. Coordinates interest fan-out and serves fetches — not a consistency primary; any node accepts writes. |
| Interest replicas | Nodes caching keys they read on demand. Lease-based, evictable, never a source for anti-entropy. |
Home replicas are chosen by HRW (highest-random-weight, a.k.a. rendezvous) hashing, not a token ring:
score(node, pid) = xxh3_64(node_id_bytes ‖ pid_le_bytes)
owners(pid) = top-N alive nodes by score, states ∈ {Active, Leaving}
H1(pid) = highest-score owner with state == Active
N is set by MAREKVS_REPLICAS_N (default 3) and must match cluster-wide.
Placement is a pure function of the gossip membership view: every node
computes the same [pid] → [NodeId; N] table from the same view, with no ring
metadata or token management to keep in sync.
Why HRW, not a virtual-node ring. At the target scale of 3–50 nodes,
recomputing 4096 × nodes scores on a membership change is trivial (≈200k xxh3
calls, cached as a flat table). HRW gives minimal, evenly-scattered disruption:
a joining node steals ~P/n partitions spread across all nodes, and a dead
node's partitions scatter evenly to all survivors — built-in thundering-herd
spreading. Rings only win at hundreds of nodes.
Views may disagree transiently — that is the AP contract. The consequences are duplicate replicas or a briefly missed home, both healed by anti-entropy. See membership-view divergence for the fine print.
#Write path
SET k v arriving at node X (which may or may not be a home for pid(k)):
client ──► X: stamp HLC → envelope → ondaDB commit (one Txn per command) ──► ack client
│
commit hook (seq, ops)
▼
replication ring (bounded)
▼
per-peer sender cursors, fan-out rule:
• → every node in owners(pid) except self and except op.origin
• → interest subscribers of the key/partition, iff self == H1(pid)
- Local commit first, always. The client is acked after the local ondaDB commit. This is the fire-and-forget contract, and it gives per-connection read-your-writes.
- The commit hook pushes
(seq, ops)into the ring and returns — no I/O on the commit path. - The fan-out rule creates a two-hop DAG:
origin → homes → interest subscribers. Homes never forward to homes; only H1 forwards to subscribers. - If
Xis not a home for the key, applying its own write makesXan interest replica: the push to H1 carries anIMPLICIT_SUBflag registeringX's lease (write-implies-subscribe, mirroring fetch-implies-subscribe). - Duplicates are harmless — every merge is idempotent, so a value arriving twice (push plus anti-entropy) converges to the same result.
#Echo suppression
Applied remote ops re-enter the local commit hook, so without a stop rule they
would loop. The envelope carries an origin field, and each sender skips
the op's origin; since a non-H1 home forwards to nobody, no cycles form.
Echo suppression attributes each ring entry to the origin of the batch being
applied on the shard thread — a thread-local set around the apply job — not to
the record envelope's origin. A merged CRDT record keeps the version winner's
origin, so envelope-based attribution once made a node holding a clock-skewed
peer's future-stamped counter attribute its own later increments to that peer;
the origin == self home push then dropped them for the duration of the skew.
Commit-context attribution closes that hole (chaos suite, design/10 finding 5).
#Crash-safety of the push path
The ring is in-memory and its seq numbers are meaningful to consumers — peers
persist "applied up to S per origin" and reconnect with ResumeFrom{S}. Four
mechanisms, each closing a hole the chaos suite actually caught, keep acked
writes from stranding on their origin:
- Seq space survives restarts. The ring high-water mark is persisted
(~1 s cadence); a restart resumes at
hw + 1_000_000. Without this, a restarted origin re-numbers from 1, every stale consumer cursor looks "caught up" (cursor >= last_seq), and the pump silently ships nothing until seqs pass the stale cursor again. - Boot re-offer. After the view settles (and on every view-epoch change), a node pushes every record it holds for partitions it does not own to a current owner. This heals strands from SIGKILL (unshipped ring entries die with the process) and from ownership moves — which owners-only Merkle AE can never repair, because the owners agree with each other and the gauge reads 0.
- Backlog-aware drain. SIGTERM waits (bounded) for all peer cursors to reach the ring head before exiting, instead of a fixed grace sleep — the last-moment ack window otherwise leaves with the process.
- Commit-context attribution. See the caution above — echo attribution uses the applied batch's origin, not the merged record's envelope origin.
#Content-aware anti-entropy digests
The Merkle bucket digest and diff key on (ikey, hlc, value_hash), not just
(ikey, hlc). Merged CRDT records (PN counters, HLL registers) can carry the
same envelope version (version = symmetric max) with different payloads
on two replicas; a version-only digest calls them equal and AE never repairs the
divergence. The value hash makes equal-version / different-content records
repair in both directions — the backstop that guarantees convergence even when
the push path mis-fires (design/10 finding 6). The digest layout lives in
Consistency.
#Wire format
Replication ops are postcard-encoded after the frame header (see transport):
struct ReplBatch {
origin: NodeId, // u16
first_seq: u64, // origin's ondaDB seq of first op (cursor resume)
ops: Vec<ReplOp>, // batched; see batch policy below
}
struct ReplOp {
ikey: Bytes, // full internal key (pid + tag + userkey [+ elem])
env_and_payload: Bytes, // 19-byte envelope + payload, verbatim ondaDB value
}
No separate delta encoding exists or is needed: per-element keys make every
hash/set/zset mutation a single-element op by construction; a string ships its
full value (a string is the delta); a collection DEL is one head-key
tombstone.
Message registry (u8 opcodes):
01 ReplBatch 02 AckSeq 03 Fetch 04 FetchResp
05 FetchCollection 06 Check 07 CheckResp 08 InterestRenew
09 MerkleRoot 0A MerkleBuckets 0B BucketKeys 0C BootstrapReq
0D BootstrapChunk 0E BootstrapDone 0F HandoffAck 10 Publish
11 Ping/Pong 12 ResumeFrom
#Replication ring & backpressure
One bounded ring per process: 128 MiB or 262,144 ops, whichever comes first. Shard threads write (one producer segment per shard, sequenced by ondaDB seq); per-peer sender tasks hold read cursors.
Today the ring batches 256 ops per ReplBatch, pumping on notify or a 50 ms
tick. When a peer's cursor falls off the ring tail (overrun), the sender drops
the cursor, marks the shared partitions dirty, and stops streaming to that peer;
recovery is ResumeFrom replay if the seq is still in the ring, else Merkle
anti-entropy on the dirty pairs. There are no unbounded disk hint queues —
the ring plus a tight anti-entropy period is the whole lag story.
Byte-cap and flow control are designed but not yet implemented.
- The 256 KiB batch byte cap + 2 ms linger are design targets; the code caps by op count (256) and time (50 ms tick) only.
- Per-peer unacked-window /
AckSeqflow control (designed 4 MiB send window):AckSeqframes are received and ignored today — there is no send-window backpressure. Acks are meant to advance the sender's persisted cursor floor without ever gating client acks.
#Apply path (receiver)
For each op: route to the shard thread for pid, read the current envelope for
ikey, run the merge rule, and write only if the incoming version wins — one
ondaDB Txn per ReplBatch per shard. Applied ops re-enter the local commit
hook; the DAG rule above prevents echo.
#Interest subscriptions
A node that reads a remote key caches it and subscribes to its updates, so hot data spreads to where it is used. Home nodes hold the subscription state in memory:
interest: HashMap<Pid, HashMap<Bytes /*userkey*/, SmallVec<[(NodeId, Instant); 4]>>>
part_interest: HashMap<Pid, HashMap<NodeId, Instant>> // escalated whole-partition subs
Exact keys are tracked, not bloom filters: false positives would fan writes to uninterested nodes, and blooms can't expire entries.
Lifecycle (implemented):
| Event | Behavior |
|---|---|
| Create | First remote GET: FetchResp carries the value and a lease (interest_lease = 60 s). Fetch implies subscribe — one RTT. Collections subscribe at collection granularity (head + all elements, one lease). |
| Lease-expired read | The cached value stays on disk; an in-memory lease table gates freshness. A read of an expired-lease key sends Check{ikey, hlc} to H1 → Fresh (re-arm) or Newer{env, payload} (merge, re-arm). Cheaper than a refetch for large values. |
| Subscriber restart | The lease table is memory-only, so every non-home local key is lease-expired → lazy revalidation on first read. No resubscription storm. |
Renew, escalation, and the entry cap are designed but not wired up.
- Interest renew interval (design 15 s): the
InterestRenewmessage exists and is handled, but it is never sent — leases refresh by re-fetch on expiry instead. - Whole-partition escalation (
interest_escalate, design 4096 key-leases per partition): converting a heavy subscriber into a partition-level shadow replica is unimplemented. interest_max_entries(design 1,000,000, LRU-evicted): there is no cap or LRU on the interest map today; expired entries are GC'd each AE round.
#Read path
GET k at node X, with p = pid(k):
X ∈ owners(p)→ serve locally (envelope decode, TTL check). May be behind an in-flight push — that is AP, bounded by anti-entropy.Xcachesk, lease valid → serve locally.Xcachesk, lease expired/invalid →Checkto H1(p), merge if newer, serve, re-arm.Xlacksk→Fetch{ikey}to H1(p), with a ~300 ms fetch timeout (FETCH_TIMEOUT); on miss, serve local/empty and let AE reconcile. The response is committed locally via merge (a concurrent local write can't be regressed), the lease is registered, and the value served. A collection fetch is aFetchCollectionstreaming all element keys of that user key.
Freshness honesty: per-connection read-your-writes and monotonic reads hold (local-commit-first plus HLC max-wins merges). Nothing is promised across connections.
#Pub/Sub
At cluster sizes of 3–50 nodes, pub/sub is a filtered full-mesh fan-out — no broadcast tree (a tree buys nothing below hundreds of nodes, and ctl connections already exist).
- Each node gossips its subscription summary via a chitchat KV: the exact
channel list while ≤ 1024 channels (postcard, versioned), else a 64 KiB
blocked bloom filter. A separate
has_patternsflag marks nodes withPSUBSCRIBEclients — they receive every publish (pattern matching is local, glob semantics as Redis). PUBLISH ch msgatX: deliver to local subscribers, then sendPublish{channel, payload}on the ctl connection only to peers whose summary matches or that sethas_patterns. At-most-once, fire-and-forget — exactly Redis pub/sub semantics.- Local delivery uses a sharded
ChannelStore(16 shards, RCU-swapped subscriber maps) over tokio broadcast senders. - Keyspace notifications ride the same mesh, generated at the origin node only.
#Transport
The mesh is TCP + tokio, with TCP_NODELAY and 4 MiB socket buffers. There
are two connections per peer pair, both dialed by the lower NodeId:
| Lane | Carries |
|---|---|
ctl | Latency-sensitive traffic: ReplBatch, AckSeq, Fetch/Check, interest, Publish, heartbeats. |
bulk | Bootstrap chunks and Merkle exchanges (lz4-compressed frames). |
Splitting bulk off removes head-of-line blocking without multiplexing
machinery. Framing is [len: u32 LE][msg_type: u8][flags: u8][body…], max
frame 8 MiB. Bodies are postcard (serde) — compact varints, evolvable via
#[serde(default)] — except ReplOp.env_and_payload and fetch payloads, which
are raw bytes copied verbatim from/to ondaDB values (zero re-encode on the hot
path).
QUIC was rejected. quinn mandates TLS (against the no-encryption constraint), burns CPU in a userspace stack, and its head-of-line-blocking win is already captured by the two-connection ctl/bulk split.
#Where to go next
- The math behind convergence: Consistency & anti-entropy.
- How nodes join, leave, and are declared dead: Cluster membership.
- Every tunable on this page: defaults table.