Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions documentation/src/app/architecture/page.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export const metadata = {

export const sections = [
{ title: 'System Overview', id: 'system-overview' },
{ title: 'Storing File Data', id: 'storing-file-data' },
{ title: 'Choosing a Protocol', id: 'choosing-a-protocol' },
{ title: 'Filesystem Limits', id: 'filesystem-limits' },
]
Expand Down Expand Up @@ -58,6 +59,14 @@ graph TB
NBD --> NBDD
`} />

## Storing File Data {{ id: 'storing-file-data' }}

File data is split into fixed **32 KiB chunks**, each keyed by `(inode, chunk index)` and stored as a value in SlateDB's LSM tree, so a read fetches only the chunks a request actually covers.

Small in-place writes get special treatment. Rewriting a few bytes of a chunk used to mean reading the whole 32 KiB chunk, splicing in the change, and writing all 32 KiB back, so a file edited in place left a trail of superseded full-chunk versions that compaction later had to read just to discard. Instead, a partial write to an existing chunk now records only the changed byte range as a small **delta** (a merge operand) while a new chunk or a full-chunk overwrite still stores the complete chunk as a **base**. The merge operator transparently reassembles each chunk from its base plus any deltas, on reads, flushes, and compaction; nothing in the read path has to know whether a chunk is stored whole or as base-plus-deltas.

This cuts both the data written per small change and the volume of superseded data compaction has to sift through, lowering write amplification and storage churn, most noticeably for random-write workloads such as databases. It needs no migration: chunks written before the change read back unchanged, and a chunk's deltas are folded back into a single value as it flushes and compacts, so read cost stays bounded no matter how many times a chunk is rewritten.

## Choosing a Protocol {{ id: 'choosing-a-protocol' }}

Because the servers share one filesystem layer, protocols mix on a single instance: NBD device files in `.nbd/`, for example, are created and resized with regular file operations through an NFS or 9P mount. Which protocol to mount with depends on the client and the workload:
Expand Down
9 changes: 9 additions & 0 deletions zerofs/proto/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ message ReplOp {
bytes key = 1;
bytes value = 2; // ignored when delete is true
bool delete = 3;
// A merge operand (resolved by the merge operator), e.g. a sub-chunk chunk
// delta. Only ever set once the standby has advertised a protocol_version
// that understands merges (HelloResponse.protocol_version), so an older
// standby never receives one. When false, the op is a put/delete as before.
bool merge = 4;
}

message ReplicateRequest {
Expand Down Expand Up @@ -59,4 +64,8 @@ message HelloResponse {
// True if the peer is leading or holds an un-replayed tail: the caller must
// NOT open as writer; it should defer and follow.
bool peer_active = 1;
// The responder's replication protocol version. Absent (0) from a peer too
// old to set it. The leader ships merge ops only to a peer whose version is
// high enough to understand them; otherwise it falls back to full-chunk puts.
uint32 protocol_version = 2;
}
1 change: 1 addition & 0 deletions zerofs/src/cli/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub async fn run_compactor(config_path: PathBuf) -> Result<()> {
.with_options(worker_options)
.with_block_transformer(block_transformer)
.with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled))
.with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator())
.build()
.await?,
);
Expand Down
19 changes: 19 additions & 0 deletions zerofs/src/cli/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,9 @@ impl DbOpen {
crate::replication::ReplOp::Put(k, v) => {
batch.put_bytes(k.clone(), v.clone())
}
crate::replication::ReplOp::Merge(k, v) => {
batch.merge(k.clone(), v.clone())
}
crate::replication::ReplOp::Delete(k) => batch.delete(k.clone()),
}
}
Expand Down Expand Up @@ -643,6 +646,21 @@ impl Replayed {
_ => (None, None),
};

// Chunk deltas (sub-chunk merge operands) are safe to write only when
// every replication peer understands merge ops; otherwise fall back to
// full-chunk writes. A non-replicating leader (no peers) writes deltas
// freely.
let deltas_enabled = if replicator.is_some() {
let peers = prepared
.replication_params
.as_ref()
.map(|p| p.peers.clone())
.unwrap_or_default();
crate::replication::transport::all_peers_support_merge(&peers).await
} else {
true
};

let fs = ZeroFS::new_with_slatedb_and_lease(
slatedb,
settings.max_bytes(),
Expand All @@ -652,6 +670,7 @@ impl Replayed {
prepared.segments_enabled,
lease,
replicator,
deltas_enabled,
prepared.dedup,
is_live_takeover,
prepared.object_tracer.clone(),
Expand Down
9 changes: 7 additions & 2 deletions zerofs/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ pub async fn build_slatedb(
.with_db_cache(cache)
.with_block_transformer(block_transformer)
.with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled))
.with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator())
.with_metrics_recorder(metrics_recorder.clone());

if segments_enabled {
Expand Down Expand Up @@ -660,6 +661,7 @@ pub async fn build_slatedb(
.with_filter_policies(crate::fs::filter_policy::filter_policies(
segments_enabled,
))
.with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator())
.with_options(slatedb::config::CompactorOptions {
poll_interval: std::time::Duration::from_secs(5),
commit_compacted_interval: std::time::Duration::from_secs(5),
Expand Down Expand Up @@ -690,7 +692,8 @@ pub async fn build_slatedb(

let mut reader_builder = DbReader::builder(db_path, object_store)
.with_block_transformer(block_transformer)
.with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled));
.with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled))
.with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator());
if segments_enabled {
reader_builder = reader_builder.with_segment_extractor(Arc::new(
crate::segment_extractor::ZeroFsSegmentExtractor,
Expand Down Expand Up @@ -718,7 +721,8 @@ pub async fn build_slatedb(
let mut reader_builder = DbReader::builder(db_path, object_store)
.with_checkpoint_id(checkpoint_id)
.with_block_transformer(block_transformer)
.with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled));
.with_filter_policies(crate::fs::filter_policy::filter_policies(segments_enabled))
.with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator());
if segments_enabled {
reader_builder = reader_builder.with_segment_extractor(Arc::new(
crate::segment_extractor::ZeroFsSegmentExtractor,
Expand Down Expand Up @@ -1094,6 +1098,7 @@ mod tests {
.with_sst_block_size(SstBlockSize::Block32Kib)
.with_filter_policies(crate::fs::filter_policy::filter_policies(true))
.with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor))
.with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator())
.build()
.await
.expect("open slatedb")
Expand Down
23 changes: 23 additions & 0 deletions zerofs/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ pub fn exit_on_write_error(err: impl std::fmt::Display) -> ! {

enum TxOp {
Put(Bytes, Bytes),
/// A merge operand (resolved by the configured merge operator). Used by the
/// chunk store to record sub-chunk deltas instead of rewriting a whole chunk.
Merge(Bytes, Bytes),
Delete(Bytes),
}

Expand Down Expand Up @@ -143,6 +146,12 @@ impl Transaction {
self.ops.push(TxOp::Put(key.clone(), value));
}

/// Record a merge operand for `key`, resolved by the configured merge
/// operator. The chunk store uses this to write sub-chunk deltas.
pub fn merge_bytes(&mut self, key: &Bytes, value: Bytes) {
self.ops.push(TxOp::Merge(key.clone(), value));
}

pub fn delete_bytes(&mut self, key: &Bytes) {
self.ops.push(TxOp::Delete(key.clone()));
}
Expand Down Expand Up @@ -175,6 +184,7 @@ impl Transaction {
for op in self.ops {
match op {
TxOp::Put(k, v) => target.put_bytes(k, v),
TxOp::Merge(k, v) => target.merge(k, v),
TxOp::Delete(k) => target.delete(k),
}
}
Expand All @@ -192,6 +202,19 @@ impl Transaction {
target.put_bytes(k.clone(), v.clone());
ops.push(ReplOp::Put(k, v));
}
TxOp::Merge(k, v) => {
// Faithfully replicate the merge operand. A merge is only ever
// produced when the chunk store's `deltas_enabled` gate is open,
// which is decided once at leader startup from the peers'
// advertised protocol version (see `all_peers_support_merge`).
// This assumes a compatible, forward-only deployment: it does
// NOT re-check on reconnect, so downgrading a live standby below
// the merge protocol version while the leader stays up is out of
// scope (the old peer would mis-decode the op). Keep peer
// versions matched.
target.merge(k.clone(), v.clone());
ops.push(ReplOp::Merge(k, v));
}
TxOp::Delete(k) => {
target.delete(k.clone());
ops.push(ReplOp::Delete(k));
Expand Down
10 changes: 9 additions & 1 deletion zerofs/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ impl ZeroFS {
use_segment_layout,
None,
None,
true, // single-node / test: no replication peers, so deltas are safe
Arc::new(crate::dedup::DedupCache::new(65_536)),
false, // single-node / test: never a live takeover, always regenerate
ObjectTracer::new(),
Expand All @@ -224,6 +225,10 @@ impl ZeroFS {
use_segment_layout: bool,
lease: Option<Arc<crate::replication::Lease>>,
replicator: Option<Arc<crate::replication::Replicator>>,
// Whether the chunk store may write sub-chunk merge deltas. False when a
// replication peer is too old to understand merge ops (see
// `all_peers_support_merge`); true for a non-replicating node.
deltas_enabled: bool,
dedup: Arc<crate::dedup::DedupCache>,
// True only when this node is a standby promoting after receiving replication
// from a live leader; false on a cold bootstrap / config leader / single node.
Expand Down Expand Up @@ -310,7 +315,7 @@ impl ZeroFS {

let flush_coordinator = FlushCoordinator::new(db.clone());
let stats = Arc::new(FileSystemStats::new());
let chunk_store = ChunkStore::new(db.clone(), key_codec.clone());
let chunk_store = ChunkStore::new(db.clone(), key_codec.clone(), deltas_enabled);
let directory_store = DirectoryStore::new(db.clone(), key_codec.clone());
let inode_store = InodeStore::new(db.clone(), key_codec.clone(), next_inode_id);
let tombstone_store = TombstoneStore::new(db.clone(), key_codec.clone());
Expand Down Expand Up @@ -474,6 +479,7 @@ impl ZeroFS {
.with_block_transformer(block_transformer)
.with_filter_policies(crate::fs::filter_policy::filter_policies(true))
.with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor))
.with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator())
.build()
.await?,
);
Expand Down Expand Up @@ -509,6 +515,7 @@ impl ZeroFS {
.with_block_transformer(block_transformer)
.with_filter_policies(crate::fs::filter_policy::filter_policies(true))
.with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor))
.with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator())
.build()
.await?,
);
Expand Down Expand Up @@ -3453,6 +3460,7 @@ mod tests {
.with_block_transformer(block_transformer)
.with_filter_policies(crate::fs::filter_policy::filter_policies(true))
.with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor))
.with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator())
.build()
.await
.unwrap(),
Expand Down
1 change: 1 addition & 0 deletions zerofs/src/fs/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,7 @@ mod tests {
.with_block_transformer(block_transformer)
.with_filter_policies(crate::fs::filter_policy::filter_policies(true))
.with_segment_extractor(Arc::new(crate::segment_extractor::ZeroFsSegmentExtractor))
.with_merge_operator(crate::fs::store::chunk_merge::chunk_merge_operator())
.build()
.await
.unwrap(),
Expand Down
81 changes: 65 additions & 16 deletions zerofs/src/fs/store/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,23 @@ pub struct ChunkStore {
/// sequential append splices into it rather than re-reading the chunk it just
/// wrote. Sharded + LRU via foyer; eviction only ever costs a re-read.
tail_cache: Cache<InodeId, (u64, Bytes)>,
/// When true, a partial write to a chunk that already has a base is recorded
/// as a sub-chunk merge delta instead of rewriting the whole chunk, cutting
/// the volume of superseded versions compaction must later read. Gated off
/// when a replication peer can't apply merge ops (see `all_peers_support_merge`).
deltas_enabled: bool,
}

impl ChunkStore {
pub fn new(db: Arc<Db>, key_codec: Arc<KeyCodec>) -> Self {
pub fn new(db: Arc<Db>, key_codec: Arc<KeyCodec>, deltas_enabled: bool) -> Self {
let tail_cache = CacheBuilder::new(TAIL_CACHE_BYTES)
.with_weighter(|_id: &InodeId, (_idx, data): &(u64, Bytes)| data.len())
.build();
Self {
db,
key_codec,
tail_cache,
deltas_enabled,
}
}

Expand Down Expand Up @@ -88,6 +94,21 @@ impl ChunkStore {
txn.put_bytes(&key, data);
}

/// Record a partial write as a sub-chunk merge delta (the merge operator
/// applies it onto the chunk's base on read/compaction). Only valid when the
/// chunk already has a base; a new chunk must be established with `save`.
fn save_delta(
&self,
txn: &mut Transaction,
id: InodeId,
chunk_idx: u64,
offset: usize,
data: Bytes,
) {
let key = self.key_codec.chunk_key(id, chunk_idx);
txn.merge_bytes(&key, super::chunk_merge::encode_delta(offset, data));
}

pub fn delete(&self, txn: &mut Transaction, id: InodeId, chunk_idx: u64) {
let key = self.key_codec.chunk_key(id, chunk_idx);
txn.delete_bytes(&key);
Expand Down Expand Up @@ -181,7 +202,11 @@ impl ChunkStore {
// The tail chunk of the previous write, splice-able without a re-read.
let cached = self.tail_get(id);

let existing_chunks: Result<HashMap<u64, Bytes>, FsError> =
// For each touched chunk: its current bytes for splicing, plus whether a
// real base exists in the store. `had_base` is what makes a partial write
// eligible to be recorded as a delta: a new/hole chunk has no base to
// merge onto, so it must be written whole.
let existing_chunks: Result<HashMap<u64, (Bytes, bool)>, FsError> =
stream::iter(start_chunk..=end_chunk)
.map(|chunk_idx| {
let chunk_start = chunk_idx * CHUNK_SIZE as u64;
Expand All @@ -195,19 +220,22 @@ impl ChunkStore {
let store = self.clone();
let cached = cached.clone();
async move {
let data = if will_overwrite_fully || beyond_eof {
Bytes::from_static(ZERO_CHUNK)
let (data, had_base) = if will_overwrite_fully || beyond_eof {
// A full overwrite replaces the chunk and a beyond-EOF
// chunk is new: neither has a base to delta against.
(Bytes::from_static(ZERO_CHUNK), false)
} else if let Some((_, bytes)) = cached.filter(|(ci, _)| *ci == chunk_idx) {
// We wrote this chunk last; splice into our copy of it
// rather than re-reading it back from the store.
bytes
(bytes, true)
} else {
store
.get(id, chunk_idx)
.await?
.unwrap_or_else(|| Bytes::from_static(ZERO_CHUNK))
match store.get(id, chunk_idx).await? {
Some(bytes) => (bytes, true),
// A hole within the file: no base on disk.
None => (Bytes::from_static(ZERO_CHUNK), false),
}
};
Ok::<(u64, Bytes), FsError>((chunk_idx, data))
Ok::<(u64, (Bytes, bool)), FsError>((chunk_idx, (data, had_base)))
}
})
.buffer_unordered(PARALLEL_CHUNK_OPS)
Expand Down Expand Up @@ -239,15 +267,36 @@ impl ChunkStore {
};

let write_len = write_end - write_start;
let chunk: Bytes = if write_start == 0 && write_end == CHUNK_SIZE {
data.slice(data_offset..data_offset + write_len)
let (base, had_base) = &existing_chunks[&chunk_idx];
let is_full = write_start == 0 && write_end == CHUNK_SIZE;
let new_bytes = data.slice(data_offset..data_offset + write_len);
data_offset += write_len;

if !is_full && self.deltas_enabled && *had_base {
// Partial write to an existing chunk: record only the changed
// range as a merge delta. The base stays put and the merge
// operator reassembles the chunk on read/compaction, so we avoid
// rewriting (and later re-reading) a whole superseded chunk.
self.save_delta(txn, id, chunk_idx, write_start, new_bytes.clone());
// Keep the tail cache materialized so a following append still
// splices without a re-read.
if chunk_idx == end_chunk && cache_tail {
let mut spliced = BytesMut::from(base.as_ref());
spliced[write_start..write_end].copy_from_slice(&new_bytes);
tail = Some(spliced.freeze());
}
continue;
}

// Full overwrite, a new/hole chunk, or deltas disabled: write the
// whole chunk, establishing or replacing its base.
let chunk: Bytes = if is_full {
new_bytes
} else {
let mut chunk = BytesMut::from(existing_chunks[&chunk_idx].as_ref());
chunk[write_start..write_end]
.copy_from_slice(&data[data_offset..data_offset + write_len]);
let mut chunk = BytesMut::from(base.as_ref());
chunk[write_start..write_end].copy_from_slice(&new_bytes);
chunk.freeze()
};
data_offset += write_len;

if chunk.as_ref() == ZERO_CHUNK {
self.delete(txn, id, chunk_idx);
Expand Down
Loading
Loading