Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ private Plan bindOlapTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
DatabaseIf database = pair.first;
OlapTable table = pair.second;
boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS;
boolean isDeletePartialUpdate = isPartialUpdate && sink.getDMLCommandType() == DMLCommandType.DELETE;
TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = sink.getPartialUpdateNewRowPolicy();

LogicalPlan child = ((LogicalPlan) sink.child());
Expand All @@ -194,7 +195,7 @@ private Plan bindOlapTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
// 1. bind target columns: from sink's column names to target tables' Columns
Pair<List<Column>, Integer> bindColumnsResult =
bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol,
sink.getDMLCommandType() == DMLCommandType.GROUP_COMMIT);
sink.getDMLCommandType() == DMLCommandType.GROUP_COMMIT, isDeletePartialUpdate);
List<Column> bindColumns = bindColumnsResult.first;
int extraColumnsNum = bindColumnsResult.second;

Expand Down Expand Up @@ -279,7 +280,7 @@ private Plan bindOlapTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
}

Map<String, NamedExpression> columnToOutput = getColumnToOutput(
ctx, table, isPartialUpdate, boundSink, child);
ctx, table, isPartialUpdate, isDeletePartialUpdate, boundSink, child);
LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(
table.getFullSchema(), child, columnToOutput);
List<Column> columns = new ArrayList<>(table.getFullSchema().size());
Expand Down Expand Up @@ -371,7 +372,8 @@ private LogicalProject<?> getOutputProjectByCoercion(List<Column> tableSchema, L

private static Map<String, NamedExpression> getColumnToOutput(
MatchingContext<? extends UnboundLogicalSink<Plan>> ctx,
TableIf table, boolean isPartialUpdate, LogicalTableSink<?> boundSink, LogicalPlan child) {
TableIf table, boolean isPartialUpdate, boolean isDeletePartialUpdate,
LogicalTableSink<?> boundSink, LogicalPlan child) {
// we need to insert all the columns of the target table
// although some columns are not mentions.
// so we add a projects to supply the default value.
Expand Down Expand Up @@ -494,6 +496,18 @@ private static Map<String, NamedExpression> getColumnToOutput(
// if processed in upper for loop, will lead to not found slot error
// It's the same reason for moving the processing of materialized columns down.
for (Column column : generatedColumns) {
if (isDeletePartialUpdate) {
NamedExpression childOutput = columnToChildOutput.get(column);
if (childOutput == null) {
continue;
}
Alias output = new Alias(TypeCoercionUtils.castIfNotSameType(
childOutput, DataType.fromCatalogType(column.getType())), column.getName());
columnToOutput.put(column.getName(), output);
columnToReplaced.put(column.getName(), output.toSlot());
replaceMap.put(output.toSlot(), output.child());
continue;
}
Map<String, String> currentSessionVars =
ctx.connectContext.getSessionVariable().getAffectQueryResultInPlanVariables();
try (AutoCloseSessionVariable autoClose = new AutoCloseSessionVariable(ctx.connectContext,
Expand Down Expand Up @@ -704,7 +718,7 @@ private Plan bindHiveTableSink(MatchingContext<UnboundHiveTableSink<Plan>> ctx)
if (boundSink.getCols().size() != child.getOutput().size()) {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, false,
Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, false, false,
boundSink, child);
LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
return boundSink.withChildAndUpdateOutput(fullOutputProject);
Expand Down Expand Up @@ -780,7 +794,7 @@ private Plan bindIcebergTableSink(MatchingContext<UnboundIcebergTableSink<Plan>>
+ "Expected " + boundSink.getCols().size() + " columns but got " + child.getOutput().size());
}

Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, false,
Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, false, false,
boundSink, child);

// For static partition columns, add constant expressions from PARTITION clause
Expand Down Expand Up @@ -905,7 +919,7 @@ private Plan bindMaxComputeTableSink(MatchingContext<UnboundMaxComputeTableSink<
if (boundSink.getCols().size() != child.getOutput().size()) {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, false,
Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, false, false,
boundSink, child);
LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
return boundSink.withChildAndUpdateOutput(fullOutputProject);
Expand Down Expand Up @@ -1132,7 +1146,7 @@ private List<Long> bindPartitionIds(OlapTable table, List<String> partitions, bo

// bindTargetColumns means bind sink node's target columns' names to target table's columns
private Pair<List<Column>, Integer> bindTargetColumns(OlapTable table, List<String> colsName,
boolean childHasSeqCol, boolean needExtraSeqCol, boolean isGroupCommit) {
boolean childHasSeqCol, boolean needExtraSeqCol, boolean isGroupCommit, boolean isDeletePartialUpdate) {
// if the table set sequence column in stream load phase, the sequence map column is null, we query it.
if (colsName.isEmpty()) {
// ATTN: group commit without column list should return all base index column
Expand All @@ -1150,7 +1164,7 @@ private Pair<List<Column>, Integer> bindTargetColumns(OlapTable table, List<Stri
++extraColumnsNum;
processedColsName.add(col.getName());
}
} else if (col.isGeneratedColumn()) {
} else if (col.isGeneratedColumn() && !isDeletePartialUpdate) {
++extraColumnsNum;
processedColsName.add(col.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,60 @@ protected void runBeforeAll() throws Exception {
+ "properties(\n"
+ " \"replication_num\"=\"1\"\n"
+ ")");
createTable("create table gen_value (\n"
+ " a int,\n"
+ " b int,\n"
+ " c int as (b + 1),\n"
+ " d int\n"
+ ")\n"
+ "unique key(a)\n"
+ "distributed by hash(a) buckets 4\n"
+ "properties(\n"
+ " \"replication_num\"=\"1\",\n"
+ " \"enable_unique_key_merge_on_write\" = \"true\",\n"
+ " \"enable_mow_light_delete\" = \"false\"\n"
+ ")");
createTable("create table gen_key (\n"
+ " a int,\n"
+ " c int as (b + 1),\n"
+ " b int,\n"
+ " d int\n"
+ ")\n"
+ "unique key(a, c)\n"
+ "distributed by hash(a) buckets 4\n"
+ "properties(\n"
+ " \"replication_num\"=\"1\",\n"
+ " \"enable_unique_key_merge_on_write\" = \"true\",\n"
+ " \"enable_mow_light_delete\" = \"false\"\n"
+ ")");
createTable("create table gen_value_required (\n"
+ " a int,\n"
+ " b int,\n"
+ " c int as (b + 1),\n"
+ " d int not null\n"
+ ")\n"
+ "unique key(a)\n"
+ "distributed by hash(a) buckets 4\n"
+ "properties(\n"
+ " \"replication_num\"=\"1\",\n"
+ " \"enable_unique_key_merge_on_write\" = \"true\",\n"
+ " \"enable_mow_light_delete\" = \"false\"\n"
+ ")");
createTable("create table gen_variant (\n"
+ " id int not null,\n"
+ " create_time datetime not null,\n"
+ " order_no varchar(128) not null,\n"
+ " receive_address_detail varchar(1024) not null default \"{}\",\n"
+ " d int not null,\n"
+ " new_col variant as (receive_address_detail) null\n"
+ ")\n"
+ "unique key(id, create_time, order_no)\n"
+ "distributed by hash(order_no) buckets 4\n"
+ "properties(\n"
+ " \"replication_num\"=\"1\",\n"
+ " \"enable_unique_key_merge_on_write\" = \"true\",\n"
+ " \"enable_mow_light_delete\" = \"false\"\n"
+ ")");
}

@Test
Expand Down Expand Up @@ -106,4 +160,39 @@ public void testFromClauseDelete() throws AnalysisException {
)
);
}

@Test
public void testDeletePartialUpdateWithGeneratedValueColumn() {
assertDeletePartialUpdateAnalyze("delete from gen_value t using src where t.a = src.k1");
}

@Test
public void testDeletePartialUpdateWithGeneratedKeyColumn() {
assertDeletePartialUpdateAnalyze("delete from gen_key t using src where t.a = src.k1");
}

@Test
public void testDeletePartialUpdateWithNotNullValueColumn() {
assertDeletePartialUpdateAnalyze("delete from gen_value_required t using src where t.a = src.k1");
}

@Test
public void testDeletePartialUpdateWithVariantGeneratedColumn() {
assertDeletePartialUpdateAnalyze("delete from gen_variant t using src where t.id = src.k1");
}

private void assertDeletePartialUpdateAnalyze(String sql) {
LogicalPlan parsed = new NereidsParser().parseSingle(sql);
Assertions.assertInstanceOf(DeleteFromUsingCommand.class, parsed);
DeleteFromUsingCommand command = ((DeleteFromUsingCommand) parsed);
LogicalPlan plan = command.completeQueryPlan(connectContext, command.getLogicalQuery());
PlanChecker.from(connectContext, plan)
.analyze(plan)
.rewrite()
.matches(
logicalOlapTableSink(
logicalProject()
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,15 @@
1 2 3
10 2 12

-- !delete_mow_without_light_delete_generated_column --
2 20 21 200

-- !delete_mow_without_light_delete_generated_key_column --
2 21 20 200

-- !delete_mow_without_light_delete_generated_column_with_not_null_value --
2 20 21 200

-- !delete_mow_without_light_delete_variant_generated_column --
2 2026-06-08T11:00 order-2 {"city":"bj"} 200

Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,101 @@ suite("test_generated_column_delete") {
) delete from test_par_gen_col_unique t1 using cte where t1.c=cte.c and t1.b=cte.b"""
qt_delete_query_cte_select "select * from test_par_gen_col_unique order by a,b,c"

multi_sql """
drop table if exists test_gen_col_mow_delete_partial_update;
create table test_gen_col_mow_delete_partial_update (
a int,
b int,
c int as (b + 1),
d int
)
unique key(a)
distributed by hash(a) buckets 1
properties (
"enable_unique_key_merge_on_write" = "true",
"enable_mow_light_delete" = "false",
"replication_num" = "1"
);
insert into test_gen_col_mow_delete_partial_update(a, b, d) values (1, 10, 100), (2, 20, 200);
"""
sql "delete from test_gen_col_mow_delete_partial_update where a = 1;"
qt_delete_mow_without_light_delete_generated_column """
select * from test_gen_col_mow_delete_partial_update order by a;
"""

multi_sql """
drop table if exists test_gen_key_mow_delete_partial_update;
create table test_gen_key_mow_delete_partial_update (
a int,
c int as (b + 1),
b int,
d int
)
unique key(a, c)
distributed by hash(a) buckets 1
properties (
"enable_unique_key_merge_on_write" = "true",
"enable_mow_light_delete" = "false",
"replication_num" = "1"
);
insert into test_gen_key_mow_delete_partial_update(a, b, d) values (1, 10, 100), (2, 20, 200);
"""
sql "delete from test_gen_key_mow_delete_partial_update where a = 1;"
qt_delete_mow_without_light_delete_generated_key_column """
select * from test_gen_key_mow_delete_partial_update order by a;
"""

multi_sql """
drop table if exists test_gen_col_mow_delete_partial_update_not_null_value;
create table test_gen_col_mow_delete_partial_update_not_null_value (
a int,
b int,
c int as (b + 1),
d int not null
)
unique key(a)
distributed by hash(a) buckets 1
properties (
"enable_unique_key_merge_on_write" = "true",
"enable_mow_light_delete" = "false",
"replication_num" = "1"
);
insert into test_gen_col_mow_delete_partial_update_not_null_value(a, b, d)
values (1, 10, 100), (2, 20, 200);
"""
sql "delete from test_gen_col_mow_delete_partial_update_not_null_value where a = 1;"
qt_delete_mow_without_light_delete_generated_column_with_not_null_value """
select * from test_gen_col_mow_delete_partial_update_not_null_value order by a;
"""

multi_sql """
drop table if exists test_gen_variant_mow_delete_partial_update;
create table test_gen_variant_mow_delete_partial_update (
id int not null,
create_time datetime not null,
order_no varchar(128) not null,
receive_address_detail varchar(1024) not null default "{}",
d int not null,
new_col variant as (receive_address_detail) null
)
unique key(id, create_time, order_no)
distributed by hash(order_no) buckets 1
properties (
"enable_unique_key_merge_on_write" = "true",
"enable_mow_light_delete" = "false",
"replication_num" = "1"
);
insert into test_gen_variant_mow_delete_partial_update(
id, create_time, order_no, receive_address_detail, d
) values
(1, '2026-06-08 10:00:00', 'order-1', '{"city":"sh"}', 100),
(2, '2026-06-08 11:00:00', 'order-2', '{"city":"bj"}', 200);
"""
sql "delete from test_gen_variant_mow_delete_partial_update where id = 1;"
qt_delete_mow_without_light_delete_variant_generated_column """
select id, create_time, order_no, receive_address_detail, d
from test_gen_variant_mow_delete_partial_update order by id;
"""


}
Loading