diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java
deleted file mode 100644
index 35c719e38466eb..00000000000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.rules.logical;
-
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.core.Minus;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.calcite.util.Util;
-import org.immutables.value.Value;
-
-import java.util.List;
-
-import static org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition;
-
-/**
- * Planner rule that replaces distinct {@link org.apache.calcite.rel.core.Minus} (SQL keyword:
- * EXCEPT) with a distinct {@link org.apache.calcite.rel.core.Aggregate} on an ANTI {@link
- * org.apache.calcite.rel.core.Join}.
- *
- *
Only handle the case of input size 2.
- */
-@Value.Enclosing
-public class ReplaceMinusWithAntiJoinRule
- extends RelRule {
-
- public static final ReplaceMinusWithAntiJoinRule INSTANCE =
- ReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig.DEFAULT.toRule();
-
- private ReplaceMinusWithAntiJoinRule(ReplaceMinusWithAntiJoinRuleConfig config) {
- super(config);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- Minus minus = call.rel(0);
- return !minus.all && minus.getInputs().size() == 2;
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- Minus minus = call.rel(0);
- RelNode left = minus.getInput(0);
- RelNode right = minus.getInput(1);
-
- RelBuilder relBuilder = call.builder();
- List keys = Util.range(left.getRowType().getFieldCount());
- List conditions = generateEqualsCondition(relBuilder, left, right, keys);
-
- relBuilder.push(left);
- relBuilder.push(right);
- relBuilder
- .join(JoinRelType.ANTI, conditions)
- .aggregate(
- relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray()));
- RelNode rel = relBuilder.build();
- call.transformTo(rel);
- }
-
- /** Rule configuration. */
- @Value.Immutable(singleton = false)
- public interface ReplaceMinusWithAntiJoinRuleConfig extends RelRule.Config {
- ReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig DEFAULT =
- ImmutableReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig.builder()
- .build()
- .withOperandSupplier(b0 -> b0.operand(Minus.class).anyInputs())
- .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
- .withDescription("ReplaceMinusWithAntiJoinRule");
-
- @Override
- default ReplaceMinusWithAntiJoinRule toRule() {
- return new ReplaceMinusWithAntiJoinRule(this);
- }
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.java
index bfd07eda7e0aba..943dcdeac39f26 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.java
@@ -21,8 +21,10 @@
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.immutables.value.Value;
@@ -30,7 +32,8 @@
/**
* Planner rule that apply various simplifying transformations on join condition. e.g. reduce same
* expressions: a=b AND b=a -> a=b, simplify boolean expressions: x = 1 AND FALSE -> FALSE, simplify
- * cast expressions: CAST('123' as integer) -> 123
+ * cast expressions: CAST('123' as integer) -> 123, collapse expanded IS NOT DISTINCT FROM
+ * expressions: OR(AND(IS NULL(a), IS NULL(b)), a = b) -> IS NOT DISTINCT FROM(a, b)
*/
@Value.Enclosing
public class SimplifyJoinConditionRule
@@ -55,6 +58,19 @@ public void onMatch(RelOptRuleCall call) {
join.getCluster().getRexBuilder(),
condition,
join.getCluster().getPlanner().getExecutor());
+
+ // After simplification, redundant CASTs may have been removed, making it possible
+ // to collapse expanded IS NOT DISTINCT FROM expressions
+ // (e.g., OR(AND(IS NULL(a), IS NULL(b)), a = b) -> IS NOT DISTINCT FROM(a, b)).
+ // This is needed because MinusToAntiJoinRule generates conditions with makeCast,
+ // which prevents collapseExpandedIsNotDistinctFromExpr from working in RelBuilder.join()
+ // (the CASTs are removed later by RexSimplify, but after collapse has already failed).
+ if (simpleCondExp instanceof RexCall) {
+ simpleCondExp =
+ RelOptUtil.collapseExpandedIsNotDistinctFromExpr(
+ (RexCall) simpleCondExp, join.getCluster().getRexBuilder());
+ }
+
RexNode newCondExp = RexUtil.pullFactors(join.getCluster().getRexBuilder(), simpleCondExp);
if (newCondExp.equals(condition)) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 58aeac3b0fab16..4b5453cc9015f2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -331,7 +331,7 @@ object FlinkBatchRuleSets {
// set operators
ReplaceIntersectWithSemiJoinRule.INSTANCE,
RewriteIntersectAllRule.INSTANCE,
- ReplaceMinusWithAntiJoinRule.INSTANCE,
+ MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule,
RewriteMinusAllRule.INSTANCE
)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 4f07715e78c681..deaf98ecee1acf 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -323,7 +323,7 @@ object FlinkStreamRuleSets {
// set operators
ReplaceIntersectWithSemiJoinRule.INSTANCE,
RewriteIntersectAllRule.INSTANCE,
- ReplaceMinusWithAntiJoinRule.INSTANCE,
+ MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule,
RewriteMinusAllRule.INSTANCE
)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala
index d8001aba4e4769..ab71b32181a0da 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala
@@ -22,10 +22,12 @@ import org.apache.flink.table.planner.plan.optimize.program.{BatchOptimizeContex
import org.apache.flink.table.planner.utils.TableTestBase
import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.logical.LogicalMinus
+import org.apache.calcite.rel.rules.MinusToAntiJoinRule
import org.apache.calcite.tools.RuleSets
import org.junit.jupiter.api.{BeforeEach, Test}
-/** Test for [[ReplaceMinusWithAntiJoinRule]]. */
+/** Test for [[MinusToAntiJoinRule]]. */
class ReplaceMinusWithAntiJoinRuleTest extends TableTestBase {
private val util = batchTestUtil()
@@ -38,7 +40,9 @@ class ReplaceMinusWithAntiJoinRuleTest extends TableTestBase {
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(RuleSets.ofList(ReplaceMinusWithAntiJoinRule.INSTANCE))
+ .add(RuleSets.ofList(
+ MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule,
+ SimplifyJoinConditionRule.INSTANCE))
.build()
)
util.replaceBatchProgram(programs)