Skip to content

Add probe-side runtime filters for inner joins in the multi-stage engine#18848

Open
yashmayya wants to merge 1 commit into
apache:masterfrom
yashmayya:inner-join-runtime-filter
Open

Add probe-side runtime filters for inner joins in the multi-stage engine#18848
yashmayya wants to merge 1 commit into
apache:masterfrom
yashmayya:inner-join-runtime-filter

Conversation

@yashmayya

@yashmayya yashmayya commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

What

Adds probe-side runtime filters for equi-INNER joins in the multi-stage query engine (MSE). When
the build (right) side of a hash join is small or selective, the planner builds a reducer from its
join keys and pushes it down to the probe (left) leaf scan, so the probe table drops rows
that cannot possibly match before they are shuffled across the network into the join.

This is the INNER-join counterpart of the existing SEMI-join dynamic broadcast
(PinotJoinToDynamicBroadcastRule). It is disabled by default.

Why

For the classic fact ⋈ dim shape — a large fact table joined to a small dimension table (or a heavily
filtered build side) — the MSE today hash-shuffles the entire probe (fact) side into the join stage,
even though only the rows whose join key appears on the (tiny) build side can contribute to the result.
That wastes scan, serialization, and network bandwidth proportional to the whole fact table rather than
to the matching subset.

The SEMI-join path already solves the analogous problem by replacing the join with a leaf IN filter,
but that rewrite is only legal for semi-joins (which emit left columns only). An inner join projects
columns from both sides, so the join must still run. This PR therefore makes the filter additive:
the real hash join is left completely intact, and we only add a reducer on the probe leaf.

How it works

After exchange insertion (POST_LOGICAL), PinotJoinToInnerRuntimeFilterRule rewrites an eligible
inner join:

        [ Inner Join ]   (unchanged — still hash-shuffles both sides)
        /            \
   [xChange L]    [xChange R]
       /                \
 [RuntimeFilter]    [build subtree]
    /        \
[probe leaf] [PIPELINE_BREAKER xChange]
                   |
            [ build keys: Project(rightKeys) -> Filter(IS NOT NULL) -> limit(maxBuildRows + 1) ]
  • The join and both of its HASH exchanges are kept verbatim — execution and results are identical to
    before; the filter is purely additive.
  • A new RuntimeFilterRel / RuntimeFilterNode is grafted on top of the probe leaf subtree
    (input[0] = probe pipeline, pass-through; input[1] = a PIPELINE_BREAKER mailbox carrying the
    build-side join keys). The pipeline breaker runs the build side first and ships its keys to
    the probe-leaf worker, reusing the same mechanism as the SEMI dynamic broadcast.
  • At the probe leaf, ServerPlanRequestUtils.attachRuntimeFilter ANDs a tiered, no-false-negative
    reducer onto the V1 leaf query:
    • Exact IN for small key sets (at/below a build-key-row threshold), or for multi-key /
      BIG_DECIMAL keys. This is index-accelerated and drives segment pruning.
    • Bloom filter (IN_ID_SET) above the threshold, plus a BETWEEN(min, max) range predicate for
      numeric keys to enable cheap range-based segment pruning. Bloom keeps the wire/heap footprint bounded
      for high-cardinality build sides.
  • Because the real hash join is the source of truth, the reducer can be abandoned at any point
    (empty/over-cap build, oversized bloom, unsupported subtree, mixed-version cluster) with no effect on
    results. Bloom false positives are simply re-checked and discarded by the join.

This mirrors runtime/dynamic filtering in Trino, Impala, and Spark's InjectRuntimeFilter.

When it helps

  • Large fact table joined to a small or selectively-filtered dimension/build side.
  • The probe side is a leaf scan (table scan, optionally with single-input Project/Filter), so the
    filter can be pushed all the way down to segment scan.

It is not beneficial (and is best left off) when the build side is large or non-selective, or the
probe is cheap — there is no automatic selectivity-based gate yet, so enablement is opt-in (see below).

How to use

Per-join hint (selects the reducer mode):

SELECT /*+ joinOptions(runtime_filter='auto') */ ...
FROM fact JOIN dim ON fact.key = dim.key
WHERE dim.attr = 'x'

runtime_filter accepts off | in | bloom | auto (exact IN below the threshold, else bloom).

Cluster-wide default (enable/disable only; defaults to auto when on):

pinot.broker.enable.runtime.filter.join=true

Per-query override: SET runtimeFilterJoin='on' (or off).

Thresholds & defaults

There is one user-facing switch (the enable flag / query option / hint). Every sizing threshold below is
a fixed constant in this first version — each affects only the filter's selectivity and size, never
correctness (the real hash join re-checks every surviving row), so they are intentionally not
cluster-configurable yet.

Knob Default Evaluated at Role
pinot.broker.enable.runtime.filter.join false broker config Cluster-wide enable. Overridable per query by runtimeFilterJoin and per join by the runtime_filter hint.
runtimeFilterJoin (query option) unset per query on/off enable switch; when on, defaults to the auto tier.
runtime_filter (join hint) unset per join off / in / bloom / auto — selects the reducer mode for that join.
max IN size 10000 leaf (runtime) auto emits an exact IN at/below this many build-key rows, and a bloom above it.
max build rows 1048576 (2^20) planner + leaf The build-key stage is capped at maxBuildRows + 1 rows. If the cap is hit the key set may be truncated/incomplete, so the leaf abandons the filter (results stay correct). This also bounds the pipeline-breaker memory. The planner cap and the leaf abandon read the same constant, so they cannot diverge.
bloom FPP 0.01 leaf (runtime) Target false-positive probability for the bloom tier. False positives only admit a few extra probe rows, which the join then discards.
bloom max bytes 16 MB leaf (runtime) If the serialized bloom would exceed this, the filter is abandoned (no predicate emitted).

Notes:

  • max IN size, bloom FPP, and bloom max bytes are applied on the probe-leaf server once the build
    keys are materialized; max build rows is enforced both as the planner's fetch cap on the build-key
    stage and as the leaf's truncation guard.
  • The build keys are not de-duplicated (no DISTINCT), so max IN size compares against the
    build-key row count, not the distinct-value count — the leaf IN/bloom dedups implicitly.
  • Exceeding max build rows, an oversized bloom, an empty/all-null build, or an unsupported probe shape
    all simply drop the filter — never a wrong result.

Correctness & safety

  • No false negatives. Exact IN is exact; a bloom never reports present-as-absent and its false
    positives are discarded by the real join; the BETWEEN(min, max) bounds cover every build key.
  • Null keys are excluded (they cannot match an inner equi-join) both at the planner (IS NOT NULL)
    and defensively at the leaf.
  • NaN float/double build keys keep the bloom membership but skip the range predicate (a finite
    range would wrongly drop probe NaN rows).
  • Truncation-safe. The build-key stage is capped at maxBuildRows + 1; if the cap is hit the key set
    is incomplete, so the filter is abandoned (the planner cap and the leaf abandon use the same constant).
  • Mixed-version. The only wire change is the new RuntimeFilterNode proto variant; the default-off
    flag is the guard. Enabling the flag (or using the hint) mid-rolling-upgrade can fail queries on
    not-yet-upgraded servers — documented on the config constant.
  • The reducer is built and applied entirely at the planner and the leaf-stage query, so it requires no
    changes to join execution itself.

Testing

  • PinotJoinToInnerRuntimeFilterRuleTest — rule firing/plan shape, probe-key/build-key value alignment,
    multi-key, hint/flag/query-option gating, pipeline-breaker distribution, negative cases.
  • ServerPlanRequestUtilsTest — exact-IN, bloom + range-prune, AUTO tiering, multi-key, empty/all-null
    build, null-key skip, NaN range omission, maxBytes/maxBuildRows abandon, existing-filter merge.
  • PlanNodeDeserializerTest — mixed-version graceful failure on the new proto variant.
  • RuntimeFilterJoinIntegrationTest — end-to-end cluster self-joins asserting results are identical
    with the filter on (in/bloom/auto) and off, across INT/LONG/DOUBLE/STRING/mixed-type/null/multi-key/
    empty-build cases.
  • Full pinot-query-planner (1321) and pinot-query-runtime (4431) suites pass — no regressions.

Limitations / future work

  • No automatic, statistics-driven enablement yet (opt-in via hint/flag); a future change can gate it on
    cardinality/selectivity estimates.
  • The build side is materialized a second time for the key broadcast; a shared spool would avoid this.
  • Bloom is single-key (composite-key tuple-encoding deferred); partitioned (both-sides-hash) joins use
    the broadcast path. Only the logical (HEP) planner is wired; wiring it into the v2 MSE physical
    optimizer is a follow-up.

@yashmayya yashmayya added multi-stage Related to the multi-stage query engine feature New functionality labels Jun 24, 2026
@codecov-commenter

codecov-commenter commented Jun 24, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 60.48387% with 147 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.78%. Comparing base (dc4e957) to head (278ee24).

Files with missing lines Patch % Lines
...ry/runtime/plan/server/ServerPlanRequestUtils.java 67.21% 37 Missing and 3 partials ⚠️
.../runtime/plan/server/ServerPlanRequestVisitor.java 0.00% 18 Missing ⚠️
...he/pinot/query/planner/explain/PlanNodeMerger.java 0.00% 13 Missing ⚠️
...e/pinot/query/runtime/InStageStatsTreeBuilder.java 0.00% 12 Missing ⚠️
...e/rel/rules/PinotJoinToInnerRuntimeFilterRule.java 85.07% 5 Missing and 5 partials ⚠️
...inot/query/planner/plannode/RuntimeFilterNode.java 54.54% 6 Missing and 4 partials ⚠️
.../query/planner/logical/EquivalentStagesFinder.java 0.00% 7 Missing ⚠️
.../query/planner/logical/PlanNodeToRelConverter.java 0.00% 6 Missing ⚠️
...ry/planner/explain/PhysicalExplainPlanVisitor.java 0.00% 4 Missing ⚠️
...inot/query/planner/serde/PlanNodeDeserializer.java 60.00% 3 Missing and 1 partial ⚠️
... and 12 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18848      +/-   ##
============================================
- Coverage     64.81%   64.78%   -0.03%     
  Complexity     1322     1322              
============================================
  Files          3393     3396       +3     
  Lines        211246   211618     +372     
  Branches      33208    33276      +68     
============================================
+ Hits         136917   137102     +185     
- Misses        63284    63442     +158     
- Partials      11045    11074      +29     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.78% <60.48%> (-0.03%) ⬇️
temurin 64.78% <60.48%> (-0.03%) ⬇️
unittests 64.78% <60.48%> (-0.03%) ⬇️
unittests1 57.02% <60.81%> (ø)
unittests2 37.09% <5.64%> (-0.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@yashmayya yashmayya force-pushed the inner-join-runtime-filter branch 3 times, most recently from 248d799 to 89fb53c Compare June 24, 2026 23:03
@gortiz

gortiz commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Really nice piece of work — the additive design is clean, the no-false-negative reasoning is carefully laid out, and the correctness coverage (planner/leaf cap sharing a constant, null-key exclusion on both sides, NaN range omission, graceful mixed-version proto fallback, visitor completeness enforced by the non-default interface method) is thorough. A few things I'd like to discuss before merge, all around sizing/footprint and validation rather than correctness:

1. The exact-IN tier has no footprint cap. maxBytes (16 MB) only guards the bloom path, and maxInSize only chooses bloom-vs-IN for single-key AUTO. For runtime_filter='in', and for any multi-key join (including AUTO multi-key above threshold), we fall to attachDynamicFilter and emit an exact IN of up to maxBuildRows (≈1M) literals per key, AND'd across keys — a multi-MB filter expression embedded in the leaf PinotQuery with no abandon guard. Could we have the exact-IN path honor the same maxBytes abandon as bloom, so there's one consistent footprint ceiling regardless of tier? That keeps the "no new config in v1" stance while closing the gap.

2. Bloom sizing knobs aren't tunable. MAX_IN_SIZE, MAX_BUILD_ROWS, FPP, and MAX_BYTES are fixed constants with no CONFIG_OF_* keys — the only runtime-reachable attachRuntimeFilter hardcodes the defaults (the parameterized overload is test-only). I understand the intent (they affect selectivity/size, not correctness) and I'm fine deferring real config to a follow-up. Just flagging it so it's a conscious decision, and tying back to (1): MAX_BYTES is the one knob that actually bounds footprint, and right now it doesn't apply to the tier that can get largest.

3. No benchmark. This is fundamentally a perf optimization (the motivation is all scan/serialization/network savings), but there's no JMH or cluster measurement — all tests assert result parity, not speedup. Since there's no selectivity gate yet and the feature can regress when the build side is large or the probe is cheap, a benchmark on the canonical large-fact ⋈ small-dim shape — showing the win at high selectivity and the cost at low selectivity / large build — would both justify the feature and tell us how real the hazard in (1) is. pinot-perf already has MSE/join scaffolding to build on. Reasonable to defer for an opt-in v1, but worth at least one data point.

4. Minor / please confirm: NaN handling is correct for the bloom (membership kept, range dropped), but the exact-IN tier just emits IN(probeCol, …, NaN, …) via the reused computeInOperands. Since float/double keys are now first-class here, can you confirm the leaf IN predicate and the MSE hash join agree on NaN = NaN (i.e. either both match it or both drop it)? If they disagree, exact-IN could drop a probe NaN row the join would keep. Likely consistent with the existing SEMI path, but worth a sentence given how carefully NaN is handled elsewhere.

None of these block correctness — the join remaining the source of truth makes the feature safe. (1) and (4) are the ones I'd most like resolved.

When the build side of an equi-inner-join is small/selective, build a reducer (exact IN below a
threshold, else a bloom filter plus a min/max range predicate) from its distinct join keys and push
it down to the probe-side leaf scan via a pipeline-breaker edge, so the probe (fact) table drops
non-matching rows before they are shuffled into the join. The real hash join still runs and remains
the source of truth, so the filter is a no-false-negative optimization that can be omitted at any time.

Generalizes the existing SEMI dynamic-broadcast machinery to inner joins (the join is kept and the
filter is additive). Disabled by default; enabled via the runtime_filter join hint or the
pinot.broker.enable.runtime.filter.join cluster flag / runtimeFilterJoin query option.
@yashmayya yashmayya force-pushed the inner-join-runtime-filter branch from 89fb53c to 278ee24 Compare June 25, 2026 21:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New functionality multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants