From 4048d6f83d12203fbe09e14b5933c1d241b7f80e Mon Sep 17 00:00:00 2001 From: liuyongvs Date: Wed, 24 Jun 2026 15:10:37 +0800 Subject: [PATCH] [FLINK-39985] Replace ReplaceMinusWithAntiJoinRule with Calcite MinusToAntiJoinRule Calcite 1.40 provides MinusToAntiJoinRule which is a superset of Flink ReplaceMinusWithAntiJoinRule: it supports n-way inputs (vs. 2-way only) and uses IS NOT DISTINCT FROM for NULL-safe comparison (semantically equivalent to Flink OR(=, AND(IS NULL, IS NULL))). This commit removes the Flink-specific rule and uses the Calcite rule instead. --- .../logical/ReplaceMinusWithAntiJoinRule.java | 95 ------------------- .../logical/SimplifyJoinConditionRule.java | 18 +++- .../plan/rules/FlinkBatchRuleSets.scala | 2 +- .../plan/rules/FlinkStreamRuleSets.scala | 2 +- .../ReplaceMinusWithAntiJoinRuleTest.scala | 8 +- 5 files changed, 25 insertions(+), 100 deletions(-) delete mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java deleted file mode 100644 index 35c719e38466e..0000000000000 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.rules.logical; - -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rel.core.Minus; -import org.apache.calcite.rel.core.RelFactories; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.tools.RelBuilder; -import org.apache.calcite.util.Util; -import org.immutables.value.Value; - -import java.util.List; - -import static org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition; - -/** - * Planner rule that replaces distinct {@link org.apache.calcite.rel.core.Minus} (SQL keyword: - * EXCEPT) with a distinct {@link org.apache.calcite.rel.core.Aggregate} on an ANTI {@link - * org.apache.calcite.rel.core.Join}. - * - *

Only handle the case of input size 2. - */ -@Value.Enclosing -public class ReplaceMinusWithAntiJoinRule - extends RelRule { - - public static final ReplaceMinusWithAntiJoinRule INSTANCE = - ReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig.DEFAULT.toRule(); - - private ReplaceMinusWithAntiJoinRule(ReplaceMinusWithAntiJoinRuleConfig config) { - super(config); - } - - @Override - public boolean matches(RelOptRuleCall call) { - Minus minus = call.rel(0); - return !minus.all && minus.getInputs().size() == 2; - } - - @Override - public void onMatch(RelOptRuleCall call) { - Minus minus = call.rel(0); - RelNode left = minus.getInput(0); - RelNode right = minus.getInput(1); - - RelBuilder relBuilder = call.builder(); - List keys = Util.range(left.getRowType().getFieldCount()); - List conditions = generateEqualsCondition(relBuilder, left, right, keys); - - relBuilder.push(left); - relBuilder.push(right); - relBuilder - .join(JoinRelType.ANTI, conditions) - .aggregate( - relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray())); - RelNode rel = relBuilder.build(); - call.transformTo(rel); - } - - /** Rule configuration. */ - @Value.Immutable(singleton = false) - public interface ReplaceMinusWithAntiJoinRuleConfig extends RelRule.Config { - ReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig DEFAULT = - ImmutableReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig.builder() - .build() - .withOperandSupplier(b0 -> b0.operand(Minus.class).anyInputs()) - .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER) - .withDescription("ReplaceMinusWithAntiJoinRule"); - - @Override - default ReplaceMinusWithAntiJoinRule toRule() { - return new ReplaceMinusWithAntiJoinRule(this); - } - } -} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.java index bfd07eda7e0ab..943dcdeac39f2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.java @@ -21,8 +21,10 @@ import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; import org.immutables.value.Value; @@ -30,7 +32,8 @@ /** * Planner rule that apply various simplifying transformations on join condition. e.g. reduce same * expressions: a=b AND b=a -> a=b, simplify boolean expressions: x = 1 AND FALSE -> FALSE, simplify - * cast expressions: CAST('123' as integer) -> 123 + * cast expressions: CAST('123' as integer) -> 123, collapse expanded IS NOT DISTINCT FROM + * expressions: OR(AND(IS NULL(a), IS NULL(b)), a = b) -> IS NOT DISTINCT FROM(a, b) */ @Value.Enclosing public class SimplifyJoinConditionRule @@ -55,6 +58,19 @@ public void onMatch(RelOptRuleCall call) { join.getCluster().getRexBuilder(), condition, join.getCluster().getPlanner().getExecutor()); + + // After simplification, redundant CASTs may have been removed, making it possible + // to collapse expanded IS NOT DISTINCT FROM expressions + // (e.g., OR(AND(IS NULL(a), IS NULL(b)), a = b) -> IS NOT DISTINCT FROM(a, b)). + // This is needed because MinusToAntiJoinRule generates conditions with makeCast, + // which prevents collapseExpandedIsNotDistinctFromExpr from working in RelBuilder.join() + // (the CASTs are removed later by RexSimplify, but after collapse has already failed). + if (simpleCondExp instanceof RexCall) { + simpleCondExp = + RelOptUtil.collapseExpandedIsNotDistinctFromExpr( + (RexCall) simpleCondExp, join.getCluster().getRexBuilder()); + } + RexNode newCondExp = RexUtil.pullFactors(join.getCluster().getRexBuilder(), simpleCondExp); if (newCondExp.equals(condition)) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index 58aeac3b0fab1..4b5453cc9015f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -331,7 +331,7 @@ object FlinkBatchRuleSets { // set operators ReplaceIntersectWithSemiJoinRule.INSTANCE, RewriteIntersectAllRule.INSTANCE, - ReplaceMinusWithAntiJoinRule.INSTANCE, + MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule, RewriteMinusAllRule.INSTANCE ) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index 4f07715e78c68..deaf98ecee1ac 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -323,7 +323,7 @@ object FlinkStreamRuleSets { // set operators ReplaceIntersectWithSemiJoinRule.INSTANCE, RewriteIntersectAllRule.INSTANCE, - ReplaceMinusWithAntiJoinRule.INSTANCE, + MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule, RewriteMinusAllRule.INSTANCE ) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala index d8001aba4e476..ab71b32181a0d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala @@ -22,10 +22,12 @@ import org.apache.flink.table.planner.plan.optimize.program.{BatchOptimizeContex import org.apache.flink.table.planner.utils.TableTestBase import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.rel.logical.LogicalMinus +import org.apache.calcite.rel.rules.MinusToAntiJoinRule import org.apache.calcite.tools.RuleSets import org.junit.jupiter.api.{BeforeEach, Test} -/** Test for [[ReplaceMinusWithAntiJoinRule]]. */ +/** Test for [[MinusToAntiJoinRule]]. */ class ReplaceMinusWithAntiJoinRuleTest extends TableTestBase { private val util = batchTestUtil() @@ -38,7 +40,9 @@ class ReplaceMinusWithAntiJoinRuleTest extends TableTestBase { FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) - .add(RuleSets.ofList(ReplaceMinusWithAntiJoinRule.INSTANCE)) + .add(RuleSets.ofList( + MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule, + SimplifyJoinConditionRule.INSTANCE)) .build() ) util.replaceBatchProgram(programs)