Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,9 @@ object FlinkBatchRuleSets {
FlinkSemiAntiJoinFilterTransposeRule.INSTANCE,

// set operators
ReplaceIntersectWithSemiJoinRule.INSTANCE,
CoreRules.INTERSECT_TO_SEMI_JOIN,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My AI says

  • Verify that the expanded join condition doesn't negatively impact query performance in production workloads
  • Confirm that ReplaceMinusWithAntiJoinRule removal (mentioned but not shown) is equally safe
  • Consider adding a comment in the rule sets explaining why the Calcite rules are preferred over custom implementations

RewriteIntersectAllRule.INSTANCE,
ReplaceMinusWithAntiJoinRule.INSTANCE,
MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule,
RewriteMinusAllRule.INSTANCE
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,9 @@ object FlinkStreamRuleSets {
FlinkSemiAntiJoinFilterTransposeRule.INSTANCE,

// set operators
ReplaceIntersectWithSemiJoinRule.INSTANCE,
CoreRules.INTERSECT_TO_SEMI_JOIN,
RewriteIntersectAllRule.INSTANCE,
ReplaceMinusWithAntiJoinRule.INSTANCE,
MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule,
RewriteMinusAllRule.INSTANCE
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ LogicalIntersect(all=[false])
<Resource name="optimized rel plan">
<![CDATA[
LogicalAggregate(group=[{0}])
+- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
+- LogicalJoin(condition=[OR(AND(IS NULL($0), IS NULL($1)), =($0, $1))], joinType=[semi])
:- LogicalProject(c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalProject(f=[$2])
Expand All @@ -57,7 +57,7 @@ LogicalIntersect(all=[false])
<Resource name="optimized rel plan">
<![CDATA[
LogicalAggregate(group=[{0}])
+- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
+- LogicalJoin(condition=[OR(AND(IS NULL($0), IS NULL($1)), =($0, $1))], joinType=[semi])
:- LogicalProject(c=[$2])
: +- LogicalFilter(condition=[=(1, 0)])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
Expand All @@ -83,7 +83,7 @@ LogicalIntersect(all=[false])
<Resource name="optimized rel plan">
<![CDATA[
LogicalAggregate(group=[{0}])
+- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
+- LogicalJoin(condition=[OR(AND(IS NULL($0), IS NULL($1)), =($0, $1))], joinType=[semi])
:- LogicalProject(c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalProject(f=[$2])
Expand Down Expand Up @@ -112,7 +112,7 @@ LogicalProject(c=[$2])
LogicalProject(c=[$2])
+- LogicalFilter(condition=[>($0, 1)])
+- LogicalAggregate(group=[{0, 1, 2}])
+- LogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $3), IS NOT DISTINCT FROM($1, $4), IS NOT DISTINCT FROM($2, $5))], joinType=[semi])
+- LogicalJoin(condition=[AND(OR(AND(IS NULL($0), IS NULL($3)), =($0, $3)), OR(AND(IS NULL($1), IS NULL($4)), =($1, $4)), OR(AND(IS NULL($2), IS NULL($5)), =($2, $5)))], joinType=[semi])
:- LogicalProject(a=[$0], b=[$1], c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalProject(d=[$0], e=[$1], f=[$2])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ import org.apache.flink.table.planner.plan.optimize.program.{BatchOptimizeContex
import org.apache.flink.table.planner.utils.TableTestBase

import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.calcite.rel.rules.CoreRules
import org.apache.calcite.tools.RuleSets
import org.junit.jupiter.api.{BeforeEach, Test}

/** Test for [[ReplaceIntersectWithSemiJoinRule]]. */
/**
* Former test for [[ReplaceIntersectWithSemiJoinRule]] which now replaced
* by Calcite's [[CoreRules#INTERSECT_TO_SEMI_JOIN]].
*/
class ReplaceIntersectWithSemiJoinRuleTest extends TableTestBase {

private val util = batchTestUtil()
Expand All @@ -38,7 +42,7 @@ class ReplaceIntersectWithSemiJoinRuleTest extends TableTestBase {
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(RuleSets.ofList(ReplaceIntersectWithSemiJoinRule.INSTANCE))
.add(RuleSets.ofList(CoreRules.INTERSECT_TO_SEMI_JOIN))
.build()
)
util.replaceBatchProgram(programs)
Expand Down