[FLINK-39991] Replace Flink ReplaceIntersectWithSemiJoinRule with Cal…#28533
[FLINK-39991] Replace Flink ReplaceIntersectWithSemiJoinRule with Cal…#28533liuyongvs wants to merge 1 commit into
Conversation
| <Resource name="optimized exec plan"> | ||
| <![CDATA[ | ||
| Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey]) | ||
| Join(joinType=[LeftSemiJoin], where=[((random IS NULL AND random0 IS NULL) OR (random = random0))], select=[random], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey]) |
There was a problem hiding this comment.
comments: IS NOT DISTINCT FROM(random, random0) is equal to ((random IS NULL AND random0 IS NULL) OR (random = random0))
There was a problem hiding this comment.
Root Cause Analysis of XML Test Differences
The Difference
After replacing Flink's ReplaceIntersectWithSemiJoinRule with Calcite's IntersectToSemiJoinRule, the join condition in XML test files changed from:
IS NOT DISTINCT FROM($0, $1)
to:
OR(AND(IS NULL($0), IS NULL($1)), =($0, $1))
Both forms are semantically equivalent.
Root Cause: collapseExpandedIsNotDistinctFromExpr Fails Due to Asymmetric removeNullabilityCast
The key difference between the two rules is how they construct the join condition, which determines whether RelBuilder.join() can collapse the expanded form back to IS NOT DISTINCT FROM.
Old Rule (Flink) — Collapse Succeeds
ReplaceIntersectWithSemiJoinRule calls SetOpRewriteUtil.generateEqualsCondition(), which constructs conditions using bare makeInputRef:
// SetOpRewriteUtil.java:59-71
RexNode leftRex = rexBuilder.makeInputRef(leftTypes.get(key), key);
RexNode rightRex = rexBuilder.makeInputRef(rightTypes.get(key), leftTypes.size() + key);
relBuilder.or(
rexBuilder.makeCall(EQUALS, leftRex, rightRex), // =($0, $1)
relBuilder.and(
relBuilder.isNull(leftRex), // IS NULL($0)
relBuilder.isNull(rightRex))); // IS NULL($1)
When RelBuilder.join() is called, it internally invokes collapseExpandedIsNotDistinctFromExpr (RelBuilder.java:3198). The IS_NULL operands ($0, $1) and the EQUALS operands ($0, $1) are identical — removeNullabilityCast has no effect on bare InputRef nodes. The equality check in doCollapseExpandedIsNotDistinctFrom (RelOptUtil.java:1883) passes, and the condition is collapsed to IS NOT DISTINCT FROM($0, $1).
New Rule (Calcite) — Collapse Fails
IntersectToSemiJoinRule calls builder.isNotDistinctFrom(), which internally calls RelOptUtil.isDistinctFromInternal() (RelOptUtil.java:2417-2425). This generates:
OR(AND(IS_NULL(CAST($0)), IS_NULL(CAST($1))),
IS_TRUE(=(CAST($0), CAST($1))))
The CAST nodes come from the rule's use of makeCast(leastFieldType, ...) (IntersectToSemiJoinRule.java:125-128) to align field types to the Intersect's leastRowType.
When collapseExpandedIsNotDistinctFromExpr processes this condition, it reaches doCollapseExpandedIsNotDistinctFrom (RelOptUtil.java:1873-1890):
// RelOptUtil.java:1875-1881
final RexNode isNullInput0 = ifNull0Call.getOperands().get(0); // CAST($0) — no transform
final RexNode isNullInput1 = ifNull1Call.getOperands().get(0); // CAST($1) — no transform
final RexNode equalsInput0 = RexUtil
.removeNullabilityCast(rexBuilder.getTypeFactory(), equalsCall.getOperands().get(0)); // $0 — CAST removed!
final RexNode equalsInput1 = RexUtil
.removeNullabilityCast(rexBuilder.getTypeFactory(), equalsCall.getOperands().get(1)); // $1 — CAST removed!
removeNullabilityCast strips CAST nodes that only change nullability (e.g., CAST($0 AS INT NOT NULL) → $0). It is applied only to the EQUALS operands, not to the IS_NULL operands. This creates an asymmetry:
isNullInput0 = CAST($0 AS type) (untouched)
equalsInput0 = $0 (CAST stripped by removeNullabilityCast)
Since CAST($0) ≠ $0, the equality check fails (RelOptUtil.java:1883-1884), and the collapse returns the original expression unchanged.
After the failed collapse, simplifyUnknownAsFalse (RelBuilder.java:3201) takes over:
IS_TRUE(=($0, $1)) → =($0, $1) (simplification removes IS_TRUE wrapper)
Remaining CAST nodes are also simplified away
The final output is: OR(AND(IS NULL($0), IS NULL($1)), =($0, $1))
f5edddb to
b592bc8
Compare
… with Calcite IntersectToSemiJoinRule
|
|
||
| // set operators | ||
| ReplaceIntersectWithSemiJoinRule.INSTANCE, | ||
| CoreRules.INTERSECT_TO_SEMI_JOIN, |
There was a problem hiding this comment.
My AI says
- Verify that the expanded join condition doesn't negatively impact query performance in production workloads
- Confirm that ReplaceMinusWithAntiJoinRule removal (mentioned but not shown) is equally safe
- Consider adding a comment in the rule sets explaining why the Calcite rules are preferred over custom implementations
What is the purpose of the change
This is follow up PR after Calcite upgrade to 1.40.0
There 1 Calcite classes which might be removed from Flink codebase
Brief change log
Replace ReplaceIntersectWithSemiJoinRule with Calcite IntersectToSemiJoinRule
Verifying this change
existing tests
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation
Was generative AI tooling used to co-author this PR?