Skip to content
15 changes: 15 additions & 0 deletions src/commands/cmd_timeseries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ constexpr const char *errBadRetention = "Couldn't parse RETENTION";
constexpr const char *errBadChunkSize = "invalid CHUNK_SIZE";
constexpr const char *errBadEncoding = "unknown ENCODING parameter";
constexpr const char *errDuplicatePolicy = "Unknown DUPLICATE_POLICY";
constexpr const char *errBadIgnore = "Couldn't parse IGNORE";
constexpr const char *errInvalidTimestamp = "invalid timestamp";
constexpr const char *errInvalidValue = "invalid value";
constexpr const char *errOldTimestamp = "Timestamp is older than retention";
Expand Down Expand Up @@ -252,6 +253,9 @@ class CommandTSCreateBase : public KeywordCommandBase {
registerHandler("DUPLICATE_POLICY", [this](TSOptionsParser &parser) {
return handleDuplicatePolicy(parser, create_option_.duplicate_policy);
});
registerHandler("IGNORE", [this](TSOptionsParser &parser) {
return handleIgnore(parser, create_option_.ignore_max_time_diff, create_option_.ignore_max_val_diff);
});
registerHandler("LABELS", [this](TSOptionsParser &parser) { return handleLabels(parser, create_option_.labels); });
}

Expand Down Expand Up @@ -315,6 +319,17 @@ class CommandTSCreateBase : public KeywordCommandBase {
return Status::OK();
}

static Status handleIgnore(TSOptionsParser &parser, uint64_t &ignore_max_time_diff, double &ignore_max_val_diff) {
auto parse_time_diff = parser.TakeInt<uint64_t>();
auto parse_val_diff = parser.TakeFloat<double>();
if (!parse_time_diff.IsOK() || !parse_val_diff.IsOK() || parse_val_diff.GetValue() < 0) {
return {Status::RedisParseErr, errBadIgnore};
}
ignore_max_time_diff = parse_time_diff.GetValue();
ignore_max_val_diff = parse_val_diff.GetValue();
return Status::OK();
}

TSCreateOption create_option_;
};

Expand Down
10 changes: 9 additions & 1 deletion src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -623,13 +623,15 @@ void TimeSeriesMetadata::Encode(std::string *dst) const {
PutFixed8(dst, static_cast<uint8_t>(duplicate_policy));
PutSizedString(dst, source_key);
PutFixed64(dst, last_timestamp);
PutFixed64(dst, ignore_max_time_diff);
PutDouble(dst, ignore_max_val_diff);
}

rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {
if (auto s = Metadata::Decode(input); !s.ok()) {
return s;
}
if (input->size() < sizeof(uint64_t) * 2 + sizeof(uint8_t) * 2 + sizeof(uint32_t)) {
if (input->size() < sizeof(uint64_t) * 3 + sizeof(uint8_t) * 2 + sizeof(uint32_t)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

Expand All @@ -641,6 +643,12 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {
GetSizedString(input, &source_key_slice);
source_key = source_key_slice.ToString();
GetFixed64(input, &last_timestamp);
ignore_max_time_diff = 0;
ignore_max_val_diff = 0.0;
if (input->size() >= sizeof(uint64_t) + sizeof(double)) {
GetFixed64(input, &ignore_max_time_diff);
GetDouble(input, &ignore_max_val_diff);
}

return rocksdb::Status::OK();
}
10 changes: 8 additions & 2 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ class TimeSeriesMetadata : public Metadata {
uint64_t chunk_size;
ChunkType chunk_type;
DuplicatePolicy duplicate_policy;
uint64_t ignore_max_time_diff;
double ignore_max_val_diff;
std::string source_key;
uint64_t last_timestamp = 0; // Approximate last timestamp, used for compaction filter

Expand All @@ -423,14 +425,18 @@ class TimeSeriesMetadata : public Metadata {
retention_time(0),
chunk_size(0),
chunk_type(ChunkType::UNCOMPRESSED),
duplicate_policy(DuplicatePolicy::BLOCK) {}
duplicate_policy(DuplicatePolicy::BLOCK),
ignore_max_time_diff(0),
ignore_max_val_diff(0.0) {}
TimeSeriesMetadata(uint64_t retention_time, uint64_t chunk_size, ChunkType chunk_type,
DuplicatePolicy duplicate_policy, bool generate_version = true)
: Metadata(kRedisTimeSeries, generate_version),
retention_time(retention_time),
chunk_size(chunk_size),
chunk_type(chunk_type),
duplicate_policy(duplicate_policy) {}
duplicate_policy(duplicate_policy),
ignore_max_time_diff(0),
ignore_max_val_diff(0.0) {}

void SetSourceKey(Slice key);

Expand Down
49 changes: 48 additions & 1 deletion src/types/redis_timeseries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "redis_timeseries.h"

#include <cmath>
#include <queue>

#include "commands/error_constants.h"
Expand Down Expand Up @@ -515,7 +516,9 @@ TSCreateOption::TSCreateOption()
: retention_time(kDefaultRetentionTime),
chunk_size(kDefaultChunkSize),
chunk_type(kDefaultChunkType),
duplicate_policy(kDefaultDuplicatePolicy) {}
duplicate_policy(kDefaultDuplicatePolicy),
ignore_max_time_diff(0),
ignore_max_val_diff(0.0) {}

Status TSMQueryFilterParser::Parse(std::string_view expr) {
if (expr.empty()) return Status::OK();
Expand Down Expand Up @@ -678,6 +681,8 @@ TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option) {
metadata.chunk_size = option.chunk_size;
metadata.chunk_type = option.chunk_type;
metadata.duplicate_policy = option.duplicate_policy;
metadata.ignore_max_time_diff = option.ignore_max_time_diff;
metadata.ignore_max_val_diff = option.ignore_max_val_diff;
metadata.SetSourceKey(option.source_key);

return metadata;
Expand Down Expand Up @@ -851,6 +856,44 @@ rocksdb::Status TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const Sl
return createTimeSeries(ctx, ns_key, metadata_out, option);
}

rocksdb::Status TimeSeries::filterSamplesByIgnorePolicy(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata, SampleBatch *sample_batch) {
if (!metadata.source_key.empty() || metadata.duplicate_policy != DuplicatePolicy::LAST) {
return rocksdb::Status::OK();
}

std::vector<TSSample> latest_samples;
auto s = getCommon(ctx, ns_key, metadata, true, &latest_samples);
if (!s.ok() || latest_samples.empty()) {
return s;
}

auto latest_sample = latest_samples.back();
auto all_samples = sample_batch->AsSlice();
auto samples = all_samples.GetSampleSpan();
auto add_results = all_samples.GetAddResultSpan();

for (size_t i = 0; i < samples.size(); i++) {
if (add_results[i].type != TSChunk::AddResultType::kNone) {
continue;
}

const auto &sample = samples[i];
if (sample.ts >= latest_sample.ts && sample.ts - latest_sample.ts <= metadata.ignore_max_time_diff &&
std::abs(sample.v - latest_sample.v) <= metadata.ignore_max_val_diff) {
add_results[i].type = TSChunk::AddResultType::kSkip;
add_results[i].sample.ts = latest_sample.ts;
continue;
}

if (sample.ts >= latest_sample.ts) {
latest_sample = sample;
}
}

return rocksdb::Status::OK();
}

rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
SampleBatch &sample_batch, DownstreamUpsertArgs *ds_args) {
auto batch = storage_->GetWriteBatchBase();
Expand Down Expand Up @@ -1930,6 +1973,8 @@ rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key, TSS
rocksdb::Status s = getOrCreateTimeSeries(ctx, ns_key, &metadata, &option);
if (!s.ok()) return s;
auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy : metadata.duplicate_policy);
s = filterSamplesByIgnorePolicy(ctx, ns_key, metadata, &sample_batch);
if (!s.ok()) return s;

DownstreamUpsertArgs ds_args;
s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
Expand All @@ -1950,6 +1995,8 @@ rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, const Slice &user_key, st
return s;
}
auto sample_batch = SampleBatch(std::move(samples), metadata.duplicate_policy);
s = filterSamplesByIgnorePolicy(ctx, ns_key, metadata, &sample_batch);
if (!s.ok()) return s;
DownstreamUpsertArgs ds_args;
s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
if (!s.ok()) return s;
Expand Down
6 changes: 5 additions & 1 deletion src/types/redis_timeseries.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ struct TSCreateOption {
uint64_t chunk_size;
TimeSeriesMetadata::ChunkType chunk_type;
TimeSeriesMetadata::DuplicatePolicy duplicate_policy;
uint64_t ignore_max_time_diff;
double ignore_max_val_diff;
std::string source_key;
LabelKVList labels;

Expand Down Expand Up @@ -257,7 +259,7 @@ enum class TSAlterMode : uint8_t {
RETENTION = 1,
CHUNK_SIZE = 1 << 1,
DUPLICATE_POLICY = 1 << 2,
IGNORE = 1 << 3,
IGNORE = 1 << 3, // TSAlterMode::IGNORE is not used for now
LABELS = 1 << 4,
};

Expand Down Expand Up @@ -317,6 +319,8 @@ class TimeSeries : public SubKeyScanner {
const TSCreateOption *options);
rocksdb::Status getOrCreateTimeSeries(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata *metadata_out,
const TSCreateOption *option = nullptr);
rocksdb::Status filterSamplesByIgnorePolicy(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata, SampleBatch *sample_batch);
rocksdb::Status getLabelKVList(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
LabelKVList *labels);
rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
Expand Down
4 changes: 3 additions & 1 deletion src/types/timeseries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ std::vector<TSChunk::AddResult> TSChunk::SampleBatch::GetFinalResults() const {
res.resize(add_results_.size());
for (size_t idx = 0; idx < add_results_.size(); idx++) {
res[indexes_[idx]] = add_results_[idx];
res[indexes_[idx]].sample.ts = samples_[idx].ts;
if (res[indexes_[idx]].type != AddResultType::kSkip || res[indexes_[idx]].sample.ts == 0) {
res[indexes_[idx]].sample.ts = samples_[idx].ts;
}
}
return res;
}
Expand Down
25 changes: 25 additions & 0 deletions tests/cppunit/metadata_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,28 @@ TEST(Metadata, Metadata64bitSize) {
EXPECT_EQ(md_decoded.Type(), kRedisHash);
EXPECT_EQ(md_decoded.size, big_size);
}

TEST(Metadata, TimeSeriesMetadataDecodingBackwardCompatible) {
TimeSeriesMetadata md_src;
md_src.retention_time = 3600;
md_src.chunk_size = 1024;
md_src.chunk_type = TimeSeriesMetadata::ChunkType::COMPRESSED;
md_src.duplicate_policy = TimeSeriesMetadata::DuplicatePolicy::LAST;
md_src.last_timestamp = 123456789;

std::string encoded_bytes;
md_src.Encode(&encoded_bytes);
encoded_bytes.resize(encoded_bytes.size() - sizeof(uint64_t) - sizeof(double));

TimeSeriesMetadata md_decoded(false);
Slice input(encoded_bytes);
ASSERT_TRUE(md_decoded.Decode(&input).ok());
EXPECT_EQ(md_decoded.retention_time, md_src.retention_time);
EXPECT_EQ(md_decoded.chunk_size, md_src.chunk_size);
EXPECT_EQ(md_decoded.chunk_type, md_src.chunk_type);
EXPECT_EQ(md_decoded.duplicate_policy, md_src.duplicate_policy);
EXPECT_EQ(md_decoded.last_timestamp, md_src.last_timestamp);
EXPECT_EQ(md_decoded.ignore_max_time_diff, 0);
EXPECT_EQ(md_decoded.ignore_max_val_diff, 0.0);
EXPECT_TRUE(md_decoded.source_key.empty());
}
81 changes: 81 additions & 0 deletions tests/gocase/unit/type/timeseries/timeseries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,48 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) {
require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "1000", "13.4").Err(), "update is not supported when DUPLICATE_POLICY is set to BLOCK mode")
})

t.Run("TS.ADD Ignore Option", func(t *testing.T) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test cases are missing a significant number of negative scenarios where parameters fail to take effect. Could you please add them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I have added more test scenarios for the IGNORE option. @jihuayu

ignoreKey := "test_add_ignore_key"
require.NoError(t, rdb.Del(ctx, ignoreKey).Err())
require.NoError(t, rdb.Do(ctx, "ts.create", ignoreKey, "duplicate_policy", "last", "ignore", "5", "2").Err())

// The first sample should be inserted normally.
require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1000", "10").Val())
// The sample falls inside the IGNORE window, so the existing timestamp stays unchanged.
require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1003", "11").Val())

res := rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 1, len(res))
assert.Equal(t, []interface{}{int64(1000), float64(10)}, res[0])

// A sample outside the IGNORE window should be appended.
require.Equal(t, int64(1008), rdb.Do(ctx, "ts.add", ignoreKey, "1008", "20").Val())
res = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 2, len(res))
assert.Equal(t, []interface{}{int64(1008), float64(20)}, res[1])

// IGNORE arguments on an existing series are ignored, so the sample is evaluated using the stored per-key settings.
require.Equal(t, int64(1012), rdb.Do(ctx, "ts.add", ignoreKey, "1012", "25", "ignore", "1000", "1000").Val())
res = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 3, len(res))
assert.Equal(t, []interface{}{int64(1012), float64(25)}, res[2])

// An invalid timestamp must be rejected and the series must remain unchanged.
require.ErrorContains(t, rdb.Do(ctx, "ts.add", ignoreKey, "abc", "5").Err(), "invalid timestamp")
res = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 3, len(res))

// An invalid value must be rejected and the series must remain unchanged.
require.ErrorContains(t, rdb.Do(ctx, "ts.add", ignoreKey, "1010", "notanumber").Err(), "invalid value")
res = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 3, len(res))

// Missing TS.ADD arguments must be rejected and the series must remain unchanged.
require.ErrorContains(t, rdb.Do(ctx, "ts.add", ignoreKey, "1010").Err(), "wrong number of arguments")
res = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 3, len(res))
})

t.Run("TS.ADD With Retention", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention", "1000").Err())
Expand Down Expand Up @@ -231,6 +273,45 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) {
assert.Contains(t, res[1], "update is not supported when DUPLICATE_POLICY is set to BLOCK mode")
})

t.Run("TS.MADD Ignore Option", func(t *testing.T) {
ignoreKey := "test_madd_ignore_key"
require.NoError(t, rdb.Del(ctx, ignoreKey).Err())
require.NoError(t, rdb.Do(ctx, "ts.create", ignoreKey, "duplicate_policy", "last", "ignore", "5", "2").Err())

// The first sample should be inserted normally.
require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1000", "10").Val())
// TS.MADD should keep the first ignored timestamp and only append the sample outside the window.
res := rdb.Do(ctx, "ts.madd", ignoreKey, "1003", "11", ignoreKey, "1004", "13", ignoreKey, "1007", "14").Val().([]interface{})
assert.Equal(t, []interface{}{int64(1000), int64(1004), int64(1004)}, res)

samples := rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 2, len(samples))
assert.Equal(t, []interface{}{int64(1000), float64(10)}, samples[0])
assert.Equal(t, []interface{}{int64(1004), float64(13)}, samples[1])

// One bad timestamp in TS.MADD should reject the batch and leave the series unchanged.
require.ErrorContains(t, rdb.Do(ctx, "ts.madd", ignoreKey, "1005", "11", ignoreKey, "badts", "12").Err(), "invalid timestamp")
samples = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 2, len(samples))

// One bad value in TS.MADD should reject the batch and leave the series unchanged.
require.ErrorContains(t, rdb.Do(ctx, "ts.madd", ignoreKey, "1006", "badvalue", ignoreKey, "1007", "14").Err(), "invalid value")
samples = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 2, len(samples))

// TS.MADD treats the extra key as a normal triplet, so the existing series keeps using its stored IGNORE policy.
res = rdb.Do(ctx, "ts.madd", ignoreKey, "1008", "15", "ignore", "5", "2").Val().([]interface{})
assert.Equal(t, int64(1004), res[0])
assert.Contains(t, res[1], "the key is not a TSDB key")
samples = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 2, len(samples))

// Missing TS.MADD arguments must be rejected and the series must remain unchanged.
require.ErrorContains(t, rdb.Do(ctx, "ts.madd", ignoreKey, "1008").Err(), "wrong number of arguments")
samples = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 2, len(samples))
})

t.Run("TS.MADD Nonexistent Key", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "nonexistent").Err())
require.NoError(t, rdb.Del(ctx, "existent").Err())
Expand Down