diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index d9d60bbaad46..94f2fa6e3a8d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -524,6 +524,9 @@ private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders boolean defaultEnableDynamicFilteringSemiJoin = _config.getProperty( CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN, CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN); + boolean defaultEnableRuntimeFilterJoin = _config.getProperty( + CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_RUNTIME_FILTER_JOIN, + CommonConstants.Broker.DEFAULT_ENABLE_RUNTIME_FILTER_JOIN); boolean defaultUsePhysicalOptimizer = _config.getProperty( CommonConstants.Broker.CONFIG_OF_USE_PHYSICAL_OPTIMIZER, CommonConstants.Broker.DEFAULT_USE_PHYSICAL_OPTIMIZER); @@ -576,6 +579,7 @@ private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders .defaultUseLeafServerForIntermediateStage(defaultUseLeafServerForIntermediateStage) .defaultEnableGroupTrim(defaultEnableGroupTrim) .defaultEnableDynamicFilteringSemiJoin(defaultEnableDynamicFilteringSemiJoin) + .defaultEnableRuntimeFilterJoin(defaultEnableRuntimeFilterJoin) .defaultUsePhysicalOptimizer(defaultUsePhysicalOptimizer) .defaultUseLiteMode(defaultUseLiteMode) .defaultRunInBroker(defaultRunInBroker) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 9702380739f7..ac8b5e3c0401 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -836,4 +836,31 @@ public static int getSortExchangeCopyThreshold(Map options, int } return i; } + + /** + * Resolves whether the additive INNER-join probe-side runtime filter is enabled for this query. + * The {@link QueryOptionKey#RUNTIME_FILTER_JOIN} query option (if present) overrides the cluster + * default. This is an enable/disable switch: {@code off}/{@code false} disable, {@code on}/{@code true} + * enable (defaulting to the AUTO tier); any other value throws. The per-join reducer mode (in / bloom / + * auto) is selected via the {@code runtime_filter} join hint, not this option. NOTE: this only resolves + * the cluster/query-level default — a per-join {@code runtime_filter} hint can still force-enable when + * this is false. + */ + public static boolean getRuntimeFilterJoinEnabled(Map options, boolean defaultValue) { + String value = options.get(QueryOptionKey.RUNTIME_FILTER_JOIN); + if (value == null) { + return defaultValue; + } + switch (value.toLowerCase()) { + case "off": + case "false": + return false; + case "on": + case "true": + return true; + default: + throw new IllegalArgumentException(QueryOptionKey.RUNTIME_FILTER_JOIN + + " must be one of: on, off (or true/false), got: " + value); + } + } } diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto index 8c2917d09686..c59b192d96b3 100644 --- a/pinot-common/src/main/proto/plan.proto +++ b/pinot-common/src/main/proto/plan.proto @@ -42,6 +42,7 @@ message PlanNode { ExplainNode explainNode = 16; EnrichedJoinNode enrichedJoinNode = 17; UnnestNode unnestNode = 18; + RuntimeFilterNode runtimeFilterNode = 19; } } @@ -227,6 +228,18 @@ message ValueNode { repeated LiteralRow literalRows = 1; } +enum RuntimeFilterType { + IN = 0; + BLOOM = 1; + AUTO = 2; +} + +message RuntimeFilterNode { + repeated int32 probeKeys = 1; + repeated int32 buildKeys = 2; + RuntimeFilterType filterType = 3; +} + message UnnestNode { // Expressions that evaluate to arrays/multi-values to be unnested repeated Expression arrayExprs = 1; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/RuntimeFilterJoinIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/RuntimeFilterJoinIntegrationTest.java new file mode 100644 index 000000000000..e5b2bee19565 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/RuntimeFilterJoinIntegrationTest.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.custom; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +/// End-to-end correctness coverage for the additive INNER-join probe-side runtime filter +/// ({@code PinotJoinToInnerRuntimeFilterRule}). Uses self-joins over a single table so a selective +/// filter on the build (right) side makes the build small; the probe (left) side is then reduced by the +/// runtime filter. The core invariant tested: results MUST be identical with the runtime filter on (in +/// every mode: in / bloom / auto) and off — the filter is a no-false-negative reducer and the real hash +/// join remains the source of truth. Covers exact-IN, bloom (numeric + string keys), AUTO, multi-key, +/// empty build, and null keys. +@Test(suiteName = "CustomClusterIntegrationTest") +public class RuntimeFilterJoinIntegrationTest extends CustomDataQueryClusterIntegrationTest { + private static final String TABLE_NAME = "RuntimeFilterJoinIntegrationTest"; + private static final String ID = "id"; // unique INT key + private static final String LID = "lid"; // unique LONG key + private static final String DKEY = "dkey"; // unique DOUBLE key (id + 0.5) + private static final String SKEY = "skey"; // STRING key, 25 distinct values (groups of 4) + private static final String NKEY = "nkey"; // nullable LONG key (null when id % 7 == 0) + private static final String NANKEY = "nankey"; // DOUBLE key that is NaN for id < 3, else distinct (id + 1000) + private static final String CAT = "cat"; // category: "rare" for id < 5, else "common" + private static final String VAL = "val"; // INT value == id + private static final int NUM_DOCS = 100; + private static final int NUM_RARE = 5; // ids 0..4 + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + protected long getCountStarResult() { + return NUM_DOCS; + } + + @Override + public TableConfig createOfflineTableConfig() { + // Null handling on so NKEY genuinely stores NULLs (exercises the build-side IS NOT NULL filter). + return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setNullHandlingEnabled(true).build(); + } + + @Override + public Schema createSchema() { + return new Schema.SchemaBuilder() + .setSchemaName(getTableName()) + .addSingleValueDimension(ID, DataType.INT) + .addSingleValueDimension(LID, DataType.LONG) + .addSingleValueDimension(DKEY, DataType.DOUBLE) + .addSingleValueDimension(SKEY, DataType.STRING) + .addSingleValueDimension(NKEY, DataType.LONG) + .addSingleValueDimension(NANKEY, DataType.DOUBLE) + .addSingleValueDimension(CAT, DataType.STRING) + .addMetric(VAL, DataType.INT) + .build(); + } + + @Override + public List createAvroFiles() + throws IOException { + org.apache.avro.Schema nullableLong = org.apache.avro.Schema.createUnion( + List.of(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG))); + org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); + avroSchema.setFields(List.of( + new org.apache.avro.Schema.Field(ID, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), null, + null), + new org.apache.avro.Schema.Field(LID, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null, + null), + new org.apache.avro.Schema.Field(DKEY, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE), null, + null), + new org.apache.avro.Schema.Field(SKEY, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), null, + null), + new org.apache.avro.Schema.Field(NKEY, nullableLong, null, null), + new org.apache.avro.Schema.Field(NANKEY, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE), + null, null), + new org.apache.avro.Schema.Field(CAT, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), null, + null), + new org.apache.avro.Schema.Field(VAL, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), null, + null))); + + try (AvroFilesAndWriters avroFilesAndWriters = createAvroFilesAndWriters(avroSchema)) { + List> writers = avroFilesAndWriters.getWriters(); + for (int id = 0; id < NUM_DOCS; id++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put(ID, id); + record.put(LID, (long) id); + record.put(DKEY, id + 0.5); + record.put(SKEY, "key_" + (id % 25)); + record.put(NKEY, id % 7 == 0 ? null : (long) id); + record.put(NANKEY, id < 3 ? Double.NaN : (double) (id + 1000)); + record.put(CAT, id < NUM_RARE ? "rare" : "common"); + record.put(VAL, id); + writers.get(id % getNumAvroFiles()).append(record); + } + return avroFilesAndWriters.getAvroFiles(); + } + } + + /// Runs an MSE query, surfacing any broker/server exception. + private JsonNode runMse(String query) + throws Exception { + setUseMultiStageQueryEngine(true); + JsonNode response = postQuery(query); + JsonNode exceptions = response.get("exceptions"); + assertTrue(exceptions == null || exceptions.isEmpty(), + "query failed: " + query + " -> " + response.toPrettyString()); + return response.get("resultTable"); + } + + /// Asserts that the same query (built by injecting the runtime_filter hint into {@code selectBody}) + /// produces identical results in every mode and with the filter off (the baseline). The {@code rest} + /// is the FROM/WHERE/ORDER BY tail shared by all variants. + private void assertAllModesMatchBaseline(String selectBody, String rest) + throws Exception { + JsonNode baseline = runMse("SELECT " + selectBody + " " + rest); + for (String mode : new String[]{"in", "bloom", "auto"}) { + String hinted = "SELECT /*+ joinOptions(runtime_filter='" + mode + "') */ " + selectBody + " " + rest; + JsonNode withFilter = runMse(hinted); + assertEquals(withFilter, baseline, + "runtime_filter='" + mode + "' must produce identical results to the baseline for: " + rest); + } + } + + @Test + public void testCountSelfJoinSelectiveBuild() + throws Exception { + // Build (t2) filtered to the 5 rare ids; probe (t1) reduced to those ids by the runtime filter. + assertAllModesMatchBaseline("COUNT(*)", + "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + ID + " = t2." + ID + + " WHERE t2." + CAT + " = 'rare'"); + } + + @Test + public void testCountAndSumValueMatchesBaseline() + throws Exception { + assertAllModesMatchBaseline("COUNT(*), SUM(t1." + VAL + ")", + "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + ID + " = t2." + ID + + " WHERE t2." + CAT + " = 'rare'"); + } + + @Test + public void testProjectedRowsMatchBaseline() + throws Exception { + // Compares the actual row set (ordered), catching wrong-row bugs the COUNT cannot. + assertAllModesMatchBaseline("t1." + ID + ", t1." + VAL, + "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + ID + " = t2." + ID + + " WHERE t2." + CAT + " = 'rare' ORDER BY t1." + ID); + } + + @Test + public void testLongKeyMatchesBaseline() + throws Exception { + assertAllModesMatchBaseline("COUNT(*)", + "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + LID + " = t2." + LID + + " WHERE t2." + CAT + " = 'rare'"); + } + + @Test + public void testStringKeyMatchesBaseline() + throws Exception { + // Build filtered to a single id -> a single skey; probe reduced to that skey (4 rows share it). + // Exercises the bloom path for a STRING key (no BETWEEN range predicate). + assertAllModesMatchBaseline("COUNT(*)", + "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + SKEY + " = t2." + SKEY + + " WHERE t2." + ID + " = 0"); + } + + @Test + public void testMultiKeyMatchesBaseline() + throws Exception { + assertAllModesMatchBaseline("COUNT(*)", + "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + ID + " = t2." + ID + + " AND t1." + LID + " = t2." + LID + " WHERE t2." + CAT + " = 'rare'"); + } + + @Test + public void testMixedTypeKeyMatchesBaseline() + throws Exception { + // Join an INT key to a LONG key (id == lid for every row). Calcite coerces both sides to a common + // type, so the probe-leaf key column and the build-key set share a stored type end-to-end; the filter + // must stay sound. Guards the bloom path (which keys its IdSet on the build stored type) against a + // probe/build type skew. + assertAllModesMatchBaseline("COUNT(*)", + "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + ID + " = t2." + LID + + " WHERE t2." + CAT + " = 'rare'"); + } + + @Test + public void testNaNKeyMatchesBaseline() + throws Exception { + // NaN join keys on both sides (ids 0,1,2 have nankey = NaN); the build (t2 where id < 3) is all-NaN. + // Confirms the exact-IN and bloom reducers agree with the MSE hash join on NaN equality: whatever the + // join does with NaN = NaN, the filter must preserve it (no false negatives). If the leaf IN/bloom + // dropped a probe NaN row the join keeps, the on/off counts would diverge and this would fail. + String rest = "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + NANKEY + " = t2." + NANKEY + + " WHERE t2." + ID + " < 3"; + assertAllModesMatchBaseline("COUNT(*)", rest); + // Pin the regime so the parity check is not vacuous: the MSE hash join treats NaN = NaN, so the 3 NaN + // probe rows join the 3 NaN build rows -> 9 pairs, and the reducer preserves all of them. + JsonNode baseline = runMse("SELECT COUNT(*) " + rest); + assertEquals(baseline.get("rows").get(0).get(0).asLong(), 9L, + "the MSE hash join matches NaN = NaN (3 x 3); if this changes, revisit the exact-IN NaN handling"); + } + + @Test + public void testEmptyBuildYieldsEmptyResult() + throws Exception { + // Build side matches nothing -> the probe is pruned entirely (constant-false). Result must be empty + // in every mode and the baseline. + String rest = "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + ID + " = t2." + ID + + " WHERE t2." + CAT + " = 'does_not_exist'"; + assertAllModesMatchBaseline("t1." + ID, rest + " ORDER BY t1." + ID); + JsonNode baseline = runMse("SELECT t1." + ID + " " + rest); + assertEquals(baseline.get("rows").size(), 0, "empty build must yield no rows"); + } + + @Test + public void testDoubleKeyMatchesBaseline() + throws Exception { + // DOUBLE join key exercises the FLOAT/DOUBLE exact-IN and bloom-BETWEEN paths end-to-end. + assertAllModesMatchBaseline("COUNT(*), SUM(t1." + VAL + ")", + "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + DKEY + " = t2." + DKEY + + " WHERE t2." + CAT + " = 'rare'"); + } + + @Test + public void testNullKeysMatchBaseline() + throws Exception { + // Nullable join key with genuine NULLs on both sides. An INNER join drops NULL keys; the build-side + // IS NOT NULL filter plus the reducer must preserve on==off parity (NULL rows never join either way). + String rest = "FROM " + TABLE_NAME + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + NKEY + " = t2." + NKEY + + " WHERE t2." + CAT + " = 'rare' ORDER BY t1." + ID; + JsonNode baseline = runMse("SET enableNullHandling=true; SELECT t1." + ID + " " + rest); + for (String mode : new String[]{"in", "bloom", "auto"}) { + JsonNode withFilter = runMse("SET enableNullHandling=true; SELECT /*+ joinOptions(runtime_filter='" + mode + + "') */ t1." + ID + " " + rest); + assertEquals(withFilter, baseline, "null-key runtime_filter='" + mode + "' must match baseline"); + } + // Of the 5 rare ids (0..4), id 0 has a NULL nkey and is dropped; ids 1..4 join -> 4 rows. + assertEquals(baseline.get("rows").size(), 4); + } + + @Test + public void testExactCountValues() + throws Exception { + // Pin the concrete expected counts (not just on==off) so a silent reduction-to-zero would be caught. + JsonNode rare = runMse("SELECT /*+ joinOptions(runtime_filter='in') */ COUNT(*) FROM " + TABLE_NAME + " t1 JOIN " + + TABLE_NAME + " t2 ON t1." + ID + " = t2." + ID + " WHERE t2." + CAT + " = 'rare'"); + assertEquals(rare.get("rows").get(0).get(0).asLong(), NUM_RARE); + + JsonNode stringKey = runMse("SELECT /*+ joinOptions(runtime_filter='bloom') */ COUNT(*) FROM " + TABLE_NAME + + " t1 JOIN " + TABLE_NAME + " t2 ON t1." + SKEY + " = t2." + SKEY + " WHERE t2." + ID + " = 0"); + // skey "key_0" is shared by ids 0,25,50,75 in t1; t2 has only id 0 -> 4 joined rows. + assertEquals(stringKey.get("rows").get(0).get(0).asLong(), 4L); + } +} diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRuntimeFilterJoin.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRuntimeFilterJoin.java new file mode 100644 index 000000000000..027adeed6e4b --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRuntimeFilterJoin.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.integration.tests.BaseClusterIntegrationTest; +import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.util.TestUtils; +import org.intellij.lang.annotations.Language; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.OptionsBuilder; + + +/** + * Benchmarks the additive INNER-join probe-side runtime filter ({@code PinotJoinToInnerRuntimeFilterRule}) + * on the canonical large-fact ⋈ small/selective-dim shape. It is a self-join of a 1M-row table with + * unique keys (so the join is 1:1) where a {@code WHERE} on the build (right) side controls how + * selective — i.e. how small — the build is, via {@code t2.intCol % buildMod = 0}: + *
    + *
  • {@code buildMod = 10000} -> 0.01% of the build survives (100 rows): the runtime filter reduces + * the probe to those 100 keys before it is shuffled into the join — the intended win. AUTO uses the + * exact-IN tier here.
  • + *
  • {@code buildMod = 1} -> 100% build (every row): the filter cannot reduce the probe (every key + * matches) and only adds the build-key broadcast + reducer overhead — the regression/cost case the + * feature must be left off for. AUTO uses the bloom tier here (build > maxInSize).
  • + *
+ * {@code filter = off} is the baseline (no hint); {@code filter = auto} enables the runtime filter via the + * join hint (the cluster default stays off). + * + *

The {@code select*} methods project a realistically wide probe row (two numerics plus three + * strings) through the join, so the probe-side shuffle — the thing the feature reduces, and the stated + * motivation of a wide fact table being shuffled needlessly — dominates. The {@code countJoinInt} method + * (only the join key crosses the network) is included as a reference for the overhead floor when there is + * almost nothing to shuffle: it isolates the case where the reducer cannot pay for itself. + */ +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Benchmark) +public class BenchmarkRuntimeFilterJoin extends BaseClusterIntegrationTest { + + public static void main(String[] args) + throws Exception { + ChainedOptionsBuilder opt = new OptionsBuilder() + .include(BenchmarkRuntimeFilterJoin.class.getSimpleName()); + new Runner(opt.build()).run(); + } + + /// Number of rows on each segment. + private final int _rowsPerSegment = 100_000; + @Param({"10"}) + private int _segments; + + /// Reducer mode: "off" (baseline, no hint) or a runtime_filter hint value ("auto", "in", "bloom"). + @Param({"off", "auto"}) + private String _filter; + + /// Build selectivity: the build keeps rows where {@code intCol % buildMod == 0}. The two values are the + /// extremes — 10000 (~0.01% build, the win, AUTO uses exact IN) and 1 (100% build, the cost, AUTO uses + /// bloom). The ~1% mid-point (buildMod=100) is omitted because it straddles the AUTO IN/bloom threshold. + @Param({"10000", "1"}) + private int _buildMod; + + private JsonNode query(String query) + throws Exception { + JsonNode result = postQuery(query, + ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(), true), + null, + getExtraQueryProperties()); + JsonNode exceptions = result.get("exceptions").get(0); + if (exceptions != null) { + throw new RuntimeException(exceptions.get("message").asText()); + } + return result.get("resultTable").get("rows"); + } + + @Override + protected Map getExtraQueryProperties() { + return Map.of("useMultistageEngine", "true", "dropResults", "true"); + } + + private String hint() { + return _filter.equals("off") ? "" : "/*+ joinOptions(runtime_filter='" + _filter + "') */ "; + } + + /// Projects the probe rows through an INT-key join, so the probe shuffle dominates: the runtime filter's + /// probe reduction is the main lever here. + @Benchmark + public JsonNode selectJoinInt() + throws Exception { + @Language("sql") + String query = "SET useMultistageEngine=true;" + + "SET maxRowsInJoin=1000000000;" + + "SELECT " + hint() + PROBE_PROJECTION + " " + + "FROM MyTable t1 " + + "JOIN MyTable t2 " + + "ON t1.intCol = t2.intCol " + + "WHERE t2.intCol % " + _buildMod + " = 0"; + return query(query); + } + + /// Same projected shape on a STRING key, which exercises the bloom-on-string reducer (no range predicate). + @Benchmark + public JsonNode selectJoinStr() + throws Exception { + @Language("sql") + String query = "SET useMultistageEngine=true;" + + "SET maxRowsInJoin=1000000000;" + + "SELECT " + hint() + PROBE_PROJECTION + " " + + "FROM MyTable t1 " + + "JOIN MyTable t2 " + + "ON t1.strCol = t2.strCol " + + "WHERE t2.intCol % " + _buildMod + " = 0"; + return query(query); + } + + /// Reference: only the join key crosses the network (count, no projection), so there is little to + /// shuffle — this isolates the reducer's fixed overhead (the floor below which the feature cannot win). + @Benchmark + public JsonNode countJoinInt() + throws Exception { + @Language("sql") + String query = "SET useMultistageEngine=true;" + + "SET maxRowsInJoin=1000000000;" + + "SELECT " + hint() + "count(*) " + + "FROM MyTable t1 " + + "JOIN MyTable t2 " + + "ON t1.intCol = t2.intCol " + + "WHERE t2.intCol % " + _buildMod + " = 0"; + return query(query); + } + + @Setup + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + CompletableFuture clusterStarted = CompletableFuture.runAsync(() -> { + try { + startZk(); + startController(); + startBroker(); + startServers(2); + addSchema(SCHEMA); + addTableConfig(TABLE_CONFIG); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + CompletableFuture[] futures = new CompletableFuture[_segments]; + for (int i = 0; i < _segments; i++) { + futures[i] = buildSegment("segment" + i, (long) i * _rowsPerSegment); + } + CompletableFuture.allOf(futures).join(); + + clusterStarted.join(); + uploadSegments(TABLE_NAME, _tarDir); + waitForAllDocsLoaded(60000); + } + + @Override + protected long getCountStarResult() { + return _rowsPerSegment * (long) _segments; + } + + private CompletableFuture buildSegment(String segmentName, long keyBase) { + return CompletableFuture.runAsync(() -> { + try { + LazyDataGenerator rows = createDataGenerator(keyBase); + SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + config.setOutDir(_segmentDir.getPath()); + config.setTableName(TABLE_NAME); + config.setSegmentName(segmentName); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + try (RecordReader recordReader = new GeneratedDataRecordReader(rows)) { + driver.init(config, recordReader); + driver.build(); + } + + File indexDir = new File(_segmentDir, segmentName); + File segmentTarFile = new File(_tarDir, segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + /// Deterministic, globally-unique keys ({@code keyBase + index}) so the self-join is 1:1 and the build + /// selectivity (and thus the probe reduction) is exact rather than statistical. + private LazyDataGenerator createDataGenerator(long keyBase) { + return new LazyDataGenerator() { + @Override + public int size() { + return _rowsPerSegment; + } + + @Override + public GenericRow next(GenericRow row, int index) { + long value = keyBase + index; + row.putValue(INT_COL_NAME, (int) value); + row.putValue(LONG_COL_NAME, value); + row.putValue(DOUBLE_COL_NAME, (double) value); + row.putValue(STRING_COL_NAME, "value" + value); + row.putValue(STR2_COL_NAME, "the_quick_brown_fox_jumps_over_" + value); + row.putValue(STR3_COL_NAME, "lorem_ipsum_dolor_sit_amet_consectetur_" + value); + return row; + } + + @Override + public void rewind() { + } + }; + } + + @TearDown + public void tearDown() + throws IOException { + stopServer(); + stopBroker(); + stopController(); + stopZk(); + FileUtils.deleteQuietly(_tempDir); + } + + private static final String TABLE_NAME = "MyTable"; + private static final String INT_COL_NAME = "intCol"; + private static final String LONG_COL_NAME = "longCol"; + private static final String DOUBLE_COL_NAME = "doubleCol"; + private static final String STRING_COL_NAME = "strCol"; + private static final String STR2_COL_NAME = "str2Col"; + private static final String STR3_COL_NAME = "str3Col"; + + /// The wide probe-row projection that crosses the network on the probe side of the join. + private static final String PROBE_PROJECTION = + "t1.intCol, t1.longCol, t1.doubleCol, t1.strCol, t1.str2Col, t1.str3Col"; + + private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setInvertedIndexColumns(List.of(INT_COL_NAME, STRING_COL_NAME)) + .build(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT) + .addSingleValueDimension(LONG_COL_NAME, FieldSpec.DataType.LONG) + .addSingleValueDimension(DOUBLE_COL_NAME, FieldSpec.DataType.DOUBLE) + .addSingleValueDimension(STRING_COL_NAME, FieldSpec.DataType.STRING) + .addSingleValueDimension(STR2_COL_NAME, FieldSpec.DataType.STRING) + .addSingleValueDimension(STR3_COL_NAME, FieldSpec.DataType.STRING) + .build(); +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java index 05258e3b0007..e0f3319b4c2a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java @@ -125,6 +125,14 @@ public static class JoinHintOptions { */ public static final String APPEND_DISTINCT_TO_SEMI_JOIN_PROJECT = "append_distinct_to_semi_join_project"; + /** + * Controls the additive INNER-join probe-side runtime filter. Supported values (case-insensitive): + * {@code off} (disable), {@code in} (exact IN list), {@code bloom} (bloom filter), {@code auto} + * (exact IN below a threshold, else bloom). When absent, the cluster/query-level default applies. + * See {@code PinotJoinToInnerRuntimeFilterRule}. + */ + public static final String RUNTIME_FILTER = "runtime_filter"; + @Nullable public static Map getJoinHintOptions(Join join) { return PinotHintStrategyTable.getHintOptions(join.getHints(), JOIN_HINT_OPTIONS); @@ -155,6 +163,50 @@ public static Boolean isColocatedByJoinKeys(Join join) { String hint = PinotHintStrategyTable.getHintOption(join.getHints(), JOIN_HINT_OPTIONS, IS_COLOCATED_BY_JOIN_KEYS); return hint != null ? Boolean.parseBoolean(hint) : null; } + + /** + * Returns the per-join {@link RuntimeFilterMode} from the {@link #RUNTIME_FILTER} hint, or + * {@code null} when the hint is absent (the cluster/query-level default then applies). + */ + @Nullable + public static RuntimeFilterMode getRuntimeFilterMode(Join join) { + return RuntimeFilterMode.fromHint( + PinotHintStrategyTable.getHintOption(join.getHints(), JOIN_HINT_OPTIONS, RUNTIME_FILTER)); + } + } + + /** + * Mode for the additive INNER-join probe-side runtime filter, resolved from the {@link + * JoinHintOptions#RUNTIME_FILTER} hint or the cluster/query-level default. + */ + public enum RuntimeFilterMode { + OFF, IN, BLOOM, AUTO; + + /** + * Parses a hint value. Returns {@code null} for a {@code null}/empty hint (defer to the default); + * throws on an unrecognized value. + */ + @Nullable + public static RuntimeFilterMode fromHint(@Nullable String hint) { + if (hint == null || hint.isEmpty()) { + return null; + } + switch (hint.toLowerCase()) { + case "off": + case "false": + return OFF; + case "in": + return IN; + case "bloom": + return BLOOM; + case "auto": + case "true": + return AUTO; + default: + throw new IllegalArgumentException("Unsupported runtime_filter hint value: " + hint + + " (expected one of: off, in, bloom, auto)"); + } + } } /** diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/logical/RuntimeFilterRel.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/logical/RuntimeFilterRel.java new file mode 100644 index 000000000000..20d524b0d271 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/logical/RuntimeFilterRel.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.calcite.rel.logical; + +import java.util.List; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.BiRel; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.type.RelDataType; + + +/** + * {@code RuntimeFilterRel} represents an additive probe-side runtime filter for an INNER equi-join. + * + *

It is created by {@code PinotJoinToInnerRuntimeFilterRule} and sits at the top of the probe + * (left) leaf subtree, below the join's left exchange. It is a pass-through for the probe rows (its + * row type equals the probe's), so the inner join keeps running unchanged in its intermediate stage. + * + *

The two inputs mirror the SEMI dynamic-broadcast layout (where the pipeline-breaker receive is the + * second input of the consuming node): + *

    + *
  • {@code left} (input 0) is the probe pipeline.
  • + *
  • {@code right} (input 1) is a {@code PIPELINE_BREAKER} exchange carrying the build-side join + * keys. After fragmentation it becomes a separate stage feeding a pipeline-breaker receive.
  • + *
+ * + *

{@code probeKeys} index this node's (probe) row type; {@code buildKeys} index the build-key + * exchange's row type. They are positionally aligned. {@code filterType} is the resolved reducer + * strategy. The reducer never introduces false negatives, so it can be omitted at any time without + * affecting correctness — the real hash join remains the source of truth. + */ +public class RuntimeFilterRel extends BiRel { + private final List _probeKeys; + private final List _buildKeys; + private final RuntimeFilterType _filterType; + + public RuntimeFilterRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode probe, RelNode buildKeyExchange, + List probeKeys, List buildKeys, RuntimeFilterType filterType) { + super(cluster, traitSet, probe, buildKeyExchange); + _probeKeys = List.copyOf(probeKeys); + _buildKeys = List.copyOf(buildKeys); + _filterType = filterType; + } + + public static RuntimeFilterRel create(RelNode probe, RelNode buildKeyExchange, List probeKeys, + List buildKeys, RuntimeFilterType filterType) { + RelOptCluster cluster = probe.getCluster(); + RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE); + return new RuntimeFilterRel(cluster, traitSet, probe, buildKeyExchange, probeKeys, buildKeys, filterType); + } + + public List getProbeKeys() { + return _probeKeys; + } + + public List getBuildKeys() { + return _buildKeys; + } + + public RuntimeFilterType getFilterType() { + return _filterType; + } + + @Override + protected RelDataType deriveRowType() { + // Pass-through: the probe rows flow through unchanged. + return left.getRowType(); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + assert inputs.size() == 2; + return new RuntimeFilterRel(getCluster(), traitSet, inputs.get(0), inputs.get(1), _probeKeys, _buildKeys, + _filterType); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .item("probeKeys", _probeKeys) + .item("buildKeys", _buildKeys) + .item("filterType", _filterType); + } + + /** + * The tiered reducer strategy for the probe-side runtime filter. + */ + public enum RuntimeFilterType { + IN, BLOOM, AUTO + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToInnerRuntimeFilterRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToInnerRuntimeFilterRule.java new file mode 100644 index 000000000000..934e476e1300 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToInnerRuntimeFilterRule.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.calcite.rel.rules; + +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelDistributions; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.pinot.calcite.rel.hint.PinotHintOptions; +import org.apache.pinot.calcite.rel.hint.PinotHintOptions.RuntimeFilterMode; +import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; +import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType; +import org.apache.pinot.calcite.rel.logical.RuntimeFilterRel; +import org.apache.pinot.calcite.rel.logical.RuntimeFilterRel.RuntimeFilterType; +import org.apache.pinot.spi.utils.CommonConstants; + + +/** + * Special Pinot rule that adds an additive probe-side runtime filter to an equi-INNER-JOIN. + * + *

This is the INNER-join counterpart of {@link PinotJoinToDynamicBroadcastRule}. The SEMI rule + * replaces the join with a leaf {@code IN} filter (sound only because a semi-join emits left + * columns only). An inner join projects both sides, so the join must keep running as a real + * intermediate-stage hash join. This rule therefore keeps the join and both of its exchanges intact + * and only adds a {@link RuntimeFilterRel} on top of the probe (left) leaf subtree, carrying a + * {@code PIPELINE_BREAKER} exchange of the build-side join keys. At runtime the probe leaf + * scan ANDs in a no-false-negative reducer (exact {@code IN} and/or bloom) built from those keys, + * dropping probe rows that cannot match before they are shuffled into the join. + * + *

Before (after exchange insertion): + *

+ *           [ Inner Join ]
+ *           /            \
+ *      [xChange L]    [xChange R]
+ *         /                \
+ *    [probe leaf]      [build subtree]
+ * 
+ * After: + *
+ *           [ Inner Join ]                         (unchanged — still shuffles both sides)
+ *           /            \
+ *      [xChange L]    [xChange R]
+ *         /                \
+ *   [RuntimeFilter]    [build subtree]
+ *      /        \
+ * [probe leaf]  [PIPELINE_BREAKER xChange]
+ *                      |
+ *                [build keys: Project(rightKeys) -> Filter(notNull) -> limit(maxBuildRows + 1)]
+ * 
+ * + *

Disabled by default; enabled per-cluster/query (then defaulting to {@code AUTO}) or per-join via + * the {@code runtime_filter} join hint. Restricted to a leaf-pushable probe (TableScan with optional + * single-in single-out Project/Filter). Multi-key joins use an exact IN per key; the bloom tier is + * single-key only. + */ +public class PinotJoinToInnerRuntimeFilterRule extends RelOptRule { + /** + * Placeholder instance registered in {@code PinotQueryRuleSets#POST_LOGICAL_RULES} to fix this rule's + * position in the post-logical sequence (right after the SEMI dynamic-broadcast rule). + * {@code QueryEnvironment#getTraitProgram} swaps it for a per-query instance carrying the resolved + * enable flag, because a Calcite rule cannot read query options at match time so the cluster/query-level + * enable state must be injected via the constructor. + */ + public static final PinotJoinToInnerRuntimeFilterRule INSTANCE = + new PinotJoinToInnerRuntimeFilterRule(PinotRuleUtils.PINOT_REL_FACTORY, false); + + private final boolean _queryLevelEnabled; + + public PinotJoinToInnerRuntimeFilterRule(RelBuilderFactory factory, boolean queryLevelEnabled) { + super(operand(Join.class, any()), factory, null); + _queryLevelEnabled = queryLevelEnabled; + } + + @Override + public boolean matches(RelOptRuleCall call) { + Join join = call.rel(0); + + // Resolve enablement: an explicit hint wins; otherwise fall back to the cluster/query-level default. + RuntimeFilterMode hintMode = PinotHintOptions.JoinHintOptions.getRuntimeFilterMode(join); + boolean enabled = hintMode != null ? hintMode != RuntimeFilterMode.OFF : _queryLevelEnabled; + if (!enabled) { + return false; + } + + // Lookup joins keep the right table local; a runtime filter does not apply. + if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) { + return false; + } + + // Only equi-INNER joins with at least one equi-key (non-equi conditions are allowed — the join still + // runs, we only reduce the probe by the equi-keys). Multi-key uses an exact IN per key, which is a + // sound reducer: a matching probe row equals some build tuple, so it passes every per-key IN; the + // (less selective) per-key form may admit cross-key false positives, which the real join discards. + JoinInfo joinInfo = join.analyzeCondition(); + if (join.getJoinType() != JoinRelType.INNER || joinInfo.leftKeys.isEmpty()) { + return false; + } + + // Both sides must already be behind exchanges (i.e. exchange insertion has run), and the probe + // (left) subtree must be pushable to a single leaf scan. Reject if already rewritten (idempotence). + RelNode left = PinotRuleUtils.unboxRel(join.getLeft()); + RelNode right = PinotRuleUtils.unboxRel(join.getRight()); + if (!(left instanceof Exchange) || !(right instanceof Exchange)) { + return false; + } + RelNode probe = PinotRuleUtils.unboxRel(left.getInput(0)); + if (probe instanceof RuntimeFilterRel) { + return false; + } + return PinotRuleUtils.canPushRuntimeFilterToLeaf(probe); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Join join = call.rel(0); + Exchange left = (Exchange) PinotRuleUtils.unboxRel(join.getLeft()); + Exchange right = (Exchange) PinotRuleUtils.unboxRel(join.getRight()); + + JoinInfo joinInfo = join.analyzeCondition(); + List probeKeys = joinInfo.leftKeys; + List rightKeys = joinInfo.rightKeys; + + RelNode probe = left.getInput(); + RelNode buildInput = right.getInput(); + + // Build the non-null build-key sub-tree: + // Project(rightKeys) -> Filter(key IS NOT NULL ...) -> limit(maxBuildRows + 1) + // Intentionally no DISTINCT here: a DISTINCT would introduce a plain Calcite LogicalAggregate, which + // is not converted at this point because the aggregate-conversion rules run earlier than this rule, + // and the leaf-side IN/bloom dedups the keys anyway. The leaf fetch cap of maxBuildRows + 1 bounds the + // pipeline-breaker memory at the source; the runtime abandons the filter if the cap was hit. + RelBuilder builder = call.builder(); + builder.push(buildInput); + List keyRefs = new ArrayList<>(rightKeys.size()); + for (int rightKey : rightKeys) { + keyRefs.add(builder.field(rightKey)); + } + builder.project(keyRefs); + List notNullConditions = new ArrayList<>(rightKeys.size()); + for (int i = 0; i < rightKeys.size(); i++) { + notNullConditions.add(builder.isNotNull(builder.field(i))); + } + builder.filter(notNullConditions); + // Cap the build-key broadcast. The leaf abandons the filter if this cap is hit (the truncated key set + // would be incomplete), so both sides MUST use the same constant. + builder.limit(0, CommonConstants.Broker.DEFAULT_RUNTIME_FILTER_MAX_BUILD_ROWS + 1); + RelNode buildKeys = builder.build(); + + // Wrap in a PIPELINE_BREAKER exchange: colocated -> SINGLETON (pre-partitioned), else BROADCAST. + boolean colocatedByJoinKeys = Boolean.TRUE.equals(PinotHintOptions.JoinHintOptions.isColocatedByJoinKeys(join)); + PinotLogicalExchange buildKeyExchange; + if (colocatedByJoinKeys) { + buildKeyExchange = PinotLogicalExchange.create(buildKeys, RelDistributions.SINGLETON, + PinotRelExchangeType.PIPELINE_BREAKER, true); + } else { + buildKeyExchange = PinotLogicalExchange.create(buildKeys, RelDistributions.BROADCAST_DISTRIBUTED, + PinotRelExchangeType.PIPELINE_BREAKER, false); + } + + // The projected build keys occupy positions 0..n-1 in the pipeline-breaker schema. + List buildKeyPositions = new ArrayList<>(rightKeys.size()); + for (int i = 0; i < rightKeys.size(); i++) { + buildKeyPositions.add(i); + } + + RuntimeFilterType filterType = resolveFilterType(join); + RuntimeFilterRel runtimeFilter = + RuntimeFilterRel.create(probe, buildKeyExchange, probeKeys, buildKeyPositions, filterType); + + // Keep the join and both exchanges intact; only swap the left exchange's input with the runtime + // filter wrapper (which is a pass-through for the probe rows). + RelNode newLeft = left.copy(left.getTraitSet(), List.of(runtimeFilter)); + call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), newLeft, join.getRight(), join.getJoinType(), + join.isSemiJoinDone())); + } + + private static RuntimeFilterType resolveFilterType(Join join) { + RuntimeFilterMode hintMode = PinotHintOptions.JoinHintOptions.getRuntimeFilterMode(join); + if (hintMode == null) { + // Enabled via the cluster/query-level default without an explicit mode -> AUTO (tiered). + return RuntimeFilterType.AUTO; + } + switch (hintMode) { + case IN: + return RuntimeFilterType.IN; + case BLOOM: + return RuntimeFilterType.BLOOM; + case AUTO: + return RuntimeFilterType.AUTO; + default: + // OFF never reaches here (matches() returns false). + throw new IllegalStateException("Unexpected runtime filter mode: " + hintMode); + } + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java index 8d05d3b782bc..d9c96f7e1d6e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java @@ -284,6 +284,11 @@ private PinotQueryRuleSets() { // apply dynamic broadcast rule after exchange is inserted PinotJoinToDynamicBroadcastRule.INSTANCE, + // additive probe-side runtime filter for inner joins, after exchange insertion. This placeholder + // instance is swapped for a per-query-configured one in QueryEnvironment#getTraitProgram (it needs + // the resolved enable flag); see PinotJoinToInnerRuntimeFilterRule#INSTANCE. + PinotJoinToInnerRuntimeFilterRule.INSTANCE, + // remove exchanges when there's duplicates PinotExchangeEliminationRule.INSTANCE, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRuleUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRuleUtils.java index 6ed696c5d6b2..d7c70520e0d6 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRuleUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRuleUtils.java @@ -130,6 +130,32 @@ public static boolean canPushDynamicBroadcastToLeaf(RelNode relNode) { } } + /** + * Determines whether an additive INNER-join probe-side runtime filter can be pushed onto the probe + * leaf scan rooted at {@code relNode}. + * + *

Unlike {@link #canPushDynamicBroadcastToLeaf(RelNode)} (used for SEMI joins), this does NOT + * recurse into a nested {@link Join}: a reducer is only sound when the probe subtree represents a + * single relation whose row representation is unchanged (TableScan, optionally with single-in + * single-out Project/Filter). A nested join would introduce a second relation and could let the + * reducer drop rows that should survive, so it is rejected. This implements the long-standing + * INNER-join TODO #4 on {@link #canPushDynamicBroadcastToLeaf(RelNode)} by construction: the + * pipeline-breaker edge is a sibling, never threaded through the probe pipeline, and only probe-side + * columns are ever referenced by the pushed predicate. + */ + public static boolean canPushRuntimeFilterToLeaf(RelNode relNode) { + relNode = PinotRuleUtils.unboxRel(relNode); + if (relNode instanceof TableScan) { + return true; + } else if (relNode instanceof Project || relNode instanceof Filter) { + return canPushRuntimeFilterToLeaf(relNode.getInput(0)); + } else { + // Reject Join/Aggregate/Window/Sort/SetOp/etc. — they change cardinality or introduce a second + // relation, so a probe-side reducer there could drop matching rows. + return false; + } + } + public static String extractFunctionName(RexCall function) { SqlKind funcSqlKind = function.getOperator().getKind(); return funcSqlKind == SqlKind.OTHER_FUNCTION ? function.getOperator().getName() : funcSqlKind.name(); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 5801a0b6e5e9..6e6316675013 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -60,6 +60,7 @@ import org.apache.pinot.calcite.rel.rules.PinotEnrichedJoinRule; import org.apache.pinot.calcite.rel.rules.PinotImplicitTableHintRule; import org.apache.pinot.calcite.rel.rules.PinotJoinToDynamicBroadcastRule; +import org.apache.pinot.calcite.rel.rules.PinotJoinToInnerRuntimeFilterRule; import org.apache.pinot.calcite.rel.rules.PinotRelDistributionTraitRule; import org.apache.pinot.calcite.rel.rules.PinotRuleUtils; import org.apache.pinot.calcite.rel.rules.PinotSortExchangeCopyRule; @@ -216,8 +217,11 @@ private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) { _envConfig.defaultSortExchangeCopyLimit()); boolean usePhysicalOptimizer = QueryOptionsUtils.isUsePhysicalOptimizer(options, _envConfig.defaultUsePhysicalOptimizer()); + boolean enableRuntimeFilterJoin = QueryOptionsUtils.getRuntimeFilterJoinEnabled(options, + _envConfig.defaultEnableRuntimeFilterJoin()); HepProgram traitProgram = getTraitProgram( - workerManager, _envConfig, usePhysicalOptimizer, useRuleSet, sortExchangeCopyLimit); + workerManager, _envConfig, usePhysicalOptimizer, useRuleSet, sortExchangeCopyLimit, + enableRuntimeFilterJoin); SqlExplainFormat format = SqlExplainFormat.DOT; if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) { SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode(); @@ -650,7 +654,8 @@ private static boolean isRuleSkipped(String ruleName, Set skipRuleSet, S } private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, Config config, - boolean usePhysicalOptimizer, Set useRuleSet, int sortExchangeCopyLimit) { + boolean usePhysicalOptimizer, Set useRuleSet, int sortExchangeCopyLimit, + boolean enableRuntimeFilterJoin) { HepProgramBuilder hepProgramBuilder = new HepProgramBuilder(); PinotRuleSet ruleSet = config.getRuleSet(); @@ -660,9 +665,10 @@ private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, // ---- // Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule. if (!usePhysicalOptimizer) { - // POST_LOGICAL list comes from PinotRuleSet; we copy it because we may need to - // swap every PinotSortExchangeCopyRule with one configured for the per-query - // (or broker-config) sortExchangeCopyLimit if it differs from the rule's default. + // POST_LOGICAL list comes from PinotRuleSet; we copy it because a couple of its rules are swapped + // for per-query-configured instances (the rule classes themselves are positioned in PinotRuleSet): + // - PinotSortExchangeCopyRule, configured with the per-query/broker sortExchangeCopyLimit; + // - PinotJoinToInnerRuntimeFilterRule, configured with the resolved runtime-filter enable flag. List postLogical = new ArrayList<>(ruleSet.rulesFor(Phase.POST_LOGICAL)); if (sortExchangeCopyLimit != PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.config.getFetchLimitThreshold()) { PinotSortExchangeCopyRule overridden = ImmutablePinotSortExchangeCopyRule.Config.builder() @@ -672,6 +678,8 @@ private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, .toRule(); postLogical.replaceAll(r -> r instanceof PinotSortExchangeCopyRule ? overridden : r); } + postLogical.replaceAll(r -> r instanceof PinotJoinToInnerRuntimeFilterRule + ? new PinotJoinToInnerRuntimeFilterRule(PinotRuleUtils.PINOT_REL_FACTORY, enableRuntimeFilterJoin) : r); for (RelOptRule relOptRule : postLogical) { if (isEligibleQueryPostRule(relOptRule, config)) { hepProgramBuilder.addRuleInstance(relOptRule); @@ -826,6 +834,17 @@ default boolean defaultEnableDynamicFilteringSemiJoin() { return CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN; } + /** + * Whether the additive INNER-join probe-side runtime filter is enabled by default. Disabled by + * default; can be overridden per-query by {@link + * CommonConstants.Broker.Request.QueryOptionKey#RUNTIME_FILTER_JOIN} or force-enabled per-join via + * the {@code runtime_filter} join hint. + */ + @Value.Default + default boolean defaultEnableRuntimeFilterJoin() { + return CommonConstants.Broker.DEFAULT_ENABLE_RUNTIME_FILTER_JOIN; + } + /** * Whether to use physical optimizer by default. * diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java index 4624bc067ef4..895b4efe5b77 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java @@ -36,6 +36,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -235,6 +236,11 @@ public PlanNode visitUnnest(UnnestNode node, Void context) { return defaultNode(node); } + @Override + public PlanNode visitRuntimeFilter(RuntimeFilterNode node, Void context) { + return defaultNode(node); + } + private List simplifyChildren(List children) { List simplifiedChildren = null; for (int i = 0; i < children.size(); i++) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java index 2d95b7568e4f..07452cd6726a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java @@ -39,6 +39,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -177,6 +178,14 @@ public StringBuilder visitEnrichedJoin(EnrichedJoinNode node, Context context) { return context._builder; } + @Override + public StringBuilder visitRuntimeFilter(RuntimeFilterNode node, Context context) { + appendInfo(node, context).append('\n'); + node.getInputs().get(0).visit(this, context.next(true, context._host, context._workerId)); + node.getInputs().get(1).visit(this, context.next(false, context._host, context._workerId)); + return context._builder; + } + @Override public StringBuilder visitMailboxReceive(MailboxReceiveNode node, Context context) { appendInfo(node, context).append('\n'); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java index b9af3463ea0f..8719266c8b4d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java @@ -41,6 +41,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -231,6 +232,29 @@ public PlanNode visitJoin(JoinNode node, PlanNode context) { return node.withInputs(children); } + @Nullable + @Override + public PlanNode visitRuntimeFilter(RuntimeFilterNode node, PlanNode context) { + if (context.getClass() != RuntimeFilterNode.class) { + return null; + } + RuntimeFilterNode otherNode = (RuntimeFilterNode) context; + if (!node.getProbeKeys().equals(otherNode.getProbeKeys())) { + return null; + } + if (!node.getBuildKeys().equals(otherNode.getBuildKeys())) { + return null; + } + if (node.getType() != otherNode.getType()) { + return null; + } + List children = mergeChildren(node, context); + if (children == null) { + return null; + } + return node.withInputs(children); + } + @Nullable @Override public PlanNode visitEnrichedJoin(EnrichedJoinNode node, PlanNode context) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeSorter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeSorter.java index 86fccdff0279..506c3fbcbe53 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeSorter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeSorter.java @@ -35,6 +35,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -161,6 +162,11 @@ public PlanNode visitUnnest(UnnestNode node, Comparator comparator) { return defaultNode(node, comparator); } + @Override + public PlanNode visitRuntimeFilter(RuntimeFilterNode node, Comparator comparator) { + return defaultNode(node, comparator); + } + private List applyToChildren(List children, Comparator comparator) { List simplifiedChildren = null; for (int i = 0; i < children.size(); i++) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java index f8b099831a70..71d2d596bfa8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java @@ -32,6 +32,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -260,6 +261,18 @@ public Boolean visitJoin(JoinNode node1, PlanNode node2) { && node1.getJoinStrategy() == that.getJoinStrategy(); } + @Override + public Boolean visitRuntimeFilter(RuntimeFilterNode node1, PlanNode node2) { + if (!(node2 instanceof RuntimeFilterNode)) { + return false; + } + RuntimeFilterNode that = (RuntimeFilterNode) node2; + return areBaseNodesEquivalent(node1, node2) + && Objects.equals(node1.getProbeKeys(), that.getProbeKeys()) + && Objects.equals(node1.getBuildKeys(), that.getBuildKeys()) + && node1.getType() == that.getType(); + } + @Override public Boolean visitEnrichedJoin(EnrichedJoinNode node1, PlanNode node2) { if (!(node2 instanceof EnrichedJoinNode)) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java index 3ecb4c06cd9e..13650132d204 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java @@ -39,6 +39,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -133,6 +134,11 @@ public PlanNode visitEnrichedJoin(EnrichedJoinNode node, Context context) { return visitJoin(node, context); } + @Override + public PlanNode visitRuntimeFilter(RuntimeFilterNode node, Context context) { + return process(node, context); + } + @Override public PlanNode visitMailboxReceive(MailboxReceiveNode node, Context context) { throw new UnsupportedOperationException("MailboxReceiveNode should not be visited by PlanNodeFragmenter"); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java index eb9b2228a638..c0574a75a3b7 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java @@ -64,6 +64,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -209,6 +210,17 @@ public Void visitEnrichedJoin(EnrichedJoinNode node, Void context) { return null; } + @Override + public Void visitRuntimeFilter(RuntimeFilterNode node, Void context) { + visitChildren(node); + + List inputs = readAlreadyPushedChildren(node); + PinotExplainedRelNode explained = new PinotExplainedRelNode(_builder.getCluster(), + "RuntimeFilter", Map.of(), node.getDataSchema(), inputs); + _builder.push(explained); + return null; + } + @Override public Void visitMailboxReceive(MailboxReceiveNode node, Void context) { visitChildren(node); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index 66271d5c0067..77bff8a68be8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -65,6 +65,7 @@ import org.apache.pinot.calcite.rel.logical.PinotLogicalSortExchange; import org.apache.pinot.calcite.rel.logical.PinotLogicalTableScan; import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType; +import org.apache.pinot.calcite.rel.logical.RuntimeFilterRel; import org.apache.pinot.calcite.rel.rules.PinotRuleUtils; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; @@ -81,6 +82,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNode.NodeHint; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -153,6 +155,8 @@ public PlanNode toPlanNode(RelNode node) { result = convertLogicalAggregate((PinotLogicalAggregate) node); } else if (node instanceof LogicalSort) { result = convertLogicalSort((LogicalSort) node); + } else if (node instanceof RuntimeFilterRel) { + result = convertRuntimeFilter((RuntimeFilterRel) node); } else if (node instanceof Exchange) { result = convertLogicalExchange((Exchange) node); } else if (node instanceof PinotLogicalEnrichedJoin) { @@ -623,6 +627,25 @@ private ExchangeNode convertLogicalExchange(Exchange node) { _hashFunction); } + private RuntimeFilterNode convertRuntimeFilter(RuntimeFilterRel node) { + return new RuntimeFilterNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), null, + convertInputs(node.getInputs()), node.getProbeKeys(), node.getBuildKeys(), + convertRuntimeFilterType(node.getFilterType())); + } + + private static RuntimeFilterNode.Type convertRuntimeFilterType(RuntimeFilterRel.RuntimeFilterType filterType) { + switch (filterType) { + case IN: + return RuntimeFilterNode.Type.IN; + case BLOOM: + return RuntimeFilterNode.Type.BLOOM; + case AUTO: + return RuntimeFilterNode.Type.AUTO; + default: + throw new IllegalStateException("Unsupported RuntimeFilterType: " + filterType); + } + } + private SetOpNode convertLogicalSetOp(SetOp node) { return new SetOpNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()), convertInputs(node.getInputs()), SetOpNode.SetOpType.fromObject(node), node.all); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java index bbd8f6c1ccf3..bec3064ebc30 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java @@ -36,6 +36,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -86,6 +87,11 @@ public PlanNode visitEnrichedJoin(EnrichedJoinNode node, Context context) { return visitJoin(node, context); } + @Override + public PlanNode visitRuntimeFilter(RuntimeFilterNode node, Context context) { + return process(node, context); + } + @Override public PlanNode visitMailboxReceive(MailboxReceiveNode node, Context context) { throw new UnsupportedOperationException("MailboxReceiveNode should not be visited by StageFragmenter"); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java index ada9d1745035..01b0d5cc31b8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java @@ -39,6 +39,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -122,6 +123,13 @@ public Void visitEnrichedJoin(EnrichedJoinNode node, DispatchablePlanContext con return null; } + @Override + public Void visitRuntimeFilter(RuntimeFilterNode node, DispatchablePlanContext context) { + node.getInputs().forEach(input -> input.visit(this, context)); + getOrCreateDispatchablePlanMetadata(node, context); + return null; + } + @Override public Void visitMailboxReceive(MailboxReceiveNode node, DispatchablePlanContext context) { node.getSender().visit(this, context); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/DefaultPostOrderTraversalVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/DefaultPostOrderTraversalVisitor.java index e1cbf62aec04..e7491cb310a5 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/DefaultPostOrderTraversalVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/DefaultPostOrderTraversalVisitor.java @@ -113,4 +113,14 @@ public T visitUnnest(UnnestNode node, C context) { node.getInputs().get(0).visit(this, context); return process(node, context); } + + @Override + public T visitRuntimeFilter(RuntimeFilterNode node, C context) { + // input[0] is the probe pipeline, input[1] is the pipeline-breaker receive carrying the build keys. + // Both must be traversed so the pipeline breaker is discovered and its mailbox assigned, mirroring + // how visitJoin traverses the SEMI dynamic-broadcast receive. + node.getInputs().get(0).visit(this, context); + node.getInputs().get(1).visit(this, context); + return process(node, context); + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java index ec89c854ba79..07d7b7cf2ab6 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java @@ -68,6 +68,8 @@ public interface PlanNodeVisitor { T visitUnnest(UnnestNode node, C context); + T visitRuntimeFilter(RuntimeFilterNode node, C context); + /** * A depth-first visitor that visits all children of a node before visiting the node itself. * @@ -250,5 +252,12 @@ public T visitUnnest(UnnestNode node, C context) { visitChildren(node, context); return postChildren(node, context); } + + @Override + public T visitRuntimeFilter(RuntimeFilterNode node, C context) { + preChildren(node, context); + visitChildren(node, context); + return postChildren(node, context); + } } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/RuntimeFilterNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/RuntimeFilterNode.java new file mode 100644 index 000000000000..da37d1b58dc1 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/RuntimeFilterNode.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.plannode; + +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.DataSchema; + + +/** + * {@code RuntimeFilterNode} carries a probe-side runtime filter for an INNER equi-join. + * + *

It is the additive counterpart of the SEMI-join dynamic broadcast: instead of replacing the join + * with a leaf filter, the inner join keeps running in its intermediate stage while this node, placed at + * the top of the probe (left) leaf stage, builds a reducer from the build (right) side's join keys and + * pushes it onto the probe leaf scan. Rows that cannot match are dropped before they are + * shuffled into the join. + * + *

Structure (mirrors the SEMI dynamic-broadcast layout, where the pipeline-breaker receive is the + * second input of the consuming node): + *

    + *
  • {@code input[0]} is the probe pipeline (table scan, optionally projected/filtered). The node is + * a pass-through for these rows, so its {@link #getDataSchema() data schema} equals the probe + * pipeline's output schema.
  • + *
  • {@code input[1]} is a {@link MailboxReceiveNode} reading a {@code PIPELINE_BREAKER} exchange that + * carries the build-side join keys.
  • + *
+ * + *

{@code probeKeys} index this node's (probe) row type; {@code buildKeys} index {@code input[1]}'s + * (build-key) row type. {@code type} is the resolved reducer strategy. + * + *

The reducer never introduces false negatives, so the node can always be dropped (empty build, + * over-threshold, unsupported leaf, mixed-version) without affecting correctness — the real hash join + * remains the source of truth. + * + *

This class is immutable. + */ +public class RuntimeFilterNode extends BasePlanNode { + private final List _probeKeys; + private final List _buildKeys; + private final Type _type; + + public RuntimeFilterNode(int stageId, DataSchema dataSchema, @Nullable NodeHint nodeHint, List inputs, + List probeKeys, List buildKeys, Type type) { + super(stageId, dataSchema, nodeHint, inputs); + _probeKeys = List.copyOf(probeKeys); + _buildKeys = List.copyOf(buildKeys); + _type = type; + } + + /** + * Indexes into this node's (probe) row type identifying the columns the reducer filters on. + */ + public List getProbeKeys() { + return _probeKeys; + } + + /** + * Indexes into {@code input[1]}'s (build-key) row type identifying the columns the reducer is built + * from. Aligned positionally with {@link #getProbeKeys()}. + */ + public List getBuildKeys() { + return _buildKeys; + } + + /** + * The resolved reducer strategy. + */ + public Type getType() { + return _type; + } + + @Override + public String explain() { + return "RUNTIME_FILTER(type=" + _type + ", probeKeys=" + _probeKeys + ", buildKeys=" + _buildKeys + ")"; + } + + @Override + public T visit(PlanNodeVisitor visitor, C context) { + return visitor.visitRuntimeFilter(this, context); + } + + @Override + public PlanNode withInputs(List inputs) { + return new RuntimeFilterNode(_stageId, _dataSchema, _nodeHint, inputs, _probeKeys, _buildKeys, _type); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof RuntimeFilterNode)) { + return false; + } + if (!super.equals(o)) { + return false; + } + RuntimeFilterNode that = (RuntimeFilterNode) o; + return _type == that._type && Objects.equals(_probeKeys, that._probeKeys) && Objects.equals(_buildKeys, + that._buildKeys); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), _probeKeys, _buildKeys, _type); + } + + /** + * The tiered reducer strategy for the probe-side runtime filter. + *

    + *
  • {@link #IN} — always emit an exact {@code IN} predicate.
  • + *
  • {@link #BLOOM} — always emit a bloom predicate.
  • + *
  • {@link #AUTO} — emit an exact {@code IN} at/below a build-key-row threshold, else a bloom.
  • + *
+ */ + public enum Type { + IN, BLOOM, AUTO + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java index c91e46d8f5fb..bcf698730856 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java @@ -42,6 +42,7 @@ import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -84,6 +85,8 @@ public static PlanNode process(Plan.PlanNode protoNode) { return deserializeEnrichedJoinNode(protoNode); case UNNESTNODE: return deserializeUnnestNode(protoNode); + case RUNTIMEFILTERNODE: + return deserializeRuntimeFilterNode(protoNode); default: throw new IllegalStateException("Unsupported PlanNode type: " + protoNode.getNodeCase()); } @@ -255,6 +258,26 @@ private static UnnestNode deserializeUnnestNode(Plan.PlanNode protoNode) { extractInputs(protoNode), arrayExprs, context); } + private static RuntimeFilterNode deserializeRuntimeFilterNode(Plan.PlanNode protoNode) { + Plan.RuntimeFilterNode protoRuntimeFilterNode = protoNode.getRuntimeFilterNode(); + return new RuntimeFilterNode(protoNode.getStageId(), extractDataSchema(protoNode), extractNodeHint(protoNode), + extractInputs(protoNode), protoRuntimeFilterNode.getProbeKeysList(), protoRuntimeFilterNode.getBuildKeysList(), + convertRuntimeFilterType(protoRuntimeFilterNode.getFilterType())); + } + + private static RuntimeFilterNode.Type convertRuntimeFilterType(Plan.RuntimeFilterType filterType) { + switch (filterType) { + case IN: + return RuntimeFilterNode.Type.IN; + case BLOOM: + return RuntimeFilterNode.Type.BLOOM; + case AUTO: + return RuntimeFilterNode.Type.AUTO; + default: + throw new IllegalStateException("Unsupported RuntimeFilterType: " + filterType); + } + } + private static DataSchema extractDataSchema(Plan.DataSchema protoDataSchema) { String[] columnNames = protoDataSchema.getColumnNamesList().toArray(new String[0]); int numColumns = columnNames.length; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java index f2283ed09f51..68d2c96e7e07 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java @@ -42,6 +42,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode.NodeHint; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -280,6 +281,17 @@ public Void visitExplained(ExplainedNode node, Plan.PlanNode.Builder builder) { return null; } + @Override + public Void visitRuntimeFilter(RuntimeFilterNode node, Plan.PlanNode.Builder builder) { + Plan.RuntimeFilterNode runtimeFilterNode = Plan.RuntimeFilterNode.newBuilder() + .addAllProbeKeys(node.getProbeKeys()) + .addAllBuildKeys(node.getBuildKeys()) + .setFilterType(convertRuntimeFilterType(node.getType())) + .build(); + builder.setRuntimeFilterNode(runtimeFilterNode); + return null; + } + @Override public Void visitUnnest(UnnestNode node, Plan.PlanNode.Builder builder) { UnnestNode.TableFunctionContext context = node.getTableFunctionContext(); @@ -469,6 +481,19 @@ private static List convertLiteralRows(List e.visit(this, isIntermediateStage)); + return null; + } + @Override public Void visitMailboxReceive(MailboxReceiveNode node, Boolean isIntermediateStage) { node.getInputs().forEach(e -> e.visit(this, isIntermediateStage)); diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/PinotJoinToInnerRuntimeFilterRuleTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/PinotJoinToInnerRuntimeFilterRuleTest.java new file mode 100644 index 000000000000..8fc9fdcb1365 --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/PinotJoinToInnerRuntimeFilterRuleTest.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelDistribution; +import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.core.routing.MockRoutingManagerFactory; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.query.planner.PlanFragment; +import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; +import org.apache.pinot.query.planner.physical.DispatchableSubPlan; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; +import org.apache.pinot.query.planner.serde.PlanNodeDeserializer; +import org.apache.pinot.query.planner.serde.PlanNodeSerializer; +import org.apache.pinot.query.routing.WorkerManager; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + + +/** + * Planner tests for {@link org.apache.pinot.calcite.rel.rules.PinotJoinToInnerRuntimeFilterRule} — the + * additive INNER-join probe-side runtime filter. Validates when the rule fires, the resulting plan + * shape (join + both exchanges preserved, an extra pipeline-breaker edge added), serde round-trips, and + * the negative cases (default off, off-hint, outer join, non-leaf probe). + */ +public class PinotJoinToInnerRuntimeFilterRuleTest extends QueryEnvironmentTestBase { + + private static List collectAllNodes(DispatchableSubPlan plan) { + List all = new ArrayList<>(); + for (DispatchablePlanFragment fragment : plan.getQueryStages()) { + PlanFragment planFragment = fragment.getPlanFragment(); + if (planFragment != null && planFragment.getFragmentRoot() != null) { + collect(planFragment.getFragmentRoot(), all); + } + } + return all; + } + + private static void collect(PlanNode node, List out) { + out.add(node); + for (PlanNode input : node.getInputs()) { + collect(input, out); + } + } + + private static RuntimeFilterNode findRuntimeFilter(DispatchableSubPlan plan) { + for (PlanNode node : collectAllNodes(plan)) { + if (node instanceof RuntimeFilterNode) { + return (RuntimeFilterNode) node; + } + } + return null; + } + + private static boolean hasJoin(DispatchableSubPlan plan) { + return collectAllNodes(plan).stream().anyMatch(n -> n instanceof JoinNode); + } + + @Test + public void testRuleFiresWithHintAndPreservesJoinAndPipelineBreaker() { + DispatchableSubPlan plan = _queryEnvironment.planQuery( + "SELECT /*+ joinOptions(runtime_filter='in') */ a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3"); + RuntimeFilterNode runtimeFilter = findRuntimeFilter(plan); + assertNotNull(runtimeFilter, "expected a RuntimeFilterNode for an INNER join with runtime_filter hint"); + // The real hash join must still be present (the filter is additive, not a replacement). + assertTrue(hasJoin(plan), "the inner join must be preserved"); + // input[1] must be a PIPELINE_BREAKER receive carrying the build keys. + assertEquals(runtimeFilter.getInputs().size(), 2); + PlanNode buildInput = runtimeFilter.getInputs().get(1); + assertTrue(buildInput instanceof MailboxReceiveNode, "RuntimeFilterNode input[1] must be a MailboxReceiveNode"); + assertEquals(((MailboxReceiveNode) buildInput).getExchangeType(), PinotRelExchangeType.PIPELINE_BREAKER); + // Single equi-key: one probe key, one build key. + assertEquals(runtimeFilter.getProbeKeys().size(), 1); + assertEquals(runtimeFilter.getBuildKeys().size(), 1); + assertEquals(runtimeFilter.getType(), RuntimeFilterNode.Type.IN); + // Pin the probe-key -> leaf-select-list alignment by VALUE (not just count): the probe key must point + // at the join-key column (col3) in the probe output schema, NOT blindly at position 0 (the query + // selects col1, joins on col3). The build key is the projected key at position 0. + assertEquals(runtimeFilter.getDataSchema().getColumnName(runtimeFilter.getProbeKeys().get(0)), "col3"); + assertEquals(runtimeFilter.getBuildKeys().get(0).intValue(), 0); + } + + @Test + public void testAutoHintYieldsAutoType() { + DispatchableSubPlan plan = _queryEnvironment.planQuery( + "SELECT /*+ joinOptions(runtime_filter='auto') */ a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3"); + RuntimeFilterNode runtimeFilter = findRuntimeFilter(plan); + assertNotNull(runtimeFilter); + assertEquals(runtimeFilter.getType(), RuntimeFilterNode.Type.AUTO); + } + + @Test + public void testBloomHintYieldsBloomType() { + DispatchableSubPlan plan = _queryEnvironment.planQuery( + "SELECT /*+ joinOptions(runtime_filter='bloom') */ a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3"); + RuntimeFilterNode runtimeFilter = findRuntimeFilter(plan); + assertNotNull(runtimeFilter); + assertEquals(runtimeFilter.getType(), RuntimeFilterNode.Type.BLOOM); + } + + @Test + public void testSerdeRoundTripWithRuntimeFilter() { + DispatchableSubPlan plan = _queryEnvironment.planQuery( + "SELECT /*+ joinOptions(runtime_filter='bloom') */ a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3"); + // Sanity: the plan really contains the new node, so this exercises its serde. + assertNotNull(findRuntimeFilter(plan)); + for (DispatchablePlanFragment fragment : plan.getQueryStages()) { + PlanNode stagePlan = fragment.getPlanFragment().getFragmentRoot(); + PlanNode roundTripped = PlanNodeDeserializer.process(PlanNodeSerializer.process(stagePlan)); + assertEquals(roundTripped, stagePlan); + } + } + + @Test + public void testNonColocatedUsesBroadcastPipelineBreaker() { + // Default (non-colocated) path: the build keys are BROADCAST to the probe leaf. The colocated + // (is_colocated_by_join_keys) path emits a SINGLETON pre-partitioned pipeline breaker exactly like + // PinotJoinToDynamicBroadcastRule; exercising it end-to-end requires real table colocation (matching + // sender/receiver worker counts), so it is covered by the shared dynamic-broadcast machinery. + DispatchableSubPlan plan = _queryEnvironment.planQuery( + "SELECT /*+ joinOptions(runtime_filter='in') */ a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3"); + MailboxReceiveNode pb = (MailboxReceiveNode) findRuntimeFilter(plan).getInputs().get(1); + assertEquals(pb.getExchangeType(), PinotRelExchangeType.PIPELINE_BREAKER); + assertEquals(pb.getDistributionType(), RelDistribution.Type.BROADCAST_DISTRIBUTED); + } + + @Test + public void testMultiKeyUsesRuntimeFilter() { + DispatchableSubPlan plan = _queryEnvironment.planQuery( + "SELECT /*+ joinOptions(runtime_filter='in') */ a.col1, b.col2 FROM a JOIN b " + + "ON a.col3 = b.col3 AND a.col7 = b.col7"); + RuntimeFilterNode runtimeFilter = findRuntimeFilter(plan); + assertNotNull(runtimeFilter, "multi-key INNER join should still get a runtime filter (exact IN per key)"); + assertEquals(runtimeFilter.getProbeKeys().size(), 2); + assertEquals(runtimeFilter.getBuildKeys().size(), 2); + assertTrue(hasJoin(plan)); + } + + @Test + public void testNoRuntimeFilterByDefault() { + DispatchableSubPlan plan = + _queryEnvironment.planQuery("SELECT a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3"); + assertNull(findRuntimeFilter(plan)); + } + + @Test + public void testOffHintDisablesRuntimeFilter() { + DispatchableSubPlan plan = _queryEnvironment.planQuery( + "SELECT /*+ joinOptions(runtime_filter='off') */ a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3"); + assertNull(findRuntimeFilter(plan)); + } + + @Test + public void testNoRuntimeFilterForLeftJoin() { + DispatchableSubPlan plan = _queryEnvironment.planQuery( + "SELECT /*+ joinOptions(runtime_filter='in') */ a.col1, b.col2 FROM a LEFT JOIN b ON a.col3 = b.col3"); + assertNull(findRuntimeFilter(plan), "runtime filter must only apply to INNER joins"); + } + + @Test + public void testNoRuntimeFilterForNonLeafProbe() { + // The probe (left) side is a GROUP BY subquery (not leaf-pushable), so the filter cannot be pushed. + DispatchableSubPlan plan = _queryEnvironment.planQuery( + "SELECT /*+ joinOptions(runtime_filter='in') */ x.col3, b.col2 " + + "FROM (SELECT col3 FROM a GROUP BY col3) x JOIN b ON x.col3 = b.col3"); + assertNull(findRuntimeFilter(plan), "runtime filter must not be pushed past a non-leaf (aggregate) probe"); + } + + @Test + public void testClusterFlagEnablesRuntimeFilterWithoutHint() { + QueryEnvironment flagEnabledEnv = runtimeFilterEnabledEnvironment(); + DispatchableSubPlan plan = + flagEnabledEnv.planQuery("SELECT a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3"); + RuntimeFilterNode runtimeFilter = findRuntimeFilter(plan); + assertNotNull(runtimeFilter, "cluster flag should enable the runtime filter without a hint"); + // No explicit hint -> AUTO (tiered) is the default mode. + assertEquals(runtimeFilter.getType(), RuntimeFilterNode.Type.AUTO); + } + + @Test + public void testQueryOptionCanDisableEvenWithFlagOn() { + QueryEnvironment flagEnabledEnv = runtimeFilterEnabledEnvironment(); + DispatchableSubPlan plan = flagEnabledEnv.planQuery( + "SET runtimeFilterJoin = 'off'; SELECT a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3"); + assertNull(findRuntimeFilter(plan), "query option 'off' should disable the runtime filter even with flag on"); + } + + /** + * Builds a {@link QueryEnvironment} identical to the base one but with the runtime-filter-join cluster + * default enabled. + */ + private static QueryEnvironment runtimeFilterEnabledEnvironment() { + MockRoutingManagerFactory factory = new MockRoutingManagerFactory(1, 2); + for (Map.Entry entry : TABLE_SCHEMAS.entrySet()) { + factory.registerTable(entry.getValue(), entry.getKey()); + } + for (Map.Entry> entry : SERVER1_SEGMENTS.entrySet()) { + for (String segment : entry.getValue()) { + factory.registerSegment(1, entry.getKey(), segment); + } + } + for (Map.Entry> entry : SERVER2_SEGMENTS.entrySet()) { + for (String segment : entry.getValue()) { + factory.registerSegment(2, entry.getKey(), segment); + } + } + RoutingManager routingManager = factory.buildRoutingManager(null); + TableCache tableCache = factory.buildTableCache(); + return new QueryEnvironment(QueryEnvironment.configBuilder() + .requestId(-1L) + .database(CommonConstants.DEFAULT_DATABASE) + .tableCache(tableCache) + .workerManager(new WorkerManager("Broker_localhost", "localhost", 3, routingManager)) + .defaultEnableRuntimeFilterJoin(true) + .build()); + } +} diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializerTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializerTest.java new file mode 100644 index 000000000000..72b5e2931bdc --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializerTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.serde; + +import org.apache.pinot.common.proto.Plan; +import org.testng.annotations.Test; + +import static org.testng.Assert.expectThrows; + + +public class PlanNodeDeserializerTest { + + /** + * A {@link Plan.PlanNode} whose {@code node} oneof is unset deserializes to the {@code default} branch + * and throws. This pins the mixed-version contract for the new {@code RuntimeFilterNode} variant (proto + * tag 19): an older server that does not know the variant lands in this same default branch and fails + * the query gracefully (a per-query error, caught by the query runner) rather than misbehaving. The + * default-OFF {@code pinot.broker.enable.runtime.filter.join} flag is the guard that prevents this from + * happening on a not-yet-fully-upgraded cluster. + */ + @Test + public void testUnknownNodeCaseThrowsIllegalState() { + Plan.PlanNode protoNode = Plan.PlanNode.newBuilder().setStageId(0).build(); + expectThrows(IllegalStateException.class, () -> PlanNodeDeserializer.process(protoNode)); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java index cbe9a870722f..7e3d88fe1a2d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java @@ -41,6 +41,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -198,6 +199,12 @@ private MailboxReceiveNode getPipelineBreakerNode(BasePlanNode node) { return (MailboxReceiveNode) planNode; } } + } else if (currentNode instanceof RuntimeFilterNode) { + // The additive INNER-join runtime filter carries its build-key pipeline breaker as input[1]. + PlanNode planNode = currentNode.getInputs().get(1); + if (planNode instanceof MailboxReceiveNode) { + return (MailboxReceiveNode) planNode; + } } nodeStack.addAll(currentNode.getInputs()); } @@ -377,6 +384,27 @@ public ObjectNode visitUnnest(UnnestNode node, Context context) { return recursiveCase(node, MultiStageOperator.Type.UNNEST, context); } + @Override + public ObjectNode visitRuntimeFilter(RuntimeFilterNode node, Context context) { + // The probe leaf stage collapses RuntimeFilterNode (and the probe pipeline below input[0]) into a + // single LEAF operator at execution; its build-key pipeline breaker (input[1]) contributes a + // separate child sub-tree. This mirrors the SEMI dynamic-broadcast LEAF handling in recursiveCase, + // where the boundary node's actual operator type is LEAF rather than the node's own type. + OperatorTypeDescriptor type = _stageStats.getOperatorType(_index); + if (type == MultiStageOperator.Type.LEAF) { + int selfIndex = _index; + ObjectNode pipelineBreakerResultNode = extractPipelineBreakerResult(node, context); + if (pipelineBreakerResultNode != null) { + return selfNode( + MultiStageOperator.Type.LEAF, context, selfIndex, new JsonNode[] {pipelineBreakerResultNode}, false); + } + return selfNode(MultiStageOperator.Type.LEAF, context, _index, new JsonNode[0]); + } + // Not collapsed into a leaf (not expected for the additive INNER runtime filter, which always sits + // atop a leaf-pushable probe). Fall back to visiting the probe pipeline transparently. + return node.getInputs().get(0).visit(this, context); + } + public static class Context { private final int _parallelism; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java index f8ee41aae95e..641537289687 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java @@ -36,6 +36,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -383,5 +384,17 @@ public MultiStageOperator visitUnnest(UnnestNode node, OpChainExecutionContext c return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, e.getMessage(), child); } } + + @Override + public MultiStageOperator visitRuntimeFilter(RuntimeFilterNode node, + OpChainExecutionContext context) { + // RuntimeFilterNode is normally collapsed into the LeafOperator (it sits at the leaf-stage + // boundary, intercepted in visit() above). It is only dispatched here when the leaf-stage boundary + // landed strictly inside the probe subtree (e.g. a Filter->Project->Filter->TableScan probe). In + // that case the node is a pure pass-through for the probe rows (input[1] is the build-key pipeline + // breaker, consumed separately at leaf compile), so build the op-chain from input[0] only. The + // runtime filter is a droppable optimization, so an otherwise-valid query must still succeed. + return visit(node.getInputs().get(0), context); + } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index 3e3c64ccedfe..9c69ea4f2cb2 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.query.runtime.plan.server; import com.google.common.base.Preconditions; +import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; @@ -30,6 +31,7 @@ import javax.annotation.Nullable; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.function.TransformFunctionType; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.Expression; @@ -45,8 +47,11 @@ import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.utils.idset.IdSet; +import org.apache.pinot.core.query.utils.idset.IdSets; import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo; import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.StagePlan; import org.apache.pinot.query.runtime.operator.MultiStageOperator; @@ -309,6 +314,300 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t } } + /** + * Attach an additive INNER-join probe-side runtime filter to the given leaf {@link PinotQuery}. + * + *

{@code probeKeys} index the leaf select list (which equals the probe pipeline's output row type), + * {@code buildKeys} index {@code dataSchema} (the build-side join keys). The reducer never + * introduces false negatives, so the real hash join in the intermediate stage remains the source of + * truth and the filter can be omitted at any time without affecting correctness. + * + *

Tiering (the bloom tier is single-key only): + *

    + *
  • {@code IN} — always an exact {@code IN} list (index-accelerated, drives segment pruning).
  • + *
  • {@code BLOOM} — a serialized bloom ({@code IN_ID_SET}) plus a {@code BETWEEN(min, max)} + * range predicate (for numeric keys) to enable cheap range-based segment pruning.
  • + *
  • {@code AUTO} — exact {@code IN} at/below {@code maxInSize} build-key rows, else bloom.
  • + *
+ * Build sides larger than {@code maxBuildRows}, or any predicate (bloom or exact {@code IN}) whose + * estimated serialized size exceeds {@code maxBytes}, are abandoned (no filter), which is always correct. + */ + static void attachRuntimeFilter(PinotQuery pinotQuery, List probeKeys, List buildKeys, + List dataContainer, DataSchema dataSchema, RuntimeFilterNode.Type type) { + attachRuntimeFilter(pinotQuery, probeKeys, buildKeys, dataContainer, dataSchema, type, + CommonConstants.Broker.DEFAULT_RUNTIME_FILTER_MAX_IN_SIZE, CommonConstants.Broker.DEFAULT_RUNTIME_FILTER_FPP, + CommonConstants.Broker.DEFAULT_RUNTIME_FILTER_MAX_BYTES, + CommonConstants.Broker.DEFAULT_RUNTIME_FILTER_MAX_BUILD_ROWS); + } + + /** + * Sizing-parameterized variant (package-private for testing). {@code maxBuildRows} MUST equal the value + * the planner used for its leaf fetch cap (both use + * {@link CommonConstants.Broker#DEFAULT_RUNTIME_FILTER_MAX_BUILD_ROWS}); see the truncation note below. + */ + static void attachRuntimeFilter(PinotQuery pinotQuery, List probeKeys, List buildKeys, + List dataContainer, DataSchema dataSchema, RuntimeFilterNode.Type type, int maxInSize, double fpp, + int maxBytes, int maxBuildRows) { + // CORRECTNESS: the planner caps the build-key stage at maxBuildRows + 1, which TRUNCATES the key set. + // If the cap was hit (rawCount > maxBuildRows) the set is incomplete, so a reducer built from it could + // drop probe rows that should join (false negative). Abandon the filter — the join is still correct. + // This threshold MUST match the planner's fetch cap. + if (dataContainer.size() > maxBuildRows) { + return; + } + // Drop build rows with any null join key: a null key never matches an INNER equi-join, so excluding + // it is sound (no false negatives), and it keeps the leaf builders (which cast unconditionally) + // null-safe regardless of how the planner's IS NOT NULL filter behaved under null handling. + List rows = retainNonNullKeyRows(dataContainer, buildKeys); + int rowCount = rows.size(); + + // Empty (or all-null) build side: nothing can match -> a constant-false predicate prunes the probe. + if (rowCount == 0) { + attachDynamicFilter(pinotQuery, probeKeys, buildKeys, rows, dataSchema); + return; + } + + // Decide the tier. The bloom tier is single-key only; BIG_DECIMAL is not supported by IdSet. + boolean useBloom; + switch (type) { + case IN: + useBloom = false; + break; + case BLOOM: + useBloom = buildKeys.size() == 1; + break; + case AUTO: + default: + useBloom = buildKeys.size() == 1 && rowCount > maxInSize; + break; + } + if (useBloom) { + FieldSpec.DataType storedType = dataSchema.getColumnDataType(buildKeys.get(0)).getStoredType().toDataType(); + if (storedType == FieldSpec.DataType.BIG_DECIMAL) { + useBloom = false; + } + } + + if (!useBloom) { + // Apply the same footprint ceiling the bloom tier honors. The exact-IN path (IN mode, or any + // multi-key, or AUTO below the threshold) emits up to maxBuildRows literals per key, AND'd across + // keys, which can be multi-MB. Abandon if the estimated serialized size exceeds maxBytes so there is + // one consistent ceiling regardless of tier; dropping the filter keeps the join correct. + if (estimateExactInBytes(rows, buildKeys, dataSchema) > maxBytes) { + return; + } + attachDynamicFilter(pinotQuery, probeKeys, buildKeys, rows, dataSchema); + return; + } + + Expression bloomPredicate = + buildBloomPredicate(pinotQuery.getSelectList().get(probeKeys.get(0)), buildKeys.get(0), rows, dataSchema, + rowCount, fpp, maxBytes); + if (bloomPredicate != null) { + andIntoFilter(pinotQuery, bloomPredicate); + } + // else: bloom exceeded maxBytes -> abandon (no filter); the join remains the source of truth. + } + + /** + * Returns the build rows whose every join-key column is non-null. A null key cannot match an INNER + * equi-join, so dropping such rows is sound and keeps the leaf filter builders null-safe. + */ + private static List retainNonNullKeyRows(List dataContainer, List buildKeys) { + List result = new ArrayList<>(dataContainer.size()); + for (Object[] row : dataContainer) { + boolean allNonNull = true; + for (int buildKey : buildKeys) { + if (row[buildKey] == null) { + allNonNull = false; + break; + } + } + if (allNonNull) { + result.add(row); + } + } + return result; + } + + /** Rough per-literal overhead (proto Expression + Literal wrapping) used by {@link #estimateExactInBytes}. */ + private static final int IN_LITERAL_OVERHEAD_BYTES = 8; + + /** + * Estimates the serialized footprint (in bytes) of the exact-IN predicate this build set would produce: + * one IN list per key over all rows. Used to abandon the exact-IN tier when it would exceed the same + * {@code maxBytes} ceiling the bloom tier honors. Fixed-width types are O(1); STRING/BYTES sum lengths. + */ + private static long estimateExactInBytes(List rows, List buildKeys, DataSchema dataSchema) { + int numRows = rows.size(); + long bytes = 0; + for (int buildKey : buildKeys) { + FieldSpec.DataType storedType = dataSchema.getColumnDataType(buildKey).getStoredType().toDataType(); + switch (storedType) { + case INT: + case FLOAT: + bytes += (long) numRows * (IN_LITERAL_OVERHEAD_BYTES + 4); + break; + case LONG: + case DOUBLE: + bytes += (long) numRows * (IN_LITERAL_OVERHEAD_BYTES + 8); + break; + case STRING: + for (Object[] row : rows) { + bytes += IN_LITERAL_OVERHEAD_BYTES + ((String) row[buildKey]).length(); + } + break; + case BYTES: + for (Object[] row : rows) { + bytes += IN_LITERAL_OVERHEAD_BYTES + ((ByteArray) row[buildKey]).length(); + } + break; + case BIG_DECIMAL: + // Variable-length: approximate the serialized form as the unscaled magnitude bytes plus scale. + for (Object[] row : rows) { + bytes += IN_LITERAL_OVERHEAD_BYTES + 4 + (((BigDecimal) row[buildKey]).unscaledValue().bitLength() / 8 + 1); + } + break; + default: + bytes += (long) numRows * (IN_LITERAL_OVERHEAD_BYTES + 16); + break; + } + } + return bytes; + } + + /** + * Builds a single-key bloom predicate: {@code IN_ID_SET(probeCol, '') = 1}, AND'd (for numeric + * keys) with {@code BETWEEN(probeCol, min, max)} to enable range-based segment pruning. Returns + * {@code null} if the serialized bloom would exceed {@code maxBytes} (caller abandons the filter). + */ + @Nullable + private static Expression buildBloomPredicate(Expression probeColExpr, int buildKey, List dataContainer, + DataSchema dataSchema, int rowCount, double fpp, int maxBytes) { + FieldSpec.DataType storedType = dataSchema.getColumnDataType(buildKey).getStoredType().toDataType(); + int expectedInsertions = Math.max(1, rowCount); + IdSet idSet = IdSets.create(storedType, -1, expectedInsertions, fpp); + Expression rangePredicate = null; + switch (storedType) { + case INT: { + int min = Integer.MAX_VALUE; + int max = Integer.MIN_VALUE; + for (Object[] row : dataContainer) { + int value = (int) row[buildKey]; + idSet.add(value); + min = Math.min(min, value); + max = Math.max(max, value); + } + rangePredicate = betweenPredicate(probeColExpr, RequestUtils.getLiteralExpression(min), + RequestUtils.getLiteralExpression(max)); + break; + } + case LONG: { + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + for (Object[] row : dataContainer) { + long value = (long) row[buildKey]; + idSet.add(value); + min = Math.min(min, value); + max = Math.max(max, value); + } + rangePredicate = betweenPredicate(probeColExpr, RequestUtils.getLiteralExpression(min), + RequestUtils.getLiteralExpression(max)); + break; + } + case FLOAT: { + float min = Float.POSITIVE_INFINITY; + float max = Float.NEGATIVE_INFINITY; + boolean hasNaN = false; + for (Object[] row : dataContainer) { + float value = (float) row[buildKey]; + idSet.add(value); + // NaN must NOT poison the range bounds: a finite BETWEEN(min, max) would drop probe NaN rows + // that should match a build NaN (false negative), and BETWEEN(NaN, NaN) drops everything. Keep + // NaN in the bloom (membership), but skip the range predicate entirely if any NaN is present. + if (Float.isNaN(value)) { + hasNaN = true; + } else { + min = Math.min(min, value); + max = Math.max(max, value); + } + } + if (!hasNaN) { + rangePredicate = betweenPredicate(probeColExpr, RequestUtils.getLiteralExpression(min), + RequestUtils.getLiteralExpression(max)); + } + break; + } + case DOUBLE: { + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + boolean hasNaN = false; + for (Object[] row : dataContainer) { + double value = (double) row[buildKey]; + idSet.add(value); + if (Double.isNaN(value)) { + hasNaN = true; + } else { + min = Math.min(min, value); + max = Math.max(max, value); + } + } + if (!hasNaN) { + rangePredicate = betweenPredicate(probeColExpr, RequestUtils.getLiteralExpression(min), + RequestUtils.getLiteralExpression(max)); + } + break; + } + case STRING: + for (Object[] row : dataContainer) { + idSet.add((String) row[buildKey]); + } + break; + case BYTES: + for (Object[] row : dataContainer) { + idSet.add(((ByteArray) row[buildKey]).getBytes()); + } + break; + default: + // Unsupported stored type for bloom (e.g. BIG_DECIMAL is filtered earlier) — abandon. + return null; + } + if (idSet.getSerializedSizeInBytes() > maxBytes) { + return null; + } + String base64IdSet; + try { + base64IdSet = idSet.toBase64String(); + } catch (IOException e) { + // Serialization failed — abandon the filter (the join remains the source of truth). + return null; + } + Expression inIdSet = RequestUtils.getFunctionExpression(TransformFunctionType.IN_ID_SET.getName(), probeColExpr, + RequestUtils.getLiteralExpression(base64IdSet)); + Expression bloomEq = + RequestUtils.getFunctionExpression(FilterKind.EQUALS.name(), inIdSet, RequestUtils.getLiteralExpression(1)); + if (rangePredicate == null) { + return bloomEq; + } + return RequestUtils.getFunctionExpression(FilterKind.AND.name(), bloomEq, rangePredicate); + } + + private static Expression betweenPredicate(Expression column, Expression min, Expression max) { + return RequestUtils.getFunctionExpression(FilterKind.BETWEEN.name(), column, min, max); + } + + /** + * ANDs the given predicate into the query's existing filter (or sets it if there is none). + */ + private static void andIntoFilter(PinotQuery pinotQuery, Expression predicate) { + Expression existing = pinotQuery.getFilterExpression(); + if (existing != null) { + pinotQuery.setFilterExpression(RequestUtils.getFunctionExpression(FilterKind.AND.name(), existing, predicate)); + } else { + pinotQuery.setFilterExpression(predicate); + } + } + /** * attach the dynamic filter to the given PinotQuery. */ diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java index 29b369c3f2fd..18ca991d98bd 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java @@ -42,6 +42,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; import org.apache.pinot.query.planner.plannode.SetOpNode; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; @@ -247,6 +248,36 @@ public Void visitEnrichedJoin(EnrichedJoinNode node, ServerPlanRequestContext co return null; } + @Override + public Void visitRuntimeFilter(RuntimeFilterNode node, ServerPlanRequestContext context) { + // Additive INNER-join probe-side runtime filter. input[0] is the probe pipeline (which builds the + // leaf PinotQuery), input[1] is the PIPELINE_BREAKER receive carrying the build-side join + // keys. Mirrors the SEMI dynamic-broadcast path in visitJoin, except the inner join itself stays in + // its intermediate stage; here we only AND a no-false-negative reducer onto the probe leaf scan. + List inputs = node.getInputs(); + PlanNode probe = inputs.get(0); + PlanNode buildKeyReceive = inputs.get(1); + Preconditions.checkState(buildKeyReceive instanceof MailboxReceiveNode + && ((MailboxReceiveNode) buildKeyReceive).getExchangeType() == PinotRelExchangeType.PIPELINE_BREAKER, + "RuntimeFilterNode second input must be a PIPELINE_BREAKER MailboxReceiveNode"); + if (visit(probe, context)) { + PipelineBreakerResult pipelineBreakerResult = context.getPipelineBreakerResult(); + int resultMapId = pipelineBreakerResult.getNodeIdMap().get(buildKeyReceive); + List blocks = pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, List.of()); + List resultDataContainer = new ArrayList<>(); + DataSchema dataSchema = buildKeyReceive.getDataSchema(); + for (MseBlock block : blocks) { + if (block.isData()) { + resultDataContainer.addAll(((MseBlock.Data) block).asRowHeap().getRows()); + } + } + // TODO: we should keep query stats here as well + ServerPlanRequestUtils.attachRuntimeFilter(context.getPinotQuery(), node.getProbeKeys(), node.getBuildKeys(), + resultDataContainer, dataSchema, node.getType()); + } + return null; + } + @Override public Void visitMailboxReceive(MailboxReceiveNode node, ServerPlanRequestContext context) { throw new UnsupportedOperationException("Leaf stage should not visit MailboxReceiveNode!"); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtilsTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtilsTest.java new file mode 100644 index 000000000000..4c507e1f2b58 --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtilsTest.java @@ -0,0 +1,388 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan.server; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.request.Function; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.query.planner.plannode.RuntimeFilterNode; +import org.apache.pinot.spi.utils.ByteArray; +import org.apache.pinot.sql.FilterKind; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + + +/** + * Unit tests for the probe-side runtime-filter construction in {@link ServerPlanRequestUtils}: the + * tiering decision (exact IN vs bloom), empty/multi-key handling, type fallbacks, and that an existing + * leaf filter is preserved. These exercise the no-false-negative reducer in isolation. + */ +public class ServerPlanRequestUtilsTest { + + private static PinotQuery queryWithProbeColumns(String... columns) { + PinotQuery pinotQuery = new PinotQuery(); + List selectList = new ArrayList<>(); + for (String column : columns) { + selectList.add(RequestUtils.getIdentifierExpression(column)); + } + pinotQuery.setSelectList(selectList); + return pinotQuery; + } + + private static List intRows(int... values) { + List rows = new ArrayList<>(); + for (int value : values) { + rows.add(new Object[]{value}); + } + return rows; + } + + /** Wraps each scalar build-key value as a single-column build row. */ + private static List rowsOf(Object... values) { + List rows = new ArrayList<>(); + for (Object value : values) { + rows.add(new Object[]{value}); + } + return rows; + } + + /** Returns the first function (DFS) whose operator equals {@code operator}, or null. */ + private static Function findFunction(Expression expression, String operator) { + if (expression == null || !expression.isSetFunctionCall()) { + return null; + } + Function function = expression.getFunctionCall(); + if (function.getOperator().equalsIgnoreCase(operator)) { + return function; + } + for (Expression operand : function.getOperands()) { + Function found = findFunction(operand, operator); + if (found != null) { + return found; + } + } + return null; + } + + @Test + public void testExactInSingleKey() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.INT}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), intRows(5, 1, 3), buildSchema, + RuntimeFilterNode.Type.IN); + + Expression filter = pinotQuery.getFilterExpression(); + assertNotNull(filter); + Function in = filter.getFunctionCall(); + assertEquals(in.getOperator(), FilterKind.IN.name()); + List operands = in.getOperands(); + // First operand is the probe column; the rest are the sorted build keys. + assertEquals(operands.get(0), RequestUtils.getIdentifierExpression("fk")); + assertEquals(operands.size(), 4); + assertEquals(operands.get(1).getLiteral().getIntValue(), 1); + assertEquals(operands.get(2).getLiteral().getIntValue(), 3); + assertEquals(operands.get(3).getLiteral().getIntValue(), 5); + } + + @Test + public void testAutoSmallUsesExactIn() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.INT}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), intRows(7, 8, 9), buildSchema, + RuntimeFilterNode.Type.AUTO); + // Few build-key rows -> AUTO chooses an exact IN (index-accelerated). + assertEquals(pinotQuery.getFilterExpression().getFunctionCall().getOperator(), FilterKind.IN.name()); + } + + @Test + public void testEmptyBuildYieldsConstantFalse() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.INT}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), new ArrayList<>(), buildSchema, + RuntimeFilterNode.Type.AUTO); + // Empty build side -> nothing can match -> constant false predicate prunes the whole probe. + Expression filter = pinotQuery.getFilterExpression(); + assertNotNull(filter); + assertTrue(filter.isSetLiteral(), "empty build should produce a constant literal filter"); + assertEquals(filter.getLiteral().getBoolValue(), false); + } + + @Test + public void testBloomSingleKeyEmitsInIdSetAndRange() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.INT}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), intRows(10, 20, 30), buildSchema, + RuntimeFilterNode.Type.BLOOM); + + Expression filter = pinotQuery.getFilterExpression(); + assertNotNull(filter); + // Bloom predicate: AND(EQUALS(inIdSet(fk, ''), 1), BETWEEN(fk, 10, 30)). + assertNotNull(findFunction(filter, "inIdSet"), "bloom path must emit an inIdSet transform"); + Function between = findFunction(filter, FilterKind.BETWEEN.name()); + assertNotNull(between, "bloom path must emit a BETWEEN range predicate for numeric keys"); + assertEquals(between.getOperands().get(1).getLiteral().getIntValue(), 10); + assertEquals(between.getOperands().get(2).getLiteral().getIntValue(), 30); + } + + @Test + public void testBigDecimalBloomFallsBackToExactIn() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.BIG_DECIMAL}); + List rows = new ArrayList<>(); + rows.add(new Object[]{new java.math.BigDecimal("1.5")}); + rows.add(new Object[]{new java.math.BigDecimal("2.5")}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), rows, buildSchema, + RuntimeFilterNode.Type.BLOOM); + // BIG_DECIMAL is unsupported by IdSet -> falls back to an exact IN (no inIdSet). + assertEquals(pinotQuery.getFilterExpression().getFunctionCall().getOperator(), FilterKind.IN.name()); + } + + @Test + public void testMultiKeyEmitsExactInPerKey() { + PinotQuery pinotQuery = queryWithProbeColumns("fk1", "fk2"); + DataSchema buildSchema = + new DataSchema(new String[]{"k1", "k2"}, new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG}); + List rows = new ArrayList<>(); + rows.add(new Object[]{1, 100L}); + rows.add(new Object[]{2, 200L}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0, 1), List.of(0, 1), rows, buildSchema, + RuntimeFilterNode.Type.AUTO); + // Multi-key -> exact IN per key, ANDed together (no bloom for composite keys). + Expression filter = pinotQuery.getFilterExpression(); + assertEquals(filter.getFunctionCall().getOperator(), FilterKind.AND.name()); + assertEquals(filter.getFunctionCall().getOperands().size(), 2); + for (Expression operand : filter.getFunctionCall().getOperands()) { + assertEquals(operand.getFunctionCall().getOperator(), FilterKind.IN.name()); + } + } + + @Test + public void testAutoTiersToBloomAboveThreshold() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.INT}); + // maxInSize = 2, 3 build-key rows -> AUTO crosses the exact-IN threshold and uses a bloom. + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), intRows(1, 2, 3), buildSchema, + RuntimeFilterNode.Type.AUTO, 2, 0.01, 16 * 1024 * 1024, 1000); + assertNotNull(findFunction(pinotQuery.getFilterExpression(), "inIdSet"), + "AUTO above maxInSize build-key rows must switch to a bloom"); + } + + @Test + public void testAutoBelowThresholdStaysExactIn() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.INT}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), intRows(1, 2, 3), buildSchema, + RuntimeFilterNode.Type.AUTO, 10, 0.01, 16 * 1024 * 1024, 1000); + assertEquals(pinotQuery.getFilterExpression().getFunctionCall().getOperator(), FilterKind.IN.name()); + } + + @Test + public void testAutoMultiKeyStaysExactInAboveThreshold() { + PinotQuery pinotQuery = queryWithProbeColumns("fk1", "fk2"); + DataSchema buildSchema = + new DataSchema(new String[]{"k1", "k2"}, new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG}); + List rows = new ArrayList<>(); + rows.add(new Object[]{1, 10L}); + rows.add(new Object[]{2, 20L}); + rows.add(new Object[]{3, 30L}); + // Even above the threshold, multi-key stays exact-IN per key (the bloom tier is single-key only). + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0, 1), List.of(0, 1), rows, buildSchema, + RuntimeFilterNode.Type.AUTO, 1, 0.01, 16 * 1024 * 1024, 1000); + Expression filter = pinotQuery.getFilterExpression(); + assertNull(findFunction(filter, "inIdSet"), "multi-key must not use a bloom"); + assertEquals(filter.getFunctionCall().getOperator(), FilterKind.AND.name()); + for (Expression operand : filter.getFunctionCall().getOperands()) { + assertEquals(operand.getFunctionCall().getOperator(), FilterKind.IN.name()); + } + } + + @Test + public void testBuildOverMaxRowsAbandonsFilter() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.INT}); + // rowCount (3) > maxBuildRows (2): the planner cap was hit, so the key set may be truncated/incomplete + // -> the filter MUST be abandoned (no false negatives), leaving the query unfiltered. + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), intRows(1, 2, 3), buildSchema, + RuntimeFilterNode.Type.IN, 10000, 0.01, 16 * 1024 * 1024, 2); + assertNull(pinotQuery.getFilterExpression(), "a truncated (over-cap) build must abandon the filter"); + } + + @Test + public void testFloatBloomEmitsRangeWhenNoNaN() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.FLOAT}); + List rows = new ArrayList<>(); + rows.add(new Object[]{1.5f}); + rows.add(new Object[]{3.5f}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), rows, buildSchema, + RuntimeFilterNode.Type.BLOOM, 10000, 0.01, 16 * 1024 * 1024, 1000); + Expression filter = pinotQuery.getFilterExpression(); + assertNotNull(findFunction(filter, "inIdSet")); + assertNotNull(findFunction(filter, FilterKind.BETWEEN.name()), "finite float bloom must include a range"); + } + + @Test + public void testFloatNaNBuildKeyOmitsRangePredicate() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.FLOAT}); + List rows = new ArrayList<>(); + rows.add(new Object[]{1.5f}); + rows.add(new Object[]{Float.NaN}); + rows.add(new Object[]{3.5f}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), rows, buildSchema, + RuntimeFilterNode.Type.BLOOM, 10000, 0.01, 16 * 1024 * 1024, 1000); + Expression filter = pinotQuery.getFilterExpression(); + // A NaN build key must keep the bloom but DROP the BETWEEN range — a finite range would exclude probe + // NaN rows that should match (false negative), and BETWEEN(NaN, NaN) would drop everything. + assertNotNull(findFunction(filter, "inIdSet"), "bloom must still be emitted with a NaN build key"); + assertNull(findFunction(filter, FilterKind.BETWEEN.name()), + "a NaN build key must omit the BETWEEN range predicate to avoid false negatives"); + } + + @Test + public void testNullBuildKeysAreSkipped() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.LONG}); + List rows = new ArrayList<>(); + rows.add(new Object[]{1L}); + rows.add(new Object[]{null}); // null key must be skipped (cannot match an INNER join) + rows.add(new Object[]{3L}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), rows, buildSchema, + RuntimeFilterNode.Type.IN); + Function in = pinotQuery.getFilterExpression().getFunctionCall(); + assertEquals(in.getOperator(), FilterKind.IN.name()); + // fk + the two non-null keys (1, 3); the null is dropped, no NPE. + assertEquals(in.getOperands().size(), 3); + assertEquals(in.getOperands().get(1).getLiteral().getLongValue(), 1L); + assertEquals(in.getOperands().get(2).getLiteral().getLongValue(), 3L); + } + + @Test + public void testAllNullBuildKeysYieldConstantFalse() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.LONG}); + List rows = new ArrayList<>(); + rows.add(new Object[]{null}); + rows.add(new Object[]{null}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), rows, buildSchema, + RuntimeFilterNode.Type.AUTO); + Expression filter = pinotQuery.getFilterExpression(); + assertTrue(filter.isSetLiteral(), "an all-null build side cannot match anything"); + assertEquals(filter.getLiteral().getBoolValue(), false); + } + + @Test + public void testBloomOverMaxBytesAbandonsFilter() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.INT}); + // maxBytes = 1 forces the serialized bloom over the cap -> the filter is abandoned (no exception, + // no predicate); the real hash join remains the source of truth. + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), intRows(1, 2, 3), buildSchema, + RuntimeFilterNode.Type.BLOOM, 10000, 0.01, 1, 1000); + assertNull(pinotQuery.getFilterExpression(), "a bloom exceeding maxBytes must abandon the filter"); + } + + /** Every storable single join-key type, with a small build set, for the exact-IN footprint cap. */ + @DataProvider(name = "exactInCapTypes") + public static Object[][] exactInCapTypes() { + return new Object[][]{ + {ColumnDataType.INT, rowsOf(1, 2, 3)}, + {ColumnDataType.LONG, rowsOf(1L, 2L, 3L)}, + {ColumnDataType.FLOAT, rowsOf(1.0f, 2.0f)}, + {ColumnDataType.DOUBLE, rowsOf(1.0, 2.0)}, + {ColumnDataType.STRING, rowsOf("alpha", "beta")}, + {ColumnDataType.BYTES, rowsOf(new ByteArray(new byte[]{1}), new ByteArray(new byte[]{2}))}, + {ColumnDataType.BIG_DECIMAL, rowsOf(new BigDecimal("1.5"), new BigDecimal("2.5"))} + }; + } + + @Test(dataProvider = "exactInCapTypes") + public void testExactInCapHonoredForEachKeyType(ColumnDataType keyType, List buildRows) { + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{keyType}); + // A 1-byte ceiling forces every non-empty build set over the cap; this drives estimateExactInBytes + // through each per-type branch (STRING/BYTES/BIG_DECIMAL walk every row) and must abandon the filter. + PinotQuery abandon = queryWithProbeColumns("fk"); + ServerPlanRequestUtils.attachRuntimeFilter(abandon, List.of(0), List.of(0), buildRows, buildSchema, + RuntimeFilterNode.Type.IN, 10000, 0.01, 1, 1000); + assertNull(abandon.getFilterExpression(), "exact IN over maxBytes must abandon the filter for " + keyType); + // A generous ceiling keeps the same exact IN for that type. + PinotQuery keep = queryWithProbeColumns("fk"); + ServerPlanRequestUtils.attachRuntimeFilter(keep, List.of(0), List.of(0), buildRows, buildSchema, + RuntimeFilterNode.Type.IN, 10000, 0.01, 1 << 20, 1000); + assertEquals(keep.getFilterExpression().getFunctionCall().getOperator(), FilterKind.IN.name(), + "exact IN under maxBytes must be kept for " + keyType); + } + + @Test + public void testExactInCapBoundaryIsStrict() { + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.INT}); + // Three INT keys estimate to 3 * (IN_LITERAL_OVERHEAD_BYTES + 4) = 36 bytes. The guard is a strict + // '>' so a ceiling equal to the footprint keeps the filter and one byte under abandons it. + PinotQuery atLimit = queryWithProbeColumns("fk"); + ServerPlanRequestUtils.attachRuntimeFilter(atLimit, List.of(0), List.of(0), intRows(1, 2, 3), buildSchema, + RuntimeFilterNode.Type.IN, 10000, 0.01, 36, 1000); + assertEquals(atLimit.getFilterExpression().getFunctionCall().getOperator(), FilterKind.IN.name(), + "footprint == maxBytes must be kept (strict > boundary)"); + PinotQuery overLimit = queryWithProbeColumns("fk"); + ServerPlanRequestUtils.attachRuntimeFilter(overLimit, List.of(0), List.of(0), intRows(1, 2, 3), buildSchema, + RuntimeFilterNode.Type.IN, 10000, 0.01, 35, 1000); + assertNull(overLimit.getFilterExpression(), "footprint one byte over maxBytes must abandon"); + } + + @Test + public void testMultiKeyExactInOverMaxBytesAbandonsFilter() { + PinotQuery pinotQuery = queryWithProbeColumns("fk1", "fk2"); + DataSchema buildSchema = + new DataSchema(new String[]{"k1", "k2"}, new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG}); + List rows = new ArrayList<>(); + rows.add(new Object[]{1, 10L}); + rows.add(new Object[]{2, 20L}); + // Multi-key always uses exact IN (per key, AND'd); a tiny maxBytes must abandon the whole filter. + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0, 1), List.of(0, 1), rows, buildSchema, + RuntimeFilterNode.Type.AUTO, 10000, 0.01, 1, 1000); + assertNull(pinotQuery.getFilterExpression(), "a multi-key exact IN exceeding maxBytes must abandon the filter"); + } + + @Test + public void testExistingFilterPreserved() { + PinotQuery pinotQuery = queryWithProbeColumns("fk"); + Expression existing = RequestUtils.getFunctionExpression(FilterKind.GREATER_THAN.name(), + RequestUtils.getIdentifierExpression("fk"), RequestUtils.getLiteralExpression(0)); + pinotQuery.setFilterExpression(existing); + DataSchema buildSchema = new DataSchema(new String[]{"k"}, new ColumnDataType[]{ColumnDataType.INT}); + ServerPlanRequestUtils.attachRuntimeFilter(pinotQuery, List.of(0), List.of(0), intRows(1, 2), buildSchema, + RuntimeFilterNode.Type.IN); + // The runtime filter is ANDed with the pre-existing WHERE predicate. + Expression filter = pinotQuery.getFilterExpression(); + assertEquals(filter.getFunctionCall().getOperator(), FilterKind.AND.name()); + assertNotNull(findFunction(filter, FilterKind.GREATER_THAN.name())); + assertNotNull(findFunction(filter, FilterKind.IN.name())); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index b81f4fb28289..a6cf254d6678 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -609,6 +609,31 @@ public static class Broker { "pinot.broker.enable.dynamic.filtering.semijoin"; public static final boolean DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN = true; + /** + * Enables the additive probe-side runtime filter for equi-INNER joins (build the small build side's + * join keys into a reducer pushed onto the probe leaf scan). Disabled by default; can be + * force-enabled per-join via the {@code runtime_filter} join hint. + *

NOTE: requires all servers to support {@code RuntimeFilterNode}; enabling it (or using the + * {@code runtime_filter} hint) on a mixed-version cluster mid-rolling-upgrade will cause query + * failures on not-yet-upgraded servers. + */ + public static final String CONFIG_OF_BROKER_ENABLE_RUNTIME_FILTER_JOIN = + "pinot.broker.enable.runtime.filter.join"; + public static final boolean DEFAULT_ENABLE_RUNTIME_FILTER_JOIN = false; + // Runtime-filter sizing knobs are fixed defaults (not cluster-configurable). They affect + // only filter selectivity/size, never correctness (the real hash join is the source of truth). The + // planner and the leaf both read these same constants so their decisions stay aligned. + /** Build-key row count at/below which the {@code AUTO} runtime filter uses an exact IN list rather + * than a bloom (the build keys are not deduplicated, so this counts rows, not distinct values). */ + public static final int DEFAULT_RUNTIME_FILTER_MAX_IN_SIZE = 10_000; + /** Max build-key rows materialized for a runtime filter, enforced as a planner fetch cap; above it the + * filter is abandoned because the truncated key set would be incomplete (would risk false negatives). */ + public static final int DEFAULT_RUNTIME_FILTER_MAX_BUILD_ROWS = 1 << 20; + /** Target false-positive probability for the bloom runtime filter tier. */ + public static final double DEFAULT_RUNTIME_FILTER_FPP = 0.01; + /** Max serialized size of a bloom runtime filter; above this the filter is abandoned. */ + public static final int DEFAULT_RUNTIME_FILTER_MAX_BYTES = 16 * 1024 * 1024; + /** * Whether to use physical optimizer by default. * This value can always be overridden by {@link Request.QueryOptionKey#USE_PHYSICAL_OPTIMIZER} query option @@ -1028,6 +1053,13 @@ public static class QueryOptionKey { /// Option to customize the value of [Broker#CONFIG_OF_SORT_EXCHANGE_COPY_THRESHOLD] public static final String SORT_EXCHANGE_COPY_THRESHOLD = "sortExchangeCopyThreshold"; + /// Per-query enable/disable override for [Broker#CONFIG_OF_BROKER_ENABLE_RUNTIME_FILTER_JOIN]. + /// Accepts `off`/`false` (disable) or `on`/`true` (enable, defaulting to the AUTO tier). The + /// per-join reducer mode (in / bloom / auto) is selected via the `runtime_filter` join hint, not + /// this option. Disabling on a mixed-version cluster is the safe default; enabling + /// mid-rolling-upgrade can fail on not-yet-upgraded servers. + public static final String RUNTIME_FILTER_JOIN = "runtimeFilterJoin"; + // Vector search query options /** Number of inverted-list probes for IVF-based vector indexes. Higher values improve recall