diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index d7da7498ba70a8..8c580171b21151 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -183,6 +183,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { DatabaseIf database = pair.first; OlapTable table = pair.second; boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS; + boolean isDeletePartialUpdate = isPartialUpdate && sink.getDMLCommandType() == DMLCommandType.DELETE; TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = sink.getPartialUpdateNewRowPolicy(); LogicalPlan child = ((LogicalPlan) sink.child()); @@ -194,7 +195,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { // 1. bind target columns: from sink's column names to target tables' Columns Pair, Integer> bindColumnsResult = bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol, - sink.getDMLCommandType() == DMLCommandType.GROUP_COMMIT); + sink.getDMLCommandType() == DMLCommandType.GROUP_COMMIT, isDeletePartialUpdate); List bindColumns = bindColumnsResult.first; int extraColumnsNum = bindColumnsResult.second; @@ -279,7 +280,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { } Map columnToOutput = getColumnToOutput( - ctx, table, isPartialUpdate, boundSink, child); + ctx, table, isPartialUpdate, isDeletePartialUpdate, boundSink, child); LogicalProject fullOutputProject = getOutputProjectByCoercion( table.getFullSchema(), child, columnToOutput); List columns = new ArrayList<>(table.getFullSchema().size()); @@ -371,7 +372,8 @@ private LogicalProject getOutputProjectByCoercion(List tableSchema, L private static Map getColumnToOutput( MatchingContext> ctx, - TableIf table, boolean isPartialUpdate, LogicalTableSink boundSink, LogicalPlan child) { + TableIf table, boolean isPartialUpdate, boolean isDeletePartialUpdate, + LogicalTableSink boundSink, LogicalPlan child) { // we need to insert all the columns of the target table // although some columns are not mentions. // so we add a projects to supply the default value. @@ -494,6 +496,18 @@ private static Map getColumnToOutput( // if processed in upper for loop, will lead to not found slot error // It's the same reason for moving the processing of materialized columns down. for (Column column : generatedColumns) { + if (isDeletePartialUpdate) { + NamedExpression childOutput = columnToChildOutput.get(column); + if (childOutput == null) { + continue; + } + Alias output = new Alias(TypeCoercionUtils.castIfNotSameType( + childOutput, DataType.fromCatalogType(column.getType())), column.getName()); + columnToOutput.put(column.getName(), output); + columnToReplaced.put(column.getName(), output.toSlot()); + replaceMap.put(output.toSlot(), output.child()); + continue; + } Map currentSessionVars = ctx.connectContext.getSessionVariable().getAffectQueryResultInPlanVariables(); try (AutoCloseSessionVariable autoClose = new AutoCloseSessionVariable(ctx.connectContext, @@ -704,7 +718,7 @@ private Plan bindHiveTableSink(MatchingContext> ctx) if (boundSink.getCols().size() != child.getOutput().size()) { throw new AnalysisException("insert into cols should be corresponding to the query output"); } - Map columnToOutput = getColumnToOutput(ctx, table, false, + Map columnToOutput = getColumnToOutput(ctx, table, false, false, boundSink, child); LogicalProject fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput); return boundSink.withChildAndUpdateOutput(fullOutputProject); @@ -780,7 +794,7 @@ private Plan bindIcebergTableSink(MatchingContext> + "Expected " + boundSink.getCols().size() + " columns but got " + child.getOutput().size()); } - Map columnToOutput = getColumnToOutput(ctx, table, false, + Map columnToOutput = getColumnToOutput(ctx, table, false, false, boundSink, child); // For static partition columns, add constant expressions from PARTITION clause @@ -905,7 +919,7 @@ private Plan bindMaxComputeTableSink(MatchingContext columnToOutput = getColumnToOutput(ctx, table, false, + Map columnToOutput = getColumnToOutput(ctx, table, false, false, boundSink, child); LogicalProject fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput); return boundSink.withChildAndUpdateOutput(fullOutputProject); @@ -1132,7 +1146,7 @@ private List bindPartitionIds(OlapTable table, List partitions, bo // bindTargetColumns means bind sink node's target columns' names to target table's columns private Pair, Integer> bindTargetColumns(OlapTable table, List colsName, - boolean childHasSeqCol, boolean needExtraSeqCol, boolean isGroupCommit) { + boolean childHasSeqCol, boolean needExtraSeqCol, boolean isGroupCommit, boolean isDeletePartialUpdate) { // if the table set sequence column in stream load phase, the sequence map column is null, we query it. if (colsName.isEmpty()) { // ATTN: group commit without column list should return all base index column @@ -1150,7 +1164,7 @@ private Pair, Integer> bindTargetColumns(OlapTable table, List