Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions be/src/exec/operator/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@ Status OlapScanLocalState::_init_profile() {
_rows_short_circuit_cond_input_counter =
ADD_COUNTER(_segment_profile, "RowsShortCircuitPredInput", TUnit::UNIT);
_rows_expr_cond_input_counter = ADD_COUNTER(_segment_profile, "RowsExprPredInput", TUnit::UNIT);

_predicate_lm_stage1_input_rows_counter =
ADD_COUNTER(_segment_profile, "PredicateLMStage1InputRows", TUnit::UNIT);
_predicate_lm_stage1_output_rows_counter =
ADD_COUNTER(_segment_profile, "PredicateLMStage1OutputRows", TUnit::UNIT);
_predicate_lm_stage2_by_rowids_batches_counter =
ADD_COUNTER(_segment_profile, "PredicateLMStage2ByRowIdsBatches", TUnit::UNIT);
_predicate_lm_stage2_by_all_rows_batches_counter =
ADD_COUNTER(_segment_profile, "PredicateLMStage2ByAllRowsBatches", TUnit::UNIT);
_predicate_lm_stage2_rows_read_counter =
ADD_COUNTER(_segment_profile, "PredicateLMStage2RowsRead", TUnit::UNIT);

_vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime");
_short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime");
_expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime");
Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/operator/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
RuntimeProfile::Counter* _rows_vec_cond_input_counter = nullptr;
RuntimeProfile::Counter* _rows_short_circuit_cond_input_counter = nullptr;
RuntimeProfile::Counter* _rows_expr_cond_input_counter = nullptr;

// Multi-stage predicate lazy materialization (SegmentIterator)
RuntimeProfile::Counter* _predicate_lm_stage1_input_rows_counter = nullptr;
RuntimeProfile::Counter* _predicate_lm_stage1_output_rows_counter = nullptr;
RuntimeProfile::Counter* _predicate_lm_stage2_by_rowids_batches_counter = nullptr;
RuntimeProfile::Counter* _predicate_lm_stage2_by_all_rows_batches_counter = nullptr;
RuntimeProfile::Counter* _predicate_lm_stage2_rows_read_counter = nullptr;

RuntimeProfile::Counter* _vec_cond_timer = nullptr;
RuntimeProfile::Counter* _short_cond_timer = nullptr;
RuntimeProfile::Counter* _expr_filter_timer = nullptr;
Expand Down
44 changes: 44 additions & 0 deletions be/src/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "core/data_type/data_type_number.h"
#include "exec/common/variant_util.h"
#include "exec/operator/olap_scan_operator.h"
#include "exec/scan/predicate_lm_utils.h"
#include "exec/scan/scan_node.h"
#include "exprs/function_filter.h"
#include "exprs/vexpr.h"
Expand All @@ -66,6 +67,7 @@
#include "util/debug_points.h"
#endif
#include "util/json/path_in_data.h"
#include "util/string_util.h"

namespace doris {
#include "common/compile_check_avoid_begin.h"
Expand Down Expand Up @@ -102,6 +104,7 @@ OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& param
.collection_statistics {},
.ann_topn_runtime {},
.condition_cache_digest = parent->get_condition_cache_digest(),
.predicate_lm_stage1_column_ids {},
.binlog_scan_type = params.binlog_scan_type}),
_start_tso(params.start_tso),
_end_tso(params.end_tso) {
Expand Down Expand Up @@ -218,6 +221,35 @@ Status OlapScanner::_prepare_impl() {
tablet_schema->update_indexes_from_thrift(olap_scan_node.indexes_desc);
}

const auto& qopts = _state->query_options();

// Map FE session variable enable_multi_stage_predicate_lm -> storage read option flag.
// This flag is what SegmentIterator actually checks.
if (qopts.__isset.enable_multi_stage_predicate_lm) {
_tablet_reader_params.enable_multi_stage_predicate_lazy_materialization =
qopts.enable_multi_stage_predicate_lm;
}

// Map FE session variable predicate_lm_stage1_survival_ratio_threshold -> stage2 strategy threshold.
if (qopts.__isset.predicate_lm_stage1_survival_ratio_threshold) {
_tablet_reader_params.predicate_lm_stage1_survival_ratio_threshold =
qopts.predicate_lm_stage1_survival_ratio_threshold;
}

// Map FE session variable predicate_lm_stage1_cols -> ColumnId list
// NOTE: only meaningful when multi-stage predicate LM is enabled.
if (_tablet_reader_params.enable_multi_stage_predicate_lazy_materialization) {
if (qopts.__isset.predicate_lm_stage1_cols && !qopts.predicate_lm_stage1_cols.empty()) {
std::vector<ColumnId> stage1_column_ids;
RETURN_IF_ERROR(parse_predicate_lm_stage1_cols_to_column_ids(
qopts.predicate_lm_stage1_cols, tablet_schema,
olap_scan_node.__isset.db_name ? std::string_view(olap_scan_node.db_name)
: std::string_view(),
olap_scan_node.table_name, &stage1_column_ids));
_tablet_reader_params.predicate_lm_stage1_column_ids = std::move(stage1_column_ids);
}
}

if (_tablet_reader_params.rs_splits.empty()) {
// Non-pipeline mode, Tablet : Scanner = 1 : 1
// acquire tablet rowset readers at the beginning of the scan node
Expand Down Expand Up @@ -931,6 +963,18 @@ void OlapScanner::_collect_profile_before_close() {
COUNTER_UPDATE(local_state->_rows_short_circuit_cond_input_counter,
stats.short_circuit_cond_input_rows);
COUNTER_UPDATE(local_state->_rows_expr_cond_input_counter, stats.expr_cond_input_rows);

COUNTER_UPDATE(local_state->_predicate_lm_stage1_input_rows_counter,
stats.predicate_lm_stage1_input_rows);
COUNTER_UPDATE(local_state->_predicate_lm_stage1_output_rows_counter,
stats.predicate_lm_stage1_output_rows);
COUNTER_UPDATE(local_state->_predicate_lm_stage2_by_rowids_batches_counter,
stats.predicate_lm_stage2_by_rowids_batches);
COUNTER_UPDATE(local_state->_predicate_lm_stage2_by_all_rows_batches_counter,
stats.predicate_lm_stage2_by_all_rows_batches);
COUNTER_UPDATE(local_state->_predicate_lm_stage2_rows_read_counter,
stats.predicate_lm_stage2_rows_read);

COUNTER_UPDATE(local_state->_stats_filtered_counter, stats.rows_stats_filtered);
COUNTER_UPDATE(local_state->_stats_rp_filtered_counter, stats.rows_stats_rp_filtered);
COUNTER_UPDATE(local_state->_dict_filtered_counter, stats.segment_dict_filtered);
Expand Down
177 changes: 177 additions & 0 deletions be/src/exec/scan/predicate_lm_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <algorithm>
#include <string>
#include <string_view>
#include <vector>

#include "common/status.h"
#include "storage/tablet/tablet_schema.h"
#include "util/string_util.h"

namespace doris {

namespace predicate_lm_utils_detail {

inline std::string to_lower_trimmed(std::string_view sv) {
sv = doris::trim(sv);
return doris::to_lower(std::string(sv));
}

inline std::string normalize_table_name_for_match(std::string_view table_name_sv) {
table_name_sv = doris::trim(table_name_sv);
std::string s(table_name_sv);

// Strip rollup suffix: "tbl(rollup)" => "tbl".
if (!s.empty() && s.back() == ')') {
auto pos = s.find_last_of('(');
if (pos != std::string::npos) {
s.resize(pos);
}
}

return to_lower_trimmed(std::string_view(s));
}

inline bool ends_with(std::string_view s, std::string_view suffix) {
if (suffix.size() > s.size()) {
return false;
}
return s.substr(s.size() - suffix.size()) == suffix;
}

inline std::string normalize_identifier_piece(std::string_view sv) {
sv = doris::trim(sv);
if (sv.size() >= 2 && sv.front() == '`' && sv.back() == '`') {
sv = sv.substr(1, sv.size() - 2);
sv = doris::trim(sv);
}
return std::string(sv);
}

inline bool qualifier_matches_current_table(const std::string& normalized_current_full_table_name,
const std::string& normalized_current_table_only,
const std::string& qualifier_raw) {
std::string qualifier = to_lower_trimmed(std::string_view(qualifier_raw));
if (qualifier.empty()) {
return true;
}

// qualifier="tbl"
if (qualifier.find('.') == std::string::npos) {
return qualifier == normalized_current_table_only;
}

// qualifier="db.tbl" (also allow suffix match for catalog.db.tbl)
if (qualifier == normalized_current_full_table_name) {
return true;
}
std::string with_dot = "." + qualifier;
return ends_with(normalized_current_full_table_name, with_dot);
}

} // namespace predicate_lm_utils_detail

inline Status parse_predicate_lm_stage1_cols_to_column_ids(const std::string& cols,
const TabletSchemaSPtr& tablet_schema,
std::string_view current_db_name,
std::string_view current_table_name,
std::vector<ColumnId>* column_ids) {
column_ids->clear();
if (cols.empty()) {
return Status::OK();
}

const std::string normalized_db = predicate_lm_utils_detail::to_lower_trimmed(current_db_name);
const std::string normalized_tbl =
predicate_lm_utils_detail::normalize_table_name_for_match(current_table_name);
const std::string normalized_current_table_only = normalized_tbl;

std::string normalized_current_full_table_name;
if (!normalized_db.empty()) {
normalized_current_full_table_name.reserve(normalized_db.size() + 1 + normalized_tbl.size());
normalized_current_full_table_name.append(normalized_db);
normalized_current_full_table_name.push_back('.');
normalized_current_full_table_name.append(normalized_tbl);
} else {
normalized_current_full_table_name = normalized_tbl;
}

std::vector<std::string> parts = doris::split(cols, ",");

for (const auto& part : parts) {
std::string_view token_sv = doris::trim(std::string_view(part));
if (token_sv.empty()) {
continue;
}

// Support qualified identifiers: tbl.col / db.tbl.col
// (Backticks are supported on each identifier piece, e.g. `db`.`tbl`.`col`)
std::vector<std::string> dot_parts = doris::split(std::string(token_sv), ".");
std::vector<std::string> ident_parts;
ident_parts.reserve(dot_parts.size());
for (const auto& dot_part : dot_parts) {
auto piece = predicate_lm_utils_detail::normalize_identifier_piece(std::string_view(dot_part));
if (!piece.empty()) {
ident_parts.emplace_back(std::move(piece));
}
}
if (ident_parts.empty()) {
continue;
}

std::string col_name = std::move(ident_parts.back());
ident_parts.pop_back();

std::string qualifier;
if (!ident_parts.empty()) {
qualifier.reserve(64);
for (size_t i = 0; i < ident_parts.size(); ++i) {
if (i > 0) {
qualifier.push_back('.');
}
qualifier.append(ident_parts[i]);
}
}

if (!predicate_lm_utils_detail::qualifier_matches_current_table(
normalized_current_full_table_name, normalized_current_table_only, qualifier)) {
continue;
}

int32_t cid = tablet_schema->field_index(col_name);
if (cid < 0) {
cid = tablet_schema->field_index(doris::to_lower(col_name));
}

// Ignore unknown columns (do not fail the query).
if (cid < 0) {
continue;
}

column_ids->push_back(static_cast<ColumnId>(cid));
}

std::sort(column_ids->begin(), column_ids->end());
column_ids->erase(std::unique(column_ids->begin(), column_ids->end()), column_ids->end());
return Status::OK();
}

} // namespace doris
16 changes: 16 additions & 0 deletions be/src/storage/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,22 @@ class StorageReadOptions {
std::shared_ptr<ScoreRuntime> score_runtime;
CollectionStatisticsPtr collection_statistics;

// Multi-stage predicate lazy materialization (experimental):
// Stage1 reads a subset of predicate columns by index, evaluates predicates to produce
// an intermediate surviving row set; stage2 reads remaining predicate columns only for
// surviving rows (by rowids) and continues filtering.
//
// When disabled (default), SegmentIterator keeps the existing behavior: all predicate
// columns are read in the first pass.
bool enable_multi_stage_predicate_lazy_materialization = false;
// ColumnIds (tablet schema ordinal ids) to read/evaluate in stage1.
// If empty and enable_multi_stage_predicate_lazy_materialization=true, SegmentIterator will
// fallback to a conservative heuristic (currently: runtime-filter predicate columns).
std::vector<ColumnId> predicate_lm_stage1_column_ids;
// If stage1 survival ratio is greater than this threshold, stage2 rowid reads may be
// less beneficial. SegmentIterator may choose a conservative evaluation path.
double predicate_lm_stage1_survival_ratio_threshold = 0.8;

// Cache for sparse column data to avoid redundant reads
// col_unique_id -> cached column_ptr
std::unordered_map<int32_t, ColumnPtr> sparse_column_cache;
Expand Down
8 changes: 8 additions & 0 deletions be/src/storage/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,14 @@ struct OlapReaderStatistics {
int64_t vec_cond_input_rows = 0;
int64_t short_circuit_cond_input_rows = 0;
int64_t expr_cond_input_rows = 0;

// Multi-stage predicate lazy materialization stats (SegmentIterator)
int64_t predicate_lm_stage1_input_rows = 0;
int64_t predicate_lm_stage1_output_rows = 0;
int64_t predicate_lm_stage2_by_rowids_batches = 0;
int64_t predicate_lm_stage2_by_all_rows_batches = 0;
int64_t predicate_lm_stage2_rows_read = 0;

int64_t rows_vec_del_cond_filtered = 0;
int64_t vec_cond_ns = 0;
int64_t short_cond_ns = 0;
Expand Down
11 changes: 11 additions & 0 deletions be/src/storage/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ void BetaRowsetReader::reset_read_options() {
_read_options.col_id_to_predicates.clear();
_read_options.del_predicates_for_zone_map.clear();
_read_options.key_ranges.clear();

_read_options.enable_multi_stage_predicate_lazy_materialization = false;
_read_options.predicate_lm_stage1_column_ids.clear();
_read_options.predicate_lm_stage1_survival_ratio_threshold = 0.8;
}

RowsetReaderSharedPtr BetaRowsetReader::clone() {
Expand Down Expand Up @@ -116,6 +120,13 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.commit_tso = _rowset->rowset_meta()->commit_tso();
_read_options.tablet_id = _rowset->rowset_meta()->tablet_id();
_read_options.read_limit = _topn_limit;

_read_options.enable_multi_stage_predicate_lazy_materialization =
_read_context->enable_multi_stage_predicate_lazy_materialization;
_read_options.predicate_lm_stage1_column_ids = _read_context->predicate_lm_stage1_column_ids;
_read_options.predicate_lm_stage1_survival_ratio_threshold =
_read_context->predicate_lm_stage1_survival_ratio_threshold;

if (_read_context->lower_bound_keys != nullptr) {
for (int i = 0; i < _read_context->lower_bound_keys->size(); ++i) {
_read_options.key_ranges.emplace_back(&_read_context->lower_bound_keys->at(i),
Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ struct RowsetReaderContext {

// General LIMIT budget forwarded to SegmentIterator.
int64_t general_read_limit = -1;

// Multi-stage predicate lazy materialization (experimental)
bool enable_multi_stage_predicate_lazy_materialization = false;
std::vector<ColumnId> predicate_lm_stage1_column_ids;
double predicate_lm_stage1_survival_ratio_threshold = 0.8;
};

} // namespace doris
Expand Down
Loading
Loading