Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.immutables.value.Value;

/**
* Planner rule that apply various simplifying transformations on join condition. e.g. reduce same
* expressions: a=b AND b=a -> a=b, simplify boolean expressions: x = 1 AND FALSE -> FALSE, simplify
* cast expressions: CAST('123' as integer) -> 123
* cast expressions: CAST('123' as integer) -> 123, collapse expanded IS NOT DISTINCT FROM
* expressions: OR(AND(IS NULL(a), IS NULL(b)), a = b) -> IS NOT DISTINCT FROM(a, b)
*/
@Value.Enclosing
public class SimplifyJoinConditionRule
Expand All @@ -55,6 +58,19 @@ public void onMatch(RelOptRuleCall call) {
join.getCluster().getRexBuilder(),
condition,
join.getCluster().getPlanner().getExecutor());

// After simplification, redundant CASTs may have been removed, making it possible
// to collapse expanded IS NOT DISTINCT FROM expressions
// (e.g., OR(AND(IS NULL(a), IS NULL(b)), a = b) -> IS NOT DISTINCT FROM(a, b)).
// This is needed because MinusToAntiJoinRule generates conditions with makeCast,
// which prevents collapseExpandedIsNotDistinctFromExpr from working in RelBuilder.join()
// (the CASTs are removed later by RexSimplify, but after collapse has already failed).
if (simpleCondExp instanceof RexCall) {
simpleCondExp =
RelOptUtil.collapseExpandedIsNotDistinctFromExpr(
(RexCall) simpleCondExp, join.getCluster().getRexBuilder());
}

RexNode newCondExp = RexUtil.pullFactors(join.getCluster().getRexBuilder(), simpleCondExp);

if (newCondExp.equals(condition)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ object FlinkBatchRuleSets {
// set operators
ReplaceIntersectWithSemiJoinRule.INSTANCE,
RewriteIntersectAllRule.INSTANCE,
ReplaceMinusWithAntiJoinRule.INSTANCE,
MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule,
RewriteMinusAllRule.INSTANCE
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ object FlinkStreamRuleSets {
// set operators
ReplaceIntersectWithSemiJoinRule.INSTANCE,
RewriteIntersectAllRule.INSTANCE,
ReplaceMinusWithAntiJoinRule.INSTANCE,
MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule,
RewriteMinusAllRule.INSTANCE
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import org.apache.flink.table.planner.plan.optimize.program.{BatchOptimizeContex
import org.apache.flink.table.planner.utils.TableTestBase

import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.calcite.rel.logical.LogicalMinus
import org.apache.calcite.rel.rules.MinusToAntiJoinRule
import org.apache.calcite.tools.RuleSets
import org.junit.jupiter.api.{BeforeEach, Test}

/** Test for [[ReplaceMinusWithAntiJoinRule]]. */
/** Test for [[MinusToAntiJoinRule]]. */
class ReplaceMinusWithAntiJoinRuleTest extends TableTestBase {

private val util = batchTestUtil()
Expand All @@ -38,7 +40,9 @@ class ReplaceMinusWithAntiJoinRuleTest extends TableTestBase {
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(RuleSets.ofList(ReplaceMinusWithAntiJoinRule.INSTANCE))
.add(RuleSets.ofList(
MinusToAntiJoinRule.Config.DEFAULT.withOperandFor(classOf[LogicalMinus]).toRule,
SimplifyJoinConditionRule.INSTANCE))
.build()
)
util.replaceBatchProgram(programs)
Expand Down