-
Notifications
You must be signed in to change notification settings - Fork 634
feat(ts): add the support of TWA aggregator to Range and MRange #3262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Changes from 9 commits
fc7b622
e83a348
279c9bd
9d8e076
9174002
d4b814b
ae2268f
021bae7
7e2e5e9
2e6dfb6
45ff6fa
20fea44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,16 +72,20 @@ | |
| [](const TSSample &a, const TSSample &b) { return a.v < b.v; }); | ||
| return max->v - min->v; | ||
| } | ||
| static inline double Area(nonstd::span<const TSSample> samples) { | ||
| // Intra bucket area is 0 for single element. | ||
| double result = 0; | ||
| for (size_t i = 1; i < samples.size(); i++) { | ||
| auto t_diff = static_cast<double>(samples[i].ts - samples[i - 1].ts); | ||
| // Area of bottom rectangle + Area of above triangle | ||
| result += (t_diff * samples[i - 1].v) + (t_diff * (samples[i].v - samples[i - 1].v) * 0.5); | ||
| } | ||
| return result; | ||
| } | ||
| }; | ||
|
|
||
| std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> samples, const TSRangeOption &option) { | ||
|
Check failure on line 87 in src/types/redis_timeseries.cc
|
||
| const auto &aggregator = option.aggregator; | ||
| std::vector<TSSample> res; | ||
| if (aggregator.type == TSAggregatorType::NONE || samples.empty()) { | ||
| res = std::move(samples); | ||
| return res; | ||
| } | ||
| auto spans = aggregator.SplitSamplesToBuckets(samples); | ||
|
|
||
| auto get_bucket_ts = [&](uint64_t left) -> uint64_t { | ||
| using BucketTimestampType = TSRangeOption::BucketTimestampType; | ||
|
|
@@ -97,7 +101,96 @@ | |
| } | ||
| return 0; | ||
| }; | ||
| // Linear interpolation. | ||
| auto interpolate_sample = [](const TSSample &left_nb, uint64_t ts, const TSSample &right_nb) { | ||
| auto y_diff = right_nb.v - left_nb.v; | ||
| auto x_diff = static_cast<double>(right_nb.ts - left_nb.ts); | ||
| auto x_diff_prime = static_cast<double>(ts - left_nb.ts); | ||
| auto y_diff_prime = (x_diff_prime * y_diff) / x_diff; | ||
| TSSample sample; | ||
| sample.ts = ts; | ||
| sample.v = y_diff_prime + left_nb.v; | ||
| return sample; | ||
| }; | ||
| // Computes the TWA of empty bucket from its neighbor samples. | ||
| auto empty_bucket_twa = [&interpolate_sample](const TSSample &left_nb, uint64_t bucket_left, uint64_t bucket_right, | ||
| const TSSample &right_nb) { | ||
| auto left = interpolate_sample(left_nb, bucket_left, right_nb); | ||
| auto right = interpolate_sample(left_nb, bucket_right, right_nb); | ||
| return Reducer::Area(std::array<TSSample, 2>{left, right}) / static_cast<double>(bucket_right - bucket_left); | ||
| }; | ||
|
|
||
| // Retrieve prev_sample and next_sample from samples when TWA aggregation. | ||
| TSSample prev_sample, next_sample; | ||
|
Check warning on line 124 in src/types/redis_timeseries.cc
|
||
| bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available = false, next_available = false; | ||
|
Check warning on line 125 in src/types/redis_timeseries.cc
|
||
| if (is_twa_aggregator) { | ||
| const bool discard_boundaries = !option.filter_by_ts.empty() || option.filter_by_value.has_value(); | ||
| next_sample = samples.back(); | ||
| samples.pop_back(); | ||
| prev_sample = samples.back(); | ||
| samples.pop_back(); | ||
| // When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary samples. | ||
| prev_available = discard_boundaries ? false : !samples.empty() && (samples.front().ts != prev_sample.ts); | ||
| next_available = discard_boundaries ? false : !samples.empty() && (samples.back().ts != next_sample.ts); | ||
| } | ||
| std::vector<TSSample> res; | ||
| if (is_twa_aggregator && option.is_return_empty && samples.empty()) { | ||
| const bool early_return = prev_sample.ts == TSSample::MAX_TIMESTAMP || next_sample.ts == TSSample::MAX_TIMESTAMP || | ||
| prev_sample.ts == next_sample.ts; // When filter entire range lies left or right to data. | ||
| if (early_return) { | ||
|
Check warning on line 140 in src/types/redis_timeseries.cc
|
||
| res = std::move(samples); | ||
| return res; | ||
| } | ||
|
|
||
| uint64_t n_buckets_estimate = (option.end_ts - option.start_ts) / option.aggregator.bucket_duration; | ||
| res.reserve(n_buckets_estimate + 1); | ||
| uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(option.start_ts); | ||
| uint64_t bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); | ||
| for (size_t i = 0; i < n_buckets_estimate; i++) { | ||
| bucket_left = std::max(bucket_left, option.start_ts); | ||
| bucket_right = std::min(bucket_right, option.end_ts); | ||
| TSSample sample; | ||
| sample.ts = bucket_left; | ||
| sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample); | ||
| res.push_back(sample); | ||
| bucket_left = bucket_right; | ||
| bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); | ||
| } | ||
| // Process last bucket. | ||
| TSSample sample; | ||
| sample.ts = bucket_left; | ||
| if (bucket_left == option.end_ts) { // Calculate last sample. | ||
| sample.v = interpolate_sample(prev_sample, option.end_ts, next_sample).v; | ||
| } else { | ||
| sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample); | ||
| } | ||
| res.push_back(sample); | ||
| return res; | ||
| } else if (aggregator.type == TSAggregatorType::NONE || samples.empty()) { | ||
| res = std::move(samples); | ||
| return res; | ||
| } | ||
|
|
||
| auto spans = aggregator.SplitSamplesToBuckets(samples); | ||
| res.reserve(spans.size()); | ||
|
|
||
| auto non_empty_left_bucket_idx = [&spans](size_t curr) { | ||
| while (--curr && spans[curr].empty()); | ||
| return curr; | ||
| }; | ||
| auto non_empty_right_bucket_idx = [&spans](size_t curr) { | ||
| while (++curr < spans.size() && spans[curr].empty()); | ||
| return curr; | ||
| }; | ||
|
|
||
| std::vector<std::pair<TSSample, TSSample>> neighbors; | ||
| neighbors.reserve(spans.size()); | ||
| for (size_t i = 0; i < spans.size(); i++) { | ||
| TSSample prev = (i != 0) ? spans[non_empty_left_bucket_idx(i)].back() : prev_sample; | ||
| TSSample next = (i != (spans.size() - 1)) ? spans[non_empty_right_bucket_idx(i)].front() : next_sample; | ||
| neighbors.emplace_back(prev, next); | ||
| } | ||
|
|
||
|
Comment on lines
+175
to
+197
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The nested while loops inside the for loop result in O(N^2) time complexity, which can be optimized to O(N). We could:
|
||
| uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(samples.front().ts); | ||
| for (size_t i = 0; i < spans.size(); i++) { | ||
| if (option.count_limit && res.size() >= option.count_limit) { | ||
|
|
@@ -114,6 +207,14 @@ | |
| case TSAggregatorType::COUNT: | ||
| sample.v = 0; | ||
| break; | ||
| case TSAggregatorType::TWA: | ||
| if ((i == 0 && !prev_available) || (i == spans.size() - 1 && !next_available)) { | ||
|
Check failure on line 211 in src/types/redis_timeseries.cc
|
||
| sample.v = TSSample::NAN_VALUE; | ||
| } else { | ||
| auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); | ||
| sample.v = empty_bucket_twa(neighbors[i].first, bucket_left, bucket_right, neighbors[i].second); | ||
| } | ||
| break; | ||
| case TSAggregatorType::LAST: | ||
| if (i == 0 || spans[i - 1].empty()) { | ||
| sample.v = TSSample::NAN_VALUE; | ||
|
|
@@ -126,6 +227,32 @@ | |
| } | ||
| } else if (!spans[i].empty()) { | ||
| sample.v = aggregator.AggregateSamplesValue(spans[i]); | ||
|
|
||
| if (is_twa_aggregator) { | ||
| auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); | ||
| // Cut left and right empty regions. In case of first and last bucket. | ||
| bucket_left = std::max(bucket_left, option.start_ts); | ||
| bucket_right = std::min(bucket_right, option.end_ts); | ||
| // Front area available iff prev_sample < bucket_left < span[i].front(). Similarly for end_area. | ||
| bool front_available = (spans[i].front().ts != bucket_left) && (neighbors[i].first.ts <= bucket_left); | ||
| bool back_available = (spans[i].back().ts != bucket_right) && (bucket_right <= neighbors[i].second.ts); | ||
| double area = 0; | ||
| uint64_t l = spans[i].front().ts; | ||
| uint64_t r = spans[i].back().ts; | ||
| if (front_available) { | ||
|
Check failure on line 242 in src/types/redis_timeseries.cc
|
||
| TSSample left_sample = interpolate_sample(neighbors[i].first, bucket_left, spans[i].front()); | ||
| area += Reducer::Area(std::array<TSSample, 2>{left_sample, spans[i].front()}); | ||
| l = bucket_left; | ||
| } | ||
| if (back_available) { | ||
|
Check failure on line 247 in src/types/redis_timeseries.cc
|
||
| TSSample right_sample = interpolate_sample(spans[i].back(), bucket_right, neighbors[i].second); | ||
| area += Reducer::Area(std::array<TSSample, 2>{spans[i].back(), right_sample}); | ||
| r = bucket_right; | ||
| } | ||
| // Edge case: If single bucket and it contains only one element. | ||
| area += !front_available && !back_available && spans[i].size() == 1 ? spans[i][0].v : 0; | ||
| sample.v = (sample.v + area) / std::max(static_cast<double>(r - l), 1.0); | ||
| } | ||
| } else { | ||
| continue; | ||
| } | ||
|
|
@@ -810,6 +937,9 @@ | |
| case TSAggregatorType::VAR_S: | ||
| res = Reducer::VarS(samples); | ||
| break; | ||
| case TSAggregatorType::TWA: | ||
| res = Reducer::Area(samples); | ||
| break; | ||
| default: | ||
| unreachable(); | ||
| } | ||
|
|
@@ -1055,18 +1185,24 @@ | |
| bool has_aggregator = aggregator.type != TSAggregatorType::NONE; | ||
| if (iter->Valid()) { | ||
| if (option.count_limit != 0 && !has_aggregator) { | ||
| temp_results.reserve(option.count_limit); | ||
| temp_results.reserve(option.count_limit + 2); | ||
| } else { | ||
| chunk = CreateTSChunkFromData(iter->value()); | ||
| auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1; | ||
| auto estimate_chunks = std::min((end_timestamp - start_timestamp) / range, uint64_t(32)); | ||
| temp_results.reserve(estimate_chunks * metadata.chunk_size); | ||
| temp_results.reserve(estimate_chunks * metadata.chunk_size + 2); | ||
| } | ||
| } | ||
| // Get samples from chunks | ||
| uint64_t bucket_count = 0; | ||
| uint64_t last_bucket = 0; | ||
| bool is_not_enough = true; | ||
| // Add these two samples at end when aggregator is TWA. | ||
| TSSample prev_sample, next_sample; | ||
|
Check warning on line 1201 in src/types/redis_timeseries.cc
|
||
| prev_sample.ts = TSSample::MAX_TIMESTAMP; | ||
| next_sample.ts = TSSample::MAX_TIMESTAMP; | ||
| const bool is_twa_aggregator = option.aggregator.type == TSAggregatorType::TWA; | ||
|
|
||
| for (; iter->Valid() && is_not_enough; iter->Next()) { | ||
| chunk = CreateTSChunkFromData(iter->value()); | ||
| auto it = chunk->CreateIterator(); | ||
|
|
@@ -1081,7 +1217,12 @@ | |
| const bool not_time_filtered = option.filter_by_ts.empty() || option.filter_by_ts.count(sample->ts); | ||
| const bool value_in_range = !option.filter_by_value || (sample->v >= option.filter_by_value->first && | ||
| sample->v <= option.filter_by_value->second); | ||
|
|
||
| // Record prev and next samples around the filtered range when aggregator is TWA | ||
| if (is_twa_aggregator) { | ||
| prev_sample = (sample->ts <= start_timestamp) ? *sample : prev_sample; | ||
| next_sample = | ||
| (sample->ts >= end_timestamp && next_sample.ts == TSSample::MAX_TIMESTAMP) ? *sample : next_sample; | ||
| } | ||
| if (!in_time_range || !not_time_filtered || !value_in_range) { | ||
| continue; | ||
| } | ||
|
|
@@ -1103,6 +1244,18 @@ | |
| } | ||
| } | ||
|
|
||
| if (is_twa_aggregator) { | ||
| // If the first element of the series is in first bucket, prev_sample might not get initialized. Similarly if the | ||
| // last element in the series is in last bucket, next_sample might not get initialized. If the series is empty, | ||
| // prev_sample and next_sample points to infinity (MAX_TIMESTAMP) | ||
| prev_sample = | ||
| prev_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.front() : prev_sample; | ||
| next_sample = | ||
| next_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.back() : next_sample; | ||
| temp_results.push_back(prev_sample); | ||
| temp_results.push_back(next_sample); | ||
| } | ||
|
|
||
| // Process compaction logic | ||
| *res = AggregateSamplesByRangeOption(std::move(temp_results), option); | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.