Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/cluster/batch_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Status BatchSender::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &k
return {Status::NotOK, fmt::format("failed to put key value to migration batch, {}", s.ToString())};
}

pending_logdata_only_ = false;
pending_entries_++;
entries_num_++;
return Status::OK();
Expand All @@ -48,6 +49,7 @@ Status BatchSender::Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice
if (!s.ok()) {
return {Status::NotOK, fmt::format("failed to delete key from migration batch, {}", s.ToString())};
}
pending_logdata_only_ = false;
pending_entries_++;
entries_num_++;
return Status::OK();
Expand All @@ -58,6 +60,7 @@ Status BatchSender::PutLogData(const rocksdb::Slice &blob) {
if (!s.ok()) {
return {Status::NotOK, fmt::format("failed to put log data to migration batch, {}", s.ToString())};
}
pending_logdata_only_ = true;
pending_entries_++;
entries_num_++;
return Status::OK();
Expand Down Expand Up @@ -89,6 +92,7 @@ Status BatchSender::Send() {
sent_bytes_ += write_batch_.GetDataSize();
sent_batches_num_++;
pending_entries_ = 0;
pending_logdata_only_ = false;
write_batch_.Clear();
return Status::OK();
}
Expand Down
5 changes: 4 additions & 1 deletion src/cluster/batch_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ class BatchSender {
void SetMaxBytes(size_t max_bytes) {
if (max_bytes_ != max_bytes) max_bytes_ = max_bytes;
}
bool IsFull() const { return write_batch_.GetDataSize() >= max_bytes_; }
// A log-data-only batch must wait for the first data record;
// otherwise the receiver would see metadata without matching writes.
bool IsFull() const { return !pending_logdata_only_ && write_batch_.GetDataSize() >= max_bytes_; }
uint64_t GetSentBytes() const { return sent_bytes_; }
uint32_t GetSentBatchesNum() const { return sent_batches_num_; }
uint32_t GetEntriesNum() const { return entries_num_; }
Expand All @@ -62,6 +64,7 @@ class BatchSender {
uint32_t sent_batches_num_ = 0;
uint32_t entries_num_ = 0;
uint32_t pending_entries_ = 0;
bool pending_logdata_only_ = false;

int dst_fd_;
size_t max_bytes_;
Expand Down
130 changes: 130 additions & 0 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <limits>
#include <memory>
#include <stdexcept>
#include <string_view>

#include "command_parser.h"
#include "commander.h"
Expand Down Expand Up @@ -49,6 +50,48 @@ CommandKeyRange ParseStreamReadRange(const std::vector<std::string> &args, uint3
range.last_key = range.first_key + stream_size - 1;
return range;
}

// Redis accepts only canonical positive decimal numids here;
// reject forms like +1 or 01 before integer parsing.
bool IsXAckDelNumIDs(std::string_view input) {
if (input.empty() || input[0] < '1' || input[0] > '9') return false;

return std::all_of(input.begin() + 1, input.end(), [](char c) { return c >= '0' && c <= '9'; });
}

StatusOr<uint64_t> ParseXAckDelStreamEntryIDComponent(std::string_view input, bool allow_negative_zero) {
if (input.empty()) return {Status::RedisParseErr, redis::kErrInvalidEntryIdSpecified};

if (input[0] == '+') {
input.remove_prefix(1);
} else if (input[0] == '-') {
if (!allow_negative_zero) return {Status::RedisParseErr, redis::kErrInvalidEntryIdSpecified};
input.remove_prefix(1);
if (input.empty() || !std::all_of(input.begin(), input.end(), [](char c) { return c == '0'; })) {
return {Status::RedisParseErr, redis::kErrInvalidEntryIdSpecified};
}
return 0;
}

auto parsed = ParseInt<uint64_t>(input, 10);
if (!parsed) return {Status::RedisParseErr, redis::kErrInvalidEntryIdSpecified};
return *parsed;
}

Status ParseXAckDelStreamEntryID(const std::string &input, redis::StreamEntryID *id) {
auto pos = input.find('-');
if (pos != std::string::npos) {
auto ms = GET_OR_RET(ParseXAckDelStreamEntryIDComponent(std::string_view(input).substr(0, pos), false));
auto seq = GET_OR_RET(ParseXAckDelStreamEntryIDComponent(std::string_view(input).substr(pos + 1), true));
id->ms = ms;
id->seq = seq;
} else {
auto ms = GET_OR_RET(ParseXAckDelStreamEntryIDComponent(input, false));
id->ms = ms;
id->seq = 0;
}
return Status::OK();
}
} // namespace

void AddStreamEntriesToResponse(std::string *output, const std::vector<StreamEntry> &entries) {
Expand Down Expand Up @@ -268,6 +311,92 @@ class CommandXDel : public Commander {
std::vector<redis::StreamEntryID> ids_;
};

class CommandXAckDel : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
stream_name_ = GET_OR_RET(parser.TakeStr());
group_name_ = GET_OR_RET(parser.TakeStr());

option_ = redis::StreamDeleteOption::KeepRef;
bool has_option = false;
bool has_ids = false;

while (parser.Good()) {
if (parser.EatEqICase("KEEPREF")) {
if (has_option) return parser.InvalidSyntax();
option_ = redis::StreamDeleteOption::KeepRef;
has_option = true;
} else if (parser.EatEqICase("DELREF")) {
if (has_option) return parser.InvalidSyntax();
option_ = redis::StreamDeleteOption::DelRef;
has_option = true;
} else if (parser.EatEqICase("ACKED")) {
if (has_option) return parser.InvalidSyntax();
option_ = redis::StreamDeleteOption::Acked;
has_option = true;
} else if (parser.EatEqICase("IDS")) {
has_ids = true;

if (!parser.Good() || !IsXAckDelNumIDs(parser.RawPeek())) {
return {Status::RedisParseErr, "Number of IDs must be a positive integer"};
}
auto numids_result = parser.TakeInt<int64_t>();
if (!numids_result.IsOK()) {
return {Status::RedisParseErr, "Number of IDs must be a positive integer"};
}
int64_t numids = numids_result.GetValue();

if (parser.Remains() < static_cast<size_t>(numids)) {
return {Status::RedisParseErr, "The `numids` parameter must match the number of arguments"};
}

std::vector<redis::StreamEntryID> entry_ids;
entry_ids.reserve(static_cast<size_t>(numids));
for (int64_t i = 0; i < numids; i++) {
auto id_str = GET_OR_RET(parser.TakeStr());
redis::StreamEntryID id;
auto s = ParseXAckDelStreamEntryID(id_str, &id);
if (!s.IsOK()) return s;
entry_ids.emplace_back(id);
}
entry_ids_ = std::move(entry_ids);
} else {
return parser.InvalidSyntax();
}
}

if (!has_ids) {
return {Status::RedisParseErr, "syntax error, expected IDS keyword"};
}

return Status::OK();
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
std::vector<int> results;

auto s = stream_db.DeleteEntriesAndAck(ctx, stream_name_, group_name_, entry_ids_, option_, &results);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

output->append(redis::MultiLen(results.size()));
for (int r : results) {
output->append(redis::Integer(r));
}

return Status::OK();
}

private:
std::string stream_name_;
std::string group_name_;
redis::StreamDeleteOption option_ = redis::StreamDeleteOption::KeepRef;
std::vector<redis::StreamEntryID> entry_ids_;
};

class CommandXClaim : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -1907,6 +2036,7 @@ class CommandXSetId : public Commander {
REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr<CommandXAck>("xack", -4, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXAckDel>("xackdel", -6, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandAutoClaim>("xautoclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
Expand Down
87 changes: 86 additions & 1 deletion src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "server/redis_reply.h"
#include "server/server.h"
#include "types/redis_bitmap.h"
#include "types/redis_stream_base.h"

void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
// Currently, we only have two kinds of log data
Expand All @@ -36,6 +37,11 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
}
} else {
// Redis type log data
// Reset state first so malformed log data cannot reuse XACKDEL dedup keys.
log_data_ = redis::WriteBatchLogData();
first_seen_ = true;
seen_xackdel_xack_keys_.clear();
seen_xackdel_xdel_keys_.clear();
if (auto s = log_data_.Decode(blob); !s.IsOK()) {
WARN("Failed to decode Redis type log: {}", s.Msg());
}
Expand Down Expand Up @@ -266,6 +272,21 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
break;
}
} else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Stream)) {
InternalKey ikey(key, is_slot_id_encoded_);
Slice entry_id_check = ikey.GetSubKey();
uint64_t delimiter = 0;
GetFixed64(&entry_id_check, &delimiter);
if (delimiter == UINT64_MAX) {
return rocksdb::Status::OK();
}

user_key = ikey.GetKey().ToString();
auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
ns = ikey.GetNamespace().ToString();

auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value, &command_args);
if (!s.IsOK()) {
ERROR("Failed to parse write_batch in PutCF. Type=Stream: {}", s.Msg());
Expand Down Expand Up @@ -397,8 +418,72 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S
Slice encoded_id = ikey.GetSubKey();
redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);

if (entry_id.ms == UINT64_MAX) {
// XACKDEL physical replay emits XACK for each PEL deletion,
// using the actual group encoded in the PEL key.
auto args = log_data_.GetArguments();
if (!args->empty() && (*args)[0] == "XACKDEL" && args->size() >= 3) {
// Skip malformed internal stream keys instead of emitting partial replay commands.
uint8_t type_delimiter = 0;
if (!GetFixed8(&encoded_id, &type_delimiter)) {
return rocksdb::Status::OK();
}
if (type_delimiter == static_cast<uint8_t>(redis::StreamSubkeyType::StreamPelEntry)) {
uint64_t group_name_len = 0;
if (!GetFixed64(&encoded_id, &group_name_len)) {
return rocksdb::Status::OK();
}
if (group_name_len > encoded_id.size() || encoded_id.size() - group_name_len < 16) {
return rocksdb::Status::OK();
}
std::string group_name = encoded_id.ToString().substr(0, group_name_len);
encoded_id.remove_prefix(group_name_len);

if (!GetFixed64(&encoded_id, &entry_id.ms) || !GetFixed64(&encoded_id, &entry_id.seq)) {
return rocksdb::Status::OK();
}
std::string entry_id_str = entry_id.ToString();

std::string user_key = ikey.GetKey().ToString();
auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
ns = ikey.GetNamespace().ToString();
std::string dedup_key = ns + '\0' + user_key + '\0' + group_name + '\0' + entry_id_str;
if (seen_xackdel_xack_keys_.insert(std::move(dedup_key)).second) {
command_args = {"XACK", user_key, group_name, entry_id_str};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
}
}
return rocksdb::Status::OK();
}

GetFixed64(&encoded_id, &entry_id.seq);
command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()};
std::string entry_id_str = entry_id.ToString();
std::string user_key = ikey.GetKey().ToString();

auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
ns = ikey.GetNamespace().ToString();

auto args = log_data_.GetArguments();
if (!args->empty()) {
if ((*args)[0] == "XACKDEL" && args->size() >= 3) {
std::string dedup_key = ns + '\0' + user_key + '\0' + entry_id_str;
if (seen_xackdel_xdel_keys_.insert(std::move(dedup_key)).second) {
command_args = {"XDEL", user_key, entry_id_str};
}
} else {
command_args = {"XDEL", user_key, entry_id_str};
}
} else {
command_args = {"XDEL", user_key, entry_id_str};
}
}
Comment thread
jihuayu marked this conversation as resolved.

if (!command_args.empty()) {
Expand Down
3 changes: 3 additions & 0 deletions src/storage/batch_extractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <map>
#include <string>
#include <unordered_set>
#include <vector>

#include "cluster/cluster_defs.h"
Expand Down Expand Up @@ -54,4 +55,6 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler {
bool is_slot_id_encoded_ = false;
SlotRange slot_range_;
bool to_redis_;
std::unordered_set<std::string> seen_xackdel_xack_keys_;
std::unordered_set<std::string> seen_xackdel_xdel_keys_;
};
Loading
Loading