diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java deleted file mode 100644 index d71f4bd4d5e950..00000000000000 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.rules.logical; - -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.Intersect; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rel.core.RelFactories; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.tools.RelBuilder; -import org.apache.calcite.util.Util; -import org.immutables.value.Value; - -import java.util.List; - -import static org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition; - -/** - * Planner rule that replaces distinct {@link Intersect} with a distinct {@link Aggregate} on a SEMI - * {@link Join}. - * - *

Only handle the case of input size 2. - */ -@Value.Enclosing -public class ReplaceIntersectWithSemiJoinRule - extends RelRule { - - public static final ReplaceIntersectWithSemiJoinRule INSTANCE = - ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig.DEFAULT - .toRule(); - - private ReplaceIntersectWithSemiJoinRule(ReplaceIntersectWithSemiJoinRuleConfig config) { - super(config); - } - - @Override - public boolean matches(RelOptRuleCall call) { - Intersect intersect = call.rel(0); - return !intersect.all && intersect.getInputs().size() == 2; - } - - @Override - public void onMatch(RelOptRuleCall call) { - Intersect intersect = call.rel(0); - RelNode left = intersect.getInput(0); - RelNode right = intersect.getInput(1); - - RelBuilder relBuilder = call.builder(); - List keys = Util.range(left.getRowType().getFieldCount()); - List conditions = generateEqualsCondition(relBuilder, left, right, keys); - - relBuilder.push(left); - relBuilder.push(right); - relBuilder - .join(JoinRelType.SEMI, conditions) - .aggregate( - relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray())); - RelNode rel = relBuilder.build(); - call.transformTo(rel); - } - - /** Rule configuration. */ - @Value.Immutable(singleton = false) - public interface ReplaceIntersectWithSemiJoinRuleConfig extends RelRule.Config { - ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig DEFAULT = - ImmutableReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig - .builder() - .build() - .withOperandSupplier(b0 -> b0.operand(Intersect.class).anyInputs()) - .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER) - .withDescription("ReplaceIntersectWithSemiJoinRule"); - - @Override - default ReplaceIntersectWithSemiJoinRule toRule() { - return new ReplaceIntersectWithSemiJoinRule(this); - } - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index e840808b5e1c24..88b7ece5b46207 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -329,9 +329,9 @@ object FlinkBatchRuleSets { FlinkSemiAntiJoinFilterTransposeRule.INSTANCE, // set operators - ReplaceIntersectWithSemiJoinRule.INSTANCE, + CoreRules.INTERSECT_TO_SEMI_JOIN, RewriteIntersectAllRule.INSTANCE, - ReplaceMinusWithAntiJoinRule.INSTANCE, + MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule, RewriteMinusAllRule.INSTANCE ) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index bc06d2d688baac..aedc4fb32bf0f1 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -321,9 +321,9 @@ object FlinkStreamRuleSets { FlinkSemiAntiJoinFilterTransposeRule.INSTANCE, // set operators - ReplaceIntersectWithSemiJoinRule.INSTANCE, + CoreRules.INTERSECT_TO_SEMI_JOIN, RewriteIntersectAllRule.INSTANCE, - ReplaceMinusWithAntiJoinRule.INSTANCE, + MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule, RewriteMinusAllRule.INSTANCE ) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml index fc8dbccb4f8f0b..6c09f8e0c5a1dd 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml @@ -32,7 +32,7 @@ LogicalIntersect(all=[false]) ($0, 1)]) +- LogicalAggregate(group=[{0, 1, 2}]) - +- LogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $3), IS NOT DISTINCT FROM($1, $4), IS NOT DISTINCT FROM($2, $5))], joinType=[semi]) + +- LogicalJoin(condition=[AND(OR(AND(IS NULL($0), IS NULL($3)), =($0, $3)), OR(AND(IS NULL($1), IS NULL($4)), =($1, $4)), OR(AND(IS NULL($2), IS NULL($5)), =($2, $5)))], joinType=[semi]) :- LogicalProject(a=[$0], b=[$1], c=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) +- LogicalProject(d=[$0], e=[$1], f=[$2]) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.scala index 35cee9fd601114..53415f95ff01d8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.scala @@ -22,10 +22,14 @@ import org.apache.flink.table.planner.plan.optimize.program.{BatchOptimizeContex import org.apache.flink.table.planner.utils.TableTestBase import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.rel.rules.CoreRules import org.apache.calcite.tools.RuleSets import org.junit.jupiter.api.{BeforeEach, Test} -/** Test for [[ReplaceIntersectWithSemiJoinRule]]. */ +/** + * Former test for [[ReplaceIntersectWithSemiJoinRule]] which now replaced + * by Calcite's [[CoreRules#INTERSECT_TO_SEMI_JOIN]]. + */ class ReplaceIntersectWithSemiJoinRuleTest extends TableTestBase { private val util = batchTestUtil() @@ -38,7 +42,7 @@ class ReplaceIntersectWithSemiJoinRuleTest extends TableTestBase { FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) - .add(RuleSets.ofList(ReplaceIntersectWithSemiJoinRule.INSTANCE)) + .add(RuleSets.ofList(CoreRules.INTERSECT_TO_SEMI_JOIN)) .build() ) util.replaceBatchProgram(programs)