Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
69e822f
feat: Implement CF.RESERVE command with bucket-based storage
LiuQhahah Jan 21, 2026
4a4d246
feat: Implement CF.RESERVE command with bucket-based storage
LiuQhahah Jan 22, 2026
6a04f4e
feat: Implement CF.RESERVE command with bucket-based storage
LiuQhahah Jan 23, 2026
f5f9399
feat: Implement CF.RESERVE command with bucket-based storage
LiuQhahah Jan 23, 2026
7fffcb6
feat: Implement CF.ADD command with kick-out insertion
LiuQhahah Feb 2, 2026
c247329
trigger CI checks
LiuQhahah Feb 4, 2026
8c8c7ee
style: Apply clang-format to cuckoo filter files
LiuQhahah Feb 4, 2026
5311d33
fix: Correct WriteBatchLogData constructor calls in CuckooChain
LiuQhahah Feb 6, 2026
652f87a
fix: ci
nagisa-kunhah May 3, 2026
56bf1cf
fix: ci
nagisa-kunhah May 4, 2026
66b6494
fix: loses atomicity
nagisa-kunhah May 6, 2026
e37ee1b
fix: create new filter in CuckooChain::Add when key is not found
nagisa-kunhah May 7, 2026
be1005d
optimize test
nagisa-kunhah May 7, 2026
8704910
fix: lint
nagisa-kunhah May 8, 2026
50dd08c
feat: CuckooPageSet
nagisa-kunhah May 10, 2026
81f655d
fix: lint
nagisa-kunhah May 11, 2026
8db4192
optimize code
nagisa-kunhah May 11, 2026
d376a1a
fix: ci
nagisa-kunhah May 12, 2026
c609c8f
fix: ci
nagisa-kunhah May 12, 2026
e4aedd2
fix: part1
nagisa-kunhah May 19, 2026
32caff1
rename: CuckooFilter->CuckooFilterHelper
nagisa-kunhah May 19, 2026
ef326ea
feat: CuckooSubFilter
nagisa-kunhah May 19, 2026
e1c9745
remove: comment
nagisa-kunhah May 19, 2026
8d5c1a7
fix: MBbloomCF
nagisa-kunhah May 19, 2026
3f394dd
fix: CuckooPageSet->CuckooPageCache
nagisa-kunhah May 19, 2026
b9f5ac8
fix: ci
nagisa-kunhah May 20, 2026
68a442d
optimize CuckooPageCache
nagisa-kunhah May 22, 2026
ef3b5d6
fix:
nagisa-kunhah May 25, 2026
c96858f
optimize code
nagisa-kunhah May 26, 2026
d651623
fix: split add into three phase
nagisa-kunhah May 26, 2026
81e9017
fix: sematic for the TryKickOutInsert
nagisa-kunhah May 27, 2026
70ac33a
rename: Discard->DiscardCachedPages
nagisa-kunhah May 29, 2026
a168d2d
restore: scan test
nagisa-kunhah May 29, 2026
a2a0d84
add: cf scan test
nagisa-kunhah May 29, 2026
81a677a
fix: invoke error when migrate, align behavior with redisbloom
nagisa-kunhah Jun 5, 2026
c245527
fix: ci
nagisa-kunhah Jun 5, 2026
cb3c95f
fix: migrate
nagisa-kunhah Jun 22, 2026
d070010
revert blank
nagisa-kunhah Jun 22, 2026
03c7094
remove unrelated test
nagisa-kunhah Jun 22, 2026
7e86758
Merge branch 'unstable' into feature/3351-cf-add-ci-fix
jihuayu Jun 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions src/commands/cmd_cuckoo_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "command_parser.h"
#include "commander.h"
#include "error_constants.h"
#include "server/server.h"
#include "types/redis_cuckoo_chain.h"

namespace redis {

class CommandCFReserve : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// CF.RESERVE key capacity [BUCKETSIZE bs] [MAXITERATIONS mi] [EXPANSION ex]
if (args.size() < 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

// Parse capacity (required)
auto parse_capacity = ParseInt<uint64_t>(args[2], 10);
if (!parse_capacity) {
return {Status::RedisParseErr, "invalid capacity"};
}
capacity_ = *parse_capacity;
if (capacity_ <= 0) {
return {Status::RedisParseErr, "capacity must be larger than 0"};
}

// Parse optional parameters
CommandParser parser(args, 3);
while (parser.Good()) {
if (parser.EatEqICase("BUCKETSIZE")) {
auto parse_bucket_size = parser.TakeInt<uint8_t>();
if (!parse_bucket_size.IsOK()) {
return {Status::RedisParseErr, "invalid bucket size"};
}
bucket_size_ = parse_bucket_size.GetValue();
if (bucket_size_ == 0 || bucket_size_ > 255) {
return {Status::RedisParseErr, "bucket size must be between 1 and 255"};
}
} else if (parser.EatEqICase("MAXITERATIONS")) {
auto parse_max_iterations = parser.TakeInt<uint16_t>();
if (!parse_max_iterations.IsOK()) {
return {Status::RedisParseErr, "invalid max iterations"};
}
max_iterations_ = parse_max_iterations.GetValue();
if (max_iterations_ == 0) {
return {Status::RedisParseErr, "max iterations must be larger than 0"};
}
} else if (parser.EatEqICase("EXPANSION")) {
auto parse_expansion = parser.TakeInt<uint16_t>();
if (!parse_expansion.IsOK()) {
return {Status::RedisParseErr, "invalid expansion factor"};
}
expansion_ = parse_expansion.GetValue();
if (expansion_ > kCFMaxExpansion) {
return {Status::RedisParseErr, "expansion must be between 0 and 32768"};
}
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}
}

return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace());
auto s = cuckoo_db.Reserve(ctx, args_[1], capacity_, bucket_size_, max_iterations_, expansion_,
kCuckooFilterDefaultPageSize);

if (!s.ok()) {
if (s.IsInvalidArgument()) {
// Return error message to client
return {Status::RedisExecErr, s.ToString()};
}
return {Status::RedisExecErr, "failed to create cuckoo filter"};
}

*output = redis::SimpleString("OK");
return Status::OK();
}

private:
uint64_t capacity_ = kCFDefaultCapacity;
uint8_t bucket_size_ = kCFDefaultBucketSize;
uint16_t max_iterations_ = kCFDefaultMaxIterations;
uint16_t expansion_ = kCFDefaultExpansion;
};

class CommandCFAdd : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// CF.ADD key item
if (args.size() != 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace());
bool added = false;
auto s = cuckoo_db.Add(ctx, args_[1], args_[2], &added);

if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

// Duplicate items are allowed, so successful insertions return 1.
*output = redis::Integer(added ? 1 : 0);
return Status::OK();
}
};

// Register the CF.RESERVE and CF.ADD commands
REDIS_REGISTER_COMMANDS(CuckooFilter, MakeCmdAttr<CommandCFReserve>("cf.reserve", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandCFAdd>("cf.add", 3, "write", 1, 1, 1))

} // namespace redis
1 change: 1 addition & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ enum class CommandCategory : uint8_t {
Bit,
BloomFilter,
Cluster,
CuckooFilter,
Function,
Geo,
Hash,
Expand Down
9 changes: 9 additions & 0 deletions src/stats/disk_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ rocksdb::Status Disk::GetKeySize(engine::Context &ctx, const Slice &user_key, Re
return GetZsetSize(ctx, ns_key, key_size);
case RedisType::kRedisStream:
return GetStreamSize(ctx, ns_key, key_size);
case RedisType::kRedisCuckooFilter:
return GetCuckooFilterSize(ctx, ns_key, key_size);
default:
return rocksdb::Status::NotFound("Not found ", user_key);
}
Expand Down Expand Up @@ -135,4 +137,11 @@ rocksdb::Status Disk::GetStreamSize(engine::Context &ctx, const Slice &ns_key, u
return GetApproximateSizes(metadata, ns_key, storage_->GetCFHandle(ColumnFamilyID::Stream), key_size);
}

rocksdb::Status Disk::GetCuckooFilterSize(engine::Context &ctx, const Slice &ns_key, uint64_t *key_size) {
CuckooChainMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(ctx, {kRedisCuckooFilter}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
return GetApproximateSizes(metadata, ns_key, storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size);
}

} // namespace redis
1 change: 1 addition & 0 deletions src/stats/disk_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Disk : public Database {
rocksdb::Status GetBitmapSize(engine::Context &ctx, const Slice &ns_key, uint64_t *key_size);
rocksdb::Status GetSortedintSize(engine::Context &ctx, const Slice &ns_key, uint64_t *key_size);
rocksdb::Status GetStreamSize(engine::Context &ctx, const Slice &ns_key, uint64_t *key_size);
rocksdb::Status GetCuckooFilterSize(engine::Context &ctx, const Slice &ns_key, uint64_t *key_size);
rocksdb::Status GetKeySize(engine::Context &ctx, const Slice &user_key, RedisType type, uint64_t *key_size);

private:
Expand Down
49 changes: 48 additions & 1 deletion src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ bool Metadata::IsSingleKVType() const { return Type() == kRedisString || Type()

bool Metadata::IsEmptyableType() const {
return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter || Type() == kRedisHyperLogLog ||
Type() == kRedisTDigest || Type() == kRedisTimeSeries;
Type() == kRedisTDigest || Type() == kRedisTimeSeries || Type() == kRedisCuckooFilter;
}

bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); }
Expand Down Expand Up @@ -644,3 +644,50 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {

return rocksdb::Status::OK();
}

void CuckooChainMetadata::Encode(std::string *dst) const {
Metadata::Encode(dst);

PutFixed16(dst, n_filters);
PutFixed16(dst, expansion);
PutFixed64(dst, base_capacity);
PutFixed8(dst, bucket_size);
PutFixed16(dst, max_iterations);
PutFixed64(dst, num_deleted_items);
PutFixed32(dst, page_size);
}

rocksdb::Status CuckooChainMetadata::Decode(Slice *input) {
if (auto s = Metadata::Decode(input); !s.ok()) {
return s;
}

if (input->size() < sizeof(uint16_t) * 3 + sizeof(uint64_t) * 2 + sizeof(uint32_t) + sizeof(uint8_t)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

GetFixed16(input, &n_filters);
GetFixed16(input, &expansion);
GetFixed64(input, &base_capacity);
GetFixed8(input, &bucket_size);
GetFixed16(input, &max_iterations);
GetFixed64(input, &num_deleted_items);
GetFixed32(input, &page_size);

return rocksdb::Status::OK();
}

uint64_t CuckooChainMetadata::GetTotalCapacity() const {
if (expansion == 0 || n_filters == 1) {
return base_capacity;
}

// Calculate total capacity across all filters
uint64_t total = 0;
uint64_t filter_capacity = base_capacity;
for (uint16_t i = 0; i < n_filters; i++) {
total += filter_capacity;
filter_capacity *= expansion;
}
return total;
Comment thread
jihuayu marked this conversation as resolved.
}
49 changes: 47 additions & 2 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ enum RedisType : uint8_t {
kRedisHyperLogLog = 11,
kRedisTDigest = 12,
kRedisTimeSeries = 13,
kRedisCuckooFilter = 14,
kRedisTypeMax
};

inline constexpr const std::array<std::string_view, kRedisTypeMax> RedisTypeNames = {
"none", "string", "hash", "list", "set", "zset", "bitmap",
"sortedint", "stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries"};
"none", "string", "hash", "list", "set", "zset", "bitmap", "sortedint",
"stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries", "MBbloomCF"};

struct RedisTypes {
RedisTypes(std::initializer_list<RedisType> list) {
Expand Down Expand Up @@ -334,6 +335,50 @@ class BloomChainMetadata : public Metadata {
bool IsScaling() const { return expansion != 0; };
};

constexpr uint32_t kCuckooFilterDefaultPageSize = 2048; // bytes

class CuckooChainMetadata : public Metadata {
public:
/// The number of sub-filters in the chain
uint16_t n_filters;

/// Expansion factor for new filters
/// When a filter is full, a new one is created with capacity = base_capacity * expansion^n
uint16_t expansion;

/// The capacity of the first filter.
uint64_t base_capacity;
Comment thread
jihuayu marked this conversation as resolved.

/// Number of fingerprints per bucket
uint8_t bucket_size;

/// Maximum number of cuckoo kicks before considering filter full
uint16_t max_iterations;

/// Track number of deleted items for maintenance
uint64_t num_deleted_items;

/// Target maximum payload size for each persisted Cuckoo Filter page, in bytes
uint32_t page_size;

explicit CuckooChainMetadata(bool generate_version = true)
: Metadata(kRedisCuckooFilter, generate_version),
n_filters(0),
expansion(0),
base_capacity(0),
bucket_size(0),
max_iterations(0),
num_deleted_items(0),
page_size(kCuckooFilterDefaultPageSize) {}

void Encode(std::string *dst) const override;
using Metadata::Decode;
rocksdb::Status Decode(Slice *input) override;

uint64_t GetTotalCapacity() const;
bool IsScaling() const { return expansion > 0; }
};

enum class JsonStorageFormat : uint8_t {
JSON = 0,
CBOR = 1,
Expand Down
Loading
Loading