diff --git a/documentation/src/app/architecture/page.mdx b/documentation/src/app/architecture/page.mdx index d434f620..09a30024 100644 --- a/documentation/src/app/architecture/page.mdx +++ b/documentation/src/app/architecture/page.mdx @@ -8,6 +8,7 @@ export const metadata = { export const sections = [ { title: 'System Overview', id: 'system-overview' }, + { title: 'Storing File Data', id: 'storing-file-data' }, { title: 'Choosing a Protocol', id: 'choosing-a-protocol' }, { title: 'Filesystem Limits', id: 'filesystem-limits' }, ] @@ -58,6 +59,14 @@ graph TB NBD --> NBDD `} /> +## Storing File Data {{ id: 'storing-file-data' }} + +File data is split into fixed **32 KiB chunks**, each keyed by `(inode, chunk index)` and stored as a value in SlateDB's LSM tree, so a read fetches only the chunks a request actually covers. + +Small in-place writes get special treatment. Rewriting a few bytes of a chunk used to mean reading the whole 32 KiB chunk, splicing in the change, and writing all 32 KiB back, so a file edited in place left a trail of superseded full-chunk versions that compaction later had to read just to discard. Instead, a partial write to an existing chunk now records only the changed byte range as a small **delta** (a merge operand) while a new chunk or a full-chunk overwrite still stores the complete chunk as a **base**. The merge operator transparently reassembles each chunk from its base plus any deltas, on reads, flushes, and compaction; nothing in the read path has to know whether a chunk is stored whole or as base-plus-deltas. + +This cuts both the data written per small change and the volume of superseded data compaction has to sift through, lowering write amplification and storage churn, most noticeably for random-write workloads such as databases. It needs no migration: chunks written before the change read back unchanged, and a chunk's deltas are folded back into a single value as it flushes and compacts, so read cost stays bounded no matter how many times a chunk is rewritten. + ## Choosing a Protocol {{ id: 'choosing-a-protocol' }} Because the servers share one filesystem layer, protocols mix on a single instance: NBD device files in `.nbd/`, for example, are created and resized with regular file operations through an NFS or 9P mount. Which protocol to mount with depends on the client and the workload: diff --git a/zerofs/proto/replication.proto b/zerofs/proto/replication.proto index a361b58c..a1802efc 100644 --- a/zerofs/proto/replication.proto +++ b/zerofs/proto/replication.proto @@ -22,6 +22,11 @@ message ReplOp { bytes key = 1; bytes value = 2; // ignored when delete is true bool delete = 3; + // A merge operand (resolved by the merge operator), e.g. a sub-chunk chunk + // delta. Only ever set once the standby has advertised a protocol_version + // that understands merges (HelloResponse.protocol_version), so an older + // standby never receives one. When false, the op is a put/delete as before. + bool merge = 4; } message ReplicateRequest { @@ -59,4 +64,8 @@ message HelloResponse { // True if the peer is leading or holds an un-replayed tail: the caller must // NOT open as writer; it should defer and follow. bool peer_active = 1; + // The responder's replication protocol version. Absent (0) from a peer too + // old to set it. The leader ships merge ops only to a peer whose version is + // high enough to understand them; otherwise it falls back to full-chunk puts. + uint32 protocol_version = 2; } diff --git a/zerofs/src/cli/compactor.rs b/zerofs/src/cli/compactor.rs index 3a6b0fe0..db844cd4 100644 --- a/zerofs/src/cli/compactor.rs +++ b/zerofs/src/cli/compactor.rs @@ -91,6 +91,7 @@ pub async fn run_compactor(config_path: PathBuf) -> Result<()> { .with_options(worker_options) .with_block_transformer(block_transformer) .with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled)) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .build() .await?, ); diff --git a/zerofs/src/cli/init.rs b/zerofs/src/cli/init.rs index a450c807..1caa4262 100644 --- a/zerofs/src/cli/init.rs +++ b/zerofs/src/cli/init.rs @@ -516,6 +516,9 @@ impl DbOpen { crate::replication::ReplOp::Put(k, v) => { batch.put_bytes(k.clone(), v.clone()) } + crate::replication::ReplOp::Merge(k, v) => { + batch.merge(k.clone(), v.clone()) + } crate::replication::ReplOp::Delete(k) => batch.delete(k.clone()), } } @@ -643,6 +646,21 @@ impl Replayed { _ => (None, None), }; + // Chunk deltas (sub-chunk merge operands) are safe to write only when + // every replication peer understands merge ops; otherwise fall back to + // full-chunk writes. A non-replicating leader (no peers) writes deltas + // freely. + let deltas_enabled = if replicator.is_some() { + let peers = prepared + .replication_params + .as_ref() + .map(|p| p.peers.clone()) + .unwrap_or_default(); + crate::replication::transport::all_peers_support_merge(&peers).await + } else { + true + }; + let fs = ZeroFS::new_with_slatedb_and_lease( slatedb, settings.max_bytes(), @@ -652,6 +670,7 @@ impl Replayed { prepared.segments_enabled, lease, replicator, + deltas_enabled, prepared.dedup, is_live_takeover, prepared.object_tracer.clone(), diff --git a/zerofs/src/cli/server.rs b/zerofs/src/cli/server.rs index d179e08b..6bf1cc3b 100644 --- a/zerofs/src/cli/server.rs +++ b/zerofs/src/cli/server.rs @@ -622,6 +622,7 @@ pub async fn build_slatedb( .with_db_cache(cache) .with_block_transformer(block_transformer) .with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled)) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .with_metrics_recorder(metrics_recorder.clone()); if segments_enabled { @@ -660,6 +661,7 @@ pub async fn build_slatedb( .with_filter_policies(crate::fs::filter_policy::filter_policies( segments_enabled, )) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .with_options(slatedb::config::CompactorOptions { poll_interval: std::time::Duration::from_secs(5), commit_compacted_interval: std::time::Duration::from_secs(5), @@ -690,7 +692,8 @@ pub async fn build_slatedb( let mut reader_builder = DbReader::builder(db_path, object_store) .with_block_transformer(block_transformer) - .with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled)); + .with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled)) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()); if segments_enabled { reader_builder = reader_builder.with_segment_extractor(Arc::new( crate::segment_extractor::ZeroFsSegmentExtractor, @@ -718,7 +721,8 @@ pub async fn build_slatedb( let mut reader_builder = DbReader::builder(db_path, object_store) .with_checkpoint_id(checkpoint_id) .with_block_transformer(block_transformer) - .with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled)); + .with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled)) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()); if segments_enabled { reader_builder = reader_builder.with_segment_extractor(Arc::new( crate::segment_extractor::ZeroFsSegmentExtractor, @@ -1094,6 +1098,7 @@ mod tests { .with_sst_block_size(SstBlockSize::Block32Kib) .with_filter_policies(crate::fs::filter_policy::filter_policies(true)) .with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor)) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .build() .await .expect("open slatedb") diff --git a/zerofs/src/db.rs b/zerofs/src/db.rs index b52357d5..38ee62f6 100644 --- a/zerofs/src/db.rs +++ b/zerofs/src/db.rs @@ -97,6 +97,9 @@ pub fn exit_on_write_error(err: impl std::fmt::Display) -> ! { enum TxOp { Put(Bytes, Bytes), + /// A merge operand (resolved by the configured merge operator). Used by the + /// chunk store to record sub-chunk deltas instead of rewriting a whole chunk. + Merge(Bytes, Bytes), Delete(Bytes), } @@ -143,6 +146,12 @@ impl Transaction { self.ops.push(TxOp::Put(key.clone(), value)); } + /// Record a merge operand for `key`, resolved by the configured merge + /// operator. The chunk store uses this to write sub-chunk deltas. + pub fn merge_bytes(&mut self, key: &Bytes, value: Bytes) { + self.ops.push(TxOp::Merge(key.clone(), value)); + } + pub fn delete_bytes(&mut self, key: &Bytes) { self.ops.push(TxOp::Delete(key.clone())); } @@ -175,6 +184,7 @@ impl Transaction { for op in self.ops { match op { TxOp::Put(k, v) => target.put_bytes(k, v), + TxOp::Merge(k, v) => target.merge(k, v), TxOp::Delete(k) => target.delete(k), } } @@ -192,6 +202,19 @@ impl Transaction { target.put_bytes(k.clone(), v.clone()); ops.push(ReplOp::Put(k, v)); } + TxOp::Merge(k, v) => { + // Faithfully replicate the merge operand. A merge is only ever + // produced when the chunk store's `deltas_enabled` gate is open, + // which is decided once at leader startup from the peers' + // advertised protocol version (see `all_peers_support_merge`). + // This assumes a compatible, forward-only deployment: it does + // NOT re-check on reconnect, so downgrading a live standby below + // the merge protocol version while the leader stays up is out of + // scope (the old peer would mis-decode the op). Keep peer + // versions matched. + target.merge(k.clone(), v.clone()); + ops.push(ReplOp::Merge(k, v)); + } TxOp::Delete(k) => { target.delete(k.clone()); ops.push(ReplOp::Delete(k)); diff --git a/zerofs/src/fs/mod.rs b/zerofs/src/fs/mod.rs index c2682f8a..d76ab8d9 100644 --- a/zerofs/src/fs/mod.rs +++ b/zerofs/src/fs/mod.rs @@ -204,6 +204,7 @@ impl ZeroFS { use_segment_layout, None, None, + true, // single-node / test: no replication peers, so deltas are safe Arc::new(crate::dedup::DedupCache::new(65_536)), false, // single-node / test: never a live takeover, always regenerate ObjectTracer::new(), @@ -224,6 +225,10 @@ impl ZeroFS { use_segment_layout: bool, lease: Option>, replicator: Option>, + // Whether the chunk store may write sub-chunk merge deltas. False when a + // replication peer is too old to understand merge ops (see + // `all_peers_support_merge`); true for a non-replicating node. + deltas_enabled: bool, dedup: Arc, // True only when this node is a standby promoting after receiving replication // from a live leader; false on a cold bootstrap / config leader / single node. @@ -310,7 +315,7 @@ impl ZeroFS { let flush_coordinator = FlushCoordinator::new(db.clone()); let stats = Arc::new(FileSystemStats::new()); - let chunk_store = ChunkStore::new(db.clone(), key_codec.clone()); + let chunk_store = ChunkStore::new(db.clone(), key_codec.clone(), deltas_enabled); let directory_store = DirectoryStore::new(db.clone(), key_codec.clone()); let inode_store = InodeStore::new(db.clone(), key_codec.clone(), next_inode_id); let tombstone_store = TombstoneStore::new(db.clone(), key_codec.clone()); @@ -474,6 +479,7 @@ impl ZeroFS { .with_block_transformer(block_transformer) .with_filter_policies(crate::fs::filter_policy::filter_policies(true)) .with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor)) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .build() .await?, ); @@ -509,6 +515,7 @@ impl ZeroFS { .with_block_transformer(block_transformer) .with_filter_policies(crate::fs::filter_policy::filter_policies(true)) .with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor)) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .build() .await?, ); @@ -3453,6 +3460,7 @@ mod tests { .with_block_transformer(block_transformer) .with_filter_policies(crate::fs::filter_policy::filter_policies(true)) .with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor)) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .build() .await .unwrap(), diff --git a/zerofs/src/fs/stats.rs b/zerofs/src/fs/stats.rs index 8b0391d3..8fabf55b 100644 --- a/zerofs/src/fs/stats.rs +++ b/zerofs/src/fs/stats.rs @@ -942,6 +942,7 @@ mod tests { .with_block_transformer(block_transformer) .with_filter_policies(crate::fs::filter_policy::filter_policies(true)) .with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor)) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .build() .await .unwrap(), diff --git a/zerofs/src/fs/store/chunk.rs b/zerofs/src/fs/store/chunk.rs index 87e5ea6a..2067b6ee 100644 --- a/zerofs/src/fs/store/chunk.rs +++ b/zerofs/src/fs/store/chunk.rs @@ -33,10 +33,15 @@ pub struct ChunkStore { /// sequential append splices into it rather than re-reading the chunk it just /// wrote. Sharded + LRU via foyer; eviction only ever costs a re-read. tail_cache: Cache, + /// When true, a partial write to a chunk that already has a base is recorded + /// as a sub-chunk merge delta instead of rewriting the whole chunk, cutting + /// the volume of superseded versions compaction must later read. Gated off + /// when a replication peer can't apply merge ops (see `all_peers_support_merge`). + deltas_enabled: bool, } impl ChunkStore { - pub fn new(db: Arc, key_codec: Arc) -> Self { + pub fn new(db: Arc, key_codec: Arc, deltas_enabled: bool) -> Self { let tail_cache = CacheBuilder::new(TAIL_CACHE_BYTES) .with_weighter(|_id: &InodeId, (_idx, data): &(u64, Bytes)| data.len()) .build(); @@ -44,6 +49,7 @@ impl ChunkStore { db, key_codec, tail_cache, + deltas_enabled, } } @@ -88,6 +94,21 @@ impl ChunkStore { txn.put_bytes(&key, data); } + /// Record a partial write as a sub-chunk merge delta (the merge operator + /// applies it onto the chunk's base on read/compaction). Only valid when the + /// chunk already has a base; a new chunk must be established with `save`. + fn save_delta( + &self, + txn: &mut Transaction, + id: InodeId, + chunk_idx: u64, + offset: usize, + data: Bytes, + ) { + let key = self.key_codec.chunk_key(id, chunk_idx); + txn.merge_bytes(&key, super::chunk_merge::encode_delta(offset, data)); + } + pub fn delete(&self, txn: &mut Transaction, id: InodeId, chunk_idx: u64) { let key = self.key_codec.chunk_key(id, chunk_idx); txn.delete_bytes(&key); @@ -181,7 +202,11 @@ impl ChunkStore { // The tail chunk of the previous write, splice-able without a re-read. let cached = self.tail_get(id); - let existing_chunks: Result, FsError> = + // For each touched chunk: its current bytes for splicing, plus whether a + // real base exists in the store. `had_base` is what makes a partial write + // eligible to be recorded as a delta: a new/hole chunk has no base to + // merge onto, so it must be written whole. + let existing_chunks: Result, FsError> = stream::iter(start_chunk..=end_chunk) .map(|chunk_idx| { let chunk_start = chunk_idx * CHUNK_SIZE as u64; @@ -195,19 +220,22 @@ impl ChunkStore { let store = self.clone(); let cached = cached.clone(); async move { - let data = if will_overwrite_fully || beyond_eof { - Bytes::from_static(ZERO_CHUNK) + let (data, had_base) = if will_overwrite_fully || beyond_eof { + // A full overwrite replaces the chunk and a beyond-EOF + // chunk is new: neither has a base to delta against. + (Bytes::from_static(ZERO_CHUNK), false) } else if let Some((_, bytes)) = cached.filter(|(ci, _)| *ci == chunk_idx) { // We wrote this chunk last; splice into our copy of it // rather than re-reading it back from the store. - bytes + (bytes, true) } else { - store - .get(id, chunk_idx) - .await? - .unwrap_or_else(|| Bytes::from_static(ZERO_CHUNK)) + match store.get(id, chunk_idx).await? { + Some(bytes) => (bytes, true), + // A hole within the file: no base on disk. + None => (Bytes::from_static(ZERO_CHUNK), false), + } }; - Ok::<(u64, Bytes), FsError>((chunk_idx, data)) + Ok::<(u64, (Bytes, bool)), FsError>((chunk_idx, (data, had_base))) } }) .buffer_unordered(PARALLEL_CHUNK_OPS) @@ -239,15 +267,36 @@ impl ChunkStore { }; let write_len = write_end - write_start; - let chunk: Bytes = if write_start == 0 && write_end == CHUNK_SIZE { - data.slice(data_offset..data_offset + write_len) + let (base, had_base) = &existing_chunks[&chunk_idx]; + let is_full = write_start == 0 && write_end == CHUNK_SIZE; + let new_bytes = data.slice(data_offset..data_offset + write_len); + data_offset += write_len; + + if !is_full && self.deltas_enabled && *had_base { + // Partial write to an existing chunk: record only the changed + // range as a merge delta. The base stays put and the merge + // operator reassembles the chunk on read/compaction, so we avoid + // rewriting (and later re-reading) a whole superseded chunk. + self.save_delta(txn, id, chunk_idx, write_start, new_bytes.clone()); + // Keep the tail cache materialized so a following append still + // splices without a re-read. + if chunk_idx == end_chunk && cache_tail { + let mut spliced = BytesMut::from(base.as_ref()); + spliced[write_start..write_end].copy_from_slice(&new_bytes); + tail = Some(spliced.freeze()); + } + continue; + } + + // Full overwrite, a new/hole chunk, or deltas disabled: write the + // whole chunk, establishing or replacing its base. + let chunk: Bytes = if is_full { + new_bytes } else { - let mut chunk = BytesMut::from(existing_chunks[&chunk_idx].as_ref()); - chunk[write_start..write_end] - .copy_from_slice(&data[data_offset..data_offset + write_len]); + let mut chunk = BytesMut::from(base.as_ref()); + chunk[write_start..write_end].copy_from_slice(&new_bytes); chunk.freeze() }; - data_offset += write_len; if chunk.as_ref() == ZERO_CHUNK { self.delete(txn, id, chunk_idx); diff --git a/zerofs/src/fs/store/chunk_merge.rs b/zerofs/src/fs/store/chunk_merge.rs new file mode 100644 index 00000000..895ceabd --- /dev/null +++ b/zerofs/src/fs/store/chunk_merge.rs @@ -0,0 +1,366 @@ +//! Sub-chunk delta encoding for file chunks, resolved by a SlateDB merge operator. +//! +//! A partial write to a 32 KiB chunk used to read-modify-write the whole chunk +//! and `put` it back, so every small write created a full-chunk version. +//! Repeated writes to a hot chunk left many superseded 32 KiB versions that +//! compaction must later read just to drop. +//! +//! Instead, a partial write to an *existing* chunk records only the changed byte +//! range as a **merge operand** (a "delta"). A new chunk / full overwrite still +//! `put`s the whole chunk, which is the merge **base**. The merge operator +//! reconstructs the chunk by applying deltas (oldest->newest) onto the base, and +//! SlateDB resolves it on reads, flushes, and compaction. +//! +//! ## Encoding +//! A delta operand is a sparse patch, non-overlapping byte ranges: +//! `[ MAGIC u8 ][ count u16 BE ][ (offset u16 BE, len u16 BE, bytes[len]) * count ]` +//! Ranges are stored sorted by offset and non-overlapping. + +use bytes::{BufMut, Bytes, BytesMut}; +use slatedb::{MergeOperator, MergeOperatorError}; + +use crate::fs::CHUNK_SIZE; + +/// Operand format version/magic. Bumping this rejects any future format change. +const DELTA_MAGIC: u8 = 0xD1; + +// In-chunk offsets and lengths are encoded as u16, so the encoding is only valid +// while a chunk fits in u16::MAX bytes; fail the build loudly if CHUNK_SIZE ever +// outgrows it rather than silently truncating offsets/lengths at runtime. The u16 +// segment count is safe on its own: canonical segments are non-overlapping and +// non-abutting, so each occupies >= 2 bytes of the u16 offset space and there are +// at most u16::MAX/2 of them (at most CHUNK_SIZE/2 for offsets the write path +// produces; a corrupt operand with larger offsets stays bounded by the u16 space). +const _: () = assert!(CHUNK_SIZE <= u16::MAX as usize); + +/// One contiguous written byte range within a chunk. +#[derive(Clone, Debug, PartialEq, Eq)] +struct Segment { + offset: usize, + data: Bytes, +} + +impl Segment { + #[inline] + fn end(&self) -> usize { + self.offset + self.data.len() + } +} + +fn encoding_error() -> MergeOperatorError { + MergeOperatorError::Callback { + message: "invalid chunk delta encoding".to_string(), + } +} + +/// Encode canonical (sorted, non-overlapping) segments as a delta operand. +fn encode_segments(segs: &[Segment]) -> Bytes { + let body: usize = segs.iter().map(|s| 4 + s.data.len()).sum(); + let mut buf = BytesMut::with_capacity(1 + 2 + body); + buf.put_u8(DELTA_MAGIC); + buf.put_u16(segs.len() as u16); + for s in segs { + debug_assert!(s.offset <= u16::MAX as usize && s.data.len() <= u16::MAX as usize); + buf.put_u16(s.offset as u16); + buf.put_u16(s.data.len() as u16); + buf.extend_from_slice(&s.data); + } + buf.freeze() +} + +/// Parse a delta operand back into segments. +fn decode_segments(bytes: &[u8]) -> Result, MergeOperatorError> { + let mut b = bytes; + if b.first() != Some(&DELTA_MAGIC) { + return Err(encoding_error()); + } + b = &b[1..]; + if b.len() < 2 { + return Err(encoding_error()); + } + let count = u16::from_be_bytes([b[0], b[1]]) as usize; + b = &b[2..]; + let mut segs = Vec::with_capacity(count); + for _ in 0..count { + if b.len() < 4 { + return Err(encoding_error()); + } + let offset = u16::from_be_bytes([b[0], b[1]]) as usize; + let len = u16::from_be_bytes([b[2], b[3]]) as usize; + b = &b[4..]; + if b.len() < len { + return Err(encoding_error()); + } + segs.push(Segment { + offset, + data: Bytes::copy_from_slice(&b[..len]), + }); + b = &b[len..]; + } + if !b.is_empty() { + return Err(encoding_error()); + } + Ok(segs) +} + +/// Overlay one newer segment onto the accumulator, overriding any overlap. +fn overlay_one(acc: &mut Vec, n: Segment) { + let (ns, ne) = (n.offset, n.end()); + let mut result = Vec::with_capacity(acc.len() + 1); + for s in acc.drain(..) { + let (ss, se) = (s.offset, s.end()); + if se <= ns || ss >= ne { + result.push(s); // disjoint, keep as-is + continue; + } + // Keep the portions of the older segment not covered by the newer one. + if ss < ns { + result.push(Segment { + offset: ss, + data: s.data.slice(0..ns - ss), + }); + } + if se > ne { + result.push(Segment { + offset: ne, + data: s.data.slice(ne - ss..), + }); + } + // The overlapping middle is dropped; `n` supersedes it. + } + result.push(n); + result.sort_by_key(|s| s.offset); + *acc = result; +} + +/// Merge abutting segments so the canonical form has no splittable adjacency. +fn canonicalize(segs: &mut Vec) { + if segs.len() < 2 { + return; + } + segs.sort_by_key(|s| s.offset); + let mut merged: Vec = Vec::with_capacity(segs.len()); + for s in segs.drain(..) { + match merged.last_mut() { + Some(last) if last.end() == s.offset => { + let mut buf = BytesMut::with_capacity(last.data.len() + s.data.len()); + buf.extend_from_slice(&last.data); + buf.extend_from_slice(&s.data); + last.data = buf.freeze(); + } + _ => merged.push(s), + } + } + *segs = merged; +} + +/// Combine operands (oldest->newest) into canonical segments; newer wins overlaps. +fn combine(operands: &[Bytes]) -> Result, MergeOperatorError> { + let mut acc: Vec = Vec::new(); + for op in operands { + for seg in decode_segments(op)? { + overlay_one(&mut acc, seg); + } + } + canonicalize(&mut acc); + Ok(acc) +} + +/// Apply segments onto a base chunk, zero-filling any gap a write past the +/// base's end opens up. +fn materialize(base: &[u8], segs: &[Segment]) -> Bytes { + let max_end = segs.iter().map(Segment::end).max().unwrap_or(0); + let len = base.len().max(max_end); + let mut buf = BytesMut::zeroed(len); + buf[..base.len()].copy_from_slice(base); + for s in segs { + buf[s.offset..s.end()].copy_from_slice(&s.data); + } + buf.freeze() +} + +/// Encode a single-range delta for a partial write of `data` at in-chunk +/// `offset`. Used by the chunk-store write path for partial writes to a chunk +/// that already has a base. +pub fn encode_delta(offset: usize, data: Bytes) -> Bytes { + encode_segments(&[Segment { offset, data }]) +} + +/// The shared chunk merge operator handle to attach to every `Db`, `DbReader`, +/// and `Compactor` builder (`.with_merge_operator(chunk_merge_operator())`). +/// Configuring it is inert until a delta operand is actually written: a key with +/// only `Put`/`Delete` entries never invokes the operator. +pub fn chunk_merge_operator() -> std::sync::Arc { + std::sync::Arc::new(ChunkMergeOperator) +} + +/// Merge operator that reconstructs a chunk from a base value plus sub-chunk +/// delta operands. See the module docs for the encoding and invariants. +#[derive(Debug, Clone, Default)] +pub struct ChunkMergeOperator; + +impl MergeOperator for ChunkMergeOperator { + fn merge( + &self, + key: &Bytes, + existing_value: Option, + value: Bytes, + ) -> Result { + self.merge_batch(key, existing_value, std::slice::from_ref(&value)) + } + + fn merge_batch( + &self, + _key: &Bytes, + existing_value: Option, + operands: &[Bytes], + ) -> Result { + let segs = combine(operands)?; + Ok(match existing_value { + // A base is present: produce the materialized full chunk (a Value). + Some(base) => materialize(&base, &segs), + // No base yet: stay sparse so we never clobber the eventual base. + None => encode_segments(&segs), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use proptest::prelude::*; + + /// Reference model: apply writes (oldest->newest) onto an optional base, + /// extending with zeros as needed. + fn apply_writes(base: Option<&[u8]>, writes: &[(usize, Vec)]) -> Vec { + let mut v: Vec = base.map(<[u8]>::to_vec).unwrap_or_default(); + for (off, data) in writes { + let end = off + data.len(); + if v.len() < end { + v.resize(end, 0); + } + v[*off..end].copy_from_slice(data); + } + v + } + + #[test] + fn encode_decode_roundtrip() { + let segs = vec![ + Segment { + offset: 3, + data: Bytes::from_static(b"abc"), + }, + Segment { + offset: 10, + data: Bytes::from_static(b"XY"), + }, + ]; + let encoded = encode_segments(&segs); + assert_eq!(decode_segments(&encoded).unwrap(), segs); + } + + #[test] + fn decode_rejects_garbage() { + assert!(decode_segments(b"").is_err()); + assert!(decode_segments(b"\x00\x00\x00").is_err()); + assert!(decode_segments(&[DELTA_MAGIC, 0x00, 0x01]).is_err()); // count=1, truncated + } + + #[test] + fn newer_operand_wins_overlap() { + let op = ChunkMergeOperator; + let d1 = encode_delta(0, Bytes::from_static(b"aaaa")); + let d2 = encode_delta(2, Bytes::from_static(b"BB")); // overlaps bytes 2..4 + let base = Bytes::from_static(&[0u8; 8]); + let out = op + .merge_batch(&Bytes::new(), Some(base), &[d1, d2]) + .unwrap(); + assert_eq!(&out[..], b"aaBB\0\0\0\0"); + } + + #[test] + fn delta_extends_base_with_zero_fill() { + let op = ChunkMergeOperator; + let base = Bytes::from_static(b"hi"); // len 2 + let d = encode_delta(4, Bytes::from_static(b"Z")); // gap at [2,4) + let out = op.merge_batch(&Bytes::new(), Some(base), &[d]).unwrap(); + assert_eq!(&out[..], b"hi\0\0Z"); + } + + #[test] + fn no_base_stays_sparse_then_materializes() { + let op = ChunkMergeOperator; + let d1 = encode_delta(1, Bytes::from_static(b"xx")); + let d2 = encode_delta(5, Bytes::from_static(b"y")); + // No base --> a combined sparse operand (still decodable as a delta). + let sparse = op + .merge_batch(&Bytes::new(), None, &[d1.clone(), d2.clone()]) + .unwrap(); + assert!(decode_segments(&sparse).is_ok()); + // Applying that combined operand onto a base equals applying both deltas. + let base = Bytes::from_static(&[b'.'; 8]); + let out = op + .merge_batch(&Bytes::new(), Some(base.clone()), &[sparse]) + .unwrap(); + let direct = op + .merge_batch(&Bytes::new(), Some(base), &[d1, d2]) + .unwrap(); + assert_eq!(out, direct); + assert_eq!(&out[..], b".xx..y.."); + } + + proptest! { + // Resolving the operands matches the reference model, AND is invariant to + // how SlateDB batches them: split the operands into arbitrary groups, fold + // each group with no base into one sparse intermediate, then resolve the + // intermediates against the base. + #[test] + fn batched_resolution_matches_reference( + base in proptest::option::of(proptest::collection::vec(any::(), 0..200usize)), + writes in proptest::collection::vec( + (0..200usize, proptest::collection::vec(any::(), 1..40usize)), + 0..40usize, + ), + splits in proptest::collection::vec(1usize..6, 1..40usize), + ) { + let op = ChunkMergeOperator; + let base_bytes = base.as_ref().map(|b| Bytes::copy_from_slice(b)); + + // Encode each write (oldest→newest) as a delta operand. + let operands: Vec = writes + .iter() + .map(|(off, data)| encode_delta(*off, Bytes::copy_from_slice(data))) + .collect(); + + // Fold arbitrary-sized groups with no base into sparse intermediates, + // mirroring MergeOperatorIterator::process_batch(None, ..). + let mut intermediates = Vec::new(); + let mut idx = 0usize; + let mut split_iter = splits.iter().cycle(); + while idx < operands.len() { + let take = (*split_iter.next().unwrap()).min(operands.len() - idx); + let group = &operands[idx..idx + take]; + intermediates.push(op.merge_batch(&Bytes::new(), None, group).unwrap()); + idx += take; + } + + let got = op + .merge_batch(&Bytes::new(), base_bytes.clone(), &intermediates) + .unwrap(); + + let expected = apply_writes(base.as_deref(), &writes); + + if base_bytes.is_some() { + // With a base, the result is the materialized chunk. + prop_assert_eq!(&got[..], &expected[..]); + } else { + // With no base, the result is sparse; materialize it against an + // explicit zero base to compare against the reference. + let zero = Bytes::copy_from_slice(&vec![0u8; expected.len()]); + let mat = op.merge_batch(&Bytes::new(), Some(zero), &[got]).unwrap(); + prop_assert_eq!(&mat[..], &expected[..]); + } + } + } +} diff --git a/zerofs/src/fs/store/mod.rs b/zerofs/src/fs/store/mod.rs index cb9203b7..49db1e41 100644 --- a/zerofs/src/fs/store/mod.rs +++ b/zerofs/src/fs/store/mod.rs @@ -1,4 +1,5 @@ pub mod chunk; +pub mod chunk_merge; pub mod directory; pub mod inode; pub mod orphan; diff --git a/zerofs/src/posix_tests.rs b/zerofs/src/posix_tests.rs index ced65b70..53f09113 100644 --- a/zerofs/src/posix_tests.rs +++ b/zerofs/src/posix_tests.rs @@ -1369,6 +1369,7 @@ mod tests { .with_segment_extractor(Arc::new( crate::segment_extractor::ZeroFsSegmentExtractor, )) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .build() .await .unwrap(), @@ -1434,6 +1435,7 @@ mod tests { .with_segment_extractor(Arc::new( crate::segment_extractor::ZeroFsSegmentExtractor, )) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .build() .await .unwrap(), @@ -1496,6 +1498,7 @@ mod tests { .with_segment_extractor(Arc::new( crate::segment_extractor::ZeroFsSegmentExtractor, )) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .build() .await .unwrap(), diff --git a/zerofs/src/replication/tail.rs b/zerofs/src/replication/tail.rs index 4e6f94ad..6da75156 100644 --- a/zerofs/src/replication/tail.rs +++ b/zerofs/src/replication/tail.rs @@ -14,6 +14,10 @@ use std::collections::BTreeMap; #[derive(Debug, Clone, PartialEq, Eq)] pub enum ReplOp { Put(Bytes, Bytes), + /// A merge operand (resolved by the merge operator on the standby), e.g. a + /// sub-chunk delta. Replayed as `WriteBatch::merge`, not a put. Only shipped + /// to a standby that advertised it understands merges (protocol version). + Merge(Bytes, Bytes), Delete(Bytes), } diff --git a/zerofs/src/replication/transport.rs b/zerofs/src/replication/transport.rs index 8076dfdd..828907e0 100644 --- a/zerofs/src/replication/transport.rs +++ b/zerofs/src/replication/transport.rs @@ -22,11 +22,22 @@ use proto::{ ReplicateRequest, ReplicateResponse, }; +/// This build's replication protocol version, advertised in `HelloResponse`. +/// Bump when the wire format gains a capability a peer must understand. +pub const REPLICATION_PROTOCOL_VERSION: u32 = 2; + +/// The minimum peer protocol version that understands `ReplOp::Merge`. The +/// leader ships merge ops (and so the chunk store writes deltas) only to a peer +/// at or above this; below it, writes fall back to full-chunk puts. +pub const MERGE_MIN_PROTOCOL_VERSION: u32 = 2; + fn to_repl_ops(ops: Vec) -> Vec { ops.into_iter() .map(|o| { if o.delete { ReplOp::Delete(Bytes::from(o.key)) + } else if o.merge { + ReplOp::Merge(Bytes::from(o.key), Bytes::from(o.value)) } else { ReplOp::Put(Bytes::from(o.key), Bytes::from(o.value)) } @@ -41,11 +52,19 @@ fn to_proto_ops(ops: &[ReplOp]) -> Vec { key: k.to_vec(), value: v.to_vec(), delete: false, + merge: false, + }, + ReplOp::Merge(k, v) => ProtoOp { + key: k.to_vec(), + value: v.to_vec(), + delete: false, + merge: true, }, ReplOp::Delete(k) => ProtoOp { key: k.to_vec(), value: Vec::new(), delete: true, + merge: false, }, }) .collect() @@ -175,6 +194,7 @@ impl ReplicationService for ReplicationReceiver { // over and preserve it). Either way the caller must defer, not open as writer. Ok(Response::new(HelloResponse { peer_active: leading || has_tail, + protocol_version: REPLICATION_PROTOCOL_VERSION, })) } } @@ -221,6 +241,46 @@ pub async fn hello_peer(endpoint: String) -> anyhow::Result { Ok(resp.into_inner().peer_active) } +/// Ask a peer (Hello) for its replication protocol version (0 if too old to +/// report one). Used to decide whether the leader may ship merge ops to it. +pub async fn hello_peer_protocol_version(endpoint: String) -> anyhow::Result { + let mut client = ReplicationServiceClient::connect(endpoint).await?; + let resp = client.hello(HelloRequest {}).await?; + Ok(resp.into_inner().protocol_version) +} + +/// True if every peer is reachable and advertises a protocol version that +/// understands merge ops. Conservative: an unreachable or too-old peer returns +/// false, so the leader falls back to full-chunk puts (always safe) rather than +/// risk shipping a merge a peer can't apply. +pub async fn all_peers_support_merge(peers: &[String]) -> bool { + for peer in peers { + let endpoint = if peer.starts_with("http") { + peer.clone() + } else { + format!("http://{peer}") + }; + match hello_peer_protocol_version(endpoint).await { + Ok(v) if v >= MERGE_MIN_PROTOCOL_VERSION => {} + Ok(v) => { + tracing::info!( + "HA: peer {peer} protocol version {v} < {MERGE_MIN_PROTOCOL_VERSION}; \ + chunk deltas disabled (using full-chunk writes)" + ); + return false; + } + Err(e) => { + tracing::info!( + "HA: peer {peer} unreachable for merge-capability check ({e:#}); \ + chunk deltas disabled (using full-chunk writes)" + ); + return false; + } + } + } + true +} + /// Stream heartbeats at `interval` until the connection breaks. `epoch` is the /// leader's data-db writer epoch, so the standby ignores a deposed leader's beats. pub async fn run_heartbeat_sender( @@ -405,6 +465,10 @@ mod tests { ReplOp::Delete(k) => { state.remove(k.as_ref()); } + // This epoch-ordering test ships only puts/deletes. + ReplOp::Merge(_, _) => { + unreachable!("merge ops are not exercised in this replay-ordering test") + } } } } @@ -415,4 +479,98 @@ mod tests { higher seqno replayed last and clobbered it (cross-term seqno collision)" ); } + + // A merge operand must survive `ship` -> proto -> receive as a `Merge`, not + // be silently mis-decoded as a `Put` of the raw delta bytes (which would + // corrupt the chunk on takeover). + #[tokio::test] + async fn merge_op_round_trips_the_wire() { + use crate::fs::store::chunk_merge::encode_delta; + + let buffer = Arc::new(Mutex::new(TailBuffer::new())); + let endpoint = spawn_receiver(buffer.clone()).await; + let sender = loop { + match ReplicationSender::connect(endpoint.clone()).await { + Ok(s) => break s, + Err(_) => tokio::time::sleep(Duration::from_millis(20)).await, + } + }; + + let key = Bytes::from_static(b"chunk-key"); + let delta = encode_delta(2, Bytes::from_static(b"XY")); + let op = ReplOp::Merge(key.clone(), delta.clone()); + assert!(sender.ship(1, &[op.clone()], &[], 0, 1).await.unwrap()); + + let buf = buffer.lock().await; + let got: Vec = buf + .batches_in_order() + .flat_map(|(_, ops)| ops.iter().cloned()) + .collect(); + assert_eq!( + got, + vec![op], + "the merge op must round-trip the wire intact, not become a Put" + ); + } + + // End to end: ship a full-chunk base and a sub-chunk delta through the real + // wire, then replay the buffered tail into a real SlateDB carrying the chunk + // merge operator (exactly as a takeover does in cli/init.rs), and confirm the + // promoted node reconstructs the chunk. + #[tokio::test] + async fn replayed_base_plus_delta_reconstructs_the_chunk() { + use crate::fs::store::chunk_merge::{chunk_merge_operator, encode_delta}; + use slatedb::{DbBuilder, WriteBatch}; + + let buffer = Arc::new(Mutex::new(TailBuffer::new())); + let endpoint = spawn_receiver(buffer.clone()).await; + let sender = loop { + match ReplicationSender::connect(endpoint.clone()).await { + Ok(s) => break s, + Err(_) => tokio::time::sleep(Duration::from_millis(20)).await, + } + }; + + let key = Bytes::from_static(b"chunk-key"); + let base = ReplOp::Put(key.clone(), Bytes::from_static(b"abcdefgh")); + let delta = ReplOp::Merge(key.clone(), encode_delta(2, Bytes::from_static(b"XY"))); + assert!(sender.ship(1, &[base], &[], 0, 1).await.unwrap()); + assert!(sender.ship(2, &[delta], &[], 0, 1).await.unwrap()); + + let store: Arc = + Arc::new(slatedb::object_store::memory::InMemory::new()); + let db = DbBuilder::new(slatedb::object_store::path::Path::from("replay"), store) + .with_merge_operator(chunk_merge_operator()) + .build() + .await + .unwrap(); + { + let buf = buffer.lock().await; + for (_seqno, ops) in buf.batches_in_order() { + let mut batch = WriteBatch::new(); + for op in ops { + match op { + ReplOp::Put(k, v) => batch.put_bytes(k.clone(), v.clone()), + ReplOp::Merge(k, v) => batch.merge(k.clone(), v.clone()), + ReplOp::Delete(k) => batch.delete(k.clone()), + } + } + db.write_with_options(batch, &slatedb::config::WriteOptions::default()) + .await + .unwrap(); + } + } + + let got = db + .get(key.clone()) + .await + .unwrap() + .expect("chunk present after replay"); + assert_eq!( + got, + Bytes::from_static(b"abXYefgh"), + "the replayed delta must overlay the base via the merge operator" + ); + db.close().await.unwrap(); + } } diff --git a/zerofs/src/rpc/server.rs b/zerofs/src/rpc/server.rs index 5bd89329..91c0fea8 100644 --- a/zerofs/src/rpc/server.rs +++ b/zerofs/src/rpc/server.rs @@ -701,6 +701,7 @@ mod tests { .with_block_transformer(block_transformer) .with_filter_policies(crate::fs::filter_policy::filter_policies(true)) .with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor)) + .with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator()) .build() .await .unwrap(),