From c4e16f8ff331cb23da2d578c5015a20ee2356ebd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 10 Dec 2025 15:09:29 +0100 Subject: [PATCH 01/58] feat(enrichment tables): add cuckoo filter to memory table This adds support for cuckoo filters in memory enrichment tables, to support use cases where only presence of a key needs to be checked and false positives are acceptable, greatly improving memory usage compared to regular memory tables. --- Cargo.lock | 10 + Cargo.toml | 5 +- src/enrichment_tables/memory/config.rs | 51 +- src/enrichment_tables/memory/cuckoo_table.rs | 504 ++++++++++++++++++ .../memory/internal_events.rs | 31 ++ src/enrichment_tables/memory/mod.rs | 1 + .../cue/reference/generated/configuration.cue | 96 ++++ 7 files changed, 695 insertions(+), 3 deletions(-) create mode 100644 src/enrichment_tables/memory/cuckoo_table.rs diff --git a/Cargo.lock b/Cargo.lock index 76e4c912d09ee..9aeb4373eacf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3014,6 +3014,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "cuckoo-clock" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e69956d137a478913ed8c017a31f483258e3a238282c40d89b014478e978b3" +dependencies = [ + "rand 0.9.2", +] + [[package]] name = "curl-sys" version = "0.4.84+curl-8.17.0" @@ -12285,6 +12294,7 @@ dependencies = [ "console-subscriber", "criterion", "csv", + "cuckoo-clock", "databend-client", "deadpool 0.13.0", "derivative", diff --git a/Cargo.toml b/Cargo.toml index 21c40c793a9bf..feae567685d68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,6 +151,7 @@ colored = { version = "3.1.1", default-features = false } const-str = { version = "1.1.0", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } +cuckoo-clock = { version = "0.1.0" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } @@ -362,6 +363,7 @@ chrono.workspace = true chrono-tz.workspace = true colored.workspace = true csv = { version = "1.3", default-features = false } +cuckoo-clock = { workspace = true, optional = true } databend-client = { version = "0.28.0", default-features = false, features = ["rustls"], optional = true } derivative.workspace = true dirs-next = { version = "2.0.0", default-features = false, optional = true } @@ -426,6 +428,7 @@ sqlx = { version = "0.8.6", default-features = false, features = ["derive", "pos stream-cancel = { version = "0.8.2", default-features = false } strip-ansi-escapes = { version = "0.2.1", default-features = false } syslog = { version = "6.1.1", default-features = false, optional = true } +tempfile = { workspace = true, optional = true } tokio-postgres = { version = "0.7.13", default-features = false, features = ["runtime", "with-chrono-0_4"], optional = true } tokio-tungstenite = { workspace = true, features = ["connect"], optional = true } toml.workspace = true @@ -603,7 +606,7 @@ gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"] enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"] enrichment-tables-geoip = ["dep:maxminddb"] enrichment-tables-mmdb = ["dep:maxminddb"] -enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"] +enrichment-tables-memory = ["dep:cuckoo-clock", "dep:evmap", "dep:evmap-derive", "dep:thread_local", "dep:tempfile"] # Codecs codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"] diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 9b17ce29750c3..83d4cb2e666bb 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -19,6 +19,7 @@ use crate::{ config::{ EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput, }, + enrichment_tables::memory::cuckoo_table::{CuckooMemoryConfig, CuckooMemoryTable}, sinks::Healthcheck, sources::Source, }; @@ -69,8 +70,16 @@ pub struct MemoryConfig { #[serde(default)] pub ttl_field: OptionalValuePath, + /// Set to make the table act as a probabilistic filter instead of storing original values. This + /// will prevent reading values from the table - found keys will have empty value. + #[configurable(derived)] + #[serde(default)] + pub filter: Option, + #[serde(skip)] memory: Arc>>>, + #[serde(skip)] + cuckoo: Arc>>>, } /// Configuration for memory enrichment table source functionality. @@ -102,6 +111,17 @@ pub struct MemorySourceConfig { pub source_key: String, } +/// Configuration for memory enrichment table filter functionality. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] +pub enum TableFilter { + /// Cuckoo filter + /// + /// Supports removal too, as well as TTL and LRU + Cuckoo(CuckooMemoryConfig), +} + impl PartialEq for MemoryConfig { fn eq(&self, other: &Self) -> bool { self.ttl == other.ttl @@ -118,11 +138,13 @@ impl Default for MemoryConfig { scan_interval: default_scan_interval(), flush_interval: None, memory: Arc::new(Mutex::new(None)), + cuckoo: Arc::new(Mutex::new(None)), max_byte_size: None, log_namespace: None, source_config: None, internal_metrics: InternalMetricsConfig::default(), ttl_field: OptionalValuePath::none(), + filter: None, } } } @@ -142,6 +164,23 @@ impl MemoryConfig { .get_or_insert_with(|| Box::new(Memory::new(self.clone()))) .clone() } + + pub(super) async fn get_or_build_cuckoo(&self) -> crate::Result { + let mut boxed_cuckoo = self.cuckoo.lock().await; + let Some(TableFilter::Cuckoo(cuckoo)) = &self.filter else { + panic!("No cuckoo"); + }; + if let Some(boxed_cuckoo) = boxed_cuckoo.as_ref() { + Ok(*boxed_cuckoo.clone()) + } else { + Ok(*boxed_cuckoo + .insert(Box::new(CuckooMemoryTable::new( + self.clone(), + cuckoo.clone(), + )?)) + .clone()) + } + } } impl EnrichmentTableConfig for MemoryConfig { @@ -149,7 +188,10 @@ impl EnrichmentTableConfig for MemoryConfig { &self, _globals: &crate::config::GlobalOptions, ) -> crate::Result> { - Ok(Box::new(self.get_or_build_memory().await)) + match &self.filter { + Some(TableFilter::Cuckoo(_)) => Ok(Box::new(self.get_or_build_cuckoo().await?)), + None => Ok(Box::new(self.get_or_build_memory().await)), + } } fn sink_config( @@ -177,7 +219,12 @@ impl EnrichmentTableConfig for MemoryConfig { #[typetag::serde(name = "memory_enrichment_table")] impl SinkConfig for MemoryConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await); + let sink = match &self.filter { + Some(TableFilter::Cuckoo(_)) => { + VectorSink::from_event_streamsink(self.get_or_build_cuckoo().await?) + } + None => VectorSink::from_event_streamsink(self.get_or_build_memory().await), + }; Ok((sink, future::ok(()).boxed())) } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs new file mode 100644 index 0000000000000..31abedf038108 --- /dev/null +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -0,0 +1,504 @@ +use std::{ + fs::File, + io::{BufReader, BufWriter, Write}, + num::NonZeroUsize, + path::PathBuf, + time::Duration, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use cuckoo_clock::{ + CuckooFilter, ExportableRandomState, InsertValues, LookupValues, + config::{CounterConfig, CuckooConfiguration, LruConfig, TtlConfig}, +}; +use futures::{StreamExt, stream::BoxStream}; +use tempfile::NamedTempFile; +use tokio::time::interval; +use tokio_stream::wrappers::IntervalStream; +use vector_config::configurable_component; +use vector_lib::{ + EstimatedJsonEncodedSizeOf, + enrichment::{Case, Condition, Error, IndexHandle, Table}, + event::{Event, EventStatus, Finalizable}, + internal_event::{ + ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol, + }, + lookup::lookup_v2::OptionalValuePath, + sink::StreamSink, +}; +use vrl::value::{KeyString, ObjectMap, Value}; + +use crate::enrichment_tables::memory::{ + MemoryConfig, + internal_events::{ + MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead, + MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableRemoved, + MemoryEnrichmentTableTtlExpiredCount, + }, +}; + +/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a cuckoo table. +#[derive(Clone)] +pub(super) struct CuckooMemoryTable { + filter: CuckooFilter, + pub(super) config: MemoryConfig, + cuckoo_config: CuckooMemoryConfig, +} + +/// Configuration of cuckoo filter for memory table. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] +pub struct CuckooMemoryConfig { + /// Number of bits used for fingerprint. + #[serde(default = "default_cuckoo_fingerprint_bits")] + pub fingerprint_bits: NonZeroUsize, + /// Number of slots in each bucket + #[serde(default = "default_cuckoo_bucket_size")] + pub bucket_size: NonZeroUsize, + /// Maximum number of entries that can be stored in the filter (actual capacity will usually be + /// larger) + pub max_entries: usize, + /// Max number of kicks when experiencing hash collisions. + #[serde(default = "default_cuckoo_max_kicks")] + pub max_kicks: usize, + /// Can be set to true to use LRU strategy for kicking. + #[serde(default = "crate::serde::default_false")] + pub lru_enabled: bool, + /// Can be set to true to also track TTL for entries. + #[serde(default = "crate::serde::default_true")] + pub ttl_enabled: bool, + /// Number of bits to use to track TTL. Low bit count will reduce maximum TTL and also require a + /// worse resolution to keep working. + #[serde(default = "default_cuckoo_ttl_bits")] + pub ttl_bits: NonZeroUsize, + /// Can be set to true to track a count alongside hashes. + #[serde(default = "crate::serde::default_false")] + pub counter_enabled: bool, + /// Number of bits to use to track counter. This will limit the max value. + #[serde(default = "default_cuckoo_counter_bits")] + pub counter_bits: NonZeroUsize, + /// Field in the incoming value used as the counter override. + #[configurable(derived)] + #[serde(default)] + pub counter_field: OptionalValuePath, + /// Path to the file to export data to periodically and on exit. + /// Data will be imported from this file on startup. + #[configurable(derived)] + #[serde(default)] + pub persistence_path: Option, + /// The interval used for exporting data. + /// + /// By default, export is only done on exit. + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub export_interval: Option, +} + +const fn default_cuckoo_fingerprint_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_bucket_size() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(4) } +} + +const fn default_cuckoo_ttl_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_counter_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_max_kicks() -> usize { + 500 +} + +impl CuckooMemoryTable { + /// Creates a new [CuckooMemoryTable] based on the provided config. + pub(super) fn new( + config: MemoryConfig, + cuckoo_config: CuckooMemoryConfig, + ) -> crate::Result { + let ttl_val = config.ttl / config.scan_interval.get(); + let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) + .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) + .bucket_size(cuckoo_config.bucket_size) + .max_kicks(cuckoo_config.max_kicks); + + if cuckoo_config.lru_enabled { + builder = builder.with_lru(LruConfig::default()); + } + + if cuckoo_config.ttl_enabled { + builder = builder.with_ttl(TtlConfig { + ttl: u32::try_from(ttl_val)?.try_into()?, + ttl_bits: cuckoo_config.ttl_bits.get().try_into()?, + }); + } + + if cuckoo_config.counter_enabled { + builder = builder.with_counter(CounterConfig { + counter_bits: cuckoo_config.counter_bits.get().try_into()?, + ..Default::default() + }); + } + + let built_config = builder.build()?; + + let filter = 'import: { + if let Some(path) = &cuckoo_config.persistence_path { + let Ok(file) = File::open(path) else { + warn!( + "Couldn't open \"{}\" for cuckoo filter state import.", + path.to_str().unwrap_or("") + ); + break 'import CuckooFilter::new_random_exportable(built_config); + }; + let mut reader = BufReader::new(file); + let filter = match CuckooFilter::import_random_exportable(&mut reader) { + Ok(filter) => filter, + Err(error) => { + warn!("Cuckoo filter state import failed: {}", error); + break 'import CuckooFilter::new_random_exportable(built_config); + } + }; + + if filter.get_configuration() != built_config { + // TODO: Should this stop the build from succeeding? The import will be lost, + // because it will be overwritter very soon. + warn!( + "Stored cuckoo filter configuration doesn't match with new configuration. Ignoring the import.", + ); + break 'import CuckooFilter::new_random_exportable(built_config); + } + + filter + } else { + CuckooFilter::new_random_exportable(built_config) + } + }; + + Ok(Self { + config, + filter, + cuckoo_config, + }) + } + + fn export(&self) { + if let Some(path) = &self.cuckoo_config.persistence_path { + let mut parent = path.clone(); + if parent.pop() + && let Ok(temp) = NamedTempFile::new_in(parent) + { + { + let mut writer = BufWriter::new(temp.as_file()); + if self.export_to(&mut writer).is_err() { + return; + } + } + if let Err(error) = temp.persist(path) { + warn!("Cuckoo filter export failed: {}", error); + } + } else { + warn!( + "Couldn't open temporary file for export. Trying to write directly to \"{}\"", + path.to_str().unwrap_or("") + ); + let Ok(file) = File::create(path) else { + warn!( + "Couldn't open \"{}\" for cuckoo filter state export.", + path.to_str().unwrap_or("") + ); + return; + }; + let mut writer = BufWriter::new(file); + let _ = self.export_to(&mut writer); + }; + } + } + + fn export_to(&self, mut writer: impl Write) -> Result<(), ()> { + match self.filter.exporter().write_to(&mut writer) { + Ok(()) => { + if let Err(error) = writer.flush() { + warn!("Cuckoo filter export failed: {}", error); + return Err(()); + }; + Ok(()) + } + Err(error) => { + warn!("Cuckoo filter export failed: {}", error); + Err(()) + } + } + } + + fn handle_value(&self, value: ObjectMap) { + for (k, value) in value.iter() { + if matches!(value, Value::Null) { + if self.filter.remove(k) { + emit!(MemoryEnrichmentTableRemoved { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } + + continue; + }; + + if self.cuckoo_config.ttl_enabled || self.cuckoo_config.counter_enabled { + let ttl = self + .config + .ttl_field + .path + .as_ref() + .and_then(|p| value.get(p)) + .and_then(|v| v.as_integer()) + .and_then(|v| u64::try_from(v).ok()) + .map(|v| v / self.config.scan_interval.get()) + .and_then(|v| u32::try_from(v).ok()); + let counter = self + .cuckoo_config + .counter_field + .path + .as_ref() + .and_then(|p| value.get(p)) + .and_then(|v| v.as_integer()) + .and_then(|v| i32::try_from(v).ok()); + let _ = self.filter.insert_if_not_present_with_update( + k, + InsertValues { ttl, counter }, + LookupValues { + ttl, + counter_diff: counter, + }, + ); + } else { + let _ = self.filter.insert_if_not_present(k); + } + emit!(MemoryEnrichmentTableInserted { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } + } +} + +impl Table for CuckooMemoryTable { + fn find_table_row<'a>( + &self, + case: Case, + condition: &'a [Condition<'a>], + select: Option<&'a [String]>, + wildcard: Option<&Value>, + index: Option, + ) -> Result { + let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?; + + match rows.pop() { + Some(row) if rows.is_empty() => Ok(row), + Some(_) => Err(Error::MoreThanOneRowFound), + None => Err(Error::NoRowsFound), + } + } + + fn find_table_rows<'a>( + &self, + _case: Case, + condition: &'a [Condition<'a>], + _select: Option<&'a [String]>, + _wildcard: Option<&Value>, + _index: Option, + ) -> Result, Error> { + match condition.first() { + Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed), + Some(Condition::Equals { value, .. }) => { + let key = value.to_string_lossy(); + if let Some(associated_data) = self.filter.get_associated_data(&key) { + emit!(MemoryEnrichmentTableRead { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + let mut result = ObjectMap::from([ + ( + KeyString::from("key"), + Value::Bytes(Bytes::copy_from_slice(key.as_bytes())), + ), + ( + KeyString::from("fingerprint"), + Value::Bytes(Bytes::from(format!( + "{:X}", + associated_data.get_fingerprint() + ))), + ), + ]); + if let Ok(ttl) = associated_data.get_stored_ttl_value() + && let Ok(ttl) = (ttl as u64 * self.config.scan_interval.get()).try_into() + { + result.insert(KeyString::from("ttl"), Value::Integer(ttl)); + } + if let Ok(counter) = associated_data.get_counter() { + result.insert(KeyString::from("counter"), Value::Integer(counter.into())); + } + Ok(vec![result]) + } else { + emit!(MemoryEnrichmentTableReadFailed { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + Ok(Default::default()) + } + } + Some(_) => Err(Error::OnlyEqualityConditionAllowed), + None => Err(Error::MissingCondition { kind: "Key" }), + } + } + + fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result { + match fields.len() { + 0 => Err(Error::MissingRequiredField { field: "Key" }), + 1 => Ok(IndexHandle(0)), + _ => Err(Error::OnlyOneFieldAllowed), + } + } + + /// Returns a list of the field names that are in each index + fn index_fields(&self) -> Vec<(Case, Vec)> { + Vec::new() + } + + /// Doesn't need reload, data is written directly + fn needs_reload(&self) -> bool { + false + } +} + +impl std::fmt::Debug for CuckooMemoryTable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CuckooMemoryTable {:?}", self.config) + } +} + +#[async_trait] +impl StreamSink for CuckooMemoryTable { + async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { + let events_sent = register!(EventsSent::from(Output(None))); + let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); + let mut scan_interval = IntervalStream::new(interval(Duration::from_secs( + self.config.scan_interval.into(), + ))); + let mut flush_interval = IntervalStream::new(interval( + self.config + .flush_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX), + )); + let mut export_interval = IntervalStream::new(interval( + self.cuckoo_config + .export_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX), + )); + + loop { + tokio::select! { + event = input.next() => { + let mut event = if let Some(event) = event { + event + } else { + break; + }; + let event_byte_size = event.estimated_json_encoded_size_of(); + + let finalizers = event.take_finalizers(); + + // Panic: This sink only accepts Logs, so this should never panic + let log = event.into_log(); + + if let (Value::Object(map), _) = log.into_parts() { + self.handle_value(map) + }; + + finalizers.update_status(EventStatus::Delivered); + events_sent.emit(CountByteSize(1, event_byte_size)); + bytes_sent.emit(ByteSize(event_byte_size.get())); + }, + + Some(_) = flush_interval.next() => { + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: self.filter.get_item_count(), + new_byte_size: self.filter.get_memory_usage() + }); + } + + Some(_) = export_interval.next() => { + self.export(); + } + + Some(_) = scan_interval.next() => { + let expired = self.filter.scan_and_update_full(); + emit!(MemoryEnrichmentTableTtlExpiredCount { + count: expired as u64 + }); + } + } + } + + // Final export before exiting + self.export(); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig { + // let mut config = MemoryConfig::default(); + // modfn(&mut config); + // config + // } + + fn build_cuckoo_config(modfn: impl Fn(&mut CuckooMemoryConfig)) -> CuckooMemoryConfig { + let mut config = CuckooMemoryConfig { + fingerprint_bits: default_cuckoo_fingerprint_bits(), + bucket_size: default_cuckoo_bucket_size(), + max_entries: 1000, + max_kicks: default_cuckoo_max_kicks(), + lru_enabled: false, + ttl_enabled: false, + ttl_bits: default_cuckoo_ttl_bits(), + counter_enabled: false, + counter_bits: default_cuckoo_counter_bits(), + counter_field: OptionalValuePath::none(), + persistence_path: None, + export_interval: None, + }; + modfn(&mut config); + config + } + + #[test] + fn finds_row() { + let memory = CuckooMemoryTable::new(Default::default(), build_cuckoo_config(|_| {})) + .expect("default cuckoo memory table should build correctly"); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + let result = memory.find_table_row(Case::Sensitive, &[condition], None, None, None); + assert!(result.is_ok()); + let result = result.unwrap(); + assert_eq!(result.get("key").unwrap(), &Value::from("test_key")); + // Cuckoo fingerprint is provided too + assert!(result.contains_key("fingerprint")); + } +} diff --git a/src/enrichment_tables/memory/internal_events.rs b/src/enrichment_tables/memory/internal_events.rs index 0799c6662a6af..ff805ad8ef744 100644 --- a/src/enrichment_tables/memory/internal_events.rs +++ b/src/enrichment_tables/memory/internal_events.rs @@ -56,6 +56,26 @@ impl InternalEvent for MemoryEnrichmentTableInserted<'_> { } } +#[derive(Debug, NamedInternalEvent)] +pub(crate) struct MemoryEnrichmentTableRemoved<'a> { + pub key: &'a str, + pub include_key_metric_tag: bool, +} + +impl InternalEvent for MemoryEnrichmentTableRemoved<'_> { + fn emit(self) { + if self.include_key_metric_tag { + counter!( + "memory_enrichment_table_removed_total", + "key" => self.key.to_owned() + ) + .increment(1); + } else { + counter!("memory_enrichment_table_removed_total",).increment(1); + } + } +} + #[derive(Debug, NamedInternalEvent)] pub(crate) struct MemoryEnrichmentTableFlushed { pub new_objects_count: usize, @@ -90,6 +110,17 @@ impl InternalEvent for MemoryEnrichmentTableTtlExpired<'_> { } } +#[derive(Debug, NamedInternalEvent)] +pub(crate) struct MemoryEnrichmentTableTtlExpiredCount { + pub count: u64, +} + +impl InternalEvent for MemoryEnrichmentTableTtlExpiredCount { + fn emit(self) { + counter!("memory_enrichment_table_ttl_expirations",).increment(self.count); + } +} + #[derive(Debug, NamedInternalEvent)] pub(crate) struct MemoryEnrichmentTableReadFailed<'a> { pub key: &'a str, diff --git a/src/enrichment_tables/memory/mod.rs b/src/enrichment_tables/memory/mod.rs index 72b0986f9b9e9..61308ef48cfbf 100644 --- a/src/enrichment_tables/memory/mod.rs +++ b/src/enrichment_tables/memory/mod.rs @@ -1,6 +1,7 @@ //! Handles enrichment tables for `type = memory`. mod config; +mod cuckoo_table; mod internal_events; mod source; mod table; diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index fdb5d0cb15bd0..101da50fcebdb 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -216,6 +216,102 @@ generated: configuration: { required: false relevant_when: "type = \"file\"" } + filter: { + type: object: options: { + bucket_size: { + type: uint: default: 4 + description: "Number of slots in each bucket" + required: false + } + counter_bits: { + type: uint: default: 8 + description: "Number of bits to use to track counter. This will limit the max value." + required: false + } + counter_enabled: { + type: bool: default: false + description: "Can be set to true to track a count alongside hashes." + required: false + } + counter_field: { + type: string: default: "" + description: "Field in the incoming value used as the counter override." + required: false + } + export_interval: { + type: uint: {} + description: """ + The interval used for exporting data. + + By default, export is only done on exit. + """ + required: false + } + fingerprint_bits: { + type: uint: default: 8 + description: "Number of bits used for fingerprint." + required: false + } + lru_enabled: { + type: bool: default: false + description: "Can be set to true to use LRU strategy for kicking." + required: false + } + max_entries: { + type: uint: {} + description: """ + Maximum number of entries that can be stored in the filter (actual capacity will usually be + larger) + """ + required: true + } + max_kicks: { + type: uint: default: 500 + description: "Max number of kicks when experiencing hash collisions." + required: false + } + persistence_path: { + type: string: {} + description: """ + Path to the file to export data to periodically and on exit. + Data will be imported from this file on startup. + """ + required: false + } + ttl_bits: { + type: uint: default: 8 + description: """ + Number of bits to use to track TTL. Low bit count will reduce maximum TTL and also require a + worse resolution to keep working. + """ + required: false + } + ttl_enabled: { + type: bool: default: true + description: "Can be set to true to also track TTL for entries." + required: false + } + type: { + type: string: enum: cuckoo: """ + Cuckoo filter + + Supports removal too, as well as TTL and LRU + """ + description: """ + Cuckoo filter + + Supports removal too, as well as TTL and LRU + """ + required: true + } + } + description: """ + Set to make the table act as a probabilistic filter instead of storing original values. This + will prevent reading values from the table - found keys will have empty value. + """ + required: false + relevant_when: "type = \"memory\"" + } flush_interval: { type: uint: {} description: """ From 8993198f694efa547421853774e6c84aa7899d98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 8 Apr 2026 17:41:13 +0200 Subject: [PATCH 02/58] Add changelog entry --- .../25143_enrichment_table_memory_cuckoo_filter.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md diff --git a/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md b/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md new file mode 100644 index 0000000000000..36eaeb1a2c64d --- /dev/null +++ b/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md @@ -0,0 +1,3 @@ +Added cuckoo filter support for `memory` enrichment table, to provide an efficient way to store and check presence of keys with a low memory footprint at the cost of false positives. + +authors: esensar Quad9DNS From 2d295b5e143b844798148cb1045d091c1ecfa161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 8 Apr 2026 17:52:58 +0200 Subject: [PATCH 03/58] Prevent ttl_val from being zero --- src/enrichment_tables/memory/cuckoo_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 31abedf038108..2c04844b50bc8 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -121,7 +121,7 @@ impl CuckooMemoryTable { config: MemoryConfig, cuckoo_config: CuckooMemoryConfig, ) -> crate::Result { - let ttl_val = config.ttl / config.scan_interval.get(); + let ttl_val = (config.ttl / config.scan_interval.get()).max(1); let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) .bucket_size(cuckoo_config.bucket_size) From 4fe95ea7430b6cda474c68098caa90c8cb90c5d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 8 Apr 2026 18:18:48 +0200 Subject: [PATCH 04/58] Remove commented out test code --- src/enrichment_tables/memory/cuckoo_table.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 2c04844b50bc8..52a085befa2cb 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -458,12 +458,6 @@ impl StreamSink for CuckooMemoryTable { mod tests { use super::*; - // fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig { - // let mut config = MemoryConfig::default(); - // modfn(&mut config); - // config - // } - fn build_cuckoo_config(modfn: impl Fn(&mut CuckooMemoryConfig)) -> CuckooMemoryConfig { let mut config = CuckooMemoryConfig { fingerprint_bits: default_cuckoo_fingerprint_bits(), From c33d602045d66259eed1d5d7ecbd80d51a2d2813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 9 Apr 2026 13:02:20 +0200 Subject: [PATCH 05/58] Set lower limit on ttl override as well --- src/enrichment_tables/memory/cuckoo_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 52a085befa2cb..0422f56aa985b 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -258,7 +258,7 @@ impl CuckooMemoryTable { .and_then(|p| value.get(p)) .and_then(|v| v.as_integer()) .and_then(|v| u64::try_from(v).ok()) - .map(|v| v / self.config.scan_interval.get()) + .map(|v| (v / self.config.scan_interval.get()).max(1)) .and_then(|v| u32::try_from(v).ok()); let counter = self .cuckoo_config From 610b97f94310e66eb2e3b9beee7d621148d26bd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 26 May 2026 14:39:08 +0200 Subject: [PATCH 06/58] Deny unknown fields in enrichment table memory config --- src/enrichment_tables/memory/config.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 83d4cb2e666bb..0562eb4880764 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -27,6 +27,7 @@ use crate::{ /// Configuration for the `memory` enrichment table. #[configurable_component(enrichment_table("memory"))] #[derive(Clone)] +#[serde(deny_unknown_fields)] pub struct MemoryConfig { /// TTL (time-to-live in seconds) is used to limit the lifetime of data stored in the cache. /// When TTL expires, data behind a specific key in the cache is removed. From 1f43d0b75dfb92b9a973954bb3540b919c4f3eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 26 May 2026 15:37:00 +0200 Subject: [PATCH 07/58] Fix CounterName issues --- lib/vector-common/src/internal_event/metric_name.rs | 2 ++ src/enrichment_tables/memory/internal_events.rs | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/vector-common/src/internal_event/metric_name.rs b/lib/vector-common/src/internal_event/metric_name.rs index f520a3ecee316..62ba00237a70b 100644 --- a/lib/vector-common/src/internal_event/metric_name.rs +++ b/lib/vector-common/src/internal_event/metric_name.rs @@ -98,6 +98,7 @@ pub enum CounterName { MemoryEnrichmentTableFlushesTotal, MemoryEnrichmentTableInsertionsTotal, MemoryEnrichmentTableReadsTotal, + MemoryEnrichmentTableRemovedTotal, MemoryEnrichmentTableTtlExpirations, } @@ -363,6 +364,7 @@ impl CounterName { "memory_enrichment_table_insertions_total" } Self::MemoryEnrichmentTableReadsTotal => "memory_enrichment_table_reads_total", + Self::MemoryEnrichmentTableRemovedTotal => "memory_enrichment_table_removed_total", Self::MemoryEnrichmentTableTtlExpirations => "memory_enrichment_table_ttl_expirations", } } diff --git a/src/enrichment_tables/memory/internal_events.rs b/src/enrichment_tables/memory/internal_events.rs index 8ab8f13f24c0c..f781416a2fdb5 100644 --- a/src/enrichment_tables/memory/internal_events.rs +++ b/src/enrichment_tables/memory/internal_events.rs @@ -68,12 +68,12 @@ impl InternalEvent for MemoryEnrichmentTableRemoved<'_> { fn emit(self) { if self.include_key_metric_tag { counter!( - "memory_enrichment_table_removed_total", + CounterName::MemoryEnrichmentTableRemovedTotal, "key" => self.key.to_owned() ) .increment(1); } else { - counter!("memory_enrichment_table_removed_total",).increment(1); + counter!(CounterName::MemoryEnrichmentTableRemovedTotal).increment(1); } } } @@ -119,7 +119,7 @@ pub(crate) struct MemoryEnrichmentTableTtlExpiredCount { impl InternalEvent for MemoryEnrichmentTableTtlExpiredCount { fn emit(self) { - counter!("memory_enrichment_table_ttl_expirations",).increment(self.count); + counter!(CounterName::MemoryEnrichmentTableTtlExpirations,).increment(self.count); } } From 4f6a752cad2dace2e2c4169cd6ea6b5aa77508a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 14:24:24 +0200 Subject: [PATCH 08/58] Apply default tll if ttl_field is not defined --- src/enrichment_tables/memory/cuckoo_table.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 0422f56aa985b..0ad96d9c271a1 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -259,7 +259,8 @@ impl CuckooMemoryTable { .and_then(|v| v.as_integer()) .and_then(|v| u64::try_from(v).ok()) .map(|v| (v / self.config.scan_interval.get()).max(1)) - .and_then(|v| u32::try_from(v).ok()); + .and_then(|v| u32::try_from(v).ok()) + .or(self.config.ttl.try_into().ok()); let counter = self .cuckoo_config .counter_field From 92bbe4752dfd760250e6eed3660908d0a46c2c78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 14:24:34 +0200 Subject: [PATCH 09/58] Cargo.lock update --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 19ffcf06b5dd4..b1ab04254bc7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3333,7 +3333,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55e69956d137a478913ed8c017a31f483258e3a238282c40d89b014478e978b3" dependencies = [ - "rand 0.9.2", + "rand 0.9.4", ] [[package]] From 99297103b369c3a3d010f24175d4a4de20504ad4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 14:33:27 +0200 Subject: [PATCH 10/58] Prevent intervals from firing off at boot --- src/enrichment_tables/memory/cuckoo_table.rs | 37 ++++++++++++-------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 0ad96d9c271a1..902fce5b4b10b 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -14,7 +14,7 @@ use cuckoo_clock::{ }; use futures::{StreamExt, stream::BoxStream}; use tempfile::NamedTempFile; -use tokio::time::interval; +use tokio::time::{Instant, interval_at}; use tokio_stream::wrappers::IntervalStream; use vector_config::configurable_component; use vector_lib::{ @@ -388,20 +388,29 @@ impl StreamSink for CuckooMemoryTable { async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { let events_sent = register!(EventsSent::from(Output(None))); let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); - let mut scan_interval = IntervalStream::new(interval(Duration::from_secs( - self.config.scan_interval.into(), - ))); - let mut flush_interval = IntervalStream::new(interval( - self.config - .flush_interval - .map(Duration::from_secs) - .unwrap_or(Duration::MAX), + let now = Instant::now(); + let cuckoo_scan_interval = Duration::from_secs(self.config.scan_interval.into()); + let mut scan_interval = IntervalStream::new(interval_at( + now + cuckoo_scan_interval, + cuckoo_scan_interval, )); - let mut export_interval = IntervalStream::new(interval( - self.cuckoo_config - .export_interval - .map(Duration::from_secs) - .unwrap_or(Duration::MAX), + let cuckoo_flush_interval = self + .config + .flush_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX); + let mut flush_interval = IntervalStream::new(interval_at( + now + cuckoo_flush_interval, + cuckoo_flush_interval, + )); + let cuckoo_export_interval = self + .cuckoo_config + .export_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX); + let mut export_interval = IntervalStream::new(interval_at( + now + cuckoo_export_interval, + cuckoo_export_interval, )); loop { From be6e123fda7187bf5c07481002c0630de36e973a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 14:51:41 +0200 Subject: [PATCH 11/58] Return null value to make value field present --- src/enrichment_tables/memory/cuckoo_table.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 902fce5b4b10b..b909fe7e47bc9 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -335,6 +335,7 @@ impl Table for CuckooMemoryTable { associated_data.get_fingerprint() ))), ), + (KeyString::from("value"), Value::Null), ]); if let Ok(ttl) = associated_data.get_stored_ttl_value() && let Ok(ttl) = (ttl as u64 * self.config.scan_interval.get()).try_into() From 8df4f876fd290add25a67b666f269e101038a0a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 14:54:39 +0200 Subject: [PATCH 12/58] Properly calculate default ttl when ttl_field is not present --- src/enrichment_tables/memory/cuckoo_table.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index b909fe7e47bc9..c2df8043bb882 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -258,9 +258,9 @@ impl CuckooMemoryTable { .and_then(|p| value.get(p)) .and_then(|v| v.as_integer()) .and_then(|v| u64::try_from(v).ok()) + .or(Some(self.config.ttl)) .map(|v| (v / self.config.scan_interval.get()).max(1)) - .and_then(|v| u32::try_from(v).ok()) - .or(self.config.ttl.try_into().ok()); + .and_then(|v| u32::try_from(v).ok()); let counter = self .cuckoo_config .counter_field From a0b64fa1104e7de6358947d59ac97b408a06641f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 15:32:19 +0200 Subject: [PATCH 13/58] Use empty streams instead of Duration::MAX for uncofigured intervals --- src/enrichment_tables/memory/cuckoo_table.rs | 36 +++++++++++--------- src/enrichment_tables/memory/table.rs | 20 +++++++---- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index c2df8043bb882..55622f2fd2c7b 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -3,6 +3,7 @@ use std::{ io::{BufReader, BufWriter, Write}, num::NonZeroUsize, path::PathBuf, + pin::Pin, time::Duration, }; @@ -12,9 +13,12 @@ use cuckoo_clock::{ CuckooFilter, ExportableRandomState, InsertValues, LookupValues, config::{CounterConfig, CuckooConfiguration, LruConfig, TtlConfig}, }; -use futures::{StreamExt, stream::BoxStream}; +use futures::{ + Stream, StreamExt, + stream::{self, BoxStream}, +}; use tempfile::NamedTempFile; -use tokio::time::{Instant, interval_at}; +use tokio::time::{Instant, interval, interval_at}; use tokio_stream::wrappers::IntervalStream; use vector_config::configurable_component; use vector_lib::{ @@ -390,29 +394,27 @@ impl StreamSink for CuckooMemoryTable { let events_sent = register!(EventsSent::from(Output(None))); let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); let now = Instant::now(); - let cuckoo_scan_interval = Duration::from_secs(self.config.scan_interval.into()); + let scan_interval_duration = Duration::from_secs(self.config.scan_interval.into()); let mut scan_interval = IntervalStream::new(interval_at( - now + cuckoo_scan_interval, - cuckoo_scan_interval, + now.checked_add(scan_interval_duration).unwrap_or(now), + scan_interval_duration, )); - let cuckoo_flush_interval = self + let mut flush_interval: Pin + Send>> = self .config .flush_interval .map(Duration::from_secs) - .unwrap_or(Duration::MAX); - let mut flush_interval = IntervalStream::new(interval_at( - now + cuckoo_flush_interval, - cuckoo_flush_interval, - )); - let cuckoo_export_interval = self + .map:: + Send>>, _>(|d| { + Box::pin(IntervalStream::new(interval(d))) + }) + .unwrap_or(Box::pin(stream::empty())); + let mut export_interval: Pin + Send>> = self .cuckoo_config .export_interval .map(Duration::from_secs) - .unwrap_or(Duration::MAX); - let mut export_interval = IntervalStream::new(interval_at( - now + cuckoo_export_interval, - cuckoo_export_interval, - )); + .map:: + Send>>, _>(|d| { + Box::pin(IntervalStream::new(interval(d))) + }) + .unwrap_or(Box::pin(stream::empty())); loop { tokio::select! { diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index a819b3be77912..d198ce1281d17 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -1,6 +1,7 @@ #![allow(unsafe_op_in_unsafe_fn)] // TODO review ShallowCopy usage code and fix properly. use std::{ + pin::Pin, sync::{Arc, Mutex, MutexGuard}, time::{Duration, Instant}, }; @@ -12,7 +13,10 @@ use evmap::{ {self}, }; use evmap_derive::ShallowCopy; -use futures::{StreamExt, stream::BoxStream}; +use futures::{ + Stream, StreamExt, + stream::{self, BoxStream}, +}; use thread_local::ThreadLocal; use tokio::{ sync::broadcast::{Receiver, Sender}, @@ -399,12 +403,14 @@ impl StreamSink for Memory { async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { let events_sent = register!(EventsSent::from(Output(None))); let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); - let mut flush_interval = IntervalStream::new(interval( - self.config - .flush_interval - .map(Duration::from_secs) - .unwrap_or(Duration::MAX), - )); + let mut flush_interval: Pin + Send>> = self + .config + .flush_interval + .map(Duration::from_secs) + .map:: + Send>>, _>(|d| { + Box::pin(IntervalStream::new(interval(d))) + }) + .unwrap_or(Box::pin(stream::empty())); let mut scan_interval = IntervalStream::new(interval(Duration::from_secs( self.config.scan_interval.into(), ))); From 3ebe905afadddb9cb733525c277a1f6f544c39ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 10 Jun 2026 10:00:39 +0200 Subject: [PATCH 14/58] Prevent using source functionality when filter is used --- src/enrichment_tables/memory/config.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 0562eb4880764..931c5c3b34b41 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -190,7 +190,12 @@ impl EnrichmentTableConfig for MemoryConfig { _globals: &crate::config::GlobalOptions, ) -> crate::Result> { match &self.filter { - Some(TableFilter::Cuckoo(_)) => Ok(Box::new(self.get_or_build_cuckoo().await?)), + Some(TableFilter::Cuckoo(_)) => { + if self.source_config.is_some() { + return Err("Source functionality is not supported for cuckoo filter".into()); + } + Ok(Box::new(self.get_or_build_cuckoo().await?)) + } None => Ok(Box::new(self.get_or_build_memory().await)), } } @@ -209,6 +214,10 @@ impl EnrichmentTableConfig for MemoryConfig { let Some(source_config) = &self.source_config else { return None; }; + // Filters can't be used as a source + if self.filter.is_some() { + return None; + } Some(( source_config.source_key.clone().into(), Box::new(self.clone()), From 3189194da3c6c2b2d552801d6cb8c5ad79911ba9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 10 Jun 2026 10:26:19 +0200 Subject: [PATCH 15/58] Discard configuration if cuckoo filter can't be restored --- src/enrichment_tables/memory/cuckoo_table.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 55622f2fd2c7b..c37e4f83609c7 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -170,12 +170,9 @@ impl CuckooMemoryTable { }; if filter.get_configuration() != built_config { - // TODO: Should this stop the build from succeeding? The import will be lost, - // because it will be overwritter very soon. - warn!( - "Stored cuckoo filter configuration doesn't match with new configuration. Ignoring the import.", + return Err( + format!("Stored cuckoo filter configuration doesn't match with new configuration. If this is intended, remove the persisted state file ({}).", path.to_str().unwrap_or("")).into(), ); - break 'import CuckooFilter::new_random_exportable(built_config); } filter From 9a18e49722fc1aa7c5e06540af5f17453dda7353 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 10 Jun 2026 12:18:15 +0200 Subject: [PATCH 16/58] Validate ttl_bits before building the cuckoo filter --- src/enrichment_tables/memory/cuckoo_table.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index c37e4f83609c7..737b046a4aad2 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -136,8 +136,20 @@ impl CuckooMemoryTable { } if cuckoo_config.ttl_enabled { + let ttl_val: u32 = u32::try_from(ttl_val)?; + let needed_bits = ttl_val.ilog2() + 1; + if needed_bits as usize > cuckoo_config.ttl_bits.get() { + return Err( + format!( + "`ttl_bits` ({}) must be set to at least {} to support the default `ttl` value ({}) at the configured scan interval ({}).", + cuckoo_config.ttl_bits.get(), + needed_bits, + config.ttl, + config.scan_interval.get()).into(), + ); + } builder = builder.with_ttl(TtlConfig { - ttl: u32::try_from(ttl_val)?.try_into()?, + ttl: ttl_val.try_into()?, ttl_bits: cuckoo_config.ttl_bits.get().try_into()?, }); } From ffe915fe303eae2adfe83366f6ba70b5e9ea20d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 14:06:27 +0200 Subject: [PATCH 17/58] Use div_ceil when calculating TTL ticks --- src/enrichment_tables/memory/cuckoo_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 737b046a4aad2..9793b58ac4488 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -125,7 +125,7 @@ impl CuckooMemoryTable { config: MemoryConfig, cuckoo_config: CuckooMemoryConfig, ) -> crate::Result { - let ttl_val = (config.ttl / config.scan_interval.get()).max(1); + let ttl_val = (config.ttl.div_ceil(config.scan_interval.get())).max(1); let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) .bucket_size(cuckoo_config.bucket_size) From beb9e0af2671a570b4614e6dc77894627ab3b5ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 14:14:55 +0200 Subject: [PATCH 18/58] Warn when provided ttl is larger than defined ttl_bits --- src/enrichment_tables/memory/cuckoo_table.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 9793b58ac4488..04f0b0e0447d3 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -272,8 +272,20 @@ impl CuckooMemoryTable { .and_then(|v| v.as_integer()) .and_then(|v| u64::try_from(v).ok()) .or(Some(self.config.ttl)) - .map(|v| (v / self.config.scan_interval.get()).max(1)) + .map(|v| (v.div_ceil(self.config.scan_interval.get())).max(1)) .and_then(|v| u32::try_from(v).ok()); + if let Some(ttl) = ttl { + let needed_bits = ttl.ilog2() + 1; + if needed_bits as usize > self.cuckoo_config.ttl_bits.get() { + warn!( + "`ttl_bits` ({}) must be set to at least {} to support the provided `ttl` value ({}) at the configured scan interval ({}).", + self.cuckoo_config.ttl_bits.get(), + needed_bits, + self.config.ttl, + self.config.scan_interval.get() + ); + } + } let counter = self .cuckoo_config .counter_field From 170f5ea98bdcb9339f021f90e5236e5bf8397164 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 14:34:50 +0200 Subject: [PATCH 19/58] Update table size stats on scan too --- src/enrichment_tables/memory/cuckoo_table.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 04f0b0e0447d3..afac14c6ef0a1 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -477,6 +477,10 @@ impl StreamSink for CuckooMemoryTable { emit!(MemoryEnrichmentTableTtlExpiredCount { count: expired as u64 }); + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: self.filter.get_item_count(), + new_byte_size: self.filter.get_memory_usage() + }); } } } From a938de7d194569dca54a48ab195dfeb3b7bd4b35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 15:39:19 +0200 Subject: [PATCH 20/58] Validate counter_bits on insert --- src/enrichment_tables/memory/cuckoo_table.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index afac14c6ef0a1..600341a0e35ac 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -281,7 +281,7 @@ impl CuckooMemoryTable { "`ttl_bits` ({}) must be set to at least {} to support the provided `ttl` value ({}) at the configured scan interval ({}).", self.cuckoo_config.ttl_bits.get(), needed_bits, - self.config.ttl, + ttl, self.config.scan_interval.get() ); } @@ -294,6 +294,17 @@ impl CuckooMemoryTable { .and_then(|p| value.get(p)) .and_then(|v| v.as_integer()) .and_then(|v| i32::try_from(v).ok()); + if let Some(counter) = counter { + let needed_bits = counter.ilog2() + 2; + if needed_bits as usize > self.cuckoo_config.counter_bits.get() { + warn!( + "`counter_bits` ({}) must be set to at least {} to support the provided `counter` value ({}).", + self.cuckoo_config.counter_bits.get(), + needed_bits, + counter + ); + } + } let _ = self.filter.insert_if_not_present_with_update( k, InsertValues { ttl, counter }, From 5be7277c97cf88c302fa1c33c7cd9a97542da804 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 15:56:36 +0200 Subject: [PATCH 21/58] Revert "Validate counter_bits on insert" This reverts commit a938de7d194569dca54a48ab195dfeb3b7bd4b35. --- src/enrichment_tables/memory/cuckoo_table.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 600341a0e35ac..afac14c6ef0a1 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -281,7 +281,7 @@ impl CuckooMemoryTable { "`ttl_bits` ({}) must be set to at least {} to support the provided `ttl` value ({}) at the configured scan interval ({}).", self.cuckoo_config.ttl_bits.get(), needed_bits, - ttl, + self.config.ttl, self.config.scan_interval.get() ); } @@ -294,17 +294,6 @@ impl CuckooMemoryTable { .and_then(|p| value.get(p)) .and_then(|v| v.as_integer()) .and_then(|v| i32::try_from(v).ok()); - if let Some(counter) = counter { - let needed_bits = counter.ilog2() + 2; - if needed_bits as usize > self.cuckoo_config.counter_bits.get() { - warn!( - "`counter_bits` ({}) must be set to at least {} to support the provided `counter` value ({}).", - self.cuckoo_config.counter_bits.get(), - needed_bits, - counter - ); - } - } let _ = self.filter.insert_if_not_present_with_update( k, InsertValues { ttl, counter }, From e0f47a5c8e2659d9a46f366698066941b0b4323e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 16:07:05 +0200 Subject: [PATCH 22/58] Reject configuration on failed import too --- src/enrichment_tables/memory/cuckoo_table.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index afac14c6ef0a1..bcc86e71036a5 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -176,8 +176,9 @@ impl CuckooMemoryTable { let filter = match CuckooFilter::import_random_exportable(&mut reader) { Ok(filter) => filter, Err(error) => { - warn!("Cuckoo filter state import failed: {}", error); - break 'import CuckooFilter::new_random_exportable(built_config); + return Err( + format!("Cuckoo filter state import failed: {}. Delete the persited state file ({}) to proceed.", error, path.to_str().unwrap_or("")).into(), + ); } }; From 4f7086624d1807c82d0441762b3fbc042baf79b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 16:55:38 +0200 Subject: [PATCH 23/58] Document removal for cuckoo --- src/enrichment_tables/memory/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 931c5c3b34b41..3819d40f25eab 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -119,7 +119,7 @@ pub struct MemorySourceConfig { pub enum TableFilter { /// Cuckoo filter /// - /// Supports removal too, as well as TTL and LRU + /// Supports removal by accepting null values for keys, as well as TTL and LRU. Cuckoo(CuckooMemoryConfig), } From 47077f8eb5ee5754bbb1d979831c55eeaa39cfb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 17:08:08 +0200 Subject: [PATCH 24/58] Add note about flush interval in cuckoo --- src/enrichment_tables/memory/config.rs | 3 +++ website/cue/reference/generated/configuration.cue | 7 +++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 3819d40f25eab..56574a7d7a9a9 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -45,6 +45,9 @@ pub struct MemoryConfig { /// Since every TTL scan makes its changes visible, only use this value /// if it is shorter than the `scan_interval`. /// + /// NOTE: For cuckoo filter, all writes are visible immediately. Flush interval still defines + /// when metrics for cuckoo filter are made visible. + /// /// By default, all writes are made visible immediately. #[serde(skip_serializing_if = "vector_lib::serde::is_default")] pub flush_interval: Option, diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 4c179886e5a83..a817948697994 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -295,12 +295,12 @@ generated: configuration: { type: string: enum: cuckoo: """ Cuckoo filter - Supports removal too, as well as TTL and LRU + Supports removal by accepting null values for keys, as well as TTL and LRU. """ description: """ Cuckoo filter - Supports removal too, as well as TTL and LRU + Supports removal by accepting null values for keys, as well as TTL and LRU. """ required: true } @@ -321,6 +321,9 @@ generated: configuration: { Since every TTL scan makes its changes visible, only use this value if it is shorter than the `scan_interval`. + NOTE: For cuckoo filter, all writes are visible immediately. Flush interval still defines + when metrics for cuckoo filter are made visible. + By default, all writes are made visible immediately. """ required: false From df7ff0cb3987d30449475fea0205b5900b5d2ba1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 17:12:00 +0200 Subject: [PATCH 25/58] Only start with a fresh filter on NotFound error --- src/enrichment_tables/memory/cuckoo_table.rs | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index bcc86e71036a5..3c053f3d62cd7 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -165,12 +165,21 @@ impl CuckooMemoryTable { let filter = 'import: { if let Some(path) = &cuckoo_config.persistence_path { - let Ok(file) = File::open(path) else { - warn!( - "Couldn't open \"{}\" for cuckoo filter state import.", - path.to_str().unwrap_or("") - ); - break 'import CuckooFilter::new_random_exportable(built_config); + let file = match File::open(path) { + Ok(file) => file, + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => { + break 'import CuckooFilter::new_random_exportable(built_config); + } + _ => { + return Err(format!( + "Couldn't open \"{}\" for cuckoo filter state import. {}", + path.to_str().unwrap_or(""), + err + ) + .into()); + } + }, }; let mut reader = BufReader::new(file); let filter = match CuckooFilter::import_random_exportable(&mut reader) { From a8253826b21b557a0522a9be9499b520ca4823f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 17:12:29 +0200 Subject: [PATCH 26/58] Run `make build-licenses` --- LICENSE-3rdparty.csv | 1 + 1 file changed, 1 insertion(+) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c04a948b51fd7..996e02c3c6067 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -204,6 +204,7 @@ crypto_secretbox,https://github.com/RustCrypto/nacl-compat/tree/master/crypto_se csv,https://github.com/BurntSushi/rust-csv,Unlicense OR MIT,Andrew Gallant csv-core,https://github.com/BurntSushi/rust-csv,Unlicense OR MIT,Andrew Gallant ctr,https://github.com/RustCrypto/block-modes,MIT OR Apache-2.0,RustCrypto Developers +cuckoo-clock,https://github.com/Quad9DNS/cuckoo-clock,MIT,"John Todd , Ensar Sarajčić " curl-sys,https://github.com/alexcrichton/curl-rust,MIT,Alex Crichton curve25519-dalek,https://github.com/dalek-cryptography/curve25519-dalek/tree/main/curve25519-dalek,BSD-3-Clause,"Isis Lovecruft , Henry de Valence " curve25519-dalek-derive,https://github.com/dalek-cryptography/curve25519-dalek,MIT OR Apache-2.0,The curve25519-dalek-derive Authors From 42a758212a19b6c439678730ec6199bc49ff1423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 17:14:19 +0200 Subject: [PATCH 27/58] Export only when temp file export works --- src/enrichment_tables/memory/cuckoo_table.rs | 42 ++++++++------------ 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 3c053f3d62cd7..77c7f27dc8da7 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -213,33 +213,25 @@ impl CuckooMemoryTable { fn export(&self) { if let Some(path) = &self.cuckoo_config.persistence_path { let mut parent = path.clone(); - if parent.pop() - && let Ok(temp) = NamedTempFile::new_in(parent) - { - { - let mut writer = BufWriter::new(temp.as_file()); - if self.export_to(&mut writer).is_err() { - return; + if parent.pop() { + match NamedTempFile::new_in(parent) { + Ok(temp) => { + { + let mut writer = BufWriter::new(temp.as_file()); + if self.export_to(&mut writer).is_err() { + return; + } + } + if let Err(error) = temp.persist(path) { + warn!("Cuckoo filter export failed: {}", error); + } } + Err(err) => warn!( + "Couldn't open temporary file for export. Aborting export. Error: {}", + err + ), } - if let Err(error) = temp.persist(path) { - warn!("Cuckoo filter export failed: {}", error); - } - } else { - warn!( - "Couldn't open temporary file for export. Trying to write directly to \"{}\"", - path.to_str().unwrap_or("") - ); - let Ok(file) = File::create(path) else { - warn!( - "Couldn't open \"{}\" for cuckoo filter state export.", - path.to_str().unwrap_or("") - ); - return; - }; - let mut writer = BufWriter::new(file); - let _ = self.export_to(&mut writer); - }; + } } } From 51702fd7579989e3bb2fb3602676f60551a3eb73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 17:46:39 +0200 Subject: [PATCH 28/58] Validate config before importing persisted state --- Cargo.lock | 4 +-- Cargo.toml | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 26 +++++++++++++------- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b62e4afebb707..780ab1826c161 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3093,9 +3093,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55e69956d137a478913ed8c017a31f483258e3a238282c40d89b014478e978b3" +checksum = "747de28146841c571d1ae290df8a28ddcd6630247d9dc75d89113a17e515011c" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 56b8aa52b0065..df60db3794055 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.0" , default-features = false } +cuckoo-clock = { version = "0.1.1" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 77c7f27dc8da7..df816d7c33162 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -182,22 +182,30 @@ impl CuckooMemoryTable { }, }; let mut reader = BufReader::new(file); - let filter = match CuckooFilter::import_random_exportable(&mut reader) { - Ok(filter) => filter, - Err(error) => { - return Err( - format!("Cuckoo filter state import failed: {}. Delete the persited state file ({}) to proceed.", error, path.to_str().unwrap_or("")).into(), + let (hasher, persisted_config) = + match CuckooFilter::::import_config(&mut reader) { + Ok(imported) => imported, + Err(error) => { + return Err( + format!("Cuckoo filter state import failed: {}. Delete the persisted state file ({}) to proceed.", error, path.to_str().unwrap_or("")).into(), ); - } - }; + } + }; - if filter.get_configuration() != built_config { + if persisted_config != built_config { return Err( format!("Stored cuckoo filter configuration doesn't match with new configuration. If this is intended, remove the persisted state file ({}).", path.to_str().unwrap_or("")).into(), ); } - filter + match CuckooFilter::import_state(hasher, persisted_config, &mut reader) { + Ok(filter) => filter, + Err(error) => { + return Err( + format!("Cuckoo filter state import failed: {}. Delete the persisted state file ({}) to proceed.", error, path.to_str().unwrap_or("")).into(), + ); + } + } } else { CuckooFilter::new_random_exportable(built_config) } From f9fedfc53651604ea4f48697380908a48114bed5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 16:48:33 +0200 Subject: [PATCH 29/58] Add a way to parallelize scan and update for cuckoo table --- Cargo.lock | 4 +- Cargo.toml | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 45 +++++++++++++++---- .../cue/reference/generated/configuration.cue | 19 ++++++++ 4 files changed, 58 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b442d02b6ef36..1f747b311a53a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3132,9 +3132,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747de28146841c571d1ae290df8a28ddcd6630247d9dc75d89113a17e515011c" +checksum = "b590a6b01bf6bb079a8e64ec6e7697421085aed63392db55f938cadfe38f5935" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 53fd5b6cee6be..1ea3b7311c0c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.1" , default-features = false } +cuckoo-clock = { version = "0.1.2" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index df816d7c33162..8c65ef241a5a3 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -18,7 +18,10 @@ use futures::{ stream::{self, BoxStream}, }; use tempfile::NamedTempFile; -use tokio::time::{Instant, interval, interval_at}; +use tokio::{ + task::JoinSet, + time::{Instant, interval, interval_at}, +}; use tokio_stream::wrappers::IntervalStream; use vector_config::configurable_component; use vector_lib::{ @@ -97,6 +100,17 @@ pub struct CuckooMemoryConfig { /// By default, export is only done on exit. #[serde(skip_serializing_if = "vector_lib::serde::is_default")] pub export_interval: Option, + /// Number of threads to use for scanning and updating LRU/TTL. + /// + /// By default, scanning is single threaded. + #[serde(default)] + pub scanning_threads: Option, + /// If set to true scanning will not block insertions. + /// This may affect behavior since blocking scans would free up space before insertions. + /// + /// By default, scanning is blocking. + #[serde(default = "crate::serde::default_false")] + pub concurrent_scanning: bool, } const fn default_cuckoo_fingerprint_bits() -> NonZeroUsize { @@ -483,14 +497,25 @@ impl StreamSink for CuckooMemoryTable { } Some(_) = scan_interval.next() => { - let expired = self.filter.scan_and_update_full(); - emit!(MemoryEnrichmentTableTtlExpiredCount { - count: expired as u64 - }); - emit!(MemoryEnrichmentTableFlushed { - new_objects_count: self.filter.get_item_count(), - new_byte_size: self.filter.get_memory_usage() - }); + let mut handles = JoinSet::new(); + let filter = self.filter.clone(); + let count = self.cuckoo_config.scanning_threads.unwrap_or(NonZeroUsize::new(1).unwrap()); + for i in 0..count.get() { + let filter = filter.clone(); + handles.spawn(async move { + let expired = filter.scan_and_update_full_partition(count, i); + emit!(MemoryEnrichmentTableTtlExpiredCount { + count: expired as u64 + }); + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: filter.get_item_count(), + new_byte_size: filter.get_memory_usage() + }); + }); + } + if !self.cuckoo_config.concurrent_scanning { + let _ = handles.join_all().await; + } } } } @@ -520,6 +545,8 @@ mod tests { counter_field: OptionalValuePath::none(), persistence_path: None, export_interval: None, + scanning_threads: None, + concurrent_scanning: false, }; modfn(&mut config); config diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 83db287fa1683..d11bcf1b5f169 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -223,6 +223,16 @@ generated: configuration: { description: "Number of slots in each bucket" required: false } + concurrent_scanning: { + type: bool: default: false + description: """ + If set to true scanning will not block insertions. + This may affect behavior since blocking scans would free up space before insertions. + + By default, scanning is blocking. + """ + required: false + } counter_bits: { type: uint: default: 8 description: "Number of bits to use to track counter. This will limit the max value." @@ -278,6 +288,15 @@ generated: configuration: { """ required: false } + scanning_threads: { + type: uint: {} + description: """ + Number of threads to use for scanning and updating LRU/TTL. + + By default, scanning is single threaded. + """ + required: false + } ttl_bits: { type: uint: default: 8 description: """ From a3ab4a75d4ef61454a5ea214d2ccfc44217830e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 17:30:37 +0200 Subject: [PATCH 30/58] Bump cuckoo-clock to fix partitioned scanning --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1f747b311a53a..92fef7200538d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3132,9 +3132,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b590a6b01bf6bb079a8e64ec6e7697421085aed63392db55f938cadfe38f5935" +checksum = "3955ec500ca96631dfc03dfaab4c4fbc57332ff7d8953fe4397392cb48f9c77c" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 1ea3b7311c0c4..11120ef568618 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.2" , default-features = false } +cuckoo-clock = { version = "0.1.3" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } From d15eca925874a36e1b0f4a223c15ac03b7ce7051 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 17:36:00 +0200 Subject: [PATCH 31/58] Prevent dropping scanning tasks when using concurrent_scanning --- src/enrichment_tables/memory/cuckoo_table.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 8c65ef241a5a3..e333d414b8622 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -502,7 +502,7 @@ impl StreamSink for CuckooMemoryTable { let count = self.cuckoo_config.scanning_threads.unwrap_or(NonZeroUsize::new(1).unwrap()); for i in 0..count.get() { let filter = filter.clone(); - handles.spawn(async move { + let task = async move { let expired = filter.scan_and_update_full_partition(count, i); emit!(MemoryEnrichmentTableTtlExpiredCount { count: expired as u64 @@ -511,7 +511,12 @@ impl StreamSink for CuckooMemoryTable { new_objects_count: filter.get_item_count(), new_byte_size: filter.get_memory_usage() }); - }); + }; + if !self.cuckoo_config.concurrent_scanning { + handles.spawn(task); + } else { + tokio::spawn(task); + } } if !self.cuckoo_config.concurrent_scanning { let _ = handles.join_all().await; From 9f6c1e764894052a2d8bc4914f695789fa19588f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 18:10:30 +0200 Subject: [PATCH 32/58] Bump cuckoo-clock to fix partition indices --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d47588c417e3a..740a973099f4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,9 +3171,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3955ec500ca96631dfc03dfaab4c4fbc57332ff7d8953fe4397392cb48f9c77c" +checksum = "2ec0d006c9c09cbae8d3b5bcd617229d1e104a5177e5ebed760f21f9f804030d" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 595443a7489c2..62e308f646490 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,7 +156,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.3" , default-features = false } +cuckoo-clock = { version = "0.1.4" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } From 110c3200e643e2a9ec0621dbc5830dd7cdbb15c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 18:12:04 +0200 Subject: [PATCH 33/58] Remove nested serde typetag --- src/enrichment_tables/memory/cuckoo_table.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index e333d414b8622..e1d68c63da3ff 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -56,7 +56,6 @@ pub(super) struct CuckooMemoryTable { /// Configuration of cuckoo filter for memory table. #[configurable_component] #[derive(Clone, Debug, PartialEq, Eq)] -#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] pub struct CuckooMemoryConfig { /// Number of bits used for fingerprint. #[serde(default = "default_cuckoo_fingerprint_bits")] From 120e3343b8169d8a6c22b13f3558edac12693c8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 18:13:34 +0200 Subject: [PATCH 34/58] Track failed insertions --- src/enrichment_tables/memory/cuckoo_table.rs | 30 +++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index e1d68c63da3ff..cd998bf5e4edf 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -39,9 +39,9 @@ use vrl::value::{KeyString, ObjectMap, Value}; use crate::enrichment_tables::memory::{ MemoryConfig, internal_events::{ - MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead, - MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableRemoved, - MemoryEnrichmentTableTtlExpiredCount, + MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed, + MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead, MemoryEnrichmentTableReadFailed, + MemoryEnrichmentTableRemoved, MemoryEnrichmentTableTtlExpiredCount, }, }; @@ -285,7 +285,7 @@ impl CuckooMemoryTable { continue; }; - if self.cuckoo_config.ttl_enabled || self.cuckoo_config.counter_enabled { + let res = if self.cuckoo_config.ttl_enabled || self.cuckoo_config.counter_enabled { let ttl = self .config .ttl_field @@ -317,21 +317,29 @@ impl CuckooMemoryTable { .and_then(|p| value.get(p)) .and_then(|v| v.as_integer()) .and_then(|v| i32::try_from(v).ok()); - let _ = self.filter.insert_if_not_present_with_update( + self.filter.insert_if_not_present_with_update( k, InsertValues { ttl, counter }, LookupValues { ttl, counter_diff: counter, }, - ); + ) } else { - let _ = self.filter.insert_if_not_present(k); + self.filter.insert_if_not_present(k) + }; + + if res.is_some_and(|r| r.matches_key(k, &self.filter)) { + emit!(MemoryEnrichmentTableInsertFailed { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } else { + emit!(MemoryEnrichmentTableInserted { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); } - emit!(MemoryEnrichmentTableInserted { - key: k, - include_key_metric_tag: self.config.internal_metrics.include_key_tag - }); } } } From a4dfda81daa38139246413226757c2bfbc9a2580 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 14:05:05 +0200 Subject: [PATCH 35/58] Add deny_unknown_fields for cuckoo_table --- src/enrichment_tables/memory/cuckoo_table.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index cd998bf5e4edf..aa1a6629bbfd2 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -56,6 +56,7 @@ pub(super) struct CuckooMemoryTable { /// Configuration of cuckoo filter for memory table. #[configurable_component] #[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields)] pub struct CuckooMemoryConfig { /// Number of bits used for fingerprint. #[serde(default = "default_cuckoo_fingerprint_bits")] From 90df75f2359945e4cb803ab535aeda625e9f910a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 14:19:05 +0200 Subject: [PATCH 36/58] Track number of scans in progress to prevent piling up of scan tasks --- src/enrichment_tables/memory/cuckoo_table.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index aa1a6629bbfd2..4c3927ac4de27 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -4,6 +4,10 @@ use std::{ num::NonZeroUsize, path::PathBuf, pin::Pin, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, time::Duration, }; @@ -469,6 +473,8 @@ impl StreamSink for CuckooMemoryTable { }) .unwrap_or(Box::pin(stream::empty())); + let scans_in_progress = Arc::new(AtomicUsize::new(0)); + loop { tokio::select! { event = input.next() => { @@ -505,11 +511,17 @@ impl StreamSink for CuckooMemoryTable { } Some(_) = scan_interval.next() => { + if scans_in_progress.load(Ordering::Acquire) > 0 { + warn!("Previous scan still in progress for cuckoo enrichment table. New scan will be skipped until previous one is complete. Consider increasing scan interval."); + continue; + } let mut handles = JoinSet::new(); let filter = self.filter.clone(); let count = self.cuckoo_config.scanning_threads.unwrap_or(NonZeroUsize::new(1).unwrap()); + scans_in_progress.fetch_add(count.get(), Ordering::AcqRel); for i in 0..count.get() { let filter = filter.clone(); + let scans_in_progress = Arc::clone(&scans_in_progress); let task = async move { let expired = filter.scan_and_update_full_partition(count, i); emit!(MemoryEnrichmentTableTtlExpiredCount { @@ -519,6 +531,7 @@ impl StreamSink for CuckooMemoryTable { new_objects_count: filter.get_item_count(), new_byte_size: filter.get_memory_usage() }); + scans_in_progress.fetch_sub(1, Ordering::AcqRel); }; if !self.cuckoo_config.concurrent_scanning { handles.spawn(task); From 8daa72c3f9ee7938591e25f250a70519fb04dd16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 14:21:36 +0200 Subject: [PATCH 37/58] Prevent 0 export interval for cuckoo_table --- src/enrichment_tables/memory/cuckoo_table.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 4c3927ac4de27..bb20e9163fd92 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -1,7 +1,7 @@ use std::{ fs::File, io::{BufReader, BufWriter, Write}, - num::NonZeroUsize, + num::{NonZeroU64, NonZeroUsize}, path::PathBuf, pin::Pin, sync::{ @@ -103,7 +103,7 @@ pub struct CuckooMemoryConfig { /// /// By default, export is only done on exit. #[serde(skip_serializing_if = "vector_lib::serde::is_default")] - pub export_interval: Option, + pub export_interval: Option, /// Number of threads to use for scanning and updating LRU/TTL. /// /// By default, scanning is single threaded. @@ -467,6 +467,7 @@ impl StreamSink for CuckooMemoryTable { let mut export_interval: Pin + Send>> = self .cuckoo_config .export_interval + .map(NonZeroU64::get) .map(Duration::from_secs) .map:: + Send>>, _>(|d| { Box::pin(IntervalStream::new(interval(d))) From ff66dad61f6fbaddee2cc4b87676fae164e4b440 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 14:32:13 +0200 Subject: [PATCH 38/58] Reject cuckoo configurations that produce filters higher than defined `max_byte_size` --- src/enrichment_tables/memory/cuckoo_table.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index bb20e9163fd92..646aa3084de85 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -229,6 +229,13 @@ impl CuckooMemoryTable { } }; + let filter_size = filter.get_memory_usage(); + if let Some(max_byte_size) = config.max_byte_size + && filter.get_memory_usage() as u64 > max_byte_size + { + return Err(format!("Configured cuckoo filter is larger ({}) than defined `max_byte_size` ({}). Reduce the size of cuckoo filter or increase or remove `max_byte_size`.", filter_size, max_byte_size).into()); + } + Ok(Self { config, filter, From 463ac9ab96285fa72a3974c7372895fb6849ca36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 15:35:16 +0200 Subject: [PATCH 39/58] Calculate cuckoo filter size before building it to prevent needless allocation --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 14 +++++++------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7235912b85fb1..23966d53a49b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,9 +3171,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ec0d006c9c09cbae8d3b5bcd617229d1e104a5177e5ebed760f21f9f804030d" +checksum = "3420f89fadc21611faa8f16dda185fcb2c35818963000466756f0698df1b5a3a" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index ad93d4bfe8a63..2e7dbe1f4fe68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,7 +157,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.4" , default-features = false } +cuckoo-clock = { version = "0.1.5" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 646aa3084de85..a7a6175d16bcf 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -181,6 +181,13 @@ impl CuckooMemoryTable { let built_config = builder.build()?; + let filter_size = built_config.get_configured_memory_usage(); + if let Some(max_byte_size) = config.max_byte_size + && filter_size as u64 > max_byte_size + { + return Err(format!("Configured cuckoo filter is larger ({}) than defined `max_byte_size` ({}). Reduce the size of cuckoo filter or increase or remove `max_byte_size`.", filter_size, max_byte_size).into()); + } + let filter = 'import: { if let Some(path) = &cuckoo_config.persistence_path { let file = match File::open(path) { @@ -229,13 +236,6 @@ impl CuckooMemoryTable { } }; - let filter_size = filter.get_memory_usage(); - if let Some(max_byte_size) = config.max_byte_size - && filter.get_memory_usage() as u64 > max_byte_size - { - return Err(format!("Configured cuckoo filter is larger ({}) than defined `max_byte_size` ({}). Reduce the size of cuckoo filter or increase or remove `max_byte_size`.", filter_size, max_byte_size).into()); - } - Ok(Self { config, filter, From 0ac29fb0641bde9648e19cb039b0d3709b518468 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 16:10:46 +0200 Subject: [PATCH 40/58] Use `in_current_span` for scanning tasks --- src/enrichment_tables/memory/cuckoo_table.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index a7a6175d16bcf..e1d2daba4d11e 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -27,6 +27,7 @@ use tokio::{ time::{Instant, interval, interval_at}, }; use tokio_stream::wrappers::IntervalStream; +use tracing::Instrument; use vector_config::configurable_component; use vector_lib::{ EstimatedJsonEncodedSizeOf, @@ -540,7 +541,7 @@ impl StreamSink for CuckooMemoryTable { new_byte_size: filter.get_memory_usage() }); scans_in_progress.fetch_sub(1, Ordering::AcqRel); - }; + }.in_current_span(); if !self.cuckoo_config.concurrent_scanning { handles.spawn(task); } else { From c56b767f7f67a118f1be7143e9b47575374d2014 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 30 Jun 2026 13:13:13 +0200 Subject: [PATCH 41/58] Add support for LRU deletion on scan --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 9 ++++++++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23966d53a49b1..30d2e8d45a53b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,9 +3171,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.5" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3420f89fadc21611faa8f16dda185fcb2c35818963000466756f0698df1b5a3a" +checksum = "95ae26df9c532a892d1051adf27040245af2caa0c0e6db0e861fcca58a4f3ef3" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 2e7dbe1f4fe68..88cc1ce9561f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,7 +157,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.5" , default-features = false } +cuckoo-clock = { version = "0.2.0" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index e1d2daba4d11e..de8e8c4a96115 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -78,6 +78,9 @@ pub struct CuckooMemoryConfig { /// Can be set to true to use LRU strategy for kicking. #[serde(default = "crate::serde::default_false")] pub lru_enabled: bool, + /// Can be set to true to delete unused items on scan when LRU is used. + #[serde(default = "crate::serde::default_false")] + pub lru_deletion_enabled: bool, /// Can be set to true to also track TTL for entries. #[serde(default = "crate::serde::default_true")] pub ttl_enabled: bool, @@ -151,7 +154,10 @@ impl CuckooMemoryTable { .max_kicks(cuckoo_config.max_kicks); if cuckoo_config.lru_enabled { - builder = builder.with_lru(LruConfig::default()); + builder = builder.with_lru(LruConfig { + remove_on_zero: cuckoo_config.lru_deletion_enabled, + ..Default::default() + }); } if cuckoo_config.ttl_enabled { @@ -573,6 +579,7 @@ mod tests { max_entries: 1000, max_kicks: default_cuckoo_max_kicks(), lru_enabled: false, + lru_deletion_enabled: false, ttl_enabled: false, ttl_bits: default_cuckoo_ttl_bits(), counter_enabled: false, From d1682a8b2ea0e085495154976855277ac3dc9555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 30 Jun 2026 15:24:51 +0200 Subject: [PATCH 42/58] Allow some configuration changes when using cuckoo filter export --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 30d2e8d45a53b..7fe6a8f03dc0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,9 +3171,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ae26df9c532a892d1051adf27040245af2caa0c0e6db0e861fcca58a4f3ef3" +checksum = "d252a41884ec128cc98c9e703ed872ad1d9e4e5170be79adc0a6b166a140eaed" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 88cc1ce9561f4..69f5bc98d7946 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,7 +157,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.2.0" , default-features = false } +cuckoo-clock = { version = "0.2.1" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index de8e8c4a96115..e4918002344df 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -224,13 +224,13 @@ impl CuckooMemoryTable { } }; - if persisted_config != built_config { + if !built_config.compatible_layout(persisted_config) { return Err( - format!("Stored cuckoo filter configuration doesn't match with new configuration. If this is intended, remove the persisted state file ({}).", path.to_str().unwrap_or("")).into(), + format!("Stored cuckoo filter configuration is not compatible with new configuration. Only changes to values that don't affect layout or size are allowed. If this is intended, remove the persisted state file ({}).", path.to_str().unwrap_or("")).into(), ); } - match CuckooFilter::import_state(hasher, persisted_config, &mut reader) { + match CuckooFilter::import_state(hasher, built_config, &mut reader) { Ok(filter) => filter, Err(error) => { return Err( From aaef6d4c4d11a958efb5f4079a0ea048ad3f22d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 30 Jun 2026 19:58:34 +0200 Subject: [PATCH 43/58] Implement state preservation for cuckoo filter --- src/enrichment_tables/memory/config.rs | 26 ++-- src/enrichment_tables/memory/cuckoo_table.rs | 130 ++++++++++++------- 2 files changed, 100 insertions(+), 56 deletions(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index a3a4a456b60a2..47ae99fc2d86d 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -195,19 +195,27 @@ impl MemoryConfig { .clone() } - pub(super) async fn get_or_build_cuckoo(&self) -> crate::Result { - let mut boxed_cuckoo = self.cuckoo.lock().await; + pub(super) async fn get_or_build_cuckoo( + &self, + prev_state: Option>, + ) -> crate::Result { let Some(TableFilter::Cuckoo(cuckoo)) = &self.filter else { panic!("No cuckoo"); }; + let mut boxed_cuckoo = self.cuckoo.lock().await; if let Some(boxed_cuckoo) = boxed_cuckoo.as_ref() { Ok(*boxed_cuckoo.clone()) } else { Ok(*boxed_cuckoo - .insert(Box::new(CuckooMemoryTable::new( - self.clone(), - cuckoo.clone(), - )?)) + .insert(if let Some(prev) = prev_state { + Box::new(CuckooMemoryTable::from_previous_state( + self.clone(), + cuckoo.clone(), + prev, + )?) + } else { + Box::new(CuckooMemoryTable::new(self.clone(), cuckoo.clone())?) + }) .clone()) } } @@ -224,9 +232,9 @@ impl EnrichmentTableConfig for MemoryConfig { if self.source_config.is_some() { return Err("Source functionality is not supported for cuckoo filter".into()); } - Ok(Box::new(self.get_or_build_cuckoo().await?)) + Ok(Box::new(self.get_or_build_cuckoo(prev_state).await?)) } - None => Ok(Box::new(self.get_or_build_memory().await)), + None => Ok(Box::new(self.get_or_build_memory(prev_state).await)), } } @@ -265,7 +273,7 @@ impl SinkConfig for MemoryConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let sink = match &self.filter { Some(TableFilter::Cuckoo(_)) => { - VectorSink::from_event_streamsink(self.get_or_build_cuckoo().await?) + VectorSink::from_event_streamsink(self.get_or_build_cuckoo(None).await?) } None => VectorSink::from_event_streamsink(self.get_or_build_memory(None).await), }; diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index e4918002344df..9a3886074e62b 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -1,4 +1,5 @@ use std::{ + collections::VecDeque, fs::File, io::{BufReader, BufWriter, Write}, num::{NonZeroU64, NonZeroUsize}, @@ -147,53 +148,7 @@ impl CuckooMemoryTable { config: MemoryConfig, cuckoo_config: CuckooMemoryConfig, ) -> crate::Result { - let ttl_val = (config.ttl.div_ceil(config.scan_interval.get())).max(1); - let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) - .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) - .bucket_size(cuckoo_config.bucket_size) - .max_kicks(cuckoo_config.max_kicks); - - if cuckoo_config.lru_enabled { - builder = builder.with_lru(LruConfig { - remove_on_zero: cuckoo_config.lru_deletion_enabled, - ..Default::default() - }); - } - - if cuckoo_config.ttl_enabled { - let ttl_val: u32 = u32::try_from(ttl_val)?; - let needed_bits = ttl_val.ilog2() + 1; - if needed_bits as usize > cuckoo_config.ttl_bits.get() { - return Err( - format!( - "`ttl_bits` ({}) must be set to at least {} to support the default `ttl` value ({}) at the configured scan interval ({}).", - cuckoo_config.ttl_bits.get(), - needed_bits, - config.ttl, - config.scan_interval.get()).into(), - ); - } - builder = builder.with_ttl(TtlConfig { - ttl: ttl_val.try_into()?, - ttl_bits: cuckoo_config.ttl_bits.get().try_into()?, - }); - } - - if cuckoo_config.counter_enabled { - builder = builder.with_counter(CounterConfig { - counter_bits: cuckoo_config.counter_bits.get().try_into()?, - ..Default::default() - }); - } - - let built_config = builder.build()?; - - let filter_size = built_config.get_configured_memory_usage(); - if let Some(max_byte_size) = config.max_byte_size - && filter_size as u64 > max_byte_size - { - return Err(format!("Configured cuckoo filter is larger ({}) than defined `max_byte_size` ({}). Reduce the size of cuckoo filter or increase or remove `max_byte_size`.", filter_size, max_byte_size).into()); - } + let built_config = Self::build_config(&config, &cuckoo_config)?; let filter = 'import: { if let Some(path) = &cuckoo_config.persistence_path { @@ -250,6 +205,87 @@ impl CuckooMemoryTable { }) } + /// Creates a new [CuckooMemoryTable] based on the provided config and previous state. + pub(super) fn from_previous_state( + config: MemoryConfig, + cuckoo_config: CuckooMemoryConfig, + prev_state: Box, + ) -> crate::Result { + if let Ok(prev_memory) = prev_state.downcast::() { + let built_config = Self::build_config(&config, &cuckoo_config)?; + if built_config.compatible_layout(prev_memory.filter.get_configuration()) + && let Ok(mut old_filter) = + prev_memory.filter.exporter().snapshot().map(VecDeque::from) + && let Ok((hasher, _old_conf)) = CuckooFilter::import_config(&mut old_filter) + && let Ok(filter) = + CuckooFilter::import_state(hasher, built_config, &mut old_filter) + { + return Ok(Self { + filter, + config, + cuckoo_config, + }); + } + } + + Self::new(config, cuckoo_config) + } + + fn build_config( + config: &MemoryConfig, + cuckoo_config: &CuckooMemoryConfig, + ) -> crate::Result { + let ttl_val = (config.ttl.div_ceil(config.scan_interval.get())).max(1); + let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) + .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) + .bucket_size(cuckoo_config.bucket_size) + .max_kicks(cuckoo_config.max_kicks); + + if cuckoo_config.lru_enabled { + builder = builder.with_lru(LruConfig { + remove_on_zero: cuckoo_config.lru_deletion_enabled, + ..Default::default() + }); + } + + if cuckoo_config.ttl_enabled { + let ttl_val: u32 = u32::try_from(ttl_val)?; + let needed_bits = ttl_val.ilog2() + 1; + if needed_bits as usize > cuckoo_config.ttl_bits.get() { + return Err( + format!( + "`ttl_bits` ({}) must be set to at least {} to support the default `ttl` value ({}) at the configured scan interval ({}).", + cuckoo_config.ttl_bits.get(), + needed_bits, + config.ttl, + config.scan_interval.get()).into(), + ); + } + builder = builder.with_ttl(TtlConfig { + ttl: ttl_val.try_into()?, + ttl_bits: cuckoo_config.ttl_bits.get().try_into()?, + }); + } + + if cuckoo_config.counter_enabled { + builder = builder.with_counter(CounterConfig { + counter_bits: cuckoo_config.counter_bits.get().try_into()?, + ..Default::default() + }); + } + + let built_config = builder.build()?; + + let filter_size = built_config.get_configured_memory_usage(); + if let Some(max_byte_size) = config.max_byte_size + && filter_size as u64 > max_byte_size + { + return Err(format!("Configured cuckoo filter is larger ({}) than defined `max_byte_size` ({}). Reduce the size of cuckoo filter or increase or remove `max_byte_size`.", filter_size, max_byte_size).into()); + } + + Ok(built_config) + } + fn export(&self) { if let Some(path) = &self.cuckoo_config.persistence_path { let mut parent = path.clone(); From d1eb3b400566c28a5b89c9345a0b95f3dc080063 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 1 Jul 2026 17:21:38 +0200 Subject: [PATCH 44/58] Add more LRU options for cuckoo table --- Cargo.lock | 4 +- Cargo.toml | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 87 +++++++++++++++++-- .../cue/reference/generated/configuration.cue | 50 +++++++++++ 4 files changed, 135 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e7d8b94ec58a..29816800c2f1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,9 +3171,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.2.1" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d252a41884ec128cc98c9e703ed872ad1d9e4e5170be79adc0a6b166a140eaed" +checksum = "d53c54372b29d8ba8f4878f0d6b5e8cd7d2e1ec0b73c11d7fb0624800b92cb79" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index daadc251b39fe..ce073154a81f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,7 +166,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.2.1" , default-features = false } +cuckoo-clock = { version = "0.2.4" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 9a3886074e62b..b18d4eff6cb60 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use bytes::Bytes; use cuckoo_clock::{ CuckooFilter, ExportableRandomState, InsertValues, LookupValues, - config::{CounterConfig, CuckooConfiguration, LruConfig, TtlConfig}, + config::{CounterConfig, CuckooConfiguration, LruAgingStrategy, LruConfig, TtlConfig}, }; use futures::{ Stream, StreamExt, @@ -82,6 +82,21 @@ pub struct CuckooMemoryConfig { /// Can be set to true to delete unused items on scan when LRU is used. #[serde(default = "crate::serde::default_false")] pub lru_deletion_enabled: bool, + /// Number of bits to use to track LRU counter. + /// Low bit count will reduce the maximum LRU counter value, making the items expire sooner if + /// unused. + #[serde(default = "default_cuckoo_lru_bits")] + pub lru_bits: NonZeroUsize, + /// Starting value for LRU counter on item insertion. + /// Higher value will give newer items a higher probability to stay in the filter. + #[serde(default = "default_cuckoo_lru_starting_value")] + pub lru_starting_value: u32, + /// Value to increase LRU counter by on each item access. + #[serde(default = "default_cuckoo_lru_increment")] + pub lru_increment: u32, + /// Strategy to use when aging LRU counters at each scan. + #[serde(default)] + pub lru_aging_strategy: CuckooLruAgingStrategy, /// Can be set to true to also track TTL for entries. #[serde(default = "crate::serde::default_true")] pub ttl_enabled: bool, @@ -122,6 +137,31 @@ pub struct CuckooMemoryConfig { pub concurrent_scanning: bool, } +/// Aging strategy for LRU counters in cuckoo filters. +#[configurable_component] +#[derive(Clone, Default, Debug, PartialEq, Eq)] +#[serde(tag = "strategy", rename_all = "snake_case")] +#[configurable(metadata(docs::enum_tag_description = "The LRU aging strategy to use."))] +pub enum CuckooLruAgingStrategy { + /// Aging LRU counters by halving their value on each scan. + #[default] + Halving, + /// Aging LRU counters by decrementing by a fixed value on each scan. + Decrement { + /// Value to decrement by + value: u32, + }, +} + +impl From<&CuckooLruAgingStrategy> for LruAgingStrategy { + fn from(value: &CuckooLruAgingStrategy) -> Self { + match value { + CuckooLruAgingStrategy::Halving => LruAgingStrategy::Halving, + CuckooLruAgingStrategy::Decrement { value } => LruAgingStrategy::Decrement(*value), + } + } +} + const fn default_cuckoo_fingerprint_bits() -> NonZeroUsize { unsafe { NonZeroUsize::new_unchecked(8) } } @@ -134,6 +174,18 @@ const fn default_cuckoo_ttl_bits() -> NonZeroUsize { unsafe { NonZeroUsize::new_unchecked(8) } } +const fn default_cuckoo_lru_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_lru_starting_value() -> u32 { + 1 +} + +const fn default_cuckoo_lru_increment() -> u32 { + 1 +} + const fn default_cuckoo_counter_bits() -> NonZeroUsize { unsafe { NonZeroUsize::new_unchecked(8) } } @@ -179,9 +231,9 @@ impl CuckooMemoryTable { } }; - if !built_config.compatible_layout(persisted_config) { + if !built_config.compatible_layout(&persisted_config) { return Err( - format!("Stored cuckoo filter configuration is not compatible with new configuration. Only changes to values that don't affect layout or size are allowed. If this is intended, remove the persisted state file ({}).", path.to_str().unwrap_or("")).into(), + format!("Stored cuckoo filter configuration is not compatible with new configuration. Only changes to values that don't affect layout or size are allowed. If this is intended, remove the persisted state file ({}). Built: {:?}. Persisted: {:?}", path.to_str().unwrap_or(""), built_config, persisted_config).into(), ); } @@ -213,7 +265,7 @@ impl CuckooMemoryTable { ) -> crate::Result { if let Ok(prev_memory) = prev_state.downcast::() { let built_config = Self::build_config(&config, &cuckoo_config)?; - if built_config.compatible_layout(prev_memory.filter.get_configuration()) + if built_config.compatible_layout(&prev_memory.filter.get_configuration()) && let Ok(mut old_filter) = prev_memory.filter.exporter().snapshot().map(VecDeque::from) && let Ok((hasher, _old_conf)) = CuckooFilter::import_config(&mut old_filter) @@ -242,9 +294,30 @@ impl CuckooMemoryTable { .max_kicks(cuckoo_config.max_kicks); if cuckoo_config.lru_enabled { + let starting_value_needed_bits = cuckoo_config.lru_starting_value.ilog2() + 1; + if starting_value_needed_bits as usize > cuckoo_config.lru_bits.get() { + return Err(format!( + "`lru_bits` ({}) must be set to at least {} to support the `lru_starting_value` value ({}).", + cuckoo_config.lru_bits.get(), + starting_value_needed_bits, + cuckoo_config.lru_starting_value, + ).into()); + } + let increment_needed_bits = cuckoo_config.lru_increment.ilog2() + 1; + if increment_needed_bits as usize > cuckoo_config.lru_bits.get() { + return Err(format!( + "`lru_bits` ({}) must be set to at least {} to support the `lru_increment` value ({}).", + cuckoo_config.lru_bits.get(), + increment_needed_bits, + cuckoo_config.lru_increment, + ).into()); + } builder = builder.with_lru(LruConfig { + counter_bits: cuckoo_config.lru_bits.get().try_into()?, remove_on_zero: cuckoo_config.lru_deletion_enabled, - ..Default::default() + starting_value: cuckoo_config.lru_starting_value, + increment: cuckoo_config.lru_increment, + aging_strategy: (&cuckoo_config.lru_aging_strategy).into(), }); } @@ -625,6 +698,10 @@ mod tests { export_interval: None, scanning_threads: None, concurrent_scanning: false, + lru_bits: default_cuckoo_ttl_bits(), + lru_starting_value: default_cuckoo_lru_starting_value(), + lru_increment: default_cuckoo_lru_increment(), + lru_aging_strategy: CuckooLruAgingStrategy::default(), }; modfn(&mut config); config diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 8a4555f781345..f07298282568c 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -262,11 +262,61 @@ generated: configuration: { description: "Number of bits used for fingerprint." required: false } + lru_aging_strategy: { + type: object: options: { + value: { + type: uint: {} + description: "Value to decrement by" + required: true + relevant_when: "strategy = \"decrement\"" + } + strategy: { + required: false + type: string: { + enum: { + halving: "Aging LRU counters by halving their value on each scan." + decrement: "Aging LRU counters by decrementing by a fixed value on each scan." + } + default: "halving" + } + description: "The LRU aging strategy to use." + } + } + description: "Strategy to use when aging LRU counters at each scan." + required: false + } + lru_bits: { + type: uint: default: 8 + description: """ + Number of bits to use to track LRU counter. + Low bit count will reduce the maximum LRU counter value, making the items expire sooner if + unused. + """ + required: false + } + lru_deletion_enabled: { + type: bool: default: false + description: "Can be set to true to delete unused items on scan when LRU is used." + required: false + } lru_enabled: { type: bool: default: false description: "Can be set to true to use LRU strategy for kicking." required: false } + lru_increment: { + type: uint: default: 1 + description: "Value to increase LRU counter by on each item access." + required: false + } + lru_starting_value: { + type: uint: default: 1 + description: """ + Starting value for LRU counter on item insertion. + Higher value will give newer items a higher probability to stay in the filter. + """ + required: false + } max_entries: { type: uint: {} description: """ From 66edf80b90fb03ecdb38a614afacce595655e437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 3 Jul 2026 17:46:44 +0200 Subject: [PATCH 45/58] Make flush_interval NonZero --- src/enrichment_tables/memory/config.rs | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 1 + src/enrichment_tables/memory/table.rs | 6 ++++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 47ae99fc2d86d..e2951c405037f 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -50,7 +50,7 @@ pub struct MemoryConfig { /// /// By default, all writes are made visible immediately. #[serde(skip_serializing_if = "vector_lib::serde::is_default")] - pub flush_interval: Option, + pub flush_interval: Option, /// Maximum size of the table in bytes. All insertions that make /// this table bigger than the maximum size are rejected. /// diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index b18d4eff6cb60..01fbc4dc1d8e7 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -582,6 +582,7 @@ impl StreamSink for CuckooMemoryTable { let mut flush_interval: Pin + Send>> = self .config .flush_interval + .map(NonZeroU64::get) .map(Duration::from_secs) .map:: + Send>>, _>(|d| { Box::pin(IntervalStream::new(interval(d))) diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index 3943c521649a3..81899230bae12 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -1,6 +1,7 @@ #![allow(unsafe_op_in_unsafe_fn)] // TODO review ShallowCopy usage code and fix properly. use std::{ + num::NonZeroU64, pin::Pin, sync::{Arc, Mutex, MutexGuard}, time::{Duration, Instant}, @@ -431,6 +432,7 @@ impl StreamSink for Memory { let mut flush_interval: Pin + Send>> = self .config .flush_interval + .map(NonZeroU64::get) .map(Duration::from_secs) .map:: + Send>>, _>(|d| { Box::pin(IntervalStream::new(interval(d))) @@ -713,7 +715,7 @@ mod tests { let ttl = 100; let memory = Memory::new(build_memory_config(|c| { c.ttl = ttl; - c.flush_interval = Some(10); + c.flush_interval = NonZeroU64::new(10); })); memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); @@ -952,7 +954,7 @@ mod tests { )]))); let memory = Memory::new(build_memory_config(|c| { - c.flush_interval = Some(1); + c.flush_interval = NonZeroU64::new(1); })); run_and_assert_sink_compliance( From 17c84f9523c7983f708f18775160926520a56d18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 3 Jul 2026 19:16:14 +0200 Subject: [PATCH 46/58] Prevent counting LRU deletions towards TTL expirations --- src/enrichment_tables/memory/cuckoo_table.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 01fbc4dc1d8e7..1284b55394c90 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -647,8 +647,16 @@ impl StreamSink for CuckooMemoryTable { for i in 0..count.get() { let filter = filter.clone(); let scans_in_progress = Arc::clone(&scans_in_progress); + let lru_deletion_enabled = self.cuckoo_config.lru_deletion_enabled; let task = async move { - let expired = filter.scan_and_update_full_partition(count, i); + let expired = if lru_deletion_enabled { + filter.scan_and_update_lru_partition(count, i); + // Run TTL scan separately when LRU deletion is enabled, to ensure + // correct TTL expired count + filter.scan_and_update_ttl_partition(count, i) + } else { + filter.scan_and_update_full_partition(count, i) + }; emit!(MemoryEnrichmentTableTtlExpiredCount { count: expired as u64 }); From 78e73aeb8a21116bedc5f71d1030d3a0b7b1a96f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 3 Jul 2026 19:44:40 +0200 Subject: [PATCH 47/58] Reject persistence_path with missing parent to prevent failed exports --- src/enrichment_tables/memory/cuckoo_table.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 1284b55394c90..5eb389c98c72c 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -208,6 +208,15 @@ impl CuckooMemoryTable { Ok(file) => file, Err(err) => match err.kind() { std::io::ErrorKind::NotFound => { + if let Some(parent) = path.parent() + && File::open(parent).is_err() + { + return Err(format!( + "Cuckoo filter persistence path ({}) doesn't exist. This will prevent exporting the cuckoo filter state. Fix the `persistence_path` to ensure export works.", + parent.to_str().unwrap_or(""), + ) + .into()); + } break 'import CuckooFilter::new_random_exportable(built_config); } _ => { From 9b01dba346e5cc53d7bae771ffda18f3fc732ebd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 3 Jul 2026 19:50:03 +0200 Subject: [PATCH 48/58] Handle zero values when calculating needed bits --- src/enrichment_tables/memory/cuckoo_table.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 5eb389c98c72c..4c6f68664d7f3 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -303,7 +303,11 @@ impl CuckooMemoryTable { .max_kicks(cuckoo_config.max_kicks); if cuckoo_config.lru_enabled { - let starting_value_needed_bits = cuckoo_config.lru_starting_value.ilog2() + 1; + let starting_value_needed_bits = cuckoo_config + .lru_starting_value + .checked_ilog2() + .unwrap_or(0) + + 1; if starting_value_needed_bits as usize > cuckoo_config.lru_bits.get() { return Err(format!( "`lru_bits` ({}) must be set to at least {} to support the `lru_starting_value` value ({}).", @@ -312,7 +316,8 @@ impl CuckooMemoryTable { cuckoo_config.lru_starting_value, ).into()); } - let increment_needed_bits = cuckoo_config.lru_increment.ilog2() + 1; + let increment_needed_bits = + cuckoo_config.lru_increment.checked_ilog2().unwrap_or(0) + 1; if increment_needed_bits as usize > cuckoo_config.lru_bits.get() { return Err(format!( "`lru_bits` ({}) must be set to at least {} to support the `lru_increment` value ({}).", @@ -332,7 +337,7 @@ impl CuckooMemoryTable { if cuckoo_config.ttl_enabled { let ttl_val: u32 = u32::try_from(ttl_val)?; - let needed_bits = ttl_val.ilog2() + 1; + let needed_bits = ttl_val.checked_ilog2().unwrap_or(0) + 1; if needed_bits as usize > cuckoo_config.ttl_bits.get() { return Err( format!( @@ -435,7 +440,7 @@ impl CuckooMemoryTable { .map(|v| (v.div_ceil(self.config.scan_interval.get())).max(1)) .and_then(|v| u32::try_from(v).ok()); if let Some(ttl) = ttl { - let needed_bits = ttl.ilog2() + 1; + let needed_bits = ttl.checked_ilog2().unwrap_or(0) + 1; if needed_bits as usize > self.cuckoo_config.ttl_bits.get() { warn!( "`ttl_bits` ({}) must be set to at least {} to support the provided `ttl` value ({}) at the configured scan interval ({}).", From e3da90213b82f2803ea372b0df9a55175765abcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 3 Jul 2026 20:34:08 +0200 Subject: [PATCH 49/58] Prevent increasing flushed count too many times when using multiple scanning threads --- src/enrichment_tables/memory/cuckoo_table.rs | 22 ++++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 4c6f68664d7f3..ee404b8f0f2f8 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -674,20 +674,24 @@ impl StreamSink for CuckooMemoryTable { emit!(MemoryEnrichmentTableTtlExpiredCount { count: expired as u64 }); - emit!(MemoryEnrichmentTableFlushed { - new_objects_count: filter.get_item_count(), - new_byte_size: filter.get_memory_usage() - }); scans_in_progress.fetch_sub(1, Ordering::AcqRel); }.in_current_span(); - if !self.cuckoo_config.concurrent_scanning { - handles.spawn(task); - } else { - tokio::spawn(task); - } + handles.spawn(task); } if !self.cuckoo_config.concurrent_scanning { let _ = handles.join_all().await; + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: filter.get_item_count(), + new_byte_size: filter.get_memory_usage() + }); + } else { + tokio::spawn(async move { + let _ = handles.join_all().await; + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: filter.get_item_count(), + new_byte_size: filter.get_memory_usage() + }); + }); } } } From 0f08a534b1a9d764e17bf5edfde3cae6cf6fb0de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 3 Jul 2026 21:06:12 +0200 Subject: [PATCH 50/58] Handle relative paths for persistence_path --- src/enrichment_tables/memory/cuckoo_table.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index ee404b8f0f2f8..f77f436405c80 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -209,6 +209,7 @@ impl CuckooMemoryTable { Err(err) => match err.kind() { std::io::ErrorKind::NotFound => { if let Some(parent) = path.parent() + && parent != "" && File::open(parent).is_err() { return Err(format!( @@ -377,6 +378,9 @@ impl CuckooMemoryTable { if let Some(path) = &self.cuckoo_config.persistence_path { let mut parent = path.clone(); if parent.pop() { + if parent == *"" { + parent = ".".into(); + } match NamedTempFile::new_in(parent) { Ok(temp) => { { From c210c9da0e43400790896761f0bbea8a73188909 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 3 Jul 2026 21:09:24 +0200 Subject: [PATCH 51/58] Use metadata check instead of `File::open` to check the parent --- src/enrichment_tables/memory/cuckoo_table.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index f77f436405c80..d969134c210c7 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -1,6 +1,6 @@ use std::{ collections::VecDeque, - fs::File, + fs::{self, File}, io::{BufReader, BufWriter, Write}, num::{NonZeroU64, NonZeroUsize}, path::PathBuf, @@ -210,7 +210,7 @@ impl CuckooMemoryTable { std::io::ErrorKind::NotFound => { if let Some(parent) = path.parent() && parent != "" - && File::open(parent).is_err() + && fs::metadata(parent).is_err() { return Err(format!( "Cuckoo filter persistence path ({}) doesn't exist. This will prevent exporting the cuckoo filter state. Fix the `persistence_path` to ensure export works.", From f18cfb97f2db02c93f4ff51df787d520d817026f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 3 Jul 2026 21:15:39 +0200 Subject: [PATCH 52/58] Expose options to control counter increments --- src/enrichment_tables/memory/cuckoo_table.rs | 21 +++++++++++++++++-- .../cue/reference/generated/configuration.cue | 12 ++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index d969134c210c7..519d2204fa0f9 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -110,10 +110,16 @@ pub struct CuckooMemoryConfig { /// Number of bits to use to track counter. This will limit the max value. #[serde(default = "default_cuckoo_counter_bits")] pub counter_bits: NonZeroUsize, - /// Field in the incoming value used as the counter override. + /// Field in the incoming value used as the counter increment override. #[configurable(derived)] #[serde(default)] pub counter_field: OptionalValuePath, + /// The amount to increment the counter by on every insertion. Negative values are allowed. + #[serde(default = "default_cuckoo_counter_insertion_increment")] + pub counter_insertion_increment: i32, + /// The amount to increment the counter by on every lookup. Negative values are allowed. + #[serde(default = "default_cuckoo_counter_lookup_increment")] + pub counter_lookup_increment: i32, /// Path to the file to export data to periodically and on exit. /// Data will be imported from this file on startup. #[configurable(derived)] @@ -190,6 +196,14 @@ const fn default_cuckoo_counter_bits() -> NonZeroUsize { unsafe { NonZeroUsize::new_unchecked(8) } } +const fn default_cuckoo_counter_insertion_increment() -> i32 { + 1 +} + +const fn default_cuckoo_counter_lookup_increment() -> i32 { + 1 +} + const fn default_cuckoo_max_kicks() -> usize { 500 } @@ -358,7 +372,8 @@ impl CuckooMemoryTable { if cuckoo_config.counter_enabled { builder = builder.with_counter(CounterConfig { counter_bits: cuckoo_config.counter_bits.get().try_into()?, - ..Default::default() + change_on_insert: cuckoo_config.counter_insertion_increment, + change_on_lookup: cuckoo_config.counter_lookup_increment, }); } @@ -725,6 +740,8 @@ mod tests { counter_enabled: false, counter_bits: default_cuckoo_counter_bits(), counter_field: OptionalValuePath::none(), + counter_insertion_increment: default_cuckoo_counter_insertion_increment(), + counter_lookup_increment: default_cuckoo_counter_lookup_increment(), persistence_path: None, export_interval: None, scanning_threads: None, diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index f07298282568c..288a7653b5cde 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -245,7 +245,17 @@ generated: configuration: { } counter_field: { type: string: default: "" - description: "Field in the incoming value used as the counter override." + description: "Field in the incoming value used as the counter increment override." + required: false + } + counter_insertion_increment: { + type: int: default: 1 + description: "The amount to increment the counter by on every insertion. Negative values are allowed." + required: false + } + counter_lookup_increment: { + type: int: default: 1 + description: "The amount to increment the counter by on every lookup. Negative values are allowed." required: false } export_interval: { From 3196ad5fdea8de87764cf8ca06c1ccbce48a8522 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Sat, 4 Jul 2026 10:46:29 +0200 Subject: [PATCH 53/58] Update cuckoo-clock to fix item count after LRU deletion --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 29816800c2f1e..7bd06211abeeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,9 +3171,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d53c54372b29d8ba8f4878f0d6b5e8cd7d2e1ec0b73c11d7fb0624800b92cb79" +checksum = "2f78d5002849ff0144af3fd772bd4b7686137c73c1ba6d2bfed40a6edba912ca" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index ce073154a81f0..1f241cc8e2d12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,7 +166,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.2.4" , default-features = false } +cuckoo-clock = { version = "0.2.5" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } From d0d120b93be806b2f1f84ef2e8b63e38e3d739dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Sat, 4 Jul 2026 11:32:59 +0200 Subject: [PATCH 54/58] Document persistence_path + reload_behavior behavior --- src/enrichment_tables/memory/cuckoo_table.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 519d2204fa0f9..50efaa10c5ece 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -121,7 +121,10 @@ pub struct CuckooMemoryConfig { #[serde(default = "default_cuckoo_counter_lookup_increment")] pub counter_lookup_increment: i32, /// Path to the file to export data to periodically and on exit. - /// Data will be imported from this file on startup. + /// Data will be imported from this file on startup and reload. + /// + /// If table `reload_behavior` is set to `clear-state` and this is set, the persisted state will + /// still be read after reload. #[configurable(derived)] #[serde(default)] pub persistence_path: Option, From 9ebfbfb7a7746fb90e4159d7d7cff6e4a1b45b35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Sat, 4 Jul 2026 17:49:26 +0200 Subject: [PATCH 55/58] Add a warning about different stored TTL --- Cargo.lock | 4 +-- Cargo.toml | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 28 +++++++++++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7bd06211abeeb..472e863373ef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,9 +3171,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f78d5002849ff0144af3fd772bd4b7686137c73c1ba6d2bfed40a6edba912ca" +checksum = "f52be29a2643acdad9588c64ed9f160643a994160a41a4a4e42e65cd097f812e" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 1f241cc8e2d12..8d997de351f61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,7 +166,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.2.5" , default-features = false } +cuckoo-clock = { version = "0.2.6" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 50efaa10c5ece..ceadcc0f9d020 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -264,6 +264,19 @@ impl CuckooMemoryTable { ); } + if let Some(ttl) = built_config.ttl_config() + && let Some(persisted_ttl) = persisted_config.ttl_config() + && ttl.ttl != persisted_ttl.ttl + { + warn!( + "Persisted configuration had a different default TTL value ({}), comapared to the new value ({}). Previous default TTL value is effectively {} seconds, while the new one is {} seconds.", + persisted_ttl.ttl, + ttl.ttl, + (persisted_ttl.ttl.get() as u64) * config.scan_interval.get(), + config.ttl + ); + } + match CuckooFilter::import_state(hasher, built_config, &mut reader) { Ok(filter) => filter, Err(error) => { @@ -292,13 +305,26 @@ impl CuckooMemoryTable { ) -> crate::Result { if let Ok(prev_memory) = prev_state.downcast::() { let built_config = Self::build_config(&config, &cuckoo_config)?; + let built_ttl = built_config.ttl_config().clone(); if built_config.compatible_layout(&prev_memory.filter.get_configuration()) && let Ok(mut old_filter) = prev_memory.filter.exporter().snapshot().map(VecDeque::from) - && let Ok((hasher, _old_conf)) = CuckooFilter::import_config(&mut old_filter) + && let Ok((hasher, old_conf)) = CuckooFilter::import_config(&mut old_filter) && let Ok(filter) = CuckooFilter::import_state(hasher, built_config, &mut old_filter) { + if let Some(ttl) = built_ttl + && let Some(old_ttl) = old_conf.ttl_config() + && ttl.ttl != old_ttl.ttl + { + warn!( + "Restored configuration had a different default TTL value ({}), comapared to the new value ({}). Previous default TTL value is effectively {} seconds, while the new one is {} seconds.", + old_ttl.ttl, + ttl.ttl, + (old_ttl.ttl.get() as u64) * config.scan_interval.get(), + config.ttl + ); + } return Ok(Self { filter, config, From 352a1e6de99cc3b30fec0a373fac8fe8f998bd0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Sat, 4 Jul 2026 18:03:30 +0200 Subject: [PATCH 56/58] Ignore clippy warning for enrichment_tables large enum --- src/enrichment_tables/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/enrichment_tables/mod.rs b/src/enrichment_tables/mod.rs index fb693b180308b..dc95372da87f3 100644 --- a/src/enrichment_tables/mod.rs +++ b/src/enrichment_tables/mod.rs @@ -35,6 +35,7 @@ pub mod mmdb; /// condition. We don't recommend using a condition that uses only date range searches. /// /// +#[allow(clippy::large_enum_variant)] #[configurable_component] #[derive(Clone, Debug)] #[serde(tag = "type", rename_all = "snake_case")] From ba2a4f101afdb59aa8c4bc7d55f1787dfecb9e4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Sat, 4 Jul 2026 18:38:07 +0200 Subject: [PATCH 57/58] Check the parent is dir also --- src/enrichment_tables/memory/cuckoo_table.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index ceadcc0f9d020..230ddcbbfed19 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -227,10 +227,10 @@ impl CuckooMemoryTable { std::io::ErrorKind::NotFound => { if let Some(parent) = path.parent() && parent != "" - && fs::metadata(parent).is_err() + && !fs::metadata(parent).is_ok_and(|m| m.is_dir()) { return Err(format!( - "Cuckoo filter persistence path ({}) doesn't exist. This will prevent exporting the cuckoo filter state. Fix the `persistence_path` to ensure export works.", + "Cuckoo filter persistence path directory ({}) doesn't exist. This will prevent exporting the cuckoo filter state. Fix the `persistence_path` to ensure export works.", parent.to_str().unwrap_or(""), ) .into()); From 2b8b20d976feb2d86510a9006432a33f75d4007a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Sat, 4 Jul 2026 18:38:17 +0200 Subject: [PATCH 58/58] Bump cuckoo-clock to ensure next power of two calculation is checked --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 472e863373ef1..ffca90d10429e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,9 +3171,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f52be29a2643acdad9588c64ed9f160643a994160a41a4a4e42e65cd097f812e" +checksum = "2f33e79e22c02e0ec434949d37e3a42a3f909790f3b66bb5b4710c5f1b29d182" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 8d997de351f61..8f48abc2d9e38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,7 +166,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.2.6" , default-features = false } +cuckoo-clock = { version = "0.2.7" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false }