Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,8 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
new AlterMaterializedTableChangeOperation(
op.getTableIdentifier(),
oldTable -> op.getTableChanges(),
suspendMaterializedTable);
suspendMaterializedTable,
op.getSinkModifyQuery());
operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);

Expand Down Expand Up @@ -1010,7 +1011,10 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(

AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
tableIdentifier, oldTable -> tableChanges, oldMaterializedTable);
tableIdentifier,
oldTable -> tableChanges,
oldMaterializedTable,
op.getSinkModifyQuery());

operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);
Expand All @@ -1029,7 +1033,10 @@ private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedT
AlterMaterializedTableChangeOperation op) {

return new AlterMaterializedTableChangeOperation(
op.getTableIdentifier(), oldTable -> List.of(), oldMaterializedTable);
op.getTableIdentifier(),
oldTable -> List.of(),
oldMaterializedTable,
op.getSinkModifyQuery());
}

private TableChange.ModifyRefreshHandler generateResetSavepointTableChange(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,8 @@ private ResultFetcher executeOperationInStatementSetState(
TableEnvironmentInternal tableEnv, OperationHandle handle, Operation operation) {
if (operation instanceof EndStatementSetOperation) {
return callEndStatementSetOperation(tableEnv, handle);
} else if (operation instanceof ModifyOperation) {
} else if (operation instanceof ModifyOperation
&& !(operation instanceof MaterializedTableOperation)) {
sessionContext.addStatementSetOperation((ModifyOperation) operation);
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
} else {
Expand All @@ -518,7 +519,7 @@ private ResultFetcher executeOperation(
} else if (op instanceof EndStatementSetOperation) {
throw new SqlExecutionException(
"No Statement Set to submit. 'END' statement should be used after 'BEGIN STATEMENT SET'.");
} else if (op instanceof ModifyOperation) {
} else if (op instanceof ModifyOperation && !(op instanceof MaterializedTableOperation)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason MaterializedTableOperation is not a ModifyOperation

however after that we call callModifyOperations ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaterializedTableOperation is a marker shared by SUSPEND, RESUME, REFRESH and DROP too. Those have no definition query, so they can't implement the ModifyOperation contract (getChild() / accept(visitor)) in any meaningful way. More importantly, if the marker were a ModifyOperation, every MT op would fall into the callModifyOperations branch and execute as an INSERT.

That's what the && !(op instanceof MaterializedTableOperation) guard enforces. So only the query-bearing operations implement ModifyOperation, and purely so the planner can translate them into a sink for EXPLAIN. Execution still goes through the manager for all of them.

return callModifyOperations(
tableEnv, handle, Collections.singletonList((ModifyOperation) op));
} else if (op instanceof CompileAndExecutePlanOperation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.TABLE;
import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND;
import static org.apache.flink.table.factories.FactoryUtil.WORKFLOW_SCHEDULER_TYPE;
import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.executeStatement;
Expand Down Expand Up @@ -293,6 +294,40 @@ void testConvertTableLeftSuspendedWhenRefreshJobFailsToStart() throws Exception
assertThat(suspendedMaterializedTable.getRefreshStatus()).isSameAs(RefreshStatus.SUSPENDED);
}

@Test
void testExplainConvertTableToMaterializedTable() throws Exception {
// pre-create a regular table that CREATE OR ALTER would convert in place.
String createRegularTableDDL =
"CREATE TABLE users_shops (\n"
+ " user_id BIGINT,\n"
+ " shop_id BIGINT,\n"
+ " payment_amount_cents BIGINT\n"
+ ")\n"
+ "WITH ('connector' = 'values')";
OperationHandle createTableHandle =
executeStatement(service, sessionHandle, createRegularTableDDL);
awaitOperationTermination(service, sessionHandle, createTableHandle);

String explainDDL =
"EXPLAIN CREATE OR ALTER MATERIALIZED TABLE users_shops"
+ " FRESHNESS = INTERVAL '30' SECOND"
+ " AS SELECT user_id, shop_id, payment_amount_cents FROM datagenSource";

// EXPLAIN succeeds and returns a single non-empty plan. The plan content itself is asserted
// by the planner tests in TableSinkTest.
OperationHandle explainHandle = executeStatement(service, sessionHandle, explainDDL);
awaitOperationTermination(service, sessionHandle, explainHandle);
List<RowData> explainResults = fetchAllResults(service, sessionHandle, explainHandle);
assertThat(explainResults).hasSize(1);
assertThat(explainResults.get(0).getString(0).toString()).isNotBlank();

// EXPLAIN is side-effect-free: the entry is still a regular table, not converted.
assertThat(
service.getTable(sessionHandle, getObjectIdentifier("users_shops"))
.getTableKind())
.isSameAs(TABLE);
}

private SessionHandle initializeSession() {
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
String catalogDDL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1858,6 +1858,83 @@ void testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws E
dropMaterializedTable(getObjectIdentifier("my_materialized_table"));
}

@Test
void testExplainCreateOrAlterMaterializedTableOnExistingTable() throws Exception {
createAndVerifyCreateMaterializedTableWithData(
"users_shops", List.of(), Map.of(), RefreshMode.FULL);
ObjectIdentifier userShopsIdentifier = getObjectIdentifier("users_shops");
ResolvedCatalogMaterializedTable oldTable = getTable(userShopsIdentifier);

// CREATE OR ALTER on an existing table behaves like an alter-with-query. Re-state the same
// definition so EXPLAIN plans the sink over the (unchanged) table.
String explainDDL =
"EXPLAIN CREATE OR ALTER MATERIALIZED TABLE users_shops"
+ " PARTITIONED BY (ds)"
+ " WITH ('format' = 'debezium-json')"
+ " FRESHNESS = INTERVAL '30' SECOND"
+ " REFRESH_MODE = FULL"
+ " AS SELECT"
+ " user_id,"
+ " shop_id,"
+ " ds,"
+ " COUNT(order_id) AS order_cnt"
+ " FROM ("
+ " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source"
+ " ) AS tmp"
+ " GROUP BY (user_id, shop_id, ds)";

// EXPLAIN succeeds (the storage options carried over from the existing table avoid a
// "Missing required options" failure) and returns a single non-empty plan. The plan content
// itself is asserted by the planner tests in TableSinkTest.
OperationHandle explainHandle = executeStatement(explainDDL);
awaitOperationTermination(service, sessionHandle, explainHandle);
List<RowData> explainResults = fetchAllResults(service, sessionHandle, explainHandle);
assertThat(explainResults).hasSize(1);
assertThat(explainResults.get(0).getString(0).toString()).isNotBlank();

// EXPLAIN is side-effect-free: the existing materialized table is unchanged.
ResolvedCatalogMaterializedTable newTable = getTable(userShopsIdentifier);
assertThat(newTable.getResolvedSchema()).isEqualTo(oldTable.getResolvedSchema());
assertThat(newTable.getExpandedQuery()).isEqualTo(oldTable.getExpandedQuery());

dropMaterializedTable(userShopsIdentifier);
}

@Test
void testExplainAlterMaterializedTableAsQuery() throws Exception {
createAndVerifyCreateMaterializedTableWithData(
"users_shops", List.of(), Map.of(), RefreshMode.FULL);
ObjectIdentifier userShopsIdentifier = getObjectIdentifier("users_shops");
ResolvedCatalogMaterializedTable oldTable = getTable(userShopsIdentifier);

String explainDDL =
"EXPLAIN ALTER MATERIALIZED TABLE users_shops"
+ " AS SELECT"
+ " user_id,"
+ " shop_id,"
+ " ds,"
+ " COUNT(order_id) AS order_cnt"
+ " FROM ("
+ " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source"
+ " ) AS tmp"
+ " GROUP BY (user_id, shop_id, ds)";

// EXPLAIN succeeds and returns a single non-empty plan. The plan content itself is asserted
// by the planner tests in TableSinkTest.
OperationHandle explainHandle = executeStatement(explainDDL);
awaitOperationTermination(service, sessionHandle, explainHandle);
List<RowData> explainResults = fetchAllResults(service, sessionHandle, explainHandle);
assertThat(explainResults).hasSize(1);
assertThat(explainResults.get(0).getString(0).toString()).isNotBlank();

// EXPLAIN is side-effect-free: the materialized table definition is unchanged.
ResolvedCatalogMaterializedTable newTable = getTable(userShopsIdentifier);
assertThat(newTable.getResolvedSchema()).isEqualTo(oldTable.getResolvedSchema());
assertThat(newTable.getExpandedQuery()).isEqualTo(oldTable.getExpandedQuery());

dropMaterializedTable(userShopsIdentifier);
}

private void setupSavepointDir(Path temporaryPath) throws Exception {
String savepointDir = "file://" + temporaryPath.toAbsolutePath();
String setupSavepointDDL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3138,16 +3138,25 @@ SqlNode SqlRichExplain() :
stmt = RichSqlInsert()
|
stmt = SqlCreate()
|
stmt = SqlAlterMaterializedTable()
)
{
if ((stmt instanceof SqlCreate)
&& !(stmt instanceof SqlCreateTableAs)
&& !(stmt instanceof SqlReplaceTableAs)) {
&& !(stmt instanceof SqlReplaceTableAs)
&& !(stmt instanceof SqlCreateOrAlterMaterializedTable)) {
throw SqlUtil.newContextException(
getPos(),
ParserResource.RESOURCE.explainCreateOrReplaceStatementUnsupported());
}

if ((stmt instanceof SqlAlterMaterializedTable) && !(stmt instanceof SqlAlterMaterializedTableAsQuery)) {
throw SqlUtil.newContextException(
getPos(),
ParserResource.RESOURCE.explainAlterMaterializedTableUnsupported());
}

return new SqlRichExplain(getPos(), stmt, explainDetails);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public interface ParserResource {
"Unsupported CREATE OR REPLACE statement for EXPLAIN. The statement must define a query using the AS clause (i.e. CTAS/RTAS statements).")
Resources.ExInst<ParseException> explainCreateOrReplaceStatementUnsupported();

@Resources.BaseMessage(
"Unsupported ALTER MATERIALIZED TABLE statement for EXPLAIN. The statement must define a query using the AS clause.")
Resources.ExInst<ParseException> explainAlterMaterializedTableUnsupported();

@Resources.BaseMessage(
"Columns identifiers without types in the schema are supported on CTAS/RTAS statements only.")
Resources.ExInst<ParseException> columnsIdentifiersUnsupported();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3170,29 +3170,40 @@ void testAnalyzeTable() {
.fails("(?s).*Encountered \"\\,\" at line 1, column 32.\n.*");
}

@Test
void testExplainCreateTableNoSupported() {
this.sql("EXPLAIN CREATE TABLE t (id int^)^")
@ParameterizedTest
@CsvSource({"CREATE", "CREATE OR REPLACE"})
void testExplainCreateTableNotSupported(final String operation) {
this.sql(String.format("EXPLAIN %s TABLE t (id int^)^", operation))
.fails(
"Unsupported CREATE OR REPLACE statement for EXPLAIN\\. The statement must define a query using the AS clause \\(i\\.e\\. CTAS/RTAS statements\\)\\.");
}

@Test
void testExplainCreateTableAsSelect() {
this.sql("EXPLAIN CREATE TABLE t AS SELECT * FROM b")
.ok("EXPLAIN CREATE TABLE `T`\nAS\nSELECT *\nFROM `B`");
}

@Test
void testExplainCreateOrReplaceTableAsSelect() {
this.sql("EXPLAIN CREATE OR REPLACE TABLE t AS SELECT * FROM b")
.ok("EXPLAIN CREATE OR REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`");
@ParameterizedTest
@CsvSource({
"ALTER MATERIALIZED TABLE t ^SUSPEND^",
"ALTER MATERIALIZED TABLE t ^RESUME^",
"ALTER MATERIALIZED TABLE t ADD category_id STRING METADATA ^VIRTUAL^",
"ALTER MATERIALIZED TABLE t MODIFY measurement double ^METADATA^"
})
void testExplainAlterMaterializedTableWithoutAsQueryNotSupported(
final String alterMaterializedTable) {
this.sql(String.format("EXPLAIN %s", alterMaterializedTable))
.fails(
"Unsupported ALTER MATERIALIZED TABLE statement for EXPLAIN\\. The statement must define a query using the AS clause.");
}

@Test
void testExplainReplaceTableAsSelect() {
this.sql("EXPLAIN REPLACE TABLE t AS SELECT * FROM b")
.ok("EXPLAIN REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`");
@ParameterizedTest
@CsvSource({
"CREATE",
"CREATE OR REPLACE",
"REPLACE",
"CREATE MATERIALIZED",
"ALTER MATERIALIZED",
"CREATE OR ALTER MATERIALIZED",
})
void testExplain(final String operation) {
this.sql(String.format("EXPLAIN %s TABLE t AS SELECT * FROM b", operation))
.ok(String.format("EXPLAIN %s TABLE `T`\nAS\nSELECT *\nFROM `B`", operation));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
import org.apache.flink.table.operations.materializedtable.ConvertTableToMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;

/**
* Class that implements visitor pattern. It allows type safe logic on top of tree of {@link
Expand All @@ -39,4 +42,10 @@ public interface ModifyOperationVisitor<T> {
T visit(CreateTableASOperation ctas);

T visit(ReplaceTableAsOperation rtas);

T visit(CreateMaterializedTableOperation createMaterializedTableOperation);

T visit(AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation);

T visit(ConvertTableToMaterializedTableOperation convertTableToMaterializedTableOperation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.operations.QueryOperation;

import java.util.List;
import java.util.function.Function;
Expand All @@ -40,8 +41,9 @@ public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTab
public AlterMaterializedTableAsQueryOperation(
ObjectIdentifier tableIdentifier,
Function<ResolvedCatalogMaterializedTable, List<TableChange>> tableChangesForTable,
ResolvedCatalogMaterializedTable oldTable) {
super(tableIdentifier, tableChangesForTable, oldTable);
ResolvedCatalogMaterializedTable oldTable,
QueryOperation sinkModifyQuery) {
super(tableIdentifier, tableChangesForTable, oldTable, sinkModifyQuery);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
import org.apache.flink.table.catalog.TableChange.ModifyStartMode;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.ModifyOperationVisitor;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;

import java.util.ArrayList;
Expand All @@ -50,9 +53,11 @@
* Alter materialized table with new table definition and table changes represents the modification.
*/
@Internal
public class AlterMaterializedTableChangeOperation extends AlterMaterializedTableOperation {
public class AlterMaterializedTableChangeOperation extends AlterMaterializedTableOperation
implements ModifyOperation {

private final Function<ResolvedCatalogMaterializedTable, List<TableChange>> tableChangeForTable;
private final QueryOperation sinkModifyQuery;
private ResolvedCatalogMaterializedTable oldTable;
private MaterializedTableChangeHandler handler;
private CatalogMaterializedTable newTable;
Expand All @@ -62,9 +67,23 @@ public AlterMaterializedTableChangeOperation(
ObjectIdentifier tableIdentifier,
Function<ResolvedCatalogMaterializedTable, List<TableChange>> tableChangeForTable,
ResolvedCatalogMaterializedTable oldTable) {
// Metadata-only changes (options, schema, distribution, ...) carry no sink-modifying query.
this(tableIdentifier, tableChangeForTable, oldTable, null);
}

public AlterMaterializedTableChangeOperation(
ObjectIdentifier tableIdentifier,
Function<ResolvedCatalogMaterializedTable, List<TableChange>> tableChangeForTable,
ResolvedCatalogMaterializedTable oldTable,
QueryOperation sinkModifyQuery) {
super(tableIdentifier);
this.tableChangeForTable = tableChangeForTable;
this.oldTable = oldTable;
this.sinkModifyQuery = sinkModifyQuery;
}

public QueryOperation getSinkModifyQuery() {
return sinkModifyQuery;
}

public List<TableChange> getTableChanges() {
Expand All @@ -76,7 +95,7 @@ public List<TableChange> getTableChanges() {

public AlterMaterializedTableChangeOperation copyAsTableChangeOperation() {
return new AlterMaterializedTableChangeOperation(
tableIdentifier, tableChangeForTable, oldTable);
tableIdentifier, tableChangeForTable, oldTable, sinkModifyQuery);
}

public CatalogMaterializedTable getNewTable() {
Expand Down Expand Up @@ -144,6 +163,16 @@ public String asSummaryString() {
"%s %s\n%s", getOperationName(), tableIdentifier.asSummaryString(), changes);
}

@Override
public QueryOperation getChild() {
return this.sinkModifyQuery;
}

@Override
public <T> T accept(final ModifyOperationVisitor<T> visitor) {
return visitor.visit(this);
}

/** Hook for subclasses to provide a different new-table builder. */
protected CatalogMaterializedTable computeNewTable() {
return MaterializedTableChangeHandler.buildNewMaterializedTable(getHandlerWithChanges());
Expand Down
Loading