Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions be/src/exec/runtime_filter/runtime_filter_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class RuntimeFilterMerger : public RuntimeFilter {
}

std::string debug_string() override {
std::unique_lock<std::recursive_mutex> l(_rmtx);
return fmt::format(
"Merger: ({}, expected_producer_num: {}, received_producer_num: {}, "
"received_rf_size_num: {}, received_sum_size: {})",
Expand All @@ -54,12 +55,15 @@ class RuntimeFilterMerger : public RuntimeFilter {
}

// If input is a disabled predicate, the final result is a disabled predicate.
Status merge_from(const RuntimeFilter* other) {
// Returns true only for the call that makes the merger ready.
Status merge_from(const RuntimeFilter* other, bool* ready) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
_received_producer_num++;
if (_expected_producer_num < _received_producer_num) {
return Status::InternalError(
"runtime filter merger input product more than expected, {}", debug_string());
}
*ready = _received_producer_num == _expected_producer_num;
if (_received_producer_num == _expected_producer_num) {
_rf_state = State::READY;
}
Expand All @@ -72,6 +76,7 @@ class RuntimeFilterMerger : public RuntimeFilter {
}

void set_expected_producer_num(int num) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_received_producer_num > 0 || _received_rf_size_num > 0) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter merger set expected producer after receive data, {}",
Expand All @@ -80,9 +85,13 @@ class RuntimeFilterMerger : public RuntimeFilter {
_expected_producer_num = num;
}

int get_expected_producer_num() const { return _expected_producer_num; }
int get_expected_producer_num() {
std::unique_lock<std::recursive_mutex> l(_rmtx);
return _expected_producer_num;
}

bool add_rf_size(uint64_t size) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
_received_rf_size_num++;
if (_expected_producer_num < _received_rf_size_num) {
throw Exception(ErrorCode::INTERNAL_ERROR,
Expand All @@ -93,7 +102,10 @@ class RuntimeFilterMerger : public RuntimeFilter {
return (_received_rf_size_num == _expected_producer_num);
}

uint64_t get_received_sum_size() const { return _received_sum_size; }
uint64_t get_received_sum_size() {
std::unique_lock<std::recursive_mutex> l(_rmtx);
return _received_sum_size;
}

bool ready() const { return _rf_state == State::READY; }

Expand Down
97 changes: 49 additions & 48 deletions be/src/exec/runtime_filter/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,57 +91,53 @@ Status RuntimeFilterMgr::register_local_merger_producer_filter(
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;

LocalMergeContext* context;
{
std::lock_guard<std::mutex> l(_lock);
context = &_local_merge_map[key]; // may inplace construct default object
std::lock_guard<std::mutex> l(_lock);
auto& context = _local_merge_map[key];
if (!context || producer->stage() > context->stage) {
auto new_context = std::make_shared<LocalMergeContext>();
RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc, &new_context->merger));
new_context->stage = producer->stage();
context = new_context;
}

RETURN_IF_ERROR(context->register_producer(query_ctx, &desc, producer));
return Status::OK();
}

Status LocalMergeContext::register_producer(const QueryContext* query_ctx,
const TRuntimeFilterDesc* desc,
std::shared_ptr<RuntimeFilterProducer> producer) {
std::lock_guard<std::mutex> l(mtx);
if (producer->stage() > stage) {
// New recursive CTE round: discard stale merger and producers from
// the previous round and recreate the merger for the new round.
merger.reset();
producers.clear();
stage = producer->stage();
}
if (!merger) {
RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger));
}
producers.emplace_back(producer);
merger->set_expected_producer_num(cast_set<int>(producers.size()));
context->producers.emplace_back(producer);
context->merger->set_expected_producer_num(cast_set<int>(context->producers.size()));
// Sync the local merger's stage from the producer so that outgoing merge RPCs
// (via _push_to_remote) carry the correct recursive CTE round number.
merger->set_stage(producer->stage());
context->merger->set_stage(producer->stage());
return Status::OK();
}

Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
LocalMergeContext** local_merge_filters) {
std::string LocalMergeContext::debug_string() {
std::string result =
Comment thread
BiteTheDDDDt marked this conversation as resolved.
fmt::format("stage: {}, {}\n", stage,
merger ? merger->debug_string() : "local merge context merger is nullptr");
for (const auto& producer : producers) {
result += fmt::format("{}\n", producer->debug_string());
}
return result;
}

Status RuntimeFilterMgr::get_local_merge_context(int filter_id, uint32_t expected_stage,
std::shared_ptr<LocalMergeContext>* context) {
if (!_is_global) [[unlikely]] {
return Status::InternalError(
"A local merge filter can not be registered in Local RuntimeFilterMgr");
}
context->reset();
std::lock_guard<std::mutex> l(_lock);
auto iter = _local_merge_map.find(filter_id);
if (iter == _local_merge_map.end()) {
// Filter may have been removed during a recursive CTE stage reset.
// Return OK with nullptr to let the caller skip gracefully.
*local_merge_filters = nullptr;
return Status::OK();
}
*local_merge_filters = &iter->second;
if (!iter->second.merger) {
return Status::InternalError("local merge context merger is nullptr for filter_id: {}",
filter_id);
if (!iter->second) {
return Status::InternalError("local merge context is nullptr for filter_id: {}", filter_id);
}
if (expected_stage != iter->second->stage) {
return Status::OK();
}
*context = iter->second;
return Status::OK();
}

Expand Down Expand Up @@ -304,32 +300,38 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
}

Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
LocalMergeContext* local_merge_filters = nullptr;
RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(), &local_merge_filters));
if (local_merge_filters == nullptr) {
std::shared_ptr<LocalMergeContext> context;
RETURN_IF_ERROR(get_local_merge_context(request->filter_id(), request->stage(), &context));
if (!context) {
// Filter was removed during a recursive CTE stage reset; discard stale request.
return Status::OK();
}
for (auto producer : local_merge_filters->producers) {
for (const auto& producer : context->producers) {
producer->set_synced_size(request->filter_size());
}
return Status::OK();
}

std::string RuntimeFilterMgr::debug_string() {
std::string result = "Local Merger Info:\n";
std::lock_guard l(_lock);
for (const auto& [filter_id, ctx] : _local_merge_map) {
result += fmt::format("{}\n", ctx.merger->debug_string());
for (const auto& producer : ctx.producers) {
result += fmt::format("{}\n", producer->debug_string());
std::vector<std::pair<int32_t, std::shared_ptr<LocalMergeContext>>> local_merge_contexts;
std::vector<std::shared_ptr<RuntimeFilterConsumer>> consumers;
{
std::lock_guard l(_lock);
for (auto& [filter_id, ctx] : _local_merge_map) {
local_merge_contexts.emplace_back(filter_id, ctx);
}
for (const auto& [filter_id, filter_consumers] : _consumer_map) {
consumers.insert(consumers.end(), filter_consumers.begin(), filter_consumers.end());
}
}
for (const auto& [filter_id, ctx] : local_merge_contexts) {
result += fmt::format("filter_id: {}, {}", filter_id,
ctx ? ctx->debug_string() : "local merge context is nullptr\n");
}
result += "Consumer Info:\n";
for (const auto& [filter_id, consumers] : _consumer_map) {
for (const auto& consumer : consumers) {
result += fmt::format("{}\n", consumer->debug_string());
}
for (const auto& consumer : consumers) {
result += fmt::format("{}\n", consumer->debug_string());
}
return result;
}
Expand Down Expand Up @@ -373,10 +375,9 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q

RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data));

RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get()));
RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get(), &is_ready));

cnt_val.arrive_id.insert(UniqueId(request->fragment_instance_id()));
is_ready = cnt_val.merger->ready(); // update is_ready in locked scope
}

if (is_ready) {
Expand Down
18 changes: 9 additions & 9 deletions be/src/exec/runtime_filter/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,14 @@ class HandleErrorBrpcCallback;
class SyncSizeCallback;

struct LocalMergeContext {
std::mutex mtx;
std::string debug_string();

std::shared_ptr<RuntimeFilterMerger> merger;
std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
// Tracks the recursive CTE round. When a producer from a newer round
// registers, the context is reset (merger recreated, old producers dropped).
// registers, RuntimeFilterMgr replaces the whole context and old in-flight
// users keep the previous context alive through shared_ptr.
uint32_t stage = 0;

Status register_producer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc,
std::shared_ptr<RuntimeFilterProducer> producer);
};

struct GlobalMergeContext {
Expand Down Expand Up @@ -102,7 +101,8 @@ class RuntimeFilterMgr {
const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer> producer);

Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** local_merge_filters);
Status get_local_merge_context(int filter_id, uint32_t expected_stage,
std::shared_ptr<LocalMergeContext>* context);

// Create local producer. This producer is hold by RuntimeFilterProducerHelper.
Status register_producer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
Expand All @@ -118,8 +118,8 @@ class RuntimeFilterMgr {
void remove_filter(int32_t filter_id) {
std::lock_guard<std::mutex> l(_lock);
_consumer_map.erase(filter_id);
// NOTE: _local_merge_map is NOT erased here. It is reset lazily in
// LocalMergeContext::register_producer when a producer from a newer
// NOTE: _local_merge_map is NOT erased here. It is replaced lazily in
// register_local_merger_producer_filter when a producer from a newer
// recursive CTE round registers. Erasing eagerly here would race with
// multi-fragment REBUILD: a consumer-only fragment's remove_filter could
// delete the entry that the producer fragment just re-registered.
Expand All @@ -144,7 +144,7 @@ class RuntimeFilterMgr {
// key: "filter-id"
std::map<int32_t, std::vector<std::shared_ptr<RuntimeFilterConsumer>>> _consumer_map;
std::set<int32_t> _producer_id_set;
std::map<int32_t, LocalMergeContext> _local_merge_map;
std::map<int32_t, std::shared_ptr<LocalMergeContext>> _local_merge_map;

std::unique_ptr<MemTracker> _tracker;

Expand Down
42 changes: 21 additions & 21 deletions be/src/exec/runtime_filter/runtime_filter_producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table
// when global consumer not exist, send_to_local_targets will do nothing, so merge rf is useless
return Status::OK();
}
LocalMergeContext* context = nullptr;
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
_wrapper->filter_id(), &context));
if (context == nullptr) {
std::shared_ptr<LocalMergeContext> context;
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context(
_wrapper->filter_id(), _stage, &context));
if (!context) {
// Filter was removed during a recursive CTE stage reset; this producer is stale.
return Status::OK();
}
std::lock_guard l(context->mtx);
RETURN_IF_ERROR(context->merger->merge_from(this));
if (context->merger->ready()) {
bool ready = false;
RETURN_IF_ERROR(context->merger->merge_from(this, &ready));
if (ready) {
if (_has_remote_target) {
RETURN_IF_ERROR(_send_to_remote_targets(state, context->merger.get()));
} else {
Expand Down Expand Up @@ -123,26 +123,26 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt
set_state(State::WAITING_FOR_SYNCED_SIZE);

if (_need_do_merge(state)) {
LocalMergeContext* merger_context = nullptr;
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
_wrapper->filter_id(), &merger_context));
if (merger_context == nullptr) {
std::shared_ptr<LocalMergeContext> context;
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context(
_wrapper->filter_id(), _stage, &context));
if (!context) {
// Filter was removed during a recursive CTE stage reset; this producer is stale.
return Status::OK();
}
std::lock_guard merger_lock(merger_context->mtx);
if (merger_context->merger->add_rf_size(local_filter_size)) {
if (!_has_remote_target) {
for (auto filter : merger_context->producers) {
filter->set_synced_size(merger_context->merger->get_received_sum_size());
}
return Status::OK();
} else {
local_filter_size = merger_context->merger->get_received_sum_size();
uint64_t received_sum_size = 0;
bool ready_to_sync = context->merger->add_rf_size(local_filter_size);
if (!ready_to_sync) {
return Status::OK();
}
received_sum_size = context->merger->get_received_sum_size();
if (!_has_remote_target) {
for (const auto& filter : context->producers) {
filter->set_synced_size(received_sum_size);
}
} else {
return Status::OK();
}
local_filter_size = received_sum_size;

} else if (!_has_remote_target) {
set_synced_size(local_filter_size);
Expand Down
18 changes: 13 additions & 5 deletions be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,22 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest {
ASSERT_FALSE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED);

bool ready = false;
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[0]->register_producer_runtime_filter(desc, &producer));
producer->set_wrapper_state_and_ready_to_publish(first_product_state);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready));
ASSERT_FALSE(ready);
ASSERT_FALSE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, first_expected_state);

std::shared_ptr<RuntimeFilterProducer> producer2;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[1]->register_producer_runtime_filter(desc, &producer2));
producer2->set_wrapper_state_and_ready_to_publish(second_product_state);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get()));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get(), &ready));
ASSERT_TRUE(ready);
ASSERT_TRUE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, second_expected_state);
}
Expand All @@ -66,12 +69,14 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest {
merger->set_expected_producer_num(1);
ASSERT_FALSE(merger->ready());

bool ready = false;
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[0]->register_producer_runtime_filter(desc, &producer));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123));
producer->set_wrapper_state_and_ready_to_publish(state);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready));
ASSERT_TRUE(ready);
ASSERT_TRUE(merger->ready());

PMergeFilterRequest request;
Expand Down Expand Up @@ -122,18 +127,21 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) {
ASSERT_FALSE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED);

bool ready = false;
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[0]->register_producer_runtime_filter(desc, &producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); // ready wrapper
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready));
ASSERT_TRUE(ready);
ASSERT_TRUE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY);

std::shared_ptr<RuntimeFilterProducer> producer2;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[1]->register_producer_runtime_filter(desc, &producer2));
producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
auto st = merger->merge_from(producer2.get());
auto st = merger->merge_from(producer2.get(), &ready);
ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR);
}

Expand Down
Loading
Loading