Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1e98a87
feat: Implement CF.RESERVE command with bucket-based storage
LiuQhahah Jan 21, 2026
577ee71
feat: Implement CF.RESERVE command with bucket-based storage
LiuQhahah Jan 22, 2026
77b4c4a
feat: Implement CF.RESERVE command with bucket-based storage
LiuQhahah Jan 23, 2026
025ee86
feat: Implement CF.RESERVE command with bucket-based storage
LiuQhahah Jan 23, 2026
6a018df
Merge branch 'unstable' into feature/3122-cf-reserve-bucket-storage
LiuQhahah Jan 26, 2026
1fe2c6f
feat: Implement CF.ADD command with kick-out insertion
LiuQhahah Feb 2, 2026
f50125c
trigger CI checks
LiuQhahah Feb 4, 2026
65664fe
style: Apply clang-format to cuckoo filter files
LiuQhahah Feb 4, 2026
73ba1d4
fix: Correct WriteBatchLogData constructor calls in CuckooChain
LiuQhahah Feb 6, 2026
7401065
Merge branch 'unstable' into feature/3351-cf-add
LiuQhahah Feb 26, 2026
9ddfc83
fix: ci
nagisa-kunhah May 3, 2026
a361dd9
Merge branch 'unstable' into feature/3351-cf-add-ci-fix
nagisa-kunhah May 4, 2026
543e459
Merge branch 'unstable' into feature/3351-cf-add-ci-fix
jihuayu May 6, 2026
4ef241f
Merge branch 'unstable' into feature/3351-cf-add-ci-fix
nagisa-kunhah May 6, 2026
c89f2de
fix: ci
nagisa-kunhah May 4, 2026
00abd42
fix: loses atomicity
nagisa-kunhah May 6, 2026
67f4c0d
fix: create new filter in CuckooChain::Add when key is not found
nagisa-kunhah May 7, 2026
23beb86
optimize test
nagisa-kunhah May 7, 2026
1ade399
fix: lint
nagisa-kunhah May 8, 2026
da03915
feat: CuckooPageSet
nagisa-kunhah May 10, 2026
adf0ff0
fix: lint
nagisa-kunhah May 11, 2026
cb91f43
optimize code
nagisa-kunhah May 11, 2026
3043383
feat: Implement CF.EXISTS command
LiuQhahah Feb 2, 2026
e0116d2
feat: MExists
nagisa-kunhah Apr 14, 2026
33d36bb
fate: CommandCFMExists
nagisa-kunhah Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions src/commands/cmd_cuckoo_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "command_parser.h"
#include "commander.h"
#include "error_constants.h"
#include "server/server.h"
#include "types/redis_cuckoo_chain.h"

namespace redis {

class CommandCFReserve : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// CF.RESERVE key capacity [BUCKETSIZE bs] [MAXITERATIONS mi] [EXPANSION ex]
if (args.size() < 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

// Parse capacity (required)
auto parse_capacity = ParseInt<uint64_t>(args[2], 10);
if (!parse_capacity) {
return {Status::RedisParseErr, "invalid capacity"};
}
capacity_ = *parse_capacity;
if (capacity_ <= 0) {
return {Status::RedisParseErr, "capacity must be larger than 0"};
}

// Parse optional parameters
CommandParser parser(args, 3);
while (parser.Good()) {
if (parser.EatEqICase("BUCKETSIZE")) {
auto parse_bucket_size = parser.TakeInt<uint8_t>();
if (!parse_bucket_size.IsOK()) {
return {Status::RedisParseErr, "invalid bucket size"};
}
bucket_size_ = parse_bucket_size.GetValue();
if (bucket_size_ == 0 || bucket_size_ > 255) {
return {Status::RedisParseErr, "bucket size must be between 1 and 255"};
}
} else if (parser.EatEqICase("MAXITERATIONS")) {
auto parse_max_iterations = parser.TakeInt<uint16_t>();
if (!parse_max_iterations.IsOK()) {
return {Status::RedisParseErr, "invalid max iterations"};
}
max_iterations_ = parse_max_iterations.GetValue();
if (max_iterations_ == 0) {
return {Status::RedisParseErr, "max iterations must be larger than 0"};
}
} else if (parser.EatEqICase("EXPANSION")) {
auto parse_expansion = parser.TakeInt<uint16_t>();
if (!parse_expansion.IsOK()) {
return {Status::RedisParseErr, "invalid expansion factor"};
}
expansion_ = parse_expansion.GetValue();
if (expansion_ > kCFMaxExpansion) {
return {Status::RedisParseErr, "expansion must be between 0 and 32768"};
}
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}
}

return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace());
auto s = cuckoo_db.Reserve(ctx, args_[1], capacity_, bucket_size_, max_iterations_, expansion_,
kCuckooFilterDefaultPageSize);

if (!s.ok()) {
if (s.IsInvalidArgument()) {
// Return error message to client
return {Status::RedisExecErr, s.ToString()};
}
return {Status::RedisExecErr, "failed to create cuckoo filter"};
}

*output = redis::SimpleString("OK");
return Status::OK();
}

private:
uint64_t capacity_ = kCFDefaultCapacity;
uint8_t bucket_size_ = kCFDefaultBucketSize;
uint16_t max_iterations_ = kCFDefaultMaxIterations;
uint16_t expansion_ = kCFDefaultExpansion;
};

class CommandCFAdd : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// CF.ADD key item
if (args.size() != 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace());
bool added = false;
auto s = cuckoo_db.Add(ctx, args_[1], args_[2], &added);

if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

// Duplicate items are allowed, so successful insertions return 1.
*output = redis::Integer(added ? 1 : 0);
return Status::OK();
}
};

class CommandCFExists : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// CF.EXISTS key item
if (args.size() != 3) {
return {Status::RedisParseErr, "wrong number of arguments"};
}
return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace());
bool exists = false;
auto s = cuckoo_db.Exists(ctx, args_[1], args_[2], &exists);

if (!s.ok()) {
if (s.IsNotFound()) {
// Return 0 if key doesn't exist, not an error
*output = redis::Integer(0);
return Status::OK();
}
return {Status::RedisExecErr, "failed to check item existence in cuckoo filter"};
}

// Return 1 if exists (might exist), 0 if doesn't exist (definitely not)
*output = redis::Integer(exists ? 1 : 0);
return Status::OK();
}
};

class CommandCFMExists : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// CF.MEXISTS key item [item ...]
if (args.size() < 3) {
return {Status::RedisParseErr, "wrong number of arguments"};
}
return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace());
std::vector<std::string> items(args_.begin() + 2, args_.end());
std::vector<bool> exists;
auto s = cuckoo_db.MExists(ctx, args_[1], items, &exists);

if (!s.ok()) {
if (s.IsNotFound()) {
exists.assign(items.size(), false);
} else {
return {Status::RedisExecErr, "failed to check items existence in cuckoo filter"};
}
}

*output = redis::MultiLen(exists.size());
for (bool exist : exists) {
output->append(redis::Integer(exist ? 1 : 0));
}
return Status::OK();
}
};

// Register the CF.RESERVE, CF.ADD, CF.EXISTS, and CF.MEXISTS commands
REDIS_REGISTER_COMMANDS(CuckooFilter, MakeCmdAttr<CommandCFReserve>("cf.reserve", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandCFAdd>("cf.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandCFExists>("cf.exists", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandCFMExists>("cf.mexists", -3, "read-only", 1, 1, 1))

} // namespace redis
1 change: 1 addition & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ enum class CommandCategory : uint8_t {
Bit,
BloomFilter,
Cluster,
CuckooFilter,
Function,
Geo,
Hash,
Expand Down
49 changes: 48 additions & 1 deletion src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ bool Metadata::IsSingleKVType() const { return Type() == kRedisString || Type()

bool Metadata::IsEmptyableType() const {
return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter || Type() == kRedisHyperLogLog ||
Type() == kRedisTDigest || Type() == kRedisTimeSeries;
Type() == kRedisTDigest || Type() == kRedisTimeSeries || Type() == kRedisCuckooFilter;
}

bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); }
Expand Down Expand Up @@ -644,3 +644,50 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {

return rocksdb::Status::OK();
}

void CuckooChainMetadata::Encode(std::string *dst) const {
Metadata::Encode(dst);

PutFixed16(dst, n_filters);
PutFixed16(dst, expansion);
PutFixed64(dst, base_capacity);
PutFixed8(dst, bucket_size);
PutFixed16(dst, max_iterations);
PutFixed64(dst, num_deleted_items);
PutFixed32(dst, page_size);
}

rocksdb::Status CuckooChainMetadata::Decode(Slice *input) {
if (auto s = Metadata::Decode(input); !s.ok()) {
return s;
}

if (input->size() < sizeof(uint16_t) * 3 + sizeof(uint64_t) * 2 + sizeof(uint32_t) + sizeof(uint8_t)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

GetFixed16(input, &n_filters);
GetFixed16(input, &expansion);
GetFixed64(input, &base_capacity);
GetFixed8(input, &bucket_size);
GetFixed16(input, &max_iterations);
GetFixed64(input, &num_deleted_items);
GetFixed32(input, &page_size);

return rocksdb::Status::OK();
}

uint64_t CuckooChainMetadata::GetTotalCapacity() const {
if (expansion == 0 || n_filters == 1) {
return base_capacity;
}

// Calculate total capacity across all filters
uint64_t total = 0;
uint64_t filter_capacity = base_capacity;
for (uint16_t i = 0; i < n_filters; i++) {
total += filter_capacity;
filter_capacity *= expansion;
}
return total;
}
49 changes: 47 additions & 2 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ enum RedisType : uint8_t {
kRedisHyperLogLog = 11,
kRedisTDigest = 12,
kRedisTimeSeries = 13,
kRedisCuckooFilter = 14,
kRedisTypeMax
};

inline constexpr const std::array<std::string_view, kRedisTypeMax> RedisTypeNames = {
"none", "string", "hash", "list", "set", "zset", "bitmap",
"sortedint", "stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries"};
"none", "string", "hash", "list", "set", "zset", "bitmap", "sortedint",
"stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries", "cuckoofilter"};

struct RedisTypes {
RedisTypes(std::initializer_list<RedisType> list) {
Expand Down Expand Up @@ -334,6 +335,50 @@ class BloomChainMetadata : public Metadata {
bool IsScaling() const { return expansion != 0; };
};

constexpr uint32_t kCuckooFilterDefaultPageSize = 2048;

class CuckooChainMetadata : public Metadata {
public:
/// The number of sub-filters in the chain
uint16_t n_filters;

/// Expansion factor for new filters
/// When a filter is full, a new one is created with capacity = base_capacity * expansion^n
uint16_t expansion;

/// The capacity of the first filter
uint64_t base_capacity;

/// Number of fingerprints per bucket
uint8_t bucket_size;

/// Maximum number of cuckoo kicks before considering filter full
uint16_t max_iterations;

/// Track number of deleted items for maintenance
uint64_t num_deleted_items;

/// Target maximum payload size for each persisted Cuckoo Filter page
uint32_t page_size;

explicit CuckooChainMetadata(bool generate_version = true)
: Metadata(kRedisCuckooFilter, generate_version),
n_filters(0),
expansion(0),
base_capacity(0),
bucket_size(0),
max_iterations(0),
num_deleted_items(0),
page_size(kCuckooFilterDefaultPageSize) {}

void Encode(std::string *dst) const override;
using Metadata::Decode;
rocksdb::Status Decode(Slice *input) override;

uint64_t GetTotalCapacity() const;
bool IsScaling() const { return expansion > 0; }
};

enum class JsonStorageFormat : uint8_t {
JSON = 0,
CBOR = 1,
Expand Down
Loading