From 526b2ee113ae08c5091a08d97ccb13fe007a3e8d Mon Sep 17 00:00:00 2001 From: yuanyuhao Date: Wed, 24 Jun 2026 21:25:25 +0800 Subject: [PATCH 01/10] add multi-stage predicate lm --- be/src/exec/scan/olap_scanner.cpp | 68 +++ be/src/storage/iterators.h | 16 + be/src/storage/rowset/beta_rowset_reader.cpp | 11 + be/src/storage/rowset/rowset_reader_context.h | 5 + be/src/storage/segment/segment_iterator.cpp | 520 +++++++++++++++--- be/src/storage/segment/segment_iterator.h | 18 +- be/src/storage/tablet/tablet_reader.cpp | 7 + be/src/storage/tablet/tablet_reader.h | 5 + .../org/apache/doris/qe/SessionVariable.java | 25 + gensrc/thrift/PaloInternalService.thrift | 4 + 10 files changed, 610 insertions(+), 69 deletions(-) diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index 8da4483ea3afea..ab0e4a57befa70 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -66,6 +66,7 @@ #include "util/debug_points.h" #endif #include "util/json/path_in_data.h" +#include "util/string_util.h" namespace doris { #include "common/compile_check_avoid_begin.h" @@ -155,6 +156,61 @@ static bool has_file_cache_statistics(const io::FileCacheStatistics& stats) { stats.inverted_index_peer_io_timer != 0 || stats.inverted_index_io_timer != 0; } +static Status parse_predicate_lm_stage1_cols_to_column_ids(const std::string& cols, + const TabletSchemaSPtr& tablet_schema, + std::vector* column_ids) { + column_ids->clear(); + if (cols.empty()) { + return Status::OK(); + } + + std::vector parts = doris::split(cols, ","); + std::vector missing; + missing.reserve(parts.size()); + + for (const auto& part : parts) { + std::string_view name_sv = doris::trim(std::string_view(part)); + if (name_sv.empty()) { + continue; + } + + // allow backticks: `col` + if (name_sv.size() >= 2 && name_sv.front() == '`' && name_sv.back() == '`') { + name_sv = name_sv.substr(1, name_sv.size() - 2); + name_sv = doris::trim(name_sv); + } + if (name_sv.empty()) { + continue; + } + + std::string name(name_sv); + + // TabletSchema stores names as-is (not guaranteed lower-case). + // Try exact match first, then fall back to lower-case. + int32_t cid = tablet_schema->field_index(name); + if (cid < 0) { + cid = tablet_schema->field_index(doris::to_lower(name)); + } + + if (cid < 0) { + missing.emplace_back(std::move(name)); + continue; + } + + column_ids->push_back(static_cast(cid)); + } + + if (!missing.empty()) { + return Status::Error( + "predicate_lm_stage1_cols contains non-existing columns: {}", + doris::join(missing, ",")); + } + + std::sort(column_ids->begin(), column_ids->end()); + column_ids->erase(std::unique(column_ids->begin(), column_ids->end()), column_ids->end()); + return Status::OK(); +} + Status OlapScanner::_prepare_impl() { auto* local_state = static_cast(_local_state); auto& tablet = _tablet_reader_params.tablet; @@ -218,6 +274,18 @@ Status OlapScanner::_prepare_impl() { tablet_schema->update_indexes_from_thrift(olap_scan_node.indexes_desc); } + // Map FE session variable predicate_lm_stage1_cols -> ColumnId list + // NOTE: only meaningful when multi-stage predicate LM is enabled. + if (_tablet_reader_params.enable_multi_stage_predicate_lazy_materialization) { + const auto& qopts = _state->query_options(); + if (qopts.__isset.predicate_lm_stage1_cols && !qopts.predicate_lm_stage1_cols.empty()) { + std::vector stage1_column_ids; + RETURN_IF_ERROR(parse_predicate_lm_stage1_cols_to_column_ids( + qopts.predicate_lm_stage1_cols, tablet_schema, &stage1_column_ids)); + _tablet_reader_params.predicate_lm_stage1_column_ids = std::move(stage1_column_ids); + } + } + if (_tablet_reader_params.rs_splits.empty()) { // Non-pipeline mode, Tablet : Scanner = 1 : 1 // acquire tablet rowset readers at the beginning of the scan node diff --git a/be/src/storage/iterators.h b/be/src/storage/iterators.h index 59071909da7d40..bfe6ad3c1e6a89 100644 --- a/be/src/storage/iterators.h +++ b/be/src/storage/iterators.h @@ -157,6 +157,22 @@ class StorageReadOptions { std::shared_ptr score_runtime; CollectionStatisticsPtr collection_statistics; + // Multi-stage predicate lazy materialization (experimental): + // Stage1 reads a subset of predicate columns by index, evaluates predicates to produce + // an intermediate surviving row set; stage2 reads remaining predicate columns only for + // surviving rows (by rowids) and continues filtering. + // + // When disabled (default), SegmentIterator keeps the existing behavior: all predicate + // columns are read in the first pass. + bool enable_multi_stage_predicate_lazy_materialization = false; + // ColumnIds (tablet schema ordinal ids) to read/evaluate in stage1. + // If empty and enable_multi_stage_predicate_lazy_materialization=true, SegmentIterator will + // fallback to a conservative heuristic (currently: runtime-filter predicate columns). + std::vector predicate_lm_stage1_column_ids; + // If stage1 survival ratio is greater than this threshold, stage2 rowid reads may be + // less beneficial. SegmentIterator may choose a conservative evaluation path. + double predicate_lm_stage1_survival_ratio_threshold = 0.8; + // Cache for sparse column data to avoid redundant reads // col_unique_id -> cached column_ptr std::unordered_map sparse_column_cache; diff --git a/be/src/storage/rowset/beta_rowset_reader.cpp b/be/src/storage/rowset/beta_rowset_reader.cpp index 56b580e152d20d..21f2116eca19d8 100644 --- a/be/src/storage/rowset/beta_rowset_reader.cpp +++ b/be/src/storage/rowset/beta_rowset_reader.cpp @@ -63,6 +63,10 @@ void BetaRowsetReader::reset_read_options() { _read_options.col_id_to_predicates.clear(); _read_options.del_predicates_for_zone_map.clear(); _read_options.key_ranges.clear(); + + _read_options.enable_multi_stage_predicate_lazy_materialization = false; + _read_options.predicate_lm_stage1_column_ids.clear(); + _read_options.predicate_lm_stage1_survival_ratio_threshold = 0.8; } RowsetReaderSharedPtr BetaRowsetReader::clone() { @@ -116,6 +120,13 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.commit_tso = _rowset->rowset_meta()->commit_tso(); _read_options.tablet_id = _rowset->rowset_meta()->tablet_id(); _read_options.read_limit = _topn_limit; + + _read_options.enable_multi_stage_predicate_lazy_materialization = + _read_context->enable_multi_stage_predicate_lazy_materialization; + _read_options.predicate_lm_stage1_column_ids = _read_context->predicate_lm_stage1_column_ids; + _read_options.predicate_lm_stage1_survival_ratio_threshold = + _read_context->predicate_lm_stage1_survival_ratio_threshold; + if (_read_context->lower_bound_keys != nullptr) { for (int i = 0; i < _read_context->lower_bound_keys->size(); ++i) { _read_options.key_ranges.emplace_back(&_read_context->lower_bound_keys->at(i), diff --git a/be/src/storage/rowset/rowset_reader_context.h b/be/src/storage/rowset/rowset_reader_context.h index cb61a6b0ad9cba..ea2f60fa0c43a1 100644 --- a/be/src/storage/rowset/rowset_reader_context.h +++ b/be/src/storage/rowset/rowset_reader_context.h @@ -113,6 +113,11 @@ struct RowsetReaderContext { // General LIMIT budget forwarded to SegmentIterator. int64_t general_read_limit = -1; + + // Multi-stage predicate lazy materialization (experimental) + bool enable_multi_stage_predicate_lazy_materialization = false; + std::vector predicate_lm_stage1_column_ids; + double predicate_lm_stage1_survival_ratio_threshold = 0.8; }; } // namespace doris diff --git a/be/src/storage/segment/segment_iterator.cpp b/be/src/storage/segment/segment_iterator.cpp index e865828c52bf68..2b24ccb39ecdff 100644 --- a/be/src/storage/segment/segment_iterator.cpp +++ b/be/src/storage/segment/segment_iterator.cpp @@ -2019,43 +2019,133 @@ Status SegmentIterator::_vec_init_lazy_materialization() { _delete_bloom_filter_column_ids.push_back(predicate->column_id()); } } + + // Multi-stage predicate lazy materialization config + _enable_multi_stage_predicate_lazy_materialization = + _opts.enable_multi_stage_predicate_lazy_materialization; + _predicate_lm_stage1_survival_ratio_threshold = + _opts.predicate_lm_stage1_survival_ratio_threshold; + + _pre_eval_block_predicate.clear(); + _short_cir_eval_predicate.clear(); + _late_pre_eval_block_predicate.clear(); + _late_short_cir_eval_predicate.clear(); + _vec_pred_column_ids.clear(); + _short_cir_pred_column_ids.clear(); + _late_predicate_column_ids.clear(); + _filter_info_id.clear(); + + _is_need_vec_eval = false; + _is_need_short_eval = false; + _is_need_vec_eval_late = false; + _is_need_short_eval_late = false; // Step1: extract columns that can be lazy materialization if (!_col_predicates.empty() || !del_cond_id_set.empty()) { - std::set short_cir_pred_col_id_set; // using set for distinct cid - std::set vec_pred_col_id_set; + std::set stage1_pred_col_id_set; + std::set stage2_pred_col_id_set; + std::set stage1_short_cir_pred_col_id_set; + std::set stage1_vec_pred_col_id_set; + std::set stage2_short_cir_pred_col_id_set; + std::set stage2_vec_pred_col_id_set; + + std::set runtime_filter_cids; for (auto predicate : _col_predicates) { auto cid = predicate->column_id(); _is_pred_column[cid] = true; pred_column_ids.insert(cid); - - // check pred using short eval or vec eval - if (_can_evaluated_by_vectorized(predicate)) { - vec_pred_col_id_set.insert(cid); - _pre_eval_block_predicate.push_back(predicate); - } else { - short_cir_pred_col_id_set.insert(cid); - _short_cir_eval_predicate.push_back(predicate); - } if (predicate->is_runtime_filter()) { - _filter_info_id.push_back(predicate); + runtime_filter_cids.insert(cid); } } - // handle delete_condition + // handle delete_condition (always stage1) if (!del_cond_id_set.empty()) { - short_cir_pred_col_id_set.insert(del_cond_id_set.begin(), del_cond_id_set.end()); pred_column_ids.insert(del_cond_id_set.begin(), del_cond_id_set.end()); - + stage1_pred_col_id_set.insert(del_cond_id_set.begin(), del_cond_id_set.end()); for (auto cid : del_cond_id_set) { _is_pred_column[cid] = true; } } + + if (_enable_multi_stage_predicate_lazy_materialization) { + if (!_opts.predicate_lm_stage1_column_ids.empty()) { + stage1_pred_col_id_set.insert(_opts.predicate_lm_stage1_column_ids.begin(), + _opts.predicate_lm_stage1_column_ids.end()); + } else { + stage1_pred_col_id_set.insert(runtime_filter_cids.begin(), runtime_filter_cids.end()); + } + + for (auto it = stage1_pred_col_id_set.begin(); it != stage1_pred_col_id_set.end();) { + if (pred_column_ids.find(*it) == pred_column_ids.end()) { + it = stage1_pred_col_id_set.erase(it); + } else { + ++it; + } + } + + if (stage1_pred_col_id_set.empty() && !pred_column_ids.empty()) { + stage1_pred_col_id_set.insert(*pred_column_ids.begin()); + } + + for (auto cid : pred_column_ids) { + if (stage1_pred_col_id_set.find(cid) == stage1_pred_col_id_set.end()) { + stage2_pred_col_id_set.insert(cid); + } + } + } else { + stage1_pred_col_id_set = pred_column_ids; + } - _vec_pred_column_ids.assign(vec_pred_col_id_set.cbegin(), vec_pred_col_id_set.cend()); - _short_cir_pred_column_ids.assign(short_cir_pred_col_id_set.cbegin(), - short_cir_pred_col_id_set.cend()); + for (auto predicate : _col_predicates) { + auto cid = predicate->column_id(); + bool in_stage1 = (stage2_pred_col_id_set.empty() || + stage1_pred_col_id_set.find(cid) != stage1_pred_col_id_set.end()); + if (_can_evaluated_by_vectorized(predicate)) { + if (in_stage1) { + stage1_vec_pred_col_id_set.insert(cid); + _pre_eval_block_predicate.push_back(predicate); + } else { + stage2_vec_pred_col_id_set.insert(cid); + _late_pre_eval_block_predicate.push_back(predicate); + } + } else { + if (in_stage1) { + stage1_short_cir_pred_col_id_set.insert(cid); + _short_cir_eval_predicate.push_back(predicate); + } else { + stage2_short_cir_pred_col_id_set.insert(cid); + _late_short_cir_eval_predicate.push_back(predicate); + } + } + if (predicate->is_runtime_filter()) { + _filter_info_id.push_back(predicate); + } + } + + stage1_short_cir_pred_col_id_set.insert(del_cond_id_set.begin(), del_cond_id_set.end()); + + + _vec_pred_column_ids.assign(stage1_vec_pred_col_id_set.cbegin(), + stage1_vec_pred_col_id_set.cend()); + _short_cir_pred_column_ids.assign(stage1_short_cir_pred_col_id_set.cbegin(), + stage1_short_cir_pred_col_id_set.cend()); + + _is_need_vec_eval = !_vec_pred_column_ids.empty(); + _is_need_short_eval = !_short_cir_pred_column_ids.empty(); + + _is_need_vec_eval_late = !stage2_vec_pred_col_id_set.empty(); + _is_need_short_eval_late = !stage2_short_cir_pred_col_id_set.empty(); + + if (_enable_multi_stage_predicate_lazy_materialization) { + std::set stage2_read_columns; + stage2_read_columns.insert(stage2_vec_pred_col_id_set.begin(), + stage2_vec_pred_col_id_set.end()); + stage2_read_columns.insert(stage2_short_cir_pred_col_id_set.begin(), + stage2_short_cir_pred_col_id_set.end()); + _late_predicate_column_ids.assign(stage2_read_columns.begin(), stage2_read_columns.end()); + } } if (!_vec_pred_column_ids.empty()) { @@ -2125,9 +2215,16 @@ Status SegmentIterator::_vec_init_lazy_materialization() { } // Step 4: fill first read columns + _predicate_column_ids.clear(); if (_lazy_materialization_read) { - // insert pred cid to first_read_columns - for (auto cid : pred_column_ids) { + std::set first_read_pred_column_ids = pred_column_ids; + if (_enable_multi_stage_predicate_lazy_materialization && !_late_predicate_column_ids.empty()) { + for (auto cid : _late_predicate_column_ids) { + first_read_pred_column_ids.erase(cid); + } + } + // insert pred cid to first_read_pred_column_ids + for (auto cid : first_read_pred_column_ids) { _predicate_column_ids.push_back(cid); } } else if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) { @@ -2364,23 +2461,29 @@ Status SegmentIterator::_output_non_pred_columns(Block* block) { * This approach optimizes reading performance by leveraging batch processing for continuous * rowid sequences and handling discontinuities gracefully in smaller chunks. */ -Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint16_t& nrows_read) { +Status SegmentIterator::_read_columns_by_index(const std::vector& column_ids, + uint32_t nrows_read_limit, uint16_t& nrows_read, + bool read_rowids) { SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_ns); - nrows_read = (uint16_t)_range_iter->read_batch_rowids(_block_rowids.data(), nrows_read_limit); + if (read_rowids) { + nrows_read = (uint16_t)_range_iter->read_batch_rowids(_block_rowids.data(), + nrows_read_limit); + } bool is_continuous = (nrows_read > 1) && (_block_rowids[nrows_read - 1] - _block_rowids[0] == nrows_read - 1); VLOG_DEBUG << fmt::format( "nrows_read from range iterator: {}, is_continus {}, _cols_read_by_column_predicate " "[{}]", - nrows_read, is_continuous, fmt::join(_predicate_column_ids, ",")); + nrows_read, is_continuous, fmt::join(column_ids, ",")); LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( "[verbose] SegmentIterator::_read_columns_by_index read {} rowids, continuous: {}, " "rowids: [{}...{}]", nrows_read, is_continuous, nrows_read > 0 ? _block_rowids[0] : 0, nrows_read > 0 ? _block_rowids[nrows_read - 1] : 0); - for (auto cid : _predicate_column_ids) { + + for (auto cid : column_ids) { auto& column = _current_return_columns[cid]; VLOG_DEBUG << fmt::format("Reading column {}, col_name {}", cid, _schema->column(cid)->name()); @@ -2672,27 +2775,119 @@ uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_ return new_size; } -uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_rowid_idx, +uint16_t SegmentIterator::_evaluate_vectorization_predicate( + const std::vector>& predicates, uint16_t* sel_rowid_idx, + uint16_t selected_size) { + SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns); + const uint16_t original_size = selected_size; + if (predicates.empty()) { + for (uint16_t i = 0; i < original_size; ++i) { + sel_rowid_idx[i] = i; + } + _opts.stats->vec_cond_input_rows += original_size; + return original_size; + } + + bool all_pred_always_true = true; + for (const auto& pred : predicates) { + if (!pred->always_true()) { + all_pred_always_true = false; + } else { + pred->update_filter_info(0, 0, selected_size); + } + } + + if (all_pred_always_true) { + for (uint16_t i = 0; i < original_size; ++i) { + sel_rowid_idx[i] = i; + } + _opts.stats->vec_cond_input_rows += original_size; + return original_size; + } + + _ret_flags.resize(original_size); + bool is_first = true; + for (auto& pred : predicates) { + if (pred->always_true()) { + continue; + } + auto column_id = pred->column_id(); + auto& column = _current_return_columns[column_id]; + if (is_first) { + pred->evaluate_vec(*column, original_size, (bool*)_ret_flags.data()); + is_first = false; + } else { + pred->evaluate_and_vec(*column, original_size, (bool*)_ret_flags.data()); + } + } + + uint16_t new_size = 0; + uint16_t sel_pos = 0; + const uint16_t sel_end = sel_pos + selected_size; + static constexpr size_t SIMD_BYTES = simd::bits_mask_length(); + const uint16_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES; + + while (sel_pos < sel_end_simd) { + auto mask = simd::bytes_mask_to_bits_mask(_ret_flags.data() + sel_pos); + if (0 == mask) { + // pass + } else if (simd::bits_mask_all() == mask) { + for (uint16_t i = 0; i < SIMD_BYTES; i++) { + sel_rowid_idx[new_size++] = sel_pos + i; + } + } else { + simd::iterate_through_bits_mask( + [&](const int bit_pos) { + sel_rowid_idx[new_size++] = sel_pos + (uint16_t)bit_pos; + }, + mask); + } + sel_pos += SIMD_BYTES; + } + + for (; sel_pos < sel_end; sel_pos++) { + if (_ret_flags[sel_pos]) { + sel_rowid_idx[new_size++] = sel_pos; + } + } + + _opts.stats->vec_cond_input_rows += original_size; + _opts.stats->rows_vec_cond_filtered += original_size - new_size; + return new_size; +} + +uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size) { - SCOPED_RAW_TIMER(&_opts.stats->short_cond_ns); if (!_is_need_short_eval) { return selected_size; } + return _evaluate_short_circuit_predicate(_short_cir_eval_predicate, sel_rowid_idx, selected_size, + /*evaluate_delete_condition=*/true); +} + +uint16_t SegmentIterator::_evaluate_short_circuit_predicate( + const std::vector>& predicates, uint16_t* sel_rowid_idx, + uint16_t selected_size, bool evaluate_delete_condition) { + SCOPED_RAW_TIMER(&_opts.stats->short_cond_ns); uint16_t original_size = selected_size; - for (auto predicate : _short_cir_eval_predicate) { + + for (auto predicate : predicates) { auto column_id = predicate->column_id(); auto& short_cir_column = _current_return_columns[column_id]; - selected_size = predicate->evaluate(*short_cir_column, vec_sel_rowid_idx, selected_size); + selected_size = predicate->evaluate(*short_cir_column, sel_rowid_idx, selected_size); } _opts.stats->short_circuit_cond_input_rows += original_size; _opts.stats->rows_short_circuit_cond_filtered += original_size - selected_size; + if (!evaluate_delete_condition) { + return selected_size; + } // evaluate delete condition original_size = selected_size; selected_size = _opts.delete_condition_predicates->evaluate(_current_return_columns, - vec_sel_rowid_idx, selected_size); + sel_rowid_idx, selected_size); _opts.stats->rows_vec_del_cond_filtered += original_size - selected_size; return selected_size; } @@ -2968,7 +3163,8 @@ Status SegmentIterator::_next_batch_internal(Block* block) { _converted_column_ids.assign(_schema->columns().size(), false); _selected_size = 0; - RETURN_IF_ERROR(_read_columns_by_index(nrows_read_limit, _selected_size)); + RETURN_IF_ERROR(_read_columns_by_index(_predicate_column_ids, nrows_read_limit, _selected_size, + /*read_rowids=*/true)); _replace_version_col_if_needed(_predicate_column_ids, _selected_size); _update_lsn_col_if_needed(_predicate_column_ids, _selected_size); _update_tso_col_if_needed(_predicate_column_ids, _selected_size); @@ -2984,45 +3180,221 @@ Status SegmentIterator::_next_batch_internal(Block* block) { _sel_rowid_idx.resize(_selected_size); if (_is_need_vec_eval || _is_need_short_eval) { - _convert_dict_code_for_predicate_if_necessary(); - - // step 1: evaluate vectorization predicate - _selected_size = - _evaluate_vectorization_predicate(_sel_rowid_idx.data(), _selected_size); - - // step 2: evaluate short circuit predicate - // todo(wb) research whether need to read short predicate after vectorization evaluation - // to reduce cost of read short circuit columns. - // In SSB test, it make no difference; So need more scenarios to test - _selected_size = - _evaluate_short_circuit_predicate(_sel_rowid_idx.data(), _selected_size); - VLOG_DEBUG << fmt::format("After evaluate predicates, selected size: {} ", - _selected_size); - if (_selected_size > 0) { - // step 3.1: output short circuit and predicate column - // when lazy materialization enables, _predicate_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids) - // see _vec_init_lazy_materialization - // todo(wb) need to tell input columnids from output columnids - RETURN_IF_ERROR(_output_column_by_sel_idx(block, _predicate_column_ids, - _sel_rowid_idx.data(), _selected_size)); - - // step 3.2: read remaining expr column and evaluate it. - if (_is_need_expr_eval) { - // The predicate column contains the remaining expr column, no need second read. - if (_common_expr_column_ids.size() > 0) { - SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns); - RETURN_IF_ERROR(_read_columns_by_rowids( - _common_expr_column_ids, _block_rowids, _sel_rowid_idx.data(), - _selected_size, &_current_return_columns)); - _replace_version_col_if_needed(_common_expr_column_ids, _selected_size); - _update_lsn_col_if_needed(_common_expr_column_ids, _selected_size); - _update_tso_col_if_needed(_common_expr_column_ids, _selected_size); + if (!_enable_multi_stage_predicate_lazy_materialization) { + _convert_dict_code_for_predicate_if_necessary(); + + // step 1: evaluate vectorization predicate + _selected_size = + _evaluate_vectorization_predicate(_sel_rowid_idx.data(), _selected_size); + + // step 2: evaluate short circuit predicate + // todo(wb) research whether need to read short predicate after vectorization evaluation + // to reduce cost of read short circuit columns. + // In SSB test, it make no difference; So need more scenarios to test + _selected_size = + _evaluate_short_circuit_predicate(_sel_rowid_idx.data(), _selected_size); + VLOG_DEBUG << fmt::format("After evaluate predicates, selected size: {} ", + _selected_size); + if (_selected_size > 0) { + // step 3.1: output short circuit and predicate column + // when lazy materialization enables, _predicate_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids) + // see _vec_init_lazy_materialization + // todo(wb) need to tell input columnids from output columnids + RETURN_IF_ERROR(_output_column_by_sel_idx(block, _predicate_column_ids, + _sel_rowid_idx.data(), _selected_size)); + + // step 3.2: read remaining expr column and evaluate it. + if (_is_need_expr_eval) { + // The predicate column contains the remaining expr column, no need second read. + if (_common_expr_column_ids.size() > 0) { + SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns); + RETURN_IF_ERROR(_read_columns_by_rowids( + _common_expr_column_ids, _block_rowids, _sel_rowid_idx.data(), + _selected_size, &_current_return_columns)); + _replace_version_col_if_needed(_common_expr_column_ids, _selected_size); + _update_lsn_col_if_needed(_common_expr_column_ids, _selected_size); + _update_tso_col_if_needed(_common_expr_column_ids, _selected_size); + RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); + } + + DCHECK(block->columns() > + _schema_block_id_map[*_common_expr_columns.begin()]); + RETURN_IF_ERROR(_process_common_expr(_sel_rowid_idx.data(), _selected_size, + block)); + } + } else { + _fill_column_nothing(); + if (_is_need_expr_eval) { RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); } + } + } else { + const uint16_t rows_read = _selected_size; + + _sel_rowid_idx_stage1.resize(rows_read); + _convert_dict_code_for_predicate_if_necessary( + _short_cir_eval_predicate, _pre_eval_block_predicate, + /*include_delete_condition_columns=*/true); + + // Stage1: evaluate predicates on the first-read predicate columns (by index) + uint16_t stage1_size = + _evaluate_vectorization_predicate(_sel_rowid_idx_stage1.data(), rows_read); + stage1_size = _evaluate_short_circuit_predicate(_sel_rowid_idx_stage1.data(), + stage1_size); + + if (stage1_size > 0) { + bool do_stage2 = _enable_multi_stage_predicate_lazy_materialization && + !_late_predicate_column_ids.empty() && + (_is_need_vec_eval_late || _is_need_short_eval_late); + + bool stage2_columns_dense_on_all_rows = false; + + if (do_stage2) { + const double survival_ratio = static_cast(stage1_size) / + static_cast(rows_read); + const bool stage2_by_rowids = + survival_ratio <= _predicate_lm_stage1_survival_ratio_threshold; + + if (stage2_by_rowids) { + RETURN_IF_ERROR(_read_columns_by_rowids( + _late_predicate_column_ids, _block_rowids, + _sel_rowid_idx_stage1.data(), stage1_size, + &_current_return_columns)); + _replace_version_col_if_needed(_late_predicate_column_ids, stage1_size); + _update_lsn_col_if_needed(_late_predicate_column_ids, stage1_size); + _update_tso_col_if_needed(_late_predicate_column_ids, stage1_size); + + _convert_dict_code_for_predicate_if_necessary( + _late_short_cir_eval_predicate, _late_pre_eval_block_predicate, + /*include_delete_condition_columns=*/false); + + _sel_rowid_idx_stage2.resize(stage1_size); + uint16_t stage2_size = stage1_size; + if (_is_need_vec_eval_late) { + stage2_size = _evaluate_vectorization_predicate( + _late_pre_eval_block_predicate, _sel_rowid_idx_stage2.data(), + stage2_size); + } else { + for (uint16_t i = 0; i < stage2_size; ++i) { + _sel_rowid_idx_stage2[i] = i; + } + } + if (_is_need_short_eval_late) { + stage2_size = _evaluate_short_circuit_predicate( + _late_short_cir_eval_predicate, _sel_rowid_idx_stage2.data(), + stage2_size, /*evaluate_delete_condition=*/false); + } + + _sel_rowid_idx.resize(stage2_size); + for (uint16_t i = 0; i < stage2_size; ++i) { + _sel_rowid_idx[i] = + _sel_rowid_idx_stage1[_sel_rowid_idx_stage2[i]]; + } + _selected_size = stage2_size; + } else { + uint16_t stage2_rows_read = rows_read; + RETURN_IF_ERROR(_read_columns_by_index(_late_predicate_column_ids, + rows_read, stage2_rows_read, + /*read_rowids=*/false)); + _replace_version_col_if_needed(_late_predicate_column_ids, stage2_rows_read); + _update_lsn_col_if_needed(_late_predicate_column_ids, stage2_rows_read); + _update_tso_col_if_needed(_late_predicate_column_ids, stage2_rows_read); + + _convert_dict_code_for_predicate_if_necessary( + _late_short_cir_eval_predicate, _late_pre_eval_block_predicate, + /*include_delete_condition_columns=*/false); + + _sel_rowid_idx_stage2.resize(stage2_rows_read); + uint16_t stage2_size_all = stage2_rows_read; + if (_is_need_vec_eval_late) { + stage2_size_all = _evaluate_vectorization_predicate( + _late_pre_eval_block_predicate, _sel_rowid_idx_stage2.data(), + stage2_size_all); + } else { + for (uint16_t i = 0; i < stage2_size_all; ++i) { + _sel_rowid_idx_stage2[i] = i; + } + } + if (_is_need_short_eval_late) { + stage2_size_all = _evaluate_short_circuit_predicate( + _late_short_cir_eval_predicate, _sel_rowid_idx_stage2.data(), + stage2_size_all, /*evaluate_delete_condition=*/false); + } - DCHECK(block->columns() > _schema_block_id_map[*_common_expr_columns.begin()]); - RETURN_IF_ERROR( - _process_common_expr(_sel_rowid_idx.data(), _selected_size, block)); + _sel_rowid_idx.clear(); + _sel_rowid_idx.reserve(std::min(stage1_size, stage2_size_all)); + uint16_t i = 0; + uint16_t j = 0; + while (i < stage1_size && j < stage2_size_all) { + const uint16_t a = _sel_rowid_idx_stage1[i]; + const uint16_t b = _sel_rowid_idx_stage2[j]; + if (a == b) { + _sel_rowid_idx.push_back(a); + ++i; + ++j; + } else if (a < b) { + ++i; + } else { + ++j; + } + } + _selected_size = cast_set(_sel_rowid_idx.size()); + stage2_columns_dense_on_all_rows = true; + } + } else { + _sel_rowid_idx.resize(stage1_size); + for (uint16_t i = 0; i < stage1_size; ++i) { + _sel_rowid_idx[i] = _sel_rowid_idx_stage1[i]; + } + _selected_size = stage1_size; + } + + if (_selected_size > 0) { + // Output stage1 predicate columns + RETURN_IF_ERROR(_output_column_by_sel_idx(block, _predicate_column_ids, + _sel_rowid_idx.data(), + _selected_size)); + + // Output stage2 predicate columns (if any) + if (do_stage2) { + uint16_t* stage2_output_sel = _sel_rowid_idx.data(); + if (!stage2_columns_dense_on_all_rows) { + stage2_output_sel = _sel_rowid_idx_stage2.data(); + } + RETURN_IF_ERROR(_output_column_by_sel_idx(block, _late_predicate_column_ids, + stage2_output_sel, + _selected_size)); + } + + // common expr (unchanged, uses final selector) + if (_is_need_expr_eval) { + if (_common_expr_column_ids.size() > 0) { + SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns); + RETURN_IF_ERROR(_read_columns_by_rowids( + _common_expr_column_ids, _block_rowids, _sel_rowid_idx.data(), + _selected_size, &_current_return_columns)); + _replace_version_col_if_needed(_common_expr_column_ids, _selected_size); + _update_lsn_col_if_needed(_common_expr_column_ids, _selected_size); + _update_tso_col_if_needed(_common_expr_column_ids, _selected_size); + RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); + } + + DCHECK(block->rows() > + _schema_block_id_map[*_common_expr_columns.begin()]); + RETURN_IF_ERROR(_process_common_expr(_sel_rowid_idx.data(), _selected_size, + block)); + } + } else { + _fill_column_nothing(); + if (_is_need_expr_eval) { + RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); + } + } + } else { + _fill_column_nothing(); + if (_is_need_expr_eval) { + RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); + } } } else { _fill_column_nothing(); @@ -3321,14 +3693,26 @@ void SegmentIterator::_output_index_result_column(const std::vector>& short_circuit_predicates, + const std::vector>& vectorized_predicates, + bool include_delete_condition_columns) { + for (auto predicate : short_circuit_predicates) { _convert_dict_code_for_predicate_if_necessary_impl(predicate); } - for (auto predicate : _pre_eval_block_predicate) { + for (auto predicate : vectorized_predicates) { _convert_dict_code_for_predicate_if_necessary_impl(predicate); } + if (!include_delete_condition_columns) { + return; + } + for (auto column_id : _delete_range_column_ids) { _current_return_columns[column_id].get()->convert_dict_codes_if_necessary(); } diff --git a/be/src/storage/segment/segment_iterator.h b/be/src/storage/segment/segment_iterator.h index 8fd143867ed97b..82d7206a6fb0b0 100644 --- a/be/src/storage/segment/segment_iterator.h +++ b/be/src/storage/segment/segment_iterator.h @@ -213,14 +213,30 @@ class SegmentIterator : public RowwiseIterator { // for vectorization implementation [[nodiscard]] Status _read_columns(const std::vector& column_ids, MutableColumns& column_block, size_t nrows); - [[nodiscard]] Status _read_columns_by_index(uint32_t nrows_read_limit, uint16_t& nrows_read); + [[nodiscard]] Status _read_columns_by_index(const std::vector& column_ids, + uint32_t nrows_read_limit, uint16_t& nrows_read, + bool read_rowids); void _replace_version_col_if_needed(const std::vector& column_ids, size_t num_rows); void _update_lsn_col_if_needed(const std::vector& column_ids, size_t num_rows); void _update_tso_col_if_needed(const std::vector& column_ids, size_t num_rows); Status _init_current_block(Block* block, std::vector& non_pred_vector, uint32_t nrows_read_limit); uint16_t _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size); + uint16_t _evaluate_vectorization_predicate( + const std::vector>& predicates, + uint16_t* sel_rowid_idx, uint16_t selected_size); + uint16_t _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size); + uint16_t _evaluate_short_circuit_predicate( + const std::vector>& predicates, + uint16_t* sel_rowid_idx, uint16_t selected_size, bool evaluate_delete_condition); + + // Dictionary column should do something to initial. + void _convert_dict_code_for_predicate_if_necessary(); + void _convert_dict_code_for_predicate_if_necessary( + const std::vector>& short_circuit_predicates, + const std::vector>& vectorized_predicates, + bool include_delete_condition_columns); Status _apply_read_limit_to_selected_rows(Block* block, uint16_t& selected_size); void _collect_runtime_filter_predicate(); Status _output_non_pred_columns(Block* block); diff --git a/be/src/storage/tablet/tablet_reader.cpp b/be/src/storage/tablet/tablet_reader.cpp index 2e7e73a3676d28..a8c73b91698db5 100644 --- a/be/src/storage/tablet/tablet_reader.cpp +++ b/be/src/storage/tablet/tablet_reader.cpp @@ -229,6 +229,13 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { // Propagate general read limit for DUP_KEYS and UNIQUE_KEYS with MOW _reader_context.general_read_limit = read_params.general_read_limit; + // Multi-stage predicate lazy materialization (experimental) + _reader_context.enable_multi_stage_predicate_lazy_materialization = + read_params.enable_multi_stage_predicate_lazy_materialization; + _reader_context.predicate_lm_stage1_column_ids = read_params.predicate_lm_stage1_column_ids; + _reader_context.predicate_lm_stage1_survival_ratio_threshold = + read_params.predicate_lm_stage1_survival_ratio_threshold; + // Preserve the original requested output layout so BlockReader can map expanded storage // columns (for non-direct AGG/UNIQUE paths) back to the final output block. _reader_context.origin_return_columns = read_params.origin_return_columns; diff --git a/be/src/storage/tablet/tablet_reader.h b/be/src/storage/tablet/tablet_reader.h index 3d03caf6d74f5e..5beebbc1e53f00 100644 --- a/be/src/storage/tablet/tablet_reader.h +++ b/be/src/storage/tablet/tablet_reader.h @@ -218,6 +218,11 @@ class TabletReader { uint64_t condition_cache_digest = 0; + // Multi-stage predicate lazy materialization (experimental) + bool enable_multi_stage_predicate_lazy_materialization = false; + std::vector predicate_lm_stage1_column_ids; + double predicate_lm_stage1_survival_ratio_threshold = 0.8; + // General LIMIT budget forwarded to SegmentIterator. -1 means no limit. int64_t general_read_limit = -1; TBinlogScanType::type binlog_scan_type = TBinlogScanType::NONE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d017b921efd79e..241ae64b186020 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -888,6 +888,9 @@ public String toString() { = "decompose_repeat_shuffle_index_in_max_group"; public static final String ENABLE_SHUFFLE_KEY_PRUNE = "enable_shuffle_key_prune"; + public static final String ENABLE_MULTI_STAGE_PREDICATE_LM = "enable_multi_stage_predicate_lm"; + public static final String PREDICATE_LM_STAGE1_COLS = "predicate_lm_stage1_cols"; + public static final String HOT_VALUE_COLLECT_COUNT = "hot_value_collect_count"; @VarAttrDef.VarAttr(name = HOT_VALUE_COLLECT_COUNT, needForward = true, description = {"列统计信息收集时,收集占比排名前 HOT_VALUE_COLLECT_COUNT 的值作为 hot value", @@ -2976,6 +2979,24 @@ public static boolean isEagerAggregationOnJoin() { @VarAttrDef.VarAttr(name = ENABLE_SHUFFLE_KEY_PRUNE) public boolean enableShuffleKeyPrune = true; + @VarAttrDef.VarAttr( + name = ENABLE_MULTI_STAGE_PREDICATE_LM, + fuzzy = true, + description = {"控制 SegmentIterator 是否启用多阶段谓词延迟物化(实验特性)。默认为 false。", + "Controls whether to enable multi-stage predicate lazy materialization in SegmentIterator " + + "(experimental). The default value is false."}, + needForward = true) + public boolean enableMultiStagePredicateLm = false; + + @VarAttrDef.VarAttr( + name = PREDICATE_LM_STAGE1_COLS, + fuzzy = true, + description = {"人工指定的多阶段谓词延迟物化中 stage1 参与过滤的列名列表,逗号分隔,例如 'a,b,c'。默认为空。", + "Stage1 predicate columns for multi-stage predicate LM, comma-separated, e.g. 'a,b,c'. " + + "Default is empty."}, + needForward = true) + public String predicateLmStage1Cols = ""; + @VarAttrDef.VarAttr(name = ENABLE_PREFER_CACHED_ROWSET, needForward = false, description = {"是否启用 prefer cached rowset 功能", "Whether to enable prefer cached rowset feature"}) @@ -5664,6 +5685,10 @@ public TQueryOptions toThrift() { tResult.setAnnIndexCandidateRowsThreshold(annIndexCandidateRowsThreshold); tResult.setAnnIndexCandidateRowsPercentThreshold(annIndexCandidateRowsPercentThreshold); tResult.setMergeReadSliceSize(mergeReadSliceSizeBytes); + + tResult.setEnableMultiStagePredicateLm(enableMultiStagePredicateLm); + tResult.setPredicateLmStage1Cols(predicateLmStage1Cols); + tResult.setEnableExtendedRegex(enableExtendedRegex); if (fileCacheQueryLimitPercent > 0) { tResult.setFileCacheQueryLimitPercent(Math.min(fileCacheQueryLimitPercent, diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 0d8618dbc78a0f..504a18a7327307 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -500,6 +500,10 @@ struct TQueryOptions { // enable plan local exchange node in fe 223: optional bool enable_local_shuffle_planner; + 224: optional bool enable_multi_stage_predicate_lm = false + + 225: optional string predicate_lm_stage1_cols = "" + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. From 7f01b5bf9dad70f156ed38054828d9147e4c7beb Mon Sep 17 00:00:00 2001 From: yuanyuhao Date: Thu, 25 Jun 2026 12:35:59 +0800 Subject: [PATCH 02/10] add profile for multi stage lm --- be/src/exec/operator/olap_scan_operator.cpp | 12 ++ be/src/exec/operator/olap_scan_operator.h | 8 + be/src/exec/scan/olap_scanner.cpp | 68 ++---- be/src/exec/scan/predicate_lm_utils.h | 83 +++++++ be/src/storage/olap_common.h | 8 + be/src/storage/segment/segment_iterator.cpp | 11 + be/test/exec/scan/predicate_lm_utils_test.cpp | 96 +++++++++ .../apache/doris/qe/SessionVariablesTest.java | 24 +++ .../test_multi_stage_predicate_lm.groovy | 202 ++++++++++++++++++ 9 files changed, 457 insertions(+), 55 deletions(-) create mode 100644 be/src/exec/scan/predicate_lm_utils.h create mode 100644 be/test/exec/scan/predicate_lm_utils_test.cpp create mode 100644 regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy diff --git a/be/src/exec/operator/olap_scan_operator.cpp b/be/src/exec/operator/olap_scan_operator.cpp index 4d401a78100b5f..9c010b59092648 100644 --- a/be/src/exec/operator/olap_scan_operator.cpp +++ b/be/src/exec/operator/olap_scan_operator.cpp @@ -211,6 +211,18 @@ Status OlapScanLocalState::_init_profile() { _rows_short_circuit_cond_input_counter = ADD_COUNTER(_segment_profile, "RowsShortCircuitPredInput", TUnit::UNIT); _rows_expr_cond_input_counter = ADD_COUNTER(_segment_profile, "RowsExprPredInput", TUnit::UNIT); + + _predicate_lm_stage1_input_rows_counter = + ADD_COUNTER(_segment_profile, "PredicateLMStage1InputRows", TUnit::UNIT); + _predicate_lm_stage1_output_rows_counter = + ADD_COUNTER(_segment_profile, "PredicateLMStage1OutputRows", TUnit::UNIT); + _predicate_lm_stage2_by_rowids_batches_counter = + ADD_COUNTER(_segment_profile, "PredicateLMStage2ByRowIdsBatches", TUnit::UNIT); + _predicate_lm_stage2_by_all_rows_batches_counter = + ADD_COUNTER(_segment_profile, "PredicateLMStage2ByAllRowsBatches", TUnit::UNIT); + _predicate_lm_stage2_rows_read_counter = + ADD_COUNTER(_segment_profile, "PredicateLMStage2RowsRead", TUnit::UNIT); + _vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime"); _short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime"); _expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime"); diff --git a/be/src/exec/operator/olap_scan_operator.h b/be/src/exec/operator/olap_scan_operator.h index 344dd604db1965..4a31b8d416b825 100644 --- a/be/src/exec/operator/olap_scan_operator.h +++ b/be/src/exec/operator/olap_scan_operator.h @@ -157,6 +157,14 @@ class OlapScanLocalState final : public ScanLocalState { RuntimeProfile::Counter* _rows_vec_cond_input_counter = nullptr; RuntimeProfile::Counter* _rows_short_circuit_cond_input_counter = nullptr; RuntimeProfile::Counter* _rows_expr_cond_input_counter = nullptr; + + // Multi-stage predicate lazy materialization (SegmentIterator) + RuntimeProfile::Counter* _predicate_lm_stage1_input_rows_counter = nullptr; + RuntimeProfile::Counter* _predicate_lm_stage1_output_rows_counter = nullptr; + RuntimeProfile::Counter* _predicate_lm_stage2_by_rowids_batches_counter = nullptr; + RuntimeProfile::Counter* _predicate_lm_stage2_by_all_rows_batches_counter = nullptr; + RuntimeProfile::Counter* _predicate_lm_stage2_rows_read_counter = nullptr; + RuntimeProfile::Counter* _vec_cond_timer = nullptr; RuntimeProfile::Counter* _short_cond_timer = nullptr; RuntimeProfile::Counter* _expr_filter_timer = nullptr; diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index ab0e4a57befa70..523cd0d37d5c45 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -41,6 +41,7 @@ #include "core/data_type/data_type_number.h" #include "exec/common/variant_util.h" #include "exec/operator/olap_scan_operator.h" +#include "exec/scan/predicate_lm_utils.h" #include "exec/scan/scan_node.h" #include "exprs/function_filter.h" #include "exprs/vexpr.h" @@ -156,61 +157,6 @@ static bool has_file_cache_statistics(const io::FileCacheStatistics& stats) { stats.inverted_index_peer_io_timer != 0 || stats.inverted_index_io_timer != 0; } -static Status parse_predicate_lm_stage1_cols_to_column_ids(const std::string& cols, - const TabletSchemaSPtr& tablet_schema, - std::vector* column_ids) { - column_ids->clear(); - if (cols.empty()) { - return Status::OK(); - } - - std::vector parts = doris::split(cols, ","); - std::vector missing; - missing.reserve(parts.size()); - - for (const auto& part : parts) { - std::string_view name_sv = doris::trim(std::string_view(part)); - if (name_sv.empty()) { - continue; - } - - // allow backticks: `col` - if (name_sv.size() >= 2 && name_sv.front() == '`' && name_sv.back() == '`') { - name_sv = name_sv.substr(1, name_sv.size() - 2); - name_sv = doris::trim(name_sv); - } - if (name_sv.empty()) { - continue; - } - - std::string name(name_sv); - - // TabletSchema stores names as-is (not guaranteed lower-case). - // Try exact match first, then fall back to lower-case. - int32_t cid = tablet_schema->field_index(name); - if (cid < 0) { - cid = tablet_schema->field_index(doris::to_lower(name)); - } - - if (cid < 0) { - missing.emplace_back(std::move(name)); - continue; - } - - column_ids->push_back(static_cast(cid)); - } - - if (!missing.empty()) { - return Status::Error( - "predicate_lm_stage1_cols contains non-existing columns: {}", - doris::join(missing, ",")); - } - - std::sort(column_ids->begin(), column_ids->end()); - column_ids->erase(std::unique(column_ids->begin(), column_ids->end()), column_ids->end()); - return Status::OK(); -} - Status OlapScanner::_prepare_impl() { auto* local_state = static_cast(_local_state); auto& tablet = _tablet_reader_params.tablet; @@ -999,6 +945,18 @@ void OlapScanner::_collect_profile_before_close() { COUNTER_UPDATE(local_state->_rows_short_circuit_cond_input_counter, stats.short_circuit_cond_input_rows); COUNTER_UPDATE(local_state->_rows_expr_cond_input_counter, stats.expr_cond_input_rows); + + COUNTER_UPDATE(local_state->_predicate_lm_stage1_input_rows_counter, + stats.predicate_lm_stage1_input_rows); + COUNTER_UPDATE(local_state->_predicate_lm_stage1_output_rows_counter, + stats.predicate_lm_stage1_output_rows); + COUNTER_UPDATE(local_state->_predicate_lm_stage2_by_rowids_batches_counter, + stats.predicate_lm_stage2_by_rowids_batches); + COUNTER_UPDATE(local_state->_predicate_lm_stage2_by_all_rows_batches_counter, + stats.predicate_lm_stage2_by_all_rows_batches); + COUNTER_UPDATE(local_state->_predicate_lm_stage2_rows_read_counter, + stats.predicate_lm_stage2_rows_read); + COUNTER_UPDATE(local_state->_stats_filtered_counter, stats.rows_stats_filtered); COUNTER_UPDATE(local_state->_stats_rp_filtered_counter, stats.rows_stats_rp_filtered); COUNTER_UPDATE(local_state->_dict_filtered_counter, stats.segment_dict_filtered); diff --git a/be/src/exec/scan/predicate_lm_utils.h b/be/src/exec/scan/predicate_lm_utils.h new file mode 100644 index 00000000000000..b805a7aaab44e8 --- /dev/null +++ b/be/src/exec/scan/predicate_lm_utils.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/status.h" +#include "storage/tablet/tablet_schema.h" +#include "util/string_util.h" + +namespace doris { + +inline Status parse_predicate_lm_stage1_cols_to_column_ids(const std::string& cols, + const TabletSchemaSPtr& tablet_schema, + std::vector* column_ids) { + column_ids->clear(); + if (cols.empty()) { + return Status::OK(); + } + + std::vector parts = doris::split(cols, ","); + std::vector missing; + missing.reserve(parts.size()); + + for (const auto& part : parts) { + std::string_view name_sv = doris::trim(std::string_view(part)); + if (name_sv.empty()) { + continue; + } + + if (name_sv.size() >= 2 && name_sv.front() == '`' && name_sv.back() == '`') { + name_sv = name_sv.substr(1, name_sv.size() - 2); + name_sv = doris::trim(name_sv); + } + if (name_sv.empty()) { + continue; + } + + std::string name(name_sv); + + int32_t cid = tablet_schema->field_index(name); + if (cid < 0) { + cid = tablet_schema->field_index(doris::to_lower(name)); + } + + if (cid < 0) { + missing.emplace_back(std::move(name)); + continue; + } + + column_ids->push_back(static_cast(cid)); + } + + if (!missing.empty()) { + return Status::Error( + "predicate_lm_stage1_cols contains non-existing columns: {}", + doris::join(missing, ",")); + } + + std::sort(column_ids->begin(), column_ids->end()); + column_ids->erase(std::unique(column_ids->begin(), column_ids->end()), column_ids->end()); + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/storage/olap_common.h b/be/src/storage/olap_common.h index 2a8893a1b0e939..5ee17b1b69d531 100644 --- a/be/src/storage/olap_common.h +++ b/be/src/storage/olap_common.h @@ -341,6 +341,14 @@ struct OlapReaderStatistics { int64_t vec_cond_input_rows = 0; int64_t short_circuit_cond_input_rows = 0; int64_t expr_cond_input_rows = 0; + + // Multi-stage predicate lazy materialization stats (SegmentIterator) + int64_t predicate_lm_stage1_input_rows = 0; + int64_t predicate_lm_stage1_output_rows = 0; + int64_t predicate_lm_stage2_by_rowids_batches = 0; + int64_t predicate_lm_stage2_by_all_rows_batches = 0; + int64_t predicate_lm_stage2_rows_read = 0; + int64_t rows_vec_del_cond_filtered = 0; int64_t vec_cond_ns = 0; int64_t short_cond_ns = 0; diff --git a/be/src/storage/segment/segment_iterator.cpp b/be/src/storage/segment/segment_iterator.cpp index 2b24ccb39ecdff..0b749fd5aa02f9 100644 --- a/be/src/storage/segment/segment_iterator.cpp +++ b/be/src/storage/segment/segment_iterator.cpp @@ -3242,6 +3242,9 @@ Status SegmentIterator::_next_batch_internal(Block* block) { stage1_size = _evaluate_short_circuit_predicate(_sel_rowid_idx_stage1.data(), stage1_size); + _opts.stats->predicate_lm_stage1_input_rows += rows_read; + _opts.stats->predicate_lm_stage1_output_rows += stage1_size; + if (stage1_size > 0) { bool do_stage2 = _enable_multi_stage_predicate_lazy_materialization && !_late_predicate_column_ids.empty() && @@ -3256,6 +3259,10 @@ Status SegmentIterator::_next_batch_internal(Block* block) { survival_ratio <= _predicate_lm_stage1_survival_ratio_threshold; if (stage2_by_rowids) { + + _opts.stats->predicate_lm_stage2_by_rowids_batches += 1; + _opts.stats->predicate_lm_stage2_rows_read += stage1_size; + RETURN_IF_ERROR(_read_columns_by_rowids( _late_predicate_column_ids, _block_rowids, _sel_rowid_idx_stage1.data(), stage1_size, @@ -3293,6 +3300,10 @@ Status SegmentIterator::_next_batch_internal(Block* block) { _selected_size = stage2_size; } else { uint16_t stage2_rows_read = rows_read; + + _opts.stats->predicate_lm_stage2_by_all_rows_batches += 1; + _opts.stats->predicate_lm_stage2_rows_read += stage2_rows_read; + RETURN_IF_ERROR(_read_columns_by_index(_late_predicate_column_ids, rows_read, stage2_rows_read, /*read_rowids=*/false)); diff --git a/be/test/exec/scan/predicate_lm_utils_test.cpp b/be/test/exec/scan/predicate_lm_utils_test.cpp new file mode 100644 index 00000000000000..8961ae55a89247 --- /dev/null +++ b/be/test/exec/scan/predicate_lm_utils_test.cpp @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/scan/predicate_lm_utils.h" + +#include + +namespace doris { + +static TabletSchemaSPtr build_tablet_schema_for_test() { + auto schema = std::make_shared(); + + { + TabletColumn col(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, + FieldType::OLAP_FIELD_TYPE_INT); + col.set_name("k"); + schema->append_column(col); + } + { + TabletColumn col(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, + FieldType::OLAP_FIELD_TYPE_INT); + col.set_name("a"); + schema->append_column(col); + } + { + TabletColumn col(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, + FieldType::OLAP_FIELD_TYPE_INT); + col.set_name("b"); + schema->append_column(col); + } + + return schema; +} + +TEST(PredicateLmUtilsTest, ParseEmptyString) { + auto schema = build_tablet_schema_for_test(); + + std::vector cids; + Status st = parse_predicate_lm_stage1_cols_to_column_ids("", schema, &cids); + EXPECT_TRUE(st.ok()) << st.to_string(); + EXPECT_TRUE(cids.empty()); +} + +TEST(PredicateLmUtilsTest, ParseBasicColumns) { + auto schema = build_tablet_schema_for_test(); + + std::vector cids; + Status st = parse_predicate_lm_stage1_cols_to_column_ids("a,b", schema, &cids); + EXPECT_TRUE(st.ok()) << st.to_string(); + + // schema is [k,a,b] => a=1, b=2 + ASSERT_EQ(2u, cids.size()); + EXPECT_EQ(static_cast(1), cids[0]); + EXPECT_EQ(static_cast(2), cids[1]); +} + +TEST(PredicateLmUtilsTest, ParseTrimBackticksDedupAndCaseInsensitive) { + auto schema = build_tablet_schema_for_test(); + + std::vector cids; + Status st = parse_predicate_lm_stage1_cols_to_column_ids(" `A` , b , a ", schema, &cids); + EXPECT_TRUE(st.ok()) << st.to_string(); + + // Dedup + sorted + ASSERT_EQ(2u, cids.size()); + EXPECT_EQ(static_cast(1), cids[0]); + EXPECT_EQ(static_cast(2), cids[1]); +} + +TEST(PredicateLmUtilsTest, ParseUnknownColumnsShouldFail) { + auto schema = build_tablet_schema_for_test(); + + std::vector cids; + Status st = parse_predicate_lm_stage1_cols_to_column_ids("a,not_exist", schema, &cids); + EXPECT_FALSE(st.ok()); + EXPECT_EQ(ErrorCode::INVALID_ARGUMENT, st.code()); + EXPECT_TRUE(st.to_string().find("predicate_lm_stage1_cols") != std::string::npos) + << st.to_string(); + EXPECT_TRUE(st.to_string().find("not_exist") != std::string::npos) << st.to_string(); +} + +} // namespace doris diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java index be7b42cfe7736f..49412bab9d6c9f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java @@ -27,6 +27,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.rules.rewrite.eageraggregation.EagerAggHints.Action; +import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.utframe.TestWithFeService; import org.junit.jupiter.api.Assertions; @@ -329,4 +330,27 @@ public void testAnnSessionVariableChecker() throws Exception { Assertions.assertTrue(nprobeException.getMessage().contains("ivf_nprobe must be >= 1")); Assertions.assertEquals(2, sv.ivfNprobe); } + + @Test + public void testMultiStagePredicateLmSessionVariablesForwardToThrift() throws Exception { + connectContext.setThreadLocalInfo(); + SessionVariable sv = connectContext.getSessionVariable(); + + VariableMgr.setVar(sv, new SetVar(SetType.SESSION, + SessionVariable.ENABLE_MULTI_STAGE_PREDICATE_LM, new StringLiteral("true"))); + VariableMgr.setVar(sv, new SetVar(SetType.SESSION, + SessionVariable.PREDICATE_LM_STAGE1_COLS, new StringLiteral("a,b"))); + + Map forwardVars = sv.getForwardVariables(); + Assertions.assertEquals("true", + forwardVars.get(SessionVariable.ENABLE_MULTI_STAGE_PREDICATE_LM)); + Assertions.assertEquals("a,b", + forwardVars.get(SessionVariable.PREDICATE_LM_STAGE1_COLS)); + + TQueryOptions opts = sv.toThrift(); + Assertions.assertTrue(opts.isSetEnableMultiStagePredicateLm()); + Assertions.assertTrue(opts.isEnableMultiStagePredicateLm()); + Assertions.assertTrue(opts.isSetPredicateLmStage1Cols()); + Assertions.assertEquals("a,b", opts.getPredicateLmStage1Cols()); + } } diff --git a/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy b/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy new file mode 100644 index 00000000000000..0f10598b1cd951 --- /dev/null +++ b/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.Base64 +import java.util.regex.Pattern +import groovy.json.JsonSlurper + +suite("test_multi_stage_predicate_lm") { + def getProfileList = { + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/rest/v1/query_profile").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() + } + + def getProfile = { id -> + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() + } + + def getProfileWithToken = { token -> + String profileId = "" + int attempts = 0 + while (attempts < 10 && (profileId == null || profileId == "")) { + List profileData = new JsonSlurper().parseText(getProfileList()).data.rows + for (def profileItem in profileData) { + if (profileItem["Sql Statement"].toString().contains(token)) { + profileId = profileItem["Profile ID"].toString() + break + } + } + if (profileId == null || profileId == "") { + Thread.sleep(300) + } + attempts++ + } + assertTrue(profileId != null && profileId != "") + Thread.sleep(800) + return getProfile(profileId).toString() + } + + def extractProfileBlockMetrics = { String profileText, String blockName -> + List lines = profileText.readLines() + + Map metrics = [:] + boolean inBlock = false + int blockIndent = -1 + + lines.each { line -> + if (!inBlock) { + def m = line =~ /^(\s*)\s+${Pattern.quote(blockName)}:/ + if (m.find()) { + inBlock = true + blockIndent = m.group(1).length() + } + } else { + def indent = (line =~ /^(\s*)/)[0][1].length() + if (indent > blockIndent) { + def kv = line =~ /^\s*-\s*([^:]+):\s*(.+)$/ + if (kv.matches()) { + metrics[kv[0][1].trim()] = kv[0][2].trim() + } + } else { + inBlock = false + } + } + } + + return metrics + } + + def parseLongOrZero = { String v -> + if (v == null) { + return 0L + } + def s = v.trim() + if (s.isEmpty()) { + return 0L + } + return Long.parseLong(s) + } + + def tbl = "tbl_multi_stage_predicate_lm" + + sql """ DROP TABLE IF EXISTS ${tbl} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tbl} ( + `k` INT NOT NULL, + `a` INT NULL, + `b` INT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2" + ); + """ + + sql """ + INSERT INTO ${tbl} + SELECT + number AS k, + number % 20 AS a, + number % 3 AS b + FROM numbers("number" = "200"); + """ + + def vars = sql """ show variables like '%multi_stage_predicate%'; """ + assertTrue(vars.toString().contains("enable_multi_stage_predicate_lm")) + + // Baseline (feature off) + sql """ set enable_multi_stage_predicate_lm = false; """ + sql """ set predicate_lm_stage1_cols = ''; """ + + qt_baseline_cnt """ SELECT count(*) FROM ${tbl} WHERE a = 1 AND b = 2; """ + order_qt_baseline_rows """ SELECT k FROM ${tbl} WHERE a = 1 AND b = 2 ORDER BY k LIMIT 10; """ + + // Enable profile for strong assertions + sql """ set enable_profile=true; """ + + // Feature on + choose stage1 cols via session variable + sql """ set enable_multi_stage_predicate_lm = true; """ + sql """ set predicate_lm_stage1_cols = 'a'; """ + + qt_lm_on_cnt """ SELECT count(*) FROM ${tbl} WHERE a = 1 AND b = 2; """ + order_qt_lm_on_rows """ SELECT k FROM ${tbl} WHERE a = 1 AND b = 2 ORDER BY k LIMIT 10; """ + + // Strong assert 1: selective stage1 predicate => stage2 by rowids + def tokenRowids = "test_multi_stage_predicate_lm_rowids_" + System.currentTimeMillis() + sql """ SELECT /* ${tokenRowids} */ count(*) FROM ${tbl} WHERE a = 1 AND b = 2; """ + def profileRowids = getProfileWithToken(tokenRowids) + def metricsRowids = extractProfileBlockMetrics(profileRowids, "SegmentIterator") + + assertTrue(metricsRowids.containsKey("PredicateLMStage2ByRowIdsBatches"), + "Profile missing PredicateLMStage2ByRowIdsBatches\n" + profileRowids) + assertTrue(metricsRowids.containsKey("PredicateLMStage2ByAllRowsBatches"), + "Profile missing PredicateLMStage2ByAllRowsBatches\n" + profileRowids) + + assertTrue(parseLongOrZero(metricsRowids["PredicateLMStage2ByRowIdsBatches"]) > 0, + "Expected stage2-by-rowids but got: " + metricsRowids.toString() + "\n" + profileRowids) + + // Strong assert 2: non-selective stage1 predicate => stage2 all-rows fallback + // Use a>=0 to make stage1 survival ratio ~= 1.0 (> default threshold 0.8) + def tokenAllRows = "test_multi_stage_predicate_lm_allrows_" + System.currentTimeMillis() + sql """ SELECT /* ${tokenAllRows} */ count(*) FROM ${tbl} WHERE a >= 0 AND b = 2; """ + def profileAllRows = getProfileWithToken(tokenAllRows) + def metricsAllRows = extractProfileBlockMetrics(profileAllRows, "SegmentIterator") + + assertTrue(metricsAllRows.containsKey("PredicateLMStage2ByAllRowsBatches"), + "Profile missing PredicateLMStage2ByAllRowsBatches\n" + profileAllRows) + assertTrue(parseLongOrZero(metricsAllRows["PredicateLMStage2ByAllRowsBatches"]) > 0, + "Expected stage2-by-all-rows but got: " + metricsAllRows.toString() + "\n" + profileAllRows) + + // Stage1 cols parsing: whitespace/backticks/dedup should be tolerated + sql """ set predicate_lm_stage1_cols = ' a ,`b`, a '; """ + qt_lm_on_cnt2 """ SELECT count(*) FROM ${tbl} WHERE a = 1 AND b = 2; """ + + // Per-statement override via SET_VAR hint + order_qt_lm_hint_rows """ + SELECT /*+ SET_VAR(enable_multi_stage_predicate_lm=true,predicate_lm_stage1_cols='a') */ + k + FROM ${tbl} + WHERE a = 1 AND b = 2 + ORDER BY k + LIMIT 10; + """ + + // Invalid stage1 cols should fail when the feature is enabled. + test { + sql """ + SELECT /*+ SET_VAR(enable_multi_stage_predicate_lm=true,predicate_lm_stage1_cols='not_exist') */ + count(*) + FROM ${tbl} + WHERE a = 1 AND b = 2; + """ + exception("predicate_lm_stage1_cols contains non-existing columns") + } +} From 7e0d0821f5802c3fe8013a5c6b3aa0f84805c09f Mon Sep 17 00:00:00 2001 From: yuanyuhao Date: Thu, 25 Jun 2026 17:10:37 +0800 Subject: [PATCH 03/10] fix be build --- be/src/exec/scan/olap_scanner.cpp | 1 + be/src/storage/segment/segment_iterator.cpp | 5 ----- be/src/storage/segment/segment_iterator.h | 22 +++++++++++++++------ 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index 523cd0d37d5c45..5a8cdbbce4513d 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -104,6 +104,7 @@ OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& param .collection_statistics {}, .ann_topn_runtime {}, .condition_cache_digest = parent->get_condition_cache_digest(), + .predicate_lm_stage1_column_ids {}, .binlog_scan_type = params.binlog_scan_type}), _start_tso(params.start_tso), _end_tso(params.end_tso) { diff --git a/be/src/storage/segment/segment_iterator.cpp b/be/src/storage/segment/segment_iterator.cpp index 0b749fd5aa02f9..8300e20855336e 100644 --- a/be/src/storage/segment/segment_iterator.cpp +++ b/be/src/storage/segment/segment_iterator.cpp @@ -3407,11 +3407,6 @@ Status SegmentIterator::_next_batch_internal(Block* block) { RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); } } - } else { - _fill_column_nothing(); - if (_is_need_expr_eval) { - RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); - } } } else if (_is_need_expr_eval) { DCHECK(!_predicate_column_ids.empty()); diff --git a/be/src/storage/segment/segment_iterator.h b/be/src/storage/segment/segment_iterator.h index 82d7206a6fb0b0..874f365a2275d6 100644 --- a/be/src/storage/segment/segment_iterator.h +++ b/be/src/storage/segment/segment_iterator.h @@ -237,6 +237,8 @@ class SegmentIterator : public RowwiseIterator { const std::vector>& short_circuit_predicates, const std::vector>& vectorized_predicates, bool include_delete_condition_columns); + void _convert_dict_code_for_predicate_if_necessary_impl(std::shared_ptr predicate); + Status _apply_read_limit_to_selected_rows(Block* block, uint16_t& selected_size); void _collect_runtime_filter_predicate(); Status _output_non_pred_columns(Block* block); @@ -298,12 +300,6 @@ class SegmentIterator : public RowwiseIterator { uint16_t _evaluate_common_expr_filter(uint16_t* sel_rowid_idx, uint16_t selected_size, const IColumn::Filter& filter); - // Dictionary column should do something to initial. - void _convert_dict_code_for_predicate_if_necessary(); - - void _convert_dict_code_for_predicate_if_necessary_impl( - std::shared_ptr predicate); - bool _check_apply_by_inverted_index(std::shared_ptr pred); void _output_index_result_column(const std::vector& expr_ctxs, @@ -389,6 +385,10 @@ class SegmentIterator : public RowwiseIterator { bool _is_need_short_eval = false; bool _is_need_expr_eval = false; + // Multi-stage predicate lazy materialization (experimental) + bool _enable_multi_stage_predicate_lazy_materialization = false; + double _predicate_lm_stage1_survival_ratio_threshold = 0.8; + // fields for vectorization execution std::vector _vec_pred_column_ids; // keep columnId of columns for vectorized predicate evaluation @@ -400,13 +400,21 @@ class SegmentIterator : public RowwiseIterator { MutableColumns _current_return_columns; std::vector> _pre_eval_block_predicate; std::vector> _short_cir_eval_predicate; + std::vector> _late_pre_eval_block_predicate; + std::vector> _late_short_cir_eval_predicate; std::vector _delete_range_column_ids; std::vector _delete_bloom_filter_column_ids; + // when lazy materialization is enabled, segmentIter need to read data at least twice // first, read predicate columns by various index // second, read non-predicate columns // so we need a field to stand for columns first time to read std::vector _predicate_column_ids; + // stage2 predicate columns for multi-stage predicate lazy materialization + std::vector _late_predicate_column_ids; + bool _is_need_vec_eval_late = false; + bool _is_need_short_eval_late = false; + std::vector _common_expr_column_ids; // TODO: Should use std::vector std::vector _columns_to_filter; @@ -445,6 +453,8 @@ class SegmentIterator : public RowwiseIterator { // used for compaction, record selectd rowids of current batch uint16_t _selected_size; std::vector _sel_rowid_idx; + std::vector _sel_rowid_idx_stage1; + std::vector _sel_rowid_idx_stage2; // Rows already produced by this iterator. Used together with // _opts.read_limit to compute the remaining per-batch budget. From 0548d0a6d408724f0b4000e000ba917abddfa169 Mon Sep 17 00:00:00 2001 From: yuanyuhao Date: Thu, 25 Jun 2026 17:26:50 +0800 Subject: [PATCH 04/10] fix style --- be/src/storage/segment/segment_iterator.cpp | 8 +++---- .../org/apache/doris/qe/SessionVariable.java | 24 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/be/src/storage/segment/segment_iterator.cpp b/be/src/storage/segment/segment_iterator.cpp index 8300e20855336e..de37b29e31a1fb 100644 --- a/be/src/storage/segment/segment_iterator.cpp +++ b/be/src/storage/segment/segment_iterator.cpp @@ -3164,7 +3164,7 @@ Status SegmentIterator::_next_batch_internal(Block* block) { _selected_size = 0; RETURN_IF_ERROR(_read_columns_by_index(_predicate_column_ids, nrows_read_limit, _selected_size, - /*read_rowids=*/true)); + /*read_rowids=*/true)); _replace_version_col_if_needed(_predicate_column_ids, _selected_size); _update_lsn_col_if_needed(_predicate_column_ids, _selected_size); _update_tso_col_if_needed(_predicate_column_ids, _selected_size); @@ -3240,7 +3240,7 @@ Status SegmentIterator::_next_batch_internal(Block* block) { uint16_t stage1_size = _evaluate_vectorization_predicate(_sel_rowid_idx_stage1.data(), rows_read); stage1_size = _evaluate_short_circuit_predicate(_sel_rowid_idx_stage1.data(), - stage1_size); + stage1_size); _opts.stats->predicate_lm_stage1_input_rows += rows_read; _opts.stats->predicate_lm_stage1_output_rows += stage1_size; @@ -3305,8 +3305,8 @@ Status SegmentIterator::_next_batch_internal(Block* block) { _opts.stats->predicate_lm_stage2_rows_read += stage2_rows_read; RETURN_IF_ERROR(_read_columns_by_index(_late_predicate_column_ids, - rows_read, stage2_rows_read, - /*read_rowids=*/false)); + rows_read, stage2_rows_read, + /*read_rowids=*/false)); _replace_version_col_if_needed(_late_predicate_column_ids, stage2_rows_read); _update_lsn_col_if_needed(_late_predicate_column_ids, stage2_rows_read); _update_tso_col_if_needed(_late_predicate_column_ids, stage2_rows_read); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 241ae64b186020..218ab6f014f1cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -2980,21 +2980,21 @@ public static boolean isEagerAggregationOnJoin() { public boolean enableShuffleKeyPrune = true; @VarAttrDef.VarAttr( - name = ENABLE_MULTI_STAGE_PREDICATE_LM, - fuzzy = true, - description = {"控制 SegmentIterator 是否启用多阶段谓词延迟物化(实验特性)。默认为 false。", - "Controls whether to enable multi-stage predicate lazy materialization in SegmentIterator " - + "(experimental). The default value is false."}, - needForward = true) + name = ENABLE_MULTI_STAGE_PREDICATE_LM, + fuzzy = true, + description = {"控制 SegmentIterator 是否启用多阶段谓词延迟物化(实验特性)。默认为 false。", + "Controls whether to enable multi-stage predicate lazy materialization in SegmentIterator " + + "(experimental). The default value is false."}, + needForward = true) public boolean enableMultiStagePredicateLm = false; @VarAttrDef.VarAttr( - name = PREDICATE_LM_STAGE1_COLS, - fuzzy = true, - description = {"人工指定的多阶段谓词延迟物化中 stage1 参与过滤的列名列表,逗号分隔,例如 'a,b,c'。默认为空。", - "Stage1 predicate columns for multi-stage predicate LM, comma-separated, e.g. 'a,b,c'. " - + "Default is empty."}, - needForward = true) + name = PREDICATE_LM_STAGE1_COLS, + fuzzy = true, + description = {"人工指定的多阶段谓词延迟物化中 stage1 参与过滤的列名列表,逗号分隔,例如 'a,b,c'。默认为空。", + "Stage1 predicate columns for multi-stage predicate LM, comma-separated, e.g. 'a,b,c'. " + + "Default is empty."}, + needForward = true) public String predicateLmStage1Cols = ""; @VarAttrDef.VarAttr(name = ENABLE_PREFER_CACHED_ROWSET, needForward = false, From e952b8a70d9af3ef039aeb4aa88aae6a87a04f1e Mon Sep 17 00:00:00 2001 From: yuanyuhao Date: Thu, 25 Jun 2026 19:12:03 +0800 Subject: [PATCH 05/10] fix logic & regression test --- be/src/exec/scan/olap_scanner.cpp | 10 +++++++++- .../test_multi_stage_predicate_lm.groovy | 6 +++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index 5a8cdbbce4513d..7e5897fb5d2407 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -221,10 +221,18 @@ Status OlapScanner::_prepare_impl() { tablet_schema->update_indexes_from_thrift(olap_scan_node.indexes_desc); } + const auto& qopts = _state->query_options(); + + // Map FE session variable enable_multi_stage_predicate_lm -> storage read option flag. + // This flag is what SegmentIterator actually checks. + if (qopts.__isset.enable_multi_stage_predicate_lm) { + _tablet_reader_params.enable_multi_stage_predicate_lazy_materialization = + qopts.enable_multi_stage_predicate_lm; + } + // Map FE session variable predicate_lm_stage1_cols -> ColumnId list // NOTE: only meaningful when multi-stage predicate LM is enabled. if (_tablet_reader_params.enable_multi_stage_predicate_lazy_materialization) { - const auto& qopts = _state->query_options(); if (qopts.__isset.predicate_lm_stage1_cols && !qopts.predicate_lm_stage1_cols.empty()) { std::vector stage1_column_ids; RETURN_IF_ERROR(parse_predicate_lm_stage1_cols_to_column_ids( diff --git a/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy b/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy index 0f10598b1cd951..40c77789985876 100644 --- a/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy +++ b/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy @@ -163,10 +163,10 @@ suite("test_multi_stage_predicate_lm") { assertTrue(parseLongOrZero(metricsRowids["PredicateLMStage2ByRowIdsBatches"]) > 0, "Expected stage2-by-rowids but got: " + metricsRowids.toString() + "\n" + profileRowids) - // Strong assert 2: non-selective stage1 predicate => stage2 all-rows fallback - // Use a>=0 to make stage1 survival ratio ~= 1.0 (> default threshold 0.8) + // Strong assert 2: high-survival stage1 predicate => stage2 all-rows fallback + // Use a < 19 so stage1 survival ratio ~= 0.95 (> default threshold 0.8) while still being a non-trivial predicate. def tokenAllRows = "test_multi_stage_predicate_lm_allrows_" + System.currentTimeMillis() - sql """ SELECT /* ${tokenAllRows} */ count(*) FROM ${tbl} WHERE a >= 0 AND b = 2; """ + sql """ SELECT /* ${tokenAllRows} */ count(*) FROM ${tbl} WHERE a < 19 AND b = 2; """ def profileAllRows = getProfileWithToken(tokenAllRows) def metricsAllRows = extractProfileBlockMetrics(profileAllRows, "SegmentIterator") From 92ec52413e06270db241e1bfe08b4218449b4901 Mon Sep 17 00:00:00 2001 From: yuanyuhao Date: Thu, 25 Jun 2026 19:17:03 +0800 Subject: [PATCH 06/10] add regression test out file --- .../test_multi_stage_predicate_lm.out | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 regression-test/data/correctness_p0/test_multi_stage_predicate_lm.out diff --git a/regression-test/data/correctness_p0/test_multi_stage_predicate_lm.out b/regression-test/data/correctness_p0/test_multi_stage_predicate_lm.out new file mode 100644 index 00000000000000..c27bdbc10370e7 --- /dev/null +++ b/regression-test/data/correctness_p0/test_multi_stage_predicate_lm.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !baseline_cnt -- +3 + +-- !baseline_rows -- +101 +161 +41 + +-- !lm_on_cnt -- +3 + +-- !lm_on_rows -- +101 +161 +41 + +-- !lm_on_cnt2 -- +3 + +-- !lm_hint_rows -- +101 +161 +41 + From ceecbf25aac9421f3f14034b1647ada41c65f791 Mon Sep 17 00:00:00 2001 From: yuanyuhao Date: Thu, 25 Jun 2026 20:02:33 +0800 Subject: [PATCH 07/10] add more config --- be/src/exec/scan/olap_scanner.cpp | 6 ++++ .../org/apache/doris/qe/SessionVariable.java | 30 +++++++++++++++++++ .../apache/doris/qe/SessionVariablesTest.java | 6 ++++ gensrc/thrift/PaloInternalService.thrift | 1 + 4 files changed, 43 insertions(+) diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index 7e5897fb5d2407..22f69e4a57b9b1 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -230,6 +230,12 @@ Status OlapScanner::_prepare_impl() { qopts.enable_multi_stage_predicate_lm; } + // Map FE session variable predicate_lm_stage1_survival_ratio_threshold -> stage2 strategy threshold. + if (qopts.__isset.predicate_lm_stage1_survival_ratio_threshold) { + _tablet_reader_params.predicate_lm_stage1_survival_ratio_threshold = + qopts.predicate_lm_stage1_survival_ratio_threshold; + } + // Map FE session variable predicate_lm_stage1_cols -> ColumnId list // NOTE: only meaningful when multi-stage predicate LM is enabled. if (_tablet_reader_params.enable_multi_stage_predicate_lazy_materialization) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 218ab6f014f1cd..551d46273571d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -890,6 +890,8 @@ public String toString() { public static final String ENABLE_MULTI_STAGE_PREDICATE_LM = "enable_multi_stage_predicate_lm"; public static final String PREDICATE_LM_STAGE1_COLS = "predicate_lm_stage1_cols"; + public static final String PREDICATE_LM_STAGE1_SURVIVAL_RATIO_THRESHOLD = + "predicate_lm_stage1_survival_ratio_threshold"; public static final String HOT_VALUE_COLLECT_COUNT = "hot_value_collect_count"; @VarAttrDef.VarAttr(name = HOT_VALUE_COLLECT_COUNT, needForward = true, @@ -2997,6 +2999,19 @@ public static boolean isEagerAggregationOnJoin() { needForward = true) public String predicateLmStage1Cols = ""; + @VarAttrDef.VarAttr( + name = PREDICATE_LM_STAGE1_SURVIVAL_RATIO_THRESHOLD, + fuzzy = true, + description = {"多阶段谓词延迟物化中 stage1 的存活率阈值,用于选择 stage2 策略。" + + "当 stage1 存活率 <= 阈值时倾向按 rowids 读取 stage2 谓词列;" + + "当 stage1 存活率 > 阈值时倾向读全量行以避免随机读。范围 [0,1],默认 0.8。", + "Stage1 survival ratio threshold for multi-stage predicate LM. " + + "If survival_ratio <= threshold, stage2 prefers by-rowids; otherwise by-all-rows. " + + "Range [0,1], default 0.8."}, + needForward = true, + checker = "checkPredicateLmStage1SurvivalRatioThreshold") + public double predicateLmStage1SurvivalRatioThreshold = 0.8; + @VarAttrDef.VarAttr(name = ENABLE_PREFER_CACHED_ROWSET, needForward = false, description = {"是否启用 prefer cached rowset 功能", "Whether to enable prefer cached rowset feature"}) @@ -5688,6 +5703,7 @@ public TQueryOptions toThrift() { tResult.setEnableMultiStagePredicateLm(enableMultiStagePredicateLm); tResult.setPredicateLmStage1Cols(predicateLmStage1Cols); + tResult.setPredicateLmStage1SurvivalRatioThreshold(predicateLmStage1SurvivalRatioThreshold); tResult.setEnableExtendedRegex(enableExtendedRegex); if (fileCacheQueryLimitPercent > 0) { @@ -6236,6 +6252,20 @@ public void checkSkewRewriteAggBucketNum(String bucketNumStr) { } } + public void checkPredicateLmStage1SurvivalRatioThreshold(String value) { + final double v; + try { + v = Double.parseDouble(value); + } catch (NumberFormatException e) { + throw new InvalidParameterException( + PREDICATE_LM_STAGE1_SURVIVAL_RATIO_THRESHOLD + " must be a valid number in range [0, 1]"); + } + if (Double.isNaN(v) || v < 0.0 || v > 1.0) { + throw new InvalidParameterException( + PREDICATE_LM_STAGE1_SURVIVAL_RATIO_THRESHOLD + " should be in range [0, 1], got " + v); + } + } + public void checkSkewRewriteJoinSaltExplodeFactor(String factorStr) { try { int factor = Integer.parseInt(factorStr); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java index 49412bab9d6c9f..0a78b74ceee73e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java @@ -340,17 +340,23 @@ public void testMultiStagePredicateLmSessionVariablesForwardToThrift() throws Ex SessionVariable.ENABLE_MULTI_STAGE_PREDICATE_LM, new StringLiteral("true"))); VariableMgr.setVar(sv, new SetVar(SetType.SESSION, SessionVariable.PREDICATE_LM_STAGE1_COLS, new StringLiteral("a,b"))); + VariableMgr.setVar(sv, new SetVar(SetType.SESSION, + SessionVariable.PREDICATE_LM_STAGE1_SURVIVAL_RATIO_THRESHOLD, new StringLiteral("0.9"))); Map forwardVars = sv.getForwardVariables(); Assertions.assertEquals("true", forwardVars.get(SessionVariable.ENABLE_MULTI_STAGE_PREDICATE_LM)); Assertions.assertEquals("a,b", forwardVars.get(SessionVariable.PREDICATE_LM_STAGE1_COLS)); + Assertions.assertEquals("0.9", + forwardVars.get(SessionVariable.PREDICATE_LM_STAGE1_SURVIVAL_RATIO_THRESHOLD)); TQueryOptions opts = sv.toThrift(); Assertions.assertTrue(opts.isSetEnableMultiStagePredicateLm()); Assertions.assertTrue(opts.isEnableMultiStagePredicateLm()); Assertions.assertTrue(opts.isSetPredicateLmStage1Cols()); Assertions.assertEquals("a,b", opts.getPredicateLmStage1Cols()); + Assertions.assertTrue(opts.isSetPredicateLmStage1SurvivalRatioThreshold()); + Assertions.assertEquals(0.9, opts.getPredicateLmStage1SurvivalRatioThreshold()); } } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 504a18a7327307..66111184296355 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -503,6 +503,7 @@ struct TQueryOptions { 224: optional bool enable_multi_stage_predicate_lm = false 225: optional string predicate_lm_stage1_cols = "" + 226: optional double predicate_lm_stage1_survival_ratio_threshold = 0.8 // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. From ffeef07cb6c53ef16238a26436be726521ad6cee Mon Sep 17 00:00:00 2001 From: yuanyuhao Date: Fri, 26 Jun 2026 11:22:23 +0800 Subject: [PATCH 08/10] improve usage & regression test --- .../org/apache/doris/qe/SessionVariable.java | 20 ++++++------ .../test_multi_stage_predicate_lm.groovy | 32 ++++++++++++++++++- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 551d46273571d3..943aed5f547add 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -3000,16 +3000,16 @@ public static boolean isEagerAggregationOnJoin() { public String predicateLmStage1Cols = ""; @VarAttrDef.VarAttr( - name = PREDICATE_LM_STAGE1_SURVIVAL_RATIO_THRESHOLD, - fuzzy = true, - description = {"多阶段谓词延迟物化中 stage1 的存活率阈值,用于选择 stage2 策略。" - + "当 stage1 存活率 <= 阈值时倾向按 rowids 读取 stage2 谓词列;" - + "当 stage1 存活率 > 阈值时倾向读全量行以避免随机读。范围 [0,1],默认 0.8。", - "Stage1 survival ratio threshold for multi-stage predicate LM. " - + "If survival_ratio <= threshold, stage2 prefers by-rowids; otherwise by-all-rows. " - + "Range [0,1], default 0.8."}, - needForward = true, - checker = "checkPredicateLmStage1SurvivalRatioThreshold") + name = PREDICATE_LM_STAGE1_SURVIVAL_RATIO_THRESHOLD, + fuzzy = true, + description = {"多阶段谓词延迟物化中 stage1 的存活率阈值,用于选择 stage2 策略。" + + "当 stage1 存活率 <= 阈值时倾向按 rowids 读取 stage2 谓词列;" + + "当 stage1 存活率 > 阈值时倾向读全量行以避免随机读。范围 [0,1],默认 0.8。", + "Stage1 survival ratio threshold for multi-stage predicate LM. " + + "If survival_ratio <= threshold, stage2 prefers by-rowids; otherwise by-all-rows. " + + "Range [0,1], default 0.8."}, + needForward = true, + checker = "checkPredicateLmStage1SurvivalRatioThreshold") public double predicateLmStage1SurvivalRatioThreshold = 0.8; @VarAttrDef.VarAttr(name = ENABLE_PREFER_CACHED_ROWSET, needForward = false, diff --git a/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy b/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy index 40c77789985876..d9c73631b11a36 100644 --- a/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy +++ b/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy @@ -19,7 +19,7 @@ import java.util.Base64 import java.util.regex.Pattern import groovy.json.JsonSlurper -suite("test_multi_stage_predicate_lm") { +suite("test_multi_stage_predicate_lm", "p0") { def getProfileList = { def dst = 'http://' + context.config.feHttpAddress def conn = new URL(dst + "/rest/v1/query_profile").openConnection() @@ -175,6 +175,36 @@ suite("test_multi_stage_predicate_lm") { assertTrue(parseLongOrZero(metricsAllRows["PredicateLMStage2ByAllRowsBatches"]) > 0, "Expected stage2-by-all-rows but got: " + metricsAllRows.toString() + "\n" + profileAllRows) + // Strong assert 3: threshold tuning should change stage2 strategy decision + // 3.1 Force stage2-by-rowids even for a high-survival stage1 case + sql """ set predicate_lm_stage1_survival_ratio_threshold = 1.0; """ + def tokenForceRowids = "test_multi_stage_predicate_lm_force_rowids_" + System.currentTimeMillis() + sql """ SELECT /* ${tokenForceRowids} */ count(*) FROM ${tbl} WHERE a < 19 AND b = 2; """ + def profileForceRowids = getProfileWithToken(tokenForceRowids) + def metricsForceRowids = extractProfileBlockMetrics(profileForceRowids, "SegmentIterator") + + assertTrue(metricsForceRowids.containsKey("PredicateLMStage2ByRowIdsBatches"), + "Profile missing PredicateLMStage2ByRowIdsBatches\n" + profileForceRowids) + assertTrue(parseLongOrZero(metricsForceRowids["PredicateLMStage2ByRowIdsBatches"]) > 0, + "Expected stage2-by-rowids after setting threshold=1.0 but got: " + + metricsForceRowids.toString() + "\n" + profileForceRowids) + + // 3.2 Force stage2-by-all-rows even for a selective stage1 case + sql """ set predicate_lm_stage1_survival_ratio_threshold = 0.0; """ + def tokenForceAllRows = "test_multi_stage_predicate_lm_force_allrows_" + System.currentTimeMillis() + sql """ SELECT /* ${tokenForceAllRows} */ count(*) FROM ${tbl} WHERE a = 1 AND b = 2; """ + def profileForceAllRows = getProfileWithToken(tokenForceAllRows) + def metricsForceAllRows = extractProfileBlockMetrics(profileForceAllRows, "SegmentIterator") + + assertTrue(metricsForceAllRows.containsKey("PredicateLMStage2ByAllRowsBatches"), + "Profile missing PredicateLMStage2ByAllRowsBatches\n" + profileForceAllRows) + assertTrue(parseLongOrZero(metricsForceAllRows["PredicateLMStage2ByAllRowsBatches"]) > 0, + "Expected stage2-by-all-rows after setting threshold=0.0 but got: " + + metricsForceAllRows.toString() + "\n" + profileForceAllRows) + + // Reset to default for subsequent cases + sql """ set predicate_lm_stage1_survival_ratio_threshold = 0.8; """ + // Stage1 cols parsing: whitespace/backticks/dedup should be tolerated sql """ set predicate_lm_stage1_cols = ' a ,`b`, a '; """ qt_lm_on_cnt2 """ SELECT count(*) FROM ${tbl} WHERE a = 1 AND b = 2; """ From 7b25a59ece52f71dde9e2bbca05910f1a2df723e Mon Sep 17 00:00:00 2001 From: yuanyuhao Date: Fri, 26 Jun 2026 14:32:30 +0800 Subject: [PATCH 09/10] add db & table support --- be/src/exec/scan/olap_scanner.cpp | 5 +- be/src/exec/scan/predicate_lm_utils.h | 130 +++++++++++++++--- be/test/exec/scan/predicate_lm_utils_test.cpp | 68 +++++++-- .../apache/doris/planner/OlapScanNode.java | 1 + gensrc/thrift/PlanNodes.thrift | 1 + .../test_multi_stage_predicate_lm.groovy | 104 ++++++++++++-- 6 files changed, 270 insertions(+), 39 deletions(-) diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index 22f69e4a57b9b1..6ff2fb9db05a61 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -242,7 +242,10 @@ Status OlapScanner::_prepare_impl() { if (qopts.__isset.predicate_lm_stage1_cols && !qopts.predicate_lm_stage1_cols.empty()) { std::vector stage1_column_ids; RETURN_IF_ERROR(parse_predicate_lm_stage1_cols_to_column_ids( - qopts.predicate_lm_stage1_cols, tablet_schema, &stage1_column_ids)); + qopts.predicate_lm_stage1_cols, tablet_schema, + olap_scan_node.__isset.db_name ? std::string_view(olap_scan_node.db_name) + : std::string_view(), + olap_scan_node.table_name, &stage1_column_ids)); _tablet_reader_params.predicate_lm_stage1_column_ids = std::move(stage1_column_ids); } } diff --git a/be/src/exec/scan/predicate_lm_utils.h b/be/src/exec/scan/predicate_lm_utils.h index b805a7aaab44e8..371e27e232ef97 100644 --- a/be/src/exec/scan/predicate_lm_utils.h +++ b/be/src/exec/scan/predicate_lm_utils.h @@ -28,53 +28,147 @@ namespace doris { +namespace predicate_lm_utils_detail { + +inline std::string to_lower_trimmed(std::string_view sv) { + sv = doris::trim(sv); + return doris::to_lower(std::string(sv)); +} + +inline std::string normalize_table_name_for_match(std::string_view table_name_sv) { + table_name_sv = doris::trim(table_name_sv); + std::string s(table_name_sv); + + // Strip rollup suffix: "tbl(rollup)" => "tbl". + if (!s.empty() && s.back() == ')') { + auto pos = s.find_last_of('('); + if (pos != std::string::npos) { + s.resize(pos); + } + } + + return to_lower_trimmed(std::string_view(s)); +} + +inline bool ends_with(std::string_view s, std::string_view suffix) { + if (suffix.size() > s.size()) { + return false; + } + return s.substr(s.size() - suffix.size()) == suffix; +} + +inline std::string normalize_identifier_piece(std::string_view sv) { + sv = doris::trim(sv); + if (sv.size() >= 2 && sv.front() == '`' && sv.back() == '`') { + sv = sv.substr(1, sv.size() - 2); + sv = doris::trim(sv); + } + return std::string(sv); +} + +inline bool qualifier_matches_current_table(const std::string& normalized_current_full_table_name, + const std::string& normalized_current_table_only, + const std::string& qualifier_raw) { + std::string qualifier = to_lower_trimmed(std::string_view(qualifier_raw)); + if (qualifier.empty()) { + return true; + } + + // qualifier="tbl" + if (qualifier.find('.') == std::string::npos) { + return qualifier == normalized_current_table_only; + } + + // qualifier="db.tbl" (also allow suffix match for catalog.db.tbl) + if (qualifier == normalized_current_full_table_name) { + return true; + } + std::string with_dot = "." + qualifier; + return ends_with(normalized_current_full_table_name, with_dot); +} + +} // namespace predicate_lm_utils_detail + inline Status parse_predicate_lm_stage1_cols_to_column_ids(const std::string& cols, const TabletSchemaSPtr& tablet_schema, + std::string_view current_db_name, + std::string_view current_table_name, std::vector* column_ids) { column_ids->clear(); if (cols.empty()) { return Status::OK(); } + const std::string normalized_db = predicate_lm_utils_detail::to_lower_trimmed(current_db_name); + const std::string normalized_tbl = + predicate_lm_utils_detail::normalize_table_name_for_match(current_table_name); + const std::string normalized_current_table_only = normalized_tbl; + + std::string normalized_current_full_table_name; + if (!normalized_db.empty()) { + normalized_current_full_table_name.reserve(normalized_db.size() + 1 + normalized_tbl.size()); + normalized_current_full_table_name.append(normalized_db); + normalized_current_full_table_name.push_back('.'); + normalized_current_full_table_name.append(normalized_tbl); + } else { + normalized_current_full_table_name = normalized_tbl; + } + std::vector parts = doris::split(cols, ","); - std::vector missing; - missing.reserve(parts.size()); for (const auto& part : parts) { - std::string_view name_sv = doris::trim(std::string_view(part)); - if (name_sv.empty()) { + std::string_view token_sv = doris::trim(std::string_view(part)); + if (token_sv.empty()) { continue; } - if (name_sv.size() >= 2 && name_sv.front() == '`' && name_sv.back() == '`') { - name_sv = name_sv.substr(1, name_sv.size() - 2); - name_sv = doris::trim(name_sv); + // Support qualified identifiers: tbl.col / db.tbl.col + // (Backticks are supported on each identifier piece, e.g. `db`.`tbl`.`col`) + std::vector dot_parts = doris::split(std::string(token_sv), "."); + std::vector ident_parts; + ident_parts.reserve(dot_parts.size()); + for (const auto& dot_part : dot_parts) { + auto piece = predicate_lm_utils_detail::normalize_identifier_piece(std::string_view(dot_part)); + if (!piece.empty()) { + ident_parts.emplace_back(std::move(piece)); + } } - if (name_sv.empty()) { + if (ident_parts.empty()) { continue; } - std::string name(name_sv); + std::string col_name = std::move(ident_parts.back()); + ident_parts.pop_back(); + + std::string qualifier; + if (!ident_parts.empty()) { + qualifier.reserve(64); + for (size_t i = 0; i < ident_parts.size(); ++i) { + if (i > 0) { + qualifier.push_back('.'); + } + qualifier.append(ident_parts[i]); + } + } + + if (!predicate_lm_utils_detail::qualifier_matches_current_table( + normalized_current_full_table_name, normalized_current_table_only, qualifier)) { + continue; + } - int32_t cid = tablet_schema->field_index(name); + int32_t cid = tablet_schema->field_index(col_name); if (cid < 0) { - cid = tablet_schema->field_index(doris::to_lower(name)); + cid = tablet_schema->field_index(doris::to_lower(col_name)); } + // Ignore unknown columns (do not fail the query). if (cid < 0) { - missing.emplace_back(std::move(name)); continue; } column_ids->push_back(static_cast(cid)); } - if (!missing.empty()) { - return Status::Error( - "predicate_lm_stage1_cols contains non-existing columns: {}", - doris::join(missing, ",")); - } - std::sort(column_ids->begin(), column_ids->end()); column_ids->erase(std::unique(column_ids->begin(), column_ids->end()), column_ids->end()); return Status::OK(); diff --git a/be/test/exec/scan/predicate_lm_utils_test.cpp b/be/test/exec/scan/predicate_lm_utils_test.cpp index 8961ae55a89247..31f32f1c2899fa 100644 --- a/be/test/exec/scan/predicate_lm_utils_test.cpp +++ b/be/test/exec/scan/predicate_lm_utils_test.cpp @@ -50,7 +50,7 @@ TEST(PredicateLmUtilsTest, ParseEmptyString) { auto schema = build_tablet_schema_for_test(); std::vector cids; - Status st = parse_predicate_lm_stage1_cols_to_column_ids("", schema, &cids); + Status st = parse_predicate_lm_stage1_cols_to_column_ids("", schema, "db", "tbl", &cids); EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_TRUE(cids.empty()); } @@ -59,7 +59,7 @@ TEST(PredicateLmUtilsTest, ParseBasicColumns) { auto schema = build_tablet_schema_for_test(); std::vector cids; - Status st = parse_predicate_lm_stage1_cols_to_column_ids("a,b", schema, &cids); + Status st = parse_predicate_lm_stage1_cols_to_column_ids("a,b", schema, "db", "tbl", &cids); EXPECT_TRUE(st.ok()) << st.to_string(); // schema is [k,a,b] => a=1, b=2 @@ -72,7 +72,7 @@ TEST(PredicateLmUtilsTest, ParseTrimBackticksDedupAndCaseInsensitive) { auto schema = build_tablet_schema_for_test(); std::vector cids; - Status st = parse_predicate_lm_stage1_cols_to_column_ids(" `A` , b , a ", schema, &cids); + Status st = parse_predicate_lm_stage1_cols_to_column_ids(" `A` , b , a ", schema, "db", "tbl", &cids); EXPECT_TRUE(st.ok()) << st.to_string(); // Dedup + sorted @@ -81,16 +81,64 @@ TEST(PredicateLmUtilsTest, ParseTrimBackticksDedupAndCaseInsensitive) { EXPECT_EQ(static_cast(2), cids[1]); } -TEST(PredicateLmUtilsTest, ParseUnknownColumnsShouldFail) { +TEST(PredicateLmUtilsTest, ParseUnknownColumnsShouldBeIgnored) { auto schema = build_tablet_schema_for_test(); std::vector cids; - Status st = parse_predicate_lm_stage1_cols_to_column_ids("a,not_exist", schema, &cids); - EXPECT_FALSE(st.ok()); - EXPECT_EQ(ErrorCode::INVALID_ARGUMENT, st.code()); - EXPECT_TRUE(st.to_string().find("predicate_lm_stage1_cols") != std::string::npos) - << st.to_string(); - EXPECT_TRUE(st.to_string().find("not_exist") != std::string::npos) << st.to_string(); + Status st = parse_predicate_lm_stage1_cols_to_column_ids("a,not_exist", schema, "db", "tbl", &cids); + EXPECT_TRUE(st.ok()) << st.to_string(); + + // schema is [k,a,b] => a=1 + ASSERT_EQ(1u, cids.size()); + EXPECT_EQ(static_cast(1), cids[0]); +} + +TEST(PredicateLmUtilsTest, ParseTableQualifiedColumns) { + auto schema = build_tablet_schema_for_test(); + + std::vector cids; + Status st = parse_predicate_lm_stage1_cols_to_column_ids("tbl.a,other.b,b", schema, "db", "tbl", &cids); + EXPECT_TRUE(st.ok()) << st.to_string(); + + // other.b should be ignored, tbl.a applies, b is unqualified and applies. + ASSERT_EQ(2u, cids.size()); + EXPECT_EQ(static_cast(1), cids[0]); + EXPECT_EQ(static_cast(2), cids[1]); +} + +TEST(PredicateLmUtilsTest, ParseDbTableQualifiedColumns) { + auto schema = build_tablet_schema_for_test(); + + std::vector cids; + Status st = parse_predicate_lm_stage1_cols_to_column_ids("db1.tbl.a,db2.tbl.b,tbl.b", schema, "db1", "tbl", &cids); + EXPECT_TRUE(st.ok()) << st.to_string(); + + // db2.tbl.b ignored, db1.tbl.a applies, tbl.b (table-qualified) applies. + ASSERT_EQ(2u, cids.size()); + EXPECT_EQ(static_cast(1), cids[0]); + EXPECT_EQ(static_cast(2), cids[1]); +} + +TEST(PredicateLmUtilsTest, ParseDbTableQualifiedColumnsWithCatalogPrefixedDbName) { + auto schema = build_tablet_schema_for_test(); + + std::vector cids; + Status st = parse_predicate_lm_stage1_cols_to_column_ids("db1.tbl.a", schema, "catalog.db1", "tbl", &cids); + EXPECT_TRUE(st.ok()) << st.to_string(); + + ASSERT_EQ(1u, cids.size()); + EXPECT_EQ(static_cast(1), cids[0]); +} + +TEST(PredicateLmUtilsTest, ParseQualifiedColumnsWithRollupSuffixInCurrentTableName) { + auto schema = build_tablet_schema_for_test(); + + std::vector cids; + Status st = parse_predicate_lm_stage1_cols_to_column_ids("db1.tbl.a", schema, "db1", "tbl(rollup)", &cids); + EXPECT_TRUE(st.ok()) << st.to_string(); + + ASSERT_EQ(1u, cids.size()); + EXPECT_EQ(static_cast(1), cids[0]); } } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 2f93e50d3d5bb2..2050e14103a890 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1295,6 +1295,7 @@ protected void toThrift(TPlanNode msg) { tableName = tableName + "(" + getSelectedIndexName() + ")"; } msg.olap_scan_node.setTableName(tableName); + msg.olap_scan_node.setDbName(olapTable.getQualifiedDbName()); msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite()); // Set MOR value predicate pushdown flag based on session variable diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 4f8826bdb3f6f4..6e95358571782d 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1008,6 +1008,7 @@ struct TOlapScanNode { // Only partitions that are candidates for pruning are included; partitions FE // does not want pruned (e.g. default catch-all) are omitted from this list. 27: optional list partition_boundaries + 28: optional string db_name } struct TEqJoinCondition { diff --git a/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy b/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy index d9c73631b11a36..d316408b580c67 100644 --- a/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy +++ b/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy @@ -219,14 +219,98 @@ suite("test_multi_stage_predicate_lm", "p0") { LIMIT 10; """ - // Invalid stage1 cols should fail when the feature is enabled. - test { - sql """ - SELECT /*+ SET_VAR(enable_multi_stage_predicate_lm=true,predicate_lm_stage1_cols='not_exist') */ - count(*) - FROM ${tbl} - WHERE a = 1 AND b = 2; - """ - exception("predicate_lm_stage1_cols contains non-existing columns") - } + // Unknown stage1 cols should be ignored (do not fail the query). + def invalidStage1ColsRes = sql """ + SELECT /*+ SET_VAR(enable_multi_stage_predicate_lm=true,predicate_lm_stage1_cols='not_exist') */ + count(*) + FROM ${tbl} + WHERE a = 1 AND b = 2; + """ + assertEquals("3", invalidStage1ColsRes[0][0].toString()) + + // Qualified stage1 cols: support `table.col` and `db.table.col` scoping + def tblQual = "tbl_multi_stage_predicate_lm_qual" + + sql """ DROP TABLE IF EXISTS ${tblQual} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tblQual} ( + `k` INT NOT NULL, + `a` INT NULL, + `b` INT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2" + ); + """ + + // Make `a` non-selective and `b` selective to differentiate stage1 choice. + // Rows: 10000, a is always 1, b alternates 0/1. + sql """ + INSERT INTO ${tblQual} + SELECT + number AS k, + 1 AS a, + number % 2 AS b + FROM numbers("number" = "10000"); + """ + + def currentDbRes = sql """ SELECT database(); """ + def currentDb = currentDbRes[0][0].toString() + + // table-qualified matches current table => stage1=b (survival_ratio ~= 0.5) => stage2-by-rowids + sql """ set predicate_lm_stage1_survival_ratio_threshold = 0.8; """ + sql """ set predicate_lm_stage1_cols = '${tblQual}.b'; """ + + def tokenTableQualified = "test_multi_stage_predicate_lm_table_qualified_" + System.currentTimeMillis() + def cntTableQualified = sql """ SELECT /* ${tokenTableQualified} */ count(*) FROM ${tblQual} WHERE a = 1 AND b = 0; """ + assertEquals("5000", cntTableQualified[0][0].toString()) + + def profileTableQualified = getProfileWithToken(tokenTableQualified) + def metricsTableQualified = extractProfileBlockMetrics(profileTableQualified, "SegmentIterator") + assertTrue(metricsTableQualified.containsKey("PredicateLMStage2ByRowIdsBatches"), + "Profile missing PredicateLMStage2ByRowIdsBatches\n" + profileTableQualified) + assertTrue(parseLongOrZero(metricsTableQualified["PredicateLMStage2ByRowIdsBatches"]) > 0, + "Expected stage2-by-rowids for table-qualified stage1 cols but got: " + + metricsTableQualified.toString() + "\n" + profileTableQualified) + + // mismatched table qualifier should be ignored => default stage1=a (survival_ratio ~= 1.0) => stage2-by-all-rows + sql """ set predicate_lm_stage1_cols = 'other_tbl.b'; """ + + def tokenTableMismatch = "test_multi_stage_predicate_lm_table_mismatch_" + System.currentTimeMillis() + def cntTableMismatch = sql """ SELECT /* ${tokenTableMismatch} */ count(*) FROM ${tblQual} WHERE a = 1 AND b = 0; """ + assertEquals("5000", cntTableMismatch[0][0].toString()) + + def profileTableMismatch = getProfileWithToken(tokenTableMismatch) + def metricsTableMismatch = extractProfileBlockMetrics(profileTableMismatch, "SegmentIterator") + assertTrue(metricsTableMismatch.containsKey("PredicateLMStage2ByAllRowsBatches"), + "Profile missing PredicateLMStage2ByAllRowsBatches\n" + profileTableMismatch) + assertTrue(parseLongOrZero(metricsTableMismatch["PredicateLMStage2ByAllRowsBatches"]) > 0, + "Expected stage2-by-all-rows for mismatched table-qualified stage1 cols but got: " + + metricsTableMismatch.toString() + "\n" + profileTableMismatch) + + // db.table-qualified matches current db/table => stage1=b => stage2-by-rowids + sql """ set predicate_lm_stage1_cols = '${currentDb}.${tblQual}.b'; """ + + def tokenDbTableQualified = "test_multi_stage_predicate_lm_db_table_qualified_" + System.currentTimeMillis() + def cntDbTableQualified = sql """ SELECT /* ${tokenDbTableQualified} */ count(*) FROM ${tblQual} WHERE a = 1 AND b = 0; """ + assertEquals("5000", cntDbTableQualified[0][0].toString()) + + def profileDbTableQualified = getProfileWithToken(tokenDbTableQualified) + def metricsDbTableQualified = extractProfileBlockMetrics(profileDbTableQualified, "SegmentIterator") + assertTrue(metricsDbTableQualified.containsKey("PredicateLMStage2ByRowIdsBatches"), + "Profile missing PredicateLMStage2ByRowIdsBatches\n" + profileDbTableQualified) + assertTrue(parseLongOrZero(metricsDbTableQualified["PredicateLMStage2ByRowIdsBatches"]) > 0, + "Expected stage2-by-rowids for db.table-qualified stage1 cols but got: " + + metricsDbTableQualified.toString() + "\n" + profileDbTableQualified) + + // Reset for cleanliness + sql """ set predicate_lm_stage1_cols = ''; """ + sql """ set enable_multi_stage_predicate_lm = false; """ + sql """ set predicate_lm_stage1_survival_ratio_threshold = 0.8; """ + sql """ set enable_profile = false; """ + } From f1d5dea50fa34b5bf71cf76b246aa618c90c1931 Mon Sep 17 00:00:00 2001 From: yuanyuhao Date: Fri, 26 Jun 2026 16:39:40 +0800 Subject: [PATCH 10/10] fix some corner case --- be/src/storage/segment/segment_iterator.cpp | 9 +++++++++ .../test_multi_stage_predicate_lm.groovy | 18 +++++++++++++----- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/be/src/storage/segment/segment_iterator.cpp b/be/src/storage/segment/segment_iterator.cpp index de37b29e31a1fb..b1f91a63ea9fe3 100644 --- a/be/src/storage/segment/segment_iterator.cpp +++ b/be/src/storage/segment/segment_iterator.cpp @@ -2068,6 +2068,15 @@ Status SegmentIterator::_vec_init_lazy_materialization() { _is_pred_column[cid] = true; } } + + // If stage1 columns are NOT explicitly configured and there is no runtime filter column, + // fall back to the single-stage behavior (equivalent to disabling multi-stage predicate LM). + // Rationale: choosing an arbitrary predicate column as stage1 is hard to reason about and + // may cause performance regressions. + if (_enable_multi_stage_predicate_lazy_materialization && + _opts.predicate_lm_stage1_column_ids.empty() && runtime_filter_cids.empty()) { + _enable_multi_stage_predicate_lazy_materialization = false; + } if (_enable_multi_stage_predicate_lazy_materialization) { if (!_opts.predicate_lm_stage1_column_ids.empty()) { diff --git a/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy b/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy index d316408b580c67..3865a341a5df0e 100644 --- a/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy +++ b/regression-test/suites/correctness_p0/test_multi_stage_predicate_lm.groovy @@ -277,19 +277,27 @@ suite("test_multi_stage_predicate_lm", "p0") { "Expected stage2-by-rowids for table-qualified stage1 cols but got: " + metricsTableQualified.toString() + "\n" + profileTableQualified) - // mismatched table qualifier should be ignored => default stage1=a (survival_ratio ~= 1.0) => stage2-by-all-rows + // mismatched table qualifier should be ignored. + // When stage1 cols are not explicitly applicable and there is no runtime filter, + // the implementation falls back to single-stage behavior (no stage2). sql """ set predicate_lm_stage1_cols = 'other_tbl.b'; """ - + def tokenTableMismatch = "test_multi_stage_predicate_lm_table_mismatch_" + System.currentTimeMillis() def cntTableMismatch = sql """ SELECT /* ${tokenTableMismatch} */ count(*) FROM ${tblQual} WHERE a = 1 AND b = 0; """ assertEquals("5000", cntTableMismatch[0][0].toString()) - + def profileTableMismatch = getProfileWithToken(tokenTableMismatch) def metricsTableMismatch = extractProfileBlockMetrics(profileTableMismatch, "SegmentIterator") assertTrue(metricsTableMismatch.containsKey("PredicateLMStage2ByAllRowsBatches"), "Profile missing PredicateLMStage2ByAllRowsBatches\n" + profileTableMismatch) - assertTrue(parseLongOrZero(metricsTableMismatch["PredicateLMStage2ByAllRowsBatches"]) > 0, - "Expected stage2-by-all-rows for mismatched table-qualified stage1 cols but got: " + assertTrue(metricsTableMismatch.containsKey("PredicateLMStage2ByRowIdsBatches"), + "Profile missing PredicateLMStage2ByRowIdsBatches\n" + profileTableMismatch) + + assertEquals(0L, parseLongOrZero(metricsTableMismatch["PredicateLMStage2ByAllRowsBatches"]), + "Expected no stage2 (single-stage fallback) but got: " + + metricsTableMismatch.toString() + "\n" + profileTableMismatch) + assertEquals(0L, parseLongOrZero(metricsTableMismatch["PredicateLMStage2ByRowIdsBatches"]), + "Expected no stage2 (single-stage fallback) but got: " + metricsTableMismatch.toString() + "\n" + profileTableMismatch) // db.table-qualified matches current db/table => stage1=b => stage2-by-rowids