Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions be/src/exec/runtime_filter/runtime_filter_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <algorithm>

#include "exec/runtime_filter/runtime_filter.h"
#include "exec/runtime_filter/runtime_filter_definitions.h"
#include "exprs/vexpr.h"
Expand Down Expand Up @@ -46,6 +48,7 @@ class RuntimeFilterMerger : public RuntimeFilter {
}

std::string debug_string() override {
std::unique_lock<std::recursive_mutex> l(_rmtx);
return fmt::format(
"Merger: ({}, expected_producer_num: {}, received_producer_num: {}, "
"received_rf_size_num: {}, received_sum_size: {})",
Expand All @@ -54,12 +57,15 @@ class RuntimeFilterMerger : public RuntimeFilter {
}

// If input is a disabled predicate, the final result is a disabled predicate.
Status merge_from(const RuntimeFilter* other) {
// Returns true only for the call that makes the merger ready.
Status merge_from(const RuntimeFilter* other, bool* ready) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
_received_producer_num++;
if (_expected_producer_num < _received_producer_num) {
return Status::InternalError(
"runtime filter merger input product more than expected, {}", debug_string());
}
*ready = _received_producer_num == _expected_producer_num;
if (_received_producer_num == _expected_producer_num) {
_rf_state = State::READY;
}
Expand All @@ -71,18 +77,26 @@ class RuntimeFilterMerger : public RuntimeFilter {
return st;
}

void set_expected_producer_num(int num) {
// Only raise the expected producer count. RuntimeFilterMgr may compute the
// count under its own lock and apply it after releasing that lock, so
// concurrent registrations can update the merger out of order.
void increase_expected_producer_num(int num) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_received_producer_num > 0 || _received_rf_size_num > 0) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter merger set expected producer after receive data, {}",
debug_string());
}
_expected_producer_num = num;
_expected_producer_num = std::max(_expected_producer_num, num);
}

int get_expected_producer_num() const { return _expected_producer_num; }
int get_expected_producer_num() {
std::unique_lock<std::recursive_mutex> l(_rmtx);
return _expected_producer_num;
}

bool add_rf_size(uint64_t size) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
_received_rf_size_num++;
if (_expected_producer_num < _received_rf_size_num) {
throw Exception(ErrorCode::INTERNAL_ERROR,
Expand All @@ -93,7 +107,10 @@ class RuntimeFilterMerger : public RuntimeFilter {
return (_received_rf_size_num == _expected_producer_num);
}

uint64_t get_received_sum_size() const { return _received_sum_size; }
uint64_t get_received_sum_size() {
std::unique_lock<std::recursive_mutex> l(_rmtx);
return _received_sum_size;
}

bool ready() const { return _rf_state == State::READY; }

Expand Down
142 changes: 84 additions & 58 deletions be/src/exec/runtime_filter/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ RuntimeFilterMgr::RuntimeFilterMgr(const bool is_global)

std::vector<std::shared_ptr<RuntimeFilterConsumer>> RuntimeFilterMgr::get_consume_filters(
int filter_id) {
std::lock_guard<std::mutex> l(_lock);
LockGuard l(_lock);
auto iter = _consumer_map.find(filter_id);
if (iter == _consumer_map.end()) {
return {};
Expand All @@ -69,13 +69,17 @@ Status RuntimeFilterMgr::register_consumer_filter(
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;

std::lock_guard<std::mutex> l(_lock);
RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, consumer));
_consumer_map[key].push_back(*consumer);
std::shared_ptr<RuntimeFilterConsumer> new_consumer;
RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, &new_consumer));
{
LockGuard l(_lock);
_consumer_map[key].push_back(new_consumer);
}
*consumer = new_consumer;
return Status::OK();
}

Status RuntimeFilterMgr::register_local_merger_producer_filter(
Status RuntimeFilterMgr::register_local_merge_producer_filter(
const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer> producer) {
if (!_is_global) [[unlikely]] {
Expand All @@ -90,58 +94,57 @@ Status RuntimeFilterMgr::register_local_merger_producer_filter(
}
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
uint32_t producer_stage = producer->stage();

LocalMergeContext* context;
std::shared_ptr<LocalMergeContext> context;
std::shared_ptr<RuntimeFilterMerger> merger;
int expected_producer_num = 0;
{
std::lock_guard<std::mutex> l(_lock);
context = &_local_merge_map[key]; // may inplace construct default object
}
LockGuard l(_lock);
auto iter = _local_merge_map.find(key);
if (iter == _local_merge_map.end() || !iter->second ||
producer_stage > iter->second->stage) {
auto new_context = std::make_shared<LocalMergeContext>();
RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc, &new_context->merger));
new_context->stage = producer_stage;
_local_merge_map.insert_or_assign(key, new_context);
context = new_context;
} else {
context = iter->second;
}

RETURN_IF_ERROR(context->register_producer(query_ctx, &desc, producer));
return Status::OK();
}
context->producers.emplace_back(producer);
merger = context->merger;
expected_producer_num = cast_set<int>(context->producers.size());
}

Status LocalMergeContext::register_producer(const QueryContext* query_ctx,
const TRuntimeFilterDesc* desc,
std::shared_ptr<RuntimeFilterProducer> producer) {
std::lock_guard<std::mutex> l(mtx);
if (producer->stage() > stage) {
// New recursive CTE round: discard stale merger and producers from
// the previous round and recreate the merger for the new round.
merger.reset();
producers.clear();
stage = producer->stage();
}
if (!merger) {
RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger));
}
producers.emplace_back(producer);
merger->set_expected_producer_num(cast_set<int>(producers.size()));
merger->increase_expected_producer_num(expected_producer_num);
// Sync the local merger's stage from the producer so that outgoing merge RPCs
// (via _push_to_remote) carry the correct recursive CTE round number.
merger->set_stage(producer->stage());
merger->set_stage(producer_stage);
return Status::OK();
}

Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
LocalMergeContext** local_merge_filters) {
Status RuntimeFilterMgr::get_local_merge_context(int filter_id, uint32_t expected_stage,
std::shared_ptr<LocalMergeContext>* context) {
if (!_is_global) [[unlikely]] {
return Status::InternalError(
"A local merge filter can not be registered in Local RuntimeFilterMgr");
}
std::lock_guard<std::mutex> l(_lock);
context->reset();
LockGuard l(_lock);
auto iter = _local_merge_map.find(filter_id);
if (iter == _local_merge_map.end()) {
// Filter may have been removed during a recursive CTE stage reset.
// Return OK with nullptr to let the caller skip gracefully.
*local_merge_filters = nullptr;
return Status::OK();
}
*local_merge_filters = &iter->second;
if (!iter->second.merger) {
return Status::InternalError("local merge context merger is nullptr for filter_id: {}",
filter_id);
if (!iter->second) {
return Status::InternalError("local merge context is nullptr for filter_id: {}", filter_id);
}
if (expected_stage != iter->second->stage) {
return Status::OK();
}
*context = iter->second;
return Status::OK();
}

Expand All @@ -155,18 +158,28 @@ Status RuntimeFilterMgr::register_producer_filter(
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;

std::lock_guard<std::mutex> l(_lock);
if (_producer_id_set.contains(key)) {
return Status::InvalidArgument("filter {} has been registered", key);
{
LockGuard l(_lock);
if (_producer_id_set.contains(key)) {
return Status::InvalidArgument("filter {} has been registered", key);
}
}
std::shared_ptr<RuntimeFilterProducer> new_producer;
RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, &new_producer));
{
LockGuard l(_lock);
if (_producer_id_set.contains(key)) {
return Status::InvalidArgument("filter {} has been registered", key);
}
_producer_id_set.insert(key);
}
RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer));
_producer_id_set.insert(key);
*producer = new_producer;
return Status::OK();
}

bool RuntimeFilterMgr::set_runtime_filter_params(
const TRuntimeFilterParams& runtime_filter_params) {
std::lock_guard l(_lock);
LockGuard l(_lock);
if (!_has_merge_addr) {
_merge_addr = runtime_filter_params.runtime_filter_merge_addr;
_has_merge_addr = true;
Expand Down Expand Up @@ -199,7 +212,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
cnt_val->targetv2_info = targetv2_info;
RETURN_IF_ERROR(
RuntimeFilterMerger::create(query_ctx.get(), runtime_filter_desc, &cnt_val->merger));
cnt_val->merger->set_expected_producer_num(producer_size);
cnt_val->merger->increase_expected_producer_num(producer_size);

return Status::OK();
}
Expand Down Expand Up @@ -304,32 +317,46 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
}

Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
LocalMergeContext* local_merge_filters = nullptr;
RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(), &local_merge_filters));
if (local_merge_filters == nullptr) {
std::shared_ptr<LocalMergeContext> context;
RETURN_IF_ERROR(get_local_merge_context(request->filter_id(), request->stage(), &context));
if (!context) {
// Filter was removed during a recursive CTE stage reset; discard stale request.
return Status::OK();
}
for (auto producer : local_merge_filters->producers) {
for (const auto& producer : context->producers) {
producer->set_synced_size(request->filter_size());
}
return Status::OK();
}

std::string RuntimeFilterMgr::debug_string() {
std::string result = "Local Merger Info:\n";
std::lock_guard l(_lock);
for (const auto& [filter_id, ctx] : _local_merge_map) {
struct LocalMergeContextSnapshot {
std::shared_ptr<RuntimeFilterMerger> merger;
std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
};
std::vector<LocalMergeContextSnapshot> local_merge_contexts;
std::vector<std::shared_ptr<RuntimeFilterConsumer>> consumers;
{
LockGuard l(_lock);
for (const auto& [filter_id, ctx] : _local_merge_map) {
DORIS_CHECK(ctx);
DORIS_CHECK(ctx->merger);
local_merge_contexts.push_back({ctx->merger, ctx->producers});
}
for (const auto& [filter_id, filter_consumers] : _consumer_map) {
consumers.insert(consumers.end(), filter_consumers.begin(), filter_consumers.end());
}
}
for (const auto& ctx : local_merge_contexts) {
result += fmt::format("{}\n", ctx.merger->debug_string());
for (const auto& producer : ctx.producers) {
result += fmt::format("{}\n", producer->debug_string());
}
}
result += "Consumer Info:\n";
for (const auto& [filter_id, consumers] : _consumer_map) {
for (const auto& consumer : consumers) {
result += fmt::format("{}\n", consumer->debug_string());
}
for (const auto& consumer : consumers) {
result += fmt::format("{}\n", consumer->debug_string());
}
return result;
}
Expand Down Expand Up @@ -373,10 +400,9 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q

RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data));

RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get()));
RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get(), &is_ready));

cnt_val.arrive_id.insert(UniqueId(request->fragment_instance_id()));
is_ready = cnt_val.merger->ready(); // update is_ready in locked scope
}

if (is_ready) {
Expand Down Expand Up @@ -491,7 +517,7 @@ Status GlobalMergeContext::reset(QueryContext* query_ctx) {
DORIS_CHECK(merger);
int producer_size = merger->get_expected_producer_num();
RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &runtime_filter_desc, &merger));
merger->set_expected_producer_num(producer_size);
merger->increase_expected_producer_num(producer_size);
arrive_id.clear();
source_addrs.clear();
sync_size_callbacks.clear();
Expand Down
35 changes: 19 additions & 16 deletions be/src/exec/runtime_filter/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,12 @@ class HandleErrorBrpcCallback;
class SyncSizeCallback;

struct LocalMergeContext {
std::mutex mtx;
std::shared_ptr<RuntimeFilterMerger> merger;
std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
// Tracks the recursive CTE round. When a producer from a newer round
// registers, the context is reset (merger recreated, old producers dropped).
// registers, RuntimeFilterMgr replaces the whole context and old in-flight
// users keep the previous context alive through shared_ptr.
uint32_t stage = 0;

Status register_producer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc,
std::shared_ptr<RuntimeFilterProducer> producer);
};

struct GlobalMergeContext {
Expand Down Expand Up @@ -98,11 +95,12 @@ class RuntimeFilterMgr {
int node_id,
std::shared_ptr<RuntimeFilterConsumer>* consumer_filter);

Status register_local_merger_producer_filter(const QueryContext* query_ctx,
const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer> producer);
Status register_local_merge_producer_filter(const QueryContext* query_ctx,
const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer> producer);

Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** local_merge_filters);
Status get_local_merge_context(int filter_id, uint32_t expected_stage,
std::shared_ptr<LocalMergeContext>* context);

// Create local producer. This producer is hold by RuntimeFilterProducerHelper.
Status register_producer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
Expand All @@ -116,10 +114,10 @@ class RuntimeFilterMgr {
std::string debug_string();

void remove_filter(int32_t filter_id) {
std::lock_guard<std::mutex> l(_lock);
LockGuard l(_lock);
_consumer_map.erase(filter_id);
// NOTE: _local_merge_map is NOT erased here. It is reset lazily in
// LocalMergeContext::register_producer when a producer from a newer
// NOTE: _local_merge_map is NOT erased here. It is replaced lazily in
// register_local_merge_producer_filter when a producer from a newer
// recursive CTE round registers. Erasing eagerly here would race with
// multi-fragment REBUILD: a consumer-only fragment's remove_filter could
// delete the entry that the producer fragment just re-registered.
Expand All @@ -142,16 +140,21 @@ class RuntimeFilterMgr {
// RuntimeFilterMgr is owned by RuntimeState, so we only
// use filter_id as key
// key: "filter-id"
std::map<int32_t, std::vector<std::shared_ptr<RuntimeFilterConsumer>>> _consumer_map;
std::set<int32_t> _producer_id_set;
std::map<int32_t, LocalMergeContext> _local_merge_map;
// Protects fields marked GUARDED_BY(_lock). While holding this lock, only
// access RuntimeFilterMgr-owned state or copy shared_ptr snapshots; do not
// call methods on existing RuntimeFilter objects, because RF objects have
// their own locks and may call back into RuntimeFilterMgr.
AnnotatedMutex _lock;
std::map<int32_t, std::vector<std::shared_ptr<RuntimeFilterConsumer>>> _consumer_map
GUARDED_BY(_lock);
std::set<int32_t> _producer_id_set GUARDED_BY(_lock);
std::map<int32_t, std::shared_ptr<LocalMergeContext>> _local_merge_map GUARDED_BY(_lock);

std::unique_ptr<MemTracker> _tracker;

TNetworkAddress _merge_addr;

bool _has_merge_addr = false;
std::mutex _lock;
};

// controller -> <query-id, entity>
Expand Down
Loading
Loading