Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions be/src/exec/runtime_filter/runtime_filter_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class RuntimeFilterMerger : public RuntimeFilter {
}

std::string debug_string() override {
std::unique_lock<std::recursive_mutex> l(_rmtx);
return fmt::format(
"Merger: ({}, expected_producer_num: {}, received_producer_num: {}, "
"received_rf_size_num: {}, received_sum_size: {})",
Expand All @@ -54,12 +55,15 @@ class RuntimeFilterMerger : public RuntimeFilter {
}

// If input is a disabled predicate, the final result is a disabled predicate.
Status merge_from(const RuntimeFilter* other) {
// Returns true only for the call that makes the merger ready.
Status merge_from(const RuntimeFilter* other, bool* ready) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
_received_producer_num++;
if (_expected_producer_num < _received_producer_num) {
return Status::InternalError(
"runtime filter merger input product more than expected, {}", debug_string());
}
*ready = _received_producer_num == _expected_producer_num;
if (_received_producer_num == _expected_producer_num) {
_rf_state = State::READY;
}
Expand All @@ -72,6 +76,7 @@ class RuntimeFilterMerger : public RuntimeFilter {
}

void set_expected_producer_num(int num) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_received_producer_num > 0 || _received_rf_size_num > 0) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter merger set expected producer after receive data, {}",
Expand All @@ -80,9 +85,13 @@ class RuntimeFilterMerger : public RuntimeFilter {
_expected_producer_num = num;
}

int get_expected_producer_num() const { return _expected_producer_num; }
int get_expected_producer_num() {
std::unique_lock<std::recursive_mutex> l(_rmtx);
return _expected_producer_num;
}

bool add_rf_size(uint64_t size) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
_received_rf_size_num++;
if (_expected_producer_num < _received_rf_size_num) {
throw Exception(ErrorCode::INTERNAL_ERROR,
Expand All @@ -93,7 +102,10 @@ class RuntimeFilterMerger : public RuntimeFilter {
return (_received_rf_size_num == _expected_producer_num);
}

uint64_t get_received_sum_size() const { return _received_sum_size; }
uint64_t get_received_sum_size() {
std::unique_lock<std::recursive_mutex> l(_rmtx);
return _received_sum_size;
}

bool ready() const { return _rf_state == State::READY; }

Expand Down
75 changes: 33 additions & 42 deletions be/src/exec/runtime_filter/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Status RuntimeFilterMgr::register_consumer_filter(
return Status::OK();
}

Status RuntimeFilterMgr::register_local_merger_producer_filter(
Status RuntimeFilterMgr::register_local_merge_producer_filter(
const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer> producer) {
if (!_is_global) [[unlikely]] {
Expand All @@ -91,57 +91,48 @@ Status RuntimeFilterMgr::register_local_merger_producer_filter(
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;

LocalMergeContext* context;
{
std::lock_guard<std::mutex> l(_lock);
context = &_local_merge_map[key]; // may inplace construct default object
std::lock_guard<std::mutex> l(_lock);
auto iter = _local_merge_map.find(key);
std::shared_ptr<LocalMergeContext> context;
if (iter == _local_merge_map.end() || !iter->second ||
producer->stage() > iter->second->stage) {
auto new_context = std::make_shared<LocalMergeContext>();
RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc, &new_context->merger));
new_context->stage = producer->stage();
_local_merge_map.insert_or_assign(key, new_context);
context = new_context;
Comment thread
BiteTheDDDDt marked this conversation as resolved.
Outdated
} else {
context = iter->second;
}

RETURN_IF_ERROR(context->register_producer(query_ctx, &desc, producer));
return Status::OK();
}

Status LocalMergeContext::register_producer(const QueryContext* query_ctx,
const TRuntimeFilterDesc* desc,
std::shared_ptr<RuntimeFilterProducer> producer) {
std::lock_guard<std::mutex> l(mtx);
if (producer->stage() > stage) {
// New recursive CTE round: discard stale merger and producers from
// the previous round and recreate the merger for the new round.
merger.reset();
producers.clear();
stage = producer->stage();
}
if (!merger) {
RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger));
}
producers.emplace_back(producer);
merger->set_expected_producer_num(cast_set<int>(producers.size()));
context->producers.emplace_back(producer);
context->merger->set_expected_producer_num(cast_set<int>(context->producers.size()));
// Sync the local merger's stage from the producer so that outgoing merge RPCs
// (via _push_to_remote) carry the correct recursive CTE round number.
merger->set_stage(producer->stage());
context->merger->set_stage(producer->stage());
return Status::OK();
}

Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
LocalMergeContext** local_merge_filters) {
Status RuntimeFilterMgr::get_local_merge_context(int filter_id, uint32_t expected_stage,
std::shared_ptr<LocalMergeContext>* context) {
if (!_is_global) [[unlikely]] {
return Status::InternalError(
"A local merge filter can not be registered in Local RuntimeFilterMgr");
}
context->reset();
std::lock_guard<std::mutex> l(_lock);
auto iter = _local_merge_map.find(filter_id);
if (iter == _local_merge_map.end()) {
// Filter may have been removed during a recursive CTE stage reset.
// Return OK with nullptr to let the caller skip gracefully.
*local_merge_filters = nullptr;
return Status::OK();
}
*local_merge_filters = &iter->second;
if (!iter->second.merger) {
return Status::InternalError("local merge context merger is nullptr for filter_id: {}",
filter_id);
if (!iter->second) {
return Status::InternalError("local merge context is nullptr for filter_id: {}", filter_id);
}
if (expected_stage != iter->second->stage) {
return Status::OK();
}
*context = iter->second;
return Status::OK();
}

Expand Down Expand Up @@ -304,13 +295,13 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
}

Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
LocalMergeContext* local_merge_filters = nullptr;
RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(), &local_merge_filters));
if (local_merge_filters == nullptr) {
std::shared_ptr<LocalMergeContext> context;
RETURN_IF_ERROR(get_local_merge_context(request->filter_id(), request->stage(), &context));
if (!context) {
// Filter was removed during a recursive CTE stage reset; discard stale request.
return Status::OK();
}
for (auto producer : local_merge_filters->producers) {
for (const auto& producer : context->producers) {
producer->set_synced_size(request->filter_size());
}
return Status::OK();
Expand All @@ -320,8 +311,9 @@ std::string RuntimeFilterMgr::debug_string() {
std::string result = "Local Merger Info:\n";
std::lock_guard l(_lock);
for (const auto& [filter_id, ctx] : _local_merge_map) {
result += fmt::format("{}\n", ctx.merger->debug_string());
for (const auto& producer : ctx.producers) {
DORIS_CHECK(ctx);
result += fmt::format("{}\n", ctx->merger->debug_string());
for (const auto& producer : ctx->producers) {
Comment thread
BiteTheDDDDt marked this conversation as resolved.
Outdated
result += fmt::format("{}\n", producer->debug_string());
}
}
Expand Down Expand Up @@ -373,10 +365,9 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q

RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data));

RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get()));
RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get(), &is_ready));

cnt_val.arrive_id.insert(UniqueId(request->fragment_instance_id()));
is_ready = cnt_val.merger->ready(); // update is_ready in locked scope
}

if (is_ready) {
Expand Down
22 changes: 10 additions & 12 deletions be/src/exec/runtime_filter/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,12 @@ class HandleErrorBrpcCallback;
class SyncSizeCallback;

struct LocalMergeContext {
std::mutex mtx;
std::shared_ptr<RuntimeFilterMerger> merger;
std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
// Tracks the recursive CTE round. When a producer from a newer round
// registers, the context is reset (merger recreated, old producers dropped).
// registers, RuntimeFilterMgr replaces the whole context and old in-flight
// users keep the previous context alive through shared_ptr.
uint32_t stage = 0;

Status register_producer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc,
std::shared_ptr<RuntimeFilterProducer> producer);
};

struct GlobalMergeContext {
Expand Down Expand Up @@ -98,11 +95,12 @@ class RuntimeFilterMgr {
int node_id,
std::shared_ptr<RuntimeFilterConsumer>* consumer_filter);

Status register_local_merger_producer_filter(const QueryContext* query_ctx,
const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer> producer);
Status register_local_merge_producer_filter(const QueryContext* query_ctx,
const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer> producer);

Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** local_merge_filters);
Status get_local_merge_context(int filter_id, uint32_t expected_stage,
std::shared_ptr<LocalMergeContext>* context);

// Create local producer. This producer is hold by RuntimeFilterProducerHelper.
Status register_producer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
Expand All @@ -118,8 +116,8 @@ class RuntimeFilterMgr {
void remove_filter(int32_t filter_id) {
std::lock_guard<std::mutex> l(_lock);
_consumer_map.erase(filter_id);
// NOTE: _local_merge_map is NOT erased here. It is reset lazily in
// LocalMergeContext::register_producer when a producer from a newer
// NOTE: _local_merge_map is NOT erased here. It is replaced lazily in
// register_local_merge_producer_filter when a producer from a newer
// recursive CTE round registers. Erasing eagerly here would race with
// multi-fragment REBUILD: a consumer-only fragment's remove_filter could
// delete the entry that the producer fragment just re-registered.
Expand All @@ -144,7 +142,7 @@ class RuntimeFilterMgr {
// key: "filter-id"
std::map<int32_t, std::vector<std::shared_ptr<RuntimeFilterConsumer>>> _consumer_map;
std::set<int32_t> _producer_id_set;
std::map<int32_t, LocalMergeContext> _local_merge_map;
std::map<int32_t, std::shared_ptr<LocalMergeContext>> _local_merge_map;

std::unique_ptr<MemTracker> _tracker;

Expand Down
42 changes: 21 additions & 21 deletions be/src/exec/runtime_filter/runtime_filter_producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table
// when global consumer not exist, send_to_local_targets will do nothing, so merge rf is useless
return Status::OK();
}
LocalMergeContext* context = nullptr;
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
_wrapper->filter_id(), &context));
if (context == nullptr) {
std::shared_ptr<LocalMergeContext> context;
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context(
_wrapper->filter_id(), _stage, &context));
if (!context) {
// Filter was removed during a recursive CTE stage reset; this producer is stale.
return Status::OK();
}
std::lock_guard l(context->mtx);
RETURN_IF_ERROR(context->merger->merge_from(this));
if (context->merger->ready()) {
bool ready = false;
RETURN_IF_ERROR(context->merger->merge_from(this, &ready));
if (ready) {
if (_has_remote_target) {
RETURN_IF_ERROR(_send_to_remote_targets(state, context->merger.get()));
} else {
Expand Down Expand Up @@ -123,26 +123,26 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt
set_state(State::WAITING_FOR_SYNCED_SIZE);

if (_need_do_merge(state)) {
LocalMergeContext* merger_context = nullptr;
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
_wrapper->filter_id(), &merger_context));
if (merger_context == nullptr) {
std::shared_ptr<LocalMergeContext> context;
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context(
_wrapper->filter_id(), _stage, &context));
if (!context) {
// Filter was removed during a recursive CTE stage reset; this producer is stale.
return Status::OK();
}
std::lock_guard merger_lock(merger_context->mtx);
if (merger_context->merger->add_rf_size(local_filter_size)) {
if (!_has_remote_target) {
for (auto filter : merger_context->producers) {
filter->set_synced_size(merger_context->merger->get_received_sum_size());
}
return Status::OK();
} else {
local_filter_size = merger_context->merger->get_received_sum_size();
uint64_t received_sum_size = 0;
bool ready_to_sync = context->merger->add_rf_size(local_filter_size);
if (!ready_to_sync) {
return Status::OK();
}
received_sum_size = context->merger->get_received_sum_size();
if (!_has_remote_target) {
for (const auto& filter : context->producers) {
filter->set_synced_size(received_sum_size);
}
} else {
return Status::OK();
}
local_filter_size = received_sum_size;

} else if (!_has_remote_target) {
set_synced_size(local_filter_size);
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ Status RuntimeState::register_producer_runtime_filter(
DORIS_CHECK(pfc);
(*producer_filter)->set_stage(pfc->rec_cte_stage());
}
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter(
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
_query_ctx, desc, *producer_filter));
return Status::OK();
}
Expand Down
18 changes: 13 additions & 5 deletions be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,22 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest {
ASSERT_FALSE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED);

bool ready = false;
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[0]->register_producer_runtime_filter(desc, &producer));
producer->set_wrapper_state_and_ready_to_publish(first_product_state);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready));
ASSERT_FALSE(ready);
ASSERT_FALSE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, first_expected_state);

std::shared_ptr<RuntimeFilterProducer> producer2;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[1]->register_producer_runtime_filter(desc, &producer2));
producer2->set_wrapper_state_and_ready_to_publish(second_product_state);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get()));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get(), &ready));
ASSERT_TRUE(ready);
ASSERT_TRUE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, second_expected_state);
}
Expand All @@ -66,12 +69,14 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest {
merger->set_expected_producer_num(1);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test file still calls the old RuntimeFilterMerger::set_expected_producer_num(...) helper, but the PR removed that member and replaced the production users with increase_expected_producer_num(...). The only remaining set_expected_producer_num references are the four calls in this file, so RuntimeFilterMergerTest will fail to compile once the BE UT target is built. Please update these calls to the new helper name, or keep a compatibility wrapper if the old API is still intended.

ASSERT_FALSE(merger->ready());

bool ready = false;
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[0]->register_producer_runtime_filter(desc, &producer));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123));
producer->set_wrapper_state_and_ready_to_publish(state);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready));
ASSERT_TRUE(ready);
ASSERT_TRUE(merger->ready());

PMergeFilterRequest request;
Expand Down Expand Up @@ -122,18 +127,21 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) {
ASSERT_FALSE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED);

bool ready = false;
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[0]->register_producer_runtime_filter(desc, &producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); // ready wrapper
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready));
ASSERT_TRUE(ready);
ASSERT_TRUE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY);

std::shared_ptr<RuntimeFilterProducer> producer2;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[1]->register_producer_runtime_filter(desc, &producer2));
producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
auto st = merger->merge_from(producer2.get());
auto st = merger->merge_from(producer2.get(), &ready);
ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR);
}

Expand Down
Loading
Loading