From 99a24cd4166cf0341758b36a25017094e195ffac Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Mon, 22 Jun 2026 15:49:41 +0800 Subject: [PATCH 01/16] [FLINK] Support ORC filesystem sink format --- .github/workflows/flink.yml | 2 +- gluten-flink/ut/pom.xml | 6 ++++++ gluten-flink/ut/src/test/resources/nexmark/q10.sql | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index cf607f5f304..4c412dcfcb2 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca + cd velox4j && git reset --hard acad51ca0c9f678c4b36d6424fdd3d19c1c3eaae git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml index 03bda070346..4e369a74a21 100644 --- a/gluten-flink/ut/pom.xml +++ b/gluten-flink/ut/pom.xml @@ -194,6 +194,12 @@ ${flink.version} test + + org.apache.flink + flink-orc + ${flink.version} + test + diff --git a/gluten-flink/ut/src/test/resources/nexmark/q10.sql b/gluten-flink/ut/src/test/resources/nexmark/q10.sql index 6d13d7c85f6..49aaaca4a2d 100644 --- a/gluten-flink/ut/src/test/resources/nexmark/q10.sql +++ b/gluten-flink/ut/src/test/resources/nexmark/q10.sql @@ -9,7 +9,7 @@ CREATE TABLE nexmark_q10 ( ) PARTITIONED BY (dt, hm) WITH ( 'connector' = 'filesystem', 'path' = 'file:///tmp/data/output/bid/', - 'format' = 'csv', + 'format' = 'orc', 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 min', 'sink.partition-commit.policy.kind' = 'success-file', From 364f32925e46e067a1d4b1ad2f90d8f8623dd30a Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Mon, 22 Jun 2026 16:04:26 +0800 Subject: [PATCH 02/16] [FLINK] Add Nexmark q10 ORC test dependencies --- gluten-flink/ut/pom.xml | 6 ++++++ .../gluten/table/runtime/stream/custom/NexmarkTest.java | 8 ++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml index 4e369a74a21..bc7d1b1723f 100644 --- a/gluten-flink/ut/pom.xml +++ b/gluten-flink/ut/pom.xml @@ -200,6 +200,12 @@ ${flink.version} test + + org.apache.hadoop + hadoop-client + 2.7.4 + test + diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java index 476c4cba4d0..3fc3c7df4a2 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java @@ -70,7 +70,7 @@ public class NexmarkTest { } }; - private static final int KAFKA_PORT = 9092; + private static final int KAFKA_PORT = Integer.getInteger("gluten.flink.ut.kafka.port", 9092); private static String topicName = "nexmark"; @RegisterExtension @@ -82,7 +82,7 @@ public class NexmarkTest { private static final Map KAFKA_VARIABLES = new HashMap<>() { { - put("BOOTSTRAP_SERVERS", "localhost:9092"); + put("BOOTSTRAP_SERVERS", "localhost:" + KAFKA_PORT); put("NEXMARK_TABLE", "kafka"); } }; @@ -242,6 +242,10 @@ private List getQueries() { } } + String query = System.getProperty("gluten.flink.ut.nexmark.query"); + if (query != null && !query.isEmpty()) { + return queryFiles.stream().filter(query::equals).sorted().collect(Collectors.toList()); + } return queryFiles.stream().sorted().collect(Collectors.toList()); } catch (URISyntaxException | IOException e) { From a14c680083adc9c3f541a1cd07d5cfe1f4e6389f Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Mon, 22 Jun 2026 16:36:38 +0800 Subject: [PATCH 03/16] Update Flink ORC velox4j dependency --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 4c412dcfcb2..eb6a74ea3b8 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard acad51ca0c9f678c4b36d6424fdd3d19c1c3eaae + cd velox4j && git reset --hard 1154e82145f2c13279265130d782322063e5fda2 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From 10172d9d28cea4a85ef81aaa55355fd126ee74f2 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Mon, 22 Jun 2026 18:32:18 +0800 Subject: [PATCH 04/16] Update Flink CI Velox4J revision --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index eb6a74ea3b8..54fb4288f2f 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 1154e82145f2c13279265130d782322063e5fda2 + cd velox4j && git reset --hard 0794a28389f81a21285df006b6018b1b561606ce git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From c9b4e0af765a5cd8acb0ca5e5219d9242f3ae01c Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 10:11:41 +0800 Subject: [PATCH 05/16] [FLINK] Verify ORC sink output read-back --- .../operators/GlutenOneInputOperator.java | 55 ++++++++++++++----- gluten-flink/ut/pom.xml | 18 ++++++ .../runtime/stream/custom/NexmarkTest.java | 47 ++++++++++++++++ 3 files changed, 106 insertions(+), 14 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 09b059f3bc2..08bf43c41f4 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -195,24 +195,46 @@ private void drainTaskOutput() { while (true) { UpIterator.State state = task.advance(); if (state == UpIterator.State.AVAILABLE) { - final StatefulElement statefulElement = task.statefulGet(); - try { - if (statefulElement.isWatermark()) { - StatefulWatermark watermark = statefulElement.asWatermark(); - output.emitWatermark(new Watermark(watermark.getTimestamp())); - } else { - outputBridge.collect( - output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType); - } - } finally { - statefulElement.close(); - } + processAvailableElement(); } else { break; } } } + private void processAvailableElement() { + final StatefulElement statefulElement = task.statefulGet(); + try { + if (statefulElement.isWatermark()) { + StatefulWatermark watermark = statefulElement.asWatermark(); + output.emitWatermark(new Watermark(watermark.getTimestamp())); + } else { + outputBridge.collect( + output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType); + } + } finally { + statefulElement.close(); + } + } + + private void finishTask() { + while (true) { + UpIterator.State state = task.advance(); + switch (state) { + case AVAILABLE: + processAvailableElement(); + break; + case BLOCKED: + task.waitFor(); + break; + case FINISHED: + return; + default: + throw new IllegalStateException("Unknown Velox task state: " + state); + } + } + } + public GlutenOneInputOperator cloneWithInputOutputClasses( StatefulPlanNode plan, Class newInClass, Class newOutClass) { return new GlutenOneInputOperator<>( @@ -238,13 +260,18 @@ public void processWatermark2(Watermark mark) throws Exception { @Override public void close() throws Exception { + if (inputQueue != null) { + inputQueue.noMoreInput(); + } if (task != null) { - task.close(); + finishTask(); } if (inputQueue != null) { - inputQueue.noMoreInput(); inputQueue.close(); } + if (task != null) { + task.close(); + } if (sessionResource != null) { sessionResource.close(); } diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml index bc7d1b1723f..b33a0b2c232 100644 --- a/gluten-flink/ut/pom.xml +++ b/gluten-flink/ut/pom.xml @@ -200,6 +200,24 @@ ${flink.version} test + + org.apache.orc + orc-core + 1.9.2 + test + + + org.apache.hive + hive-storage-api + 2.8.1 + test + + + com.google.protobuf + protobuf-java + 3.25.5 + test + org.apache.hadoop hadoop-client diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java index 3fc3c7df4a2..a54578b0c16 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java @@ -27,6 +27,9 @@ import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; import com.salesforce.kafka.test.listeners.PlainListener; +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -40,6 +43,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.attribute.FileTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -179,6 +183,7 @@ private static void clearEnvironment(StreamTableEnvironment tEnv) { private void executeQuery(StreamTableEnvironment tEnv, String queryFileName, boolean kafkaSource) throws ExecutionException, InterruptedException, TimeoutException { String queryContent = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" + queryFileName); + long queryStartMillis = System.currentTimeMillis(); String[] sqlStatements = queryContent.split(";"); assertThat(sqlStatements.length).isGreaterThanOrEqualTo(2); @@ -200,11 +205,53 @@ private void executeQuery(StreamTableEnvironment tEnv, String queryFileName, boo assertThat(checkJobRunningStatus(insertResult, 30000) == true); } else { waitForJobCompletion(insertResult, 30000); + if ("q10.sql".equals(queryFileName)) { + verifyQ10OrcOutput(queryStartMillis); + } } } assertTrue(sqlStatements[sqlStatements.length - 1].trim().isEmpty()); } + private void verifyQ10OrcOutput(long queryStartMillis) { + Path outputDir = Paths.get("/tmp/data/output/bid"); + assertTrue("Q10 ORC output directory should exist", Files.exists(outputDir)); + try { + List outputFiles; + try (java.util.stream.Stream files = Files.walk(outputDir)) { + outputFiles = + files + .filter(Files::isRegularFile) + .filter(path -> isModifiedAfter(path, queryStartMillis)) + .sorted() + .collect(Collectors.toList()); + } + assertThat(outputFiles).isNotEmpty(); + long rowCount = 0; + for (Path outputFile : outputFiles) { + Reader reader = + OrcFile.createReader( + new org.apache.hadoop.fs.Path(outputFile.toUri()), + OrcFile.readerOptions(new Configuration())); + assertThat(reader.getSchema().getFieldNames()) + .containsExactly("auction", "bidder", "price", "dateTime", "extra"); + rowCount += reader.getNumberOfRows(); + } + assertThat(rowCount).isGreaterThan(0); + } catch (IOException e) { + throw new RuntimeException("Failed to read back Q10 ORC output", e); + } + } + + private boolean isModifiedAfter(Path path, long queryStartMillis) { + try { + FileTime lastModifiedTime = Files.getLastModifiedTime(path); + return lastModifiedTime.toMillis() >= queryStartMillis; + } catch (IOException e) { + throw new RuntimeException("Failed to inspect output file " + path, e); + } + } + private void waitForJobCompletion(TableResult result, long timeoutMs) throws InterruptedException, ExecutionException, TimeoutException { assertTrue(result.getJobClient().isPresent()); From 2fbcbcdb46e3357ffcd8e59bf22392d94ee3178d Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 10:14:07 +0800 Subject: [PATCH 06/16] Update Flink CI Velox4J ORC revision --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 54fb4288f2f..389ae82e22e 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 0794a28389f81a21285df006b6018b1b561606ce + cd velox4j && git reset --hard 42c6e2c36eb23a4a2b78e790d75b09ba973912a0 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From 4e421420fe7b7e487a19d8632dd35a5f4c1bb3ac Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 10:15:50 +0800 Subject: [PATCH 07/16] Update Flink CI Velox4J revision --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 389ae82e22e..f271977fc8d 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 42c6e2c36eb23a4a2b78e790d75b09ba973912a0 + cd velox4j && git reset --hard bd8708eb07a691445807c9445390ed1cb39c3874 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From 7edeec4143cfb1852b11a9e77694c8c96e0a57bc Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 11:55:38 +0800 Subject: [PATCH 08/16] [FLINK] Gracefully finish two-input Velox operators --- .../operators/GlutenTwoInputOperator.java | 55 +++++++++++++++---- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 3f73ad4bbd8..8e8d8f0f206 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -175,24 +175,46 @@ private void drainTaskOutput() { while (true) { UpIterator.State state = task.advance(); if (state == UpIterator.State.AVAILABLE) { - final StatefulElement element = task.statefulGet(); - try { - if (element.isWatermark()) { - StatefulWatermark watermark = element.asWatermark(); - output.emitWatermark(new Watermark(watermark.getTimestamp())); - } else { - outputBridge.collect( - output, element.asRecord(), sessionResource.getAllocator(), outputType); - } - } finally { - element.close(); - } + processAvailableElement(); } else { break; } } } + private void processAvailableElement() { + final StatefulElement element = task.statefulGet(); + try { + if (element.isWatermark()) { + StatefulWatermark watermark = element.asWatermark(); + output.emitWatermark(new Watermark(watermark.getTimestamp())); + } else { + outputBridge.collect( + output, element.asRecord(), sessionResource.getAllocator(), outputType); + } + } finally { + element.close(); + } + } + + private void finishTask() { + while (true) { + UpIterator.State state = task.advance(); + switch (state) { + case AVAILABLE: + processAvailableElement(); + break; + case BLOCKED: + task.waitFor(); + break; + case FINISHED: + return; + default: + throw new IllegalStateException("Unknown Velox task state: " + state); + } + } + } + @Override public void processWatermark(Watermark mark) throws Exception { task.notifyWatermark(mark.getTimestamp()); @@ -213,6 +235,15 @@ public void processWatermark2(Watermark mark) throws Exception { @Override public void close() throws Exception { + if (leftInputQueue != null) { + leftInputQueue.noMoreInput(); + } + if (rightInputQueue != null) { + rightInputQueue.noMoreInput(); + } + if (task != null) { + finishTask(); + } if (leftInputQueue != null) { leftInputQueue.close(); } From 3744e44d484c58829e6907e812134b997636da1a Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 11:59:40 +0800 Subject: [PATCH 09/16] [FLINK] Guard Velox operator close cleanup --- .../operators/GlutenOneInputOperator.java | 41 +++++++++----- .../operators/GlutenTwoInputOperator.java | 56 ++++++++++++------- 2 files changed, 63 insertions(+), 34 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 08bf43c41f4..57049b72bd2 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -260,20 +260,33 @@ public void processWatermark2(Watermark mark) throws Exception { @Override public void close() throws Exception { - if (inputQueue != null) { - inputQueue.noMoreInput(); - } - if (task != null) { - finishTask(); - } - if (inputQueue != null) { - inputQueue.close(); - } - if (task != null) { - task.close(); - } - if (sessionResource != null) { - sessionResource.close(); + try { + if (inputQueue != null) { + inputQueue.noMoreInput(); + } + if (task != null) { + finishTask(); + } + } finally { + try { + if (inputQueue != null) { + inputQueue.close(); + } + } finally { + try { + if (task != null) { + task.close(); + } + } finally { + try { + if (sessionResource != null) { + sessionResource.close(); + } + } finally { + super.close(); + } + } + } } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 8e8d8f0f206..2bfcdaab2e8 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -235,26 +235,42 @@ public void processWatermark2(Watermark mark) throws Exception { @Override public void close() throws Exception { - if (leftInputQueue != null) { - leftInputQueue.noMoreInput(); - } - if (rightInputQueue != null) { - rightInputQueue.noMoreInput(); - } - if (task != null) { - finishTask(); - } - if (leftInputQueue != null) { - leftInputQueue.close(); - } - if (rightInputQueue != null) { - rightInputQueue.close(); - } - if (task != null) { - task.close(); - } - if (sessionResource != null) { - sessionResource.close(); + try { + if (leftInputQueue != null) { + leftInputQueue.noMoreInput(); + } + if (rightInputQueue != null) { + rightInputQueue.noMoreInput(); + } + if (task != null) { + finishTask(); + } + } finally { + try { + if (leftInputQueue != null) { + leftInputQueue.close(); + } + } finally { + try { + if (rightInputQueue != null) { + rightInputQueue.close(); + } + } finally { + try { + if (task != null) { + task.close(); + } + } finally { + try { + if (sessionResource != null) { + sessionResource.close(); + } + } finally { + super.close(); + } + } + } + } } } From 1122fe20f067e7a1cc27a94ebf6eae2382ac996c Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 12:06:51 +0800 Subject: [PATCH 10/16] [FLINK] Simplify Velox operator close cleanup --- .../runtime/operators/GlutenCloseables.java | 52 ++++++++++++++++ .../operators/GlutenOneInputOperator.java | 45 +++++++------- .../operators/GlutenTwoInputOperator.java | 62 +++++++++---------- 3 files changed, 102 insertions(+), 57 deletions(-) create mode 100644 gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java new file mode 100644 index 00000000000..1fe982ba457 --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.operators; + +final class GlutenCloseables { + private GlutenCloseables() {} + + static void runWithCleanup(ThrowingRunnable action, ThrowingRunnable... cleanups) + throws Exception { + Exception failure = null; + try { + action.run(); + } catch (Exception e) { + failure = e; + } + + for (ThrowingRunnable cleanup : cleanups) { + try { + cleanup.run(); + } catch (Exception e) { + if (failure == null) { + failure = e; + } else { + failure.addSuppressed(e); + } + } + } + + if (failure != null) { + throw failure; + } + } + + @FunctionalInterface + interface ThrowingRunnable { + void run() throws Exception; + } +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 57049b72bd2..c461ed44abd 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -260,34 +260,31 @@ public void processWatermark2(Watermark mark) throws Exception { @Override public void close() throws Exception { - try { - if (inputQueue != null) { - inputQueue.noMoreInput(); - } - if (task != null) { - finishTask(); - } - } finally { - try { - if (inputQueue != null) { - inputQueue.close(); - } - } finally { - try { + GlutenCloseables.runWithCleanup( + () -> { + if (inputQueue != null) { + inputQueue.noMoreInput(); + } + if (task != null) { + finishTask(); + } + }, + () -> { + if (inputQueue != null) { + inputQueue.close(); + } + }, + () -> { if (task != null) { task.close(); } - } finally { - try { - if (sessionResource != null) { - sessionResource.close(); - } - } finally { - super.close(); + }, + () -> { + if (sessionResource != null) { + sessionResource.close(); } - } - } - } + }, + super::close); } @Override diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 2bfcdaab2e8..5eaa66b3aff 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -235,43 +235,39 @@ public void processWatermark2(Watermark mark) throws Exception { @Override public void close() throws Exception { - try { - if (leftInputQueue != null) { - leftInputQueue.noMoreInput(); - } - if (rightInputQueue != null) { - rightInputQueue.noMoreInput(); - } - if (task != null) { - finishTask(); - } - } finally { - try { - if (leftInputQueue != null) { - leftInputQueue.close(); - } - } finally { - try { + GlutenCloseables.runWithCleanup( + () -> { + if (leftInputQueue != null) { + leftInputQueue.noMoreInput(); + } + if (rightInputQueue != null) { + rightInputQueue.noMoreInput(); + } + if (task != null) { + finishTask(); + } + }, + () -> { + if (leftInputQueue != null) { + leftInputQueue.close(); + } + }, + () -> { if (rightInputQueue != null) { rightInputQueue.close(); } - } finally { - try { - if (task != null) { - task.close(); - } - } finally { - try { - if (sessionResource != null) { - sessionResource.close(); - } - } finally { - super.close(); - } + }, + () -> { + if (task != null) { + task.close(); } - } - } - } + }, + () -> { + if (sessionResource != null) { + sessionResource.close(); + } + }, + super::close); } @Override From bcfc8e07c72cd3c876b62e2ec3ab6a08fde81fbf Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 12:19:08 +0800 Subject: [PATCH 11/16] [FLINK] Test two-input operator close cleanup --- .../GlutenTwoInputOperatorCloseTest.java | 168 ++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java new file mode 100644 index 00000000000..ebdbdeb941b --- /dev/null +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.operators; + +import io.github.zhztheplayer.velox4j.connector.ExternalStreams; +import io.github.zhztheplayer.velox4j.data.RowVector; +import io.github.zhztheplayer.velox4j.iterator.UpIterator; +import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; +import io.github.zhztheplayer.velox4j.query.SerialTask; +import io.github.zhztheplayer.velox4j.stateful.StatefulElement; +import io.github.zhztheplayer.velox4j.type.BigIntType; +import io.github.zhztheplayer.velox4j.type.RowType; + +import org.apache.flink.table.data.RowData; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class GlutenTwoInputOperatorCloseTest { + private static final RowType ROW_TYPE = new RowType(List.of("c0"), List.of(new BigIntType())); + + @Test + public void testCloseFinishesBeforeClosingResources() throws Exception { + List calls = new ArrayList<>(); + GlutenTwoInputOperator operator = newOperator(); + FakeBlockingQueue leftQueue = new FakeBlockingQueue(1, "left", calls); + FakeBlockingQueue rightQueue = new FakeBlockingQueue(2, "right", calls); + FakeSerialTask task = new FakeSerialTask(calls, false); + setField(operator, "leftInputQueue", leftQueue); + setField(operator, "rightInputQueue", rightQueue); + setField(operator, "task", task); + + operator.close(); + + assertThat(calls) + .containsExactly( + "left.noMoreInput", + "right.noMoreInput", + "task.advance", + "left.close", + "right.close", + "task.close"); + } + + @Test + public void testCloseCleansResourcesWhenFinishFails() throws Exception { + List calls = new ArrayList<>(); + GlutenTwoInputOperator operator = newOperator(); + FakeBlockingQueue leftQueue = new FakeBlockingQueue(1, "left", calls); + FakeBlockingQueue rightQueue = new FakeBlockingQueue(2, "right", calls); + FakeSerialTask task = new FakeSerialTask(calls, true); + setField(operator, "leftInputQueue", leftQueue); + setField(operator, "rightInputQueue", rightQueue); + setField(operator, "task", task); + + assertThatThrownBy(operator::close).isInstanceOf(RuntimeException.class).hasMessage("finish"); + + assertThat(calls) + .containsExactly( + "left.noMoreInput", + "right.noMoreInput", + "task.advance", + "left.close", + "right.close", + "task.close"); + } + + private static GlutenTwoInputOperator newOperator() { + StatefulPlanNode plan = new StatefulPlanNode("join", null); + return new GlutenTwoInputOperator<>( + plan, + "left", + "right", + ROW_TYPE, + ROW_TYPE, + Map.of("join", ROW_TYPE), + RowData.class, + RowData.class); + } + + private static void setField(Object target, String name, Object value) throws Exception { + Field field = GlutenTwoInputOperator.class.getDeclaredField(name); + field.setAccessible(true); + field.set(target, value); + } + + private static class FakeBlockingQueue extends ExternalStreams.BlockingQueue { + private final String name; + private final List calls; + + private FakeBlockingQueue(long id, String name, List calls) { + super(id); + this.name = name; + this.calls = calls; + } + + @Override + public void put(RowVector vector) { + throw new UnsupportedOperationException(); + } + + @Override + public void noMoreInput() { + calls.add(name + ".noMoreInput"); + } + + @Override + public void close() { + calls.add(name + ".close"); + } + } + + private static class FakeSerialTask extends SerialTask { + private final List calls; + private final boolean failAdvance; + + private FakeSerialTask(List calls, boolean failAdvance) { + super(null, 3); + this.calls = calls; + this.failAdvance = failAdvance; + } + + @Override + public UpIterator.State advance() { + calls.add("task.advance"); + if (failAdvance) { + throw new RuntimeException("finish"); + } + return UpIterator.State.FINISHED; + } + + @Override + public void waitFor() { + throw new UnsupportedOperationException(); + } + + @Override + public StatefulElement statefulGet() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + calls.add("task.close"); + } + } +} From 3ac0d5096fd2eb0451b7a588272aab4dc7d4a31a Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 12:29:07 +0800 Subject: [PATCH 12/16] [FLINK] Add dedicated Nexmark ORC q10 query --- .../runtime/stream/custom/NexmarkTest.java | 3 ++- .../ut/src/test/resources/nexmark/q10.sql | 2 +- .../ut/src/test/resources/nexmark/q10_orc.sql | 23 +++++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 gluten-flink/ut/src/test/resources/nexmark/q10_orc.sql diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java index a54578b0c16..cffb8b1809b 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java @@ -170,6 +170,7 @@ private static void clearEnvironment(StreamTableEnvironment tEnv) { String sql = String.format("drop table if exists %s", tableName); tEnv.executeSql(sql); } + tEnv.executeSql("drop table if exists nexmark_q10_orc"); for (String view : VIEWS) { String sql = String.format("drop view if exists %s", view); tEnv.executeSql(sql); @@ -205,7 +206,7 @@ private void executeQuery(StreamTableEnvironment tEnv, String queryFileName, boo assertThat(checkJobRunningStatus(insertResult, 30000) == true); } else { waitForJobCompletion(insertResult, 30000); - if ("q10.sql".equals(queryFileName)) { + if ("q10_orc.sql".equals(queryFileName)) { verifyQ10OrcOutput(queryStartMillis); } } diff --git a/gluten-flink/ut/src/test/resources/nexmark/q10.sql b/gluten-flink/ut/src/test/resources/nexmark/q10.sql index 49aaaca4a2d..6d13d7c85f6 100644 --- a/gluten-flink/ut/src/test/resources/nexmark/q10.sql +++ b/gluten-flink/ut/src/test/resources/nexmark/q10.sql @@ -9,7 +9,7 @@ CREATE TABLE nexmark_q10 ( ) PARTITIONED BY (dt, hm) WITH ( 'connector' = 'filesystem', 'path' = 'file:///tmp/data/output/bid/', - 'format' = 'orc', + 'format' = 'csv', 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 min', 'sink.partition-commit.policy.kind' = 'success-file', diff --git a/gluten-flink/ut/src/test/resources/nexmark/q10_orc.sql b/gluten-flink/ut/src/test/resources/nexmark/q10_orc.sql new file mode 100644 index 00000000000..6b00f387868 --- /dev/null +++ b/gluten-flink/ut/src/test/resources/nexmark/q10_orc.sql @@ -0,0 +1,23 @@ +CREATE TABLE nexmark_q10_orc ( + auction BIGINT, + bidder BIGINT, + price BIGINT, + `dateTime` TIMESTAMP(3), + extra VARCHAR, + dt STRING, + hm STRING +) PARTITIONED BY (dt, hm) WITH ( + 'connector' = 'filesystem', + 'path' = 'file:///tmp/data/output/bid/', + 'format' = 'orc', + 'sink.partition-commit.trigger' = 'partition-time', + 'sink.partition-commit.delay' = '1 min', + 'sink.partition-commit.policy.kind' = 'success-file', + 'partition.time-extractor.timestamp-pattern' = '$dt $hm:00', + 'sink.rolling-policy.rollover-interval' = '1min', + 'sink.rolling-policy.check-interval' = '1min' +); + +INSERT INTO nexmark_q10_orc +SELECT auction, bidder, price, `dateTime`, extra, DATE_FORMAT(`dateTime`, 'yyyy-MM-dd'), DATE_FORMAT(`dateTime`, 'HH:mm') +FROM bid; From d428c3670a09dac5331dd6b7096673e19dd73429 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 13:15:01 +0800 Subject: [PATCH 13/16] [FLINK] Avoid finishing Velox tasks during close --- .../operators/GlutenOneInputOperator.java | 22 +-- .../operators/GlutenTwoInputOperator.java | 40 ++++-- .../GlutenOneInputOperatorCloseTest.java | 135 ++++++++++++++++++ .../GlutenTwoInputOperatorCloseTest.java | 59 +++++--- 4 files changed, 214 insertions(+), 42 deletions(-) create mode 100644 gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperatorCloseTest.java diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index c461ed44abd..87ba296d576 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -39,6 +39,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -49,7 +50,7 @@ /** Calculate operator in gluten, which will call Velox to run. */ public class GlutenOneInputOperator extends TableStreamOperator - implements OneInputStreamOperator, GlutenOperator { + implements OneInputStreamOperator, BoundedOneInput, GlutenOperator { private final StatefulPlanNode glutenPlan; private final String id; @@ -258,17 +259,20 @@ public void processWatermark2(Watermark mark) throws Exception { throw new UnsupportedOperationException("Not implemented for GlutenOneInputOperator"); } + @Override + public void endInput() throws Exception { + if (inputQueue != null) { + inputQueue.noMoreInput(); + } + if (task != null) { + finishTask(); + } + } + @Override public void close() throws Exception { GlutenCloseables.runWithCleanup( - () -> { - if (inputQueue != null) { - inputQueue.noMoreInput(); - } - if (task != null) { - finishTask(); - } - }, + () -> {}, () -> { if (inputQueue != null) { inputQueue.close(); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 5eaa66b3aff..824a442c17f 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -50,7 +51,7 @@ * instead of flink RowData. */ public class GlutenTwoInputOperator extends AbstractStreamOperator - implements TwoInputStreamOperator, GlutenOperator { + implements TwoInputStreamOperator, BoundedMultiInput, GlutenOperator { private static final Logger LOG = LoggerFactory.getLogger(GlutenTwoInputOperator.class); @@ -73,6 +74,8 @@ public class GlutenTwoInputOperator extends AbstractStreamOperator private VectorOutputBridge outputBridge; private String description; private final GlutenMailboxHolder mailboxHolder = new GlutenMailboxHolder(); + private boolean leftInputEnded; + private boolean rightInputEnded; public GlutenTwoInputOperator( StatefulPlanNode plan, @@ -233,20 +236,33 @@ public void processWatermark2(Watermark mark) throws Exception { processElementInternal(); } + @Override + public void endInput(int inputId) throws Exception { + switch (inputId) { + case 1: + leftInputEnded = true; + if (leftInputQueue != null) { + leftInputQueue.noMoreInput(); + } + break; + case 2: + rightInputEnded = true; + if (rightInputQueue != null) { + rightInputQueue.noMoreInput(); + } + break; + default: + throw new IllegalArgumentException("Unknown input id: " + inputId); + } + if (leftInputEnded && rightInputEnded && task != null) { + finishTask(); + } + } + @Override public void close() throws Exception { GlutenCloseables.runWithCleanup( - () -> { - if (leftInputQueue != null) { - leftInputQueue.noMoreInput(); - } - if (rightInputQueue != null) { - rightInputQueue.noMoreInput(); - } - if (task != null) { - finishTask(); - } - }, + () -> {}, () -> { if (leftInputQueue != null) { leftInputQueue.close(); diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperatorCloseTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperatorCloseTest.java new file mode 100644 index 00000000000..d1e59ba5365 --- /dev/null +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperatorCloseTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.operators; + +import io.github.zhztheplayer.velox4j.connector.ExternalStreams; +import io.github.zhztheplayer.velox4j.data.RowVector; +import io.github.zhztheplayer.velox4j.iterator.UpIterator; +import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; +import io.github.zhztheplayer.velox4j.query.SerialTask; +import io.github.zhztheplayer.velox4j.stateful.StatefulElement; +import io.github.zhztheplayer.velox4j.type.BigIntType; +import io.github.zhztheplayer.velox4j.type.RowType; + +import org.apache.flink.table.data.RowData; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class GlutenOneInputOperatorCloseTest { + private static final RowType ROW_TYPE = new RowType(List.of("c0"), List.of(new BigIntType())); + + @Test + public void testEndInputFinishesTask() throws Exception { + List calls = new ArrayList<>(); + GlutenOneInputOperator operator = newOperator(); + FakeBlockingQueue inputQueue = new FakeBlockingQueue(1, calls); + FakeSerialTask task = new FakeSerialTask(calls); + setField(operator, "inputQueue", inputQueue); + setField(operator, "task", task); + + operator.endInput(); + + assertThat(calls).containsExactly("input.noMoreInput", "task.advance"); + } + + @Test + public void testCloseOnlyCleansResources() throws Exception { + List calls = new ArrayList<>(); + GlutenOneInputOperator operator = newOperator(); + FakeBlockingQueue inputQueue = new FakeBlockingQueue(1, calls); + FakeSerialTask task = new FakeSerialTask(calls); + setField(operator, "inputQueue", inputQueue); + setField(operator, "task", task); + + operator.close(); + + assertThat(calls).containsExactly("input.close", "task.close"); + } + + private static GlutenOneInputOperator newOperator() { + StatefulPlanNode plan = new StatefulPlanNode("project", null); + return new GlutenOneInputOperator<>( + plan, "input", ROW_TYPE, Map.of("project", ROW_TYPE), RowData.class, RowData.class); + } + + private static void setField(Object target, String name, Object value) throws Exception { + Field field = GlutenOneInputOperator.class.getDeclaredField(name); + field.setAccessible(true); + field.set(target, value); + } + + private static class FakeBlockingQueue extends ExternalStreams.BlockingQueue { + private final List calls; + + private FakeBlockingQueue(long id, List calls) { + super(id); + this.calls = calls; + } + + @Override + public void put(RowVector vector) { + throw new UnsupportedOperationException(); + } + + @Override + public void noMoreInput() { + calls.add("input.noMoreInput"); + } + + @Override + public void close() { + calls.add("input.close"); + } + } + + private static class FakeSerialTask extends SerialTask { + private final List calls; + + private FakeSerialTask(List calls) { + super(null, 2); + this.calls = calls; + } + + @Override + public UpIterator.State advance() { + calls.add("task.advance"); + return UpIterator.State.FINISHED; + } + + @Override + public void waitFor() { + throw new UnsupportedOperationException(); + } + + @Override + public StatefulElement statefulGet() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + calls.add("task.close"); + } + } +} diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java index ebdbdeb941b..167473b5cb2 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java @@ -41,7 +41,7 @@ public class GlutenTwoInputOperatorCloseTest { private static final RowType ROW_TYPE = new RowType(List.of("c0"), List.of(new BigIntType())); @Test - public void testCloseFinishesBeforeClosingResources() throws Exception { + public void testEndInputFinishesAfterBothInputsEnd() throws Exception { List calls = new ArrayList<>(); GlutenTwoInputOperator operator = newOperator(); FakeBlockingQueue leftQueue = new FakeBlockingQueue(1, "left", calls); @@ -51,39 +51,47 @@ public void testCloseFinishesBeforeClosingResources() throws Exception { setField(operator, "rightInputQueue", rightQueue); setField(operator, "task", task); - operator.close(); + operator.endInput(1); + + assertThat(calls).containsExactly("left.noMoreInput"); + + operator.endInput(2); - assertThat(calls) - .containsExactly( - "left.noMoreInput", - "right.noMoreInput", - "task.advance", - "left.close", - "right.close", - "task.close"); + assertThat(calls).containsExactly("left.noMoreInput", "right.noMoreInput", "task.advance"); } @Test - public void testCloseCleansResourcesWhenFinishFails() throws Exception { + public void testCloseOnlyCleansResources() throws Exception { List calls = new ArrayList<>(); GlutenTwoInputOperator operator = newOperator(); FakeBlockingQueue leftQueue = new FakeBlockingQueue(1, "left", calls); FakeBlockingQueue rightQueue = new FakeBlockingQueue(2, "right", calls); - FakeSerialTask task = new FakeSerialTask(calls, true); + FakeSerialTask task = new FakeSerialTask(calls, false); + setField(operator, "leftInputQueue", leftQueue); + setField(operator, "rightInputQueue", rightQueue); + setField(operator, "task", task); + + operator.close(); + + assertThat(calls).containsExactly("left.close", "right.close", "task.close"); + } + + @Test + public void testCloseCleansResourcesWhenCloseFails() throws Exception { + List calls = new ArrayList<>(); + GlutenTwoInputOperator operator = newOperator(); + FakeBlockingQueue leftQueue = new FakeBlockingQueue(1, "left", calls, true); + FakeBlockingQueue rightQueue = new FakeBlockingQueue(2, "right", calls); + FakeSerialTask task = new FakeSerialTask(calls, false); setField(operator, "leftInputQueue", leftQueue); setField(operator, "rightInputQueue", rightQueue); setField(operator, "task", task); - assertThatThrownBy(operator::close).isInstanceOf(RuntimeException.class).hasMessage("finish"); + assertThatThrownBy(operator::close) + .isInstanceOf(RuntimeException.class) + .hasMessage("left.close"); - assertThat(calls) - .containsExactly( - "left.noMoreInput", - "right.noMoreInput", - "task.advance", - "left.close", - "right.close", - "task.close"); + assertThat(calls).containsExactly("left.close", "right.close", "task.close"); } private static GlutenTwoInputOperator newOperator() { @@ -108,11 +116,17 @@ private static void setField(Object target, String name, Object value) throws Ex private static class FakeBlockingQueue extends ExternalStreams.BlockingQueue { private final String name; private final List calls; + private final boolean failClose; private FakeBlockingQueue(long id, String name, List calls) { + this(id, name, calls, false); + } + + private FakeBlockingQueue(long id, String name, List calls, boolean failClose) { super(id); this.name = name; this.calls = calls; + this.failClose = failClose; } @Override @@ -128,6 +142,9 @@ public void noMoreInput() { @Override public void close() { calls.add(name + ".close"); + if (failClose) { + throw new RuntimeException(name + ".close"); + } } } From b5664a96c4d31cfec5a6b9174bf4a2d089ad6a3f Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 16:29:56 +0800 Subject: [PATCH 14/16] [FLINK] Update Velox4J ORC revision --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index f271977fc8d..caabef4d219 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard bd8708eb07a691445807c9445390ed1cb39c3874 + cd velox4j && git reset --hard ef7d100ec6577bf64575dbafb4597911bbdfb473 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From eca1f222c5f7fa36ad8645a12348c8f57b205f86 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 18:14:35 +0800 Subject: [PATCH 15/16] [FLINK] Update Velox4J gluten-0530 revision --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index caabef4d219..1de4e2eb5d6 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard ef7d100ec6577bf64575dbafb4597911bbdfb473 + cd velox4j && git reset --hard 3880c78098df12f6f01ffb4fca26f1b9cf66d358 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From 647a12f23cb10a9216b8c48db78957cdebdde981 Mon Sep 17 00:00:00 2001 From: zhanglistar Date: Wed, 24 Jun 2026 13:16:38 +0800 Subject: [PATCH 16/16] Update flink.yml --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 1de4e2eb5d6..45b4fd40f90 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 3880c78098df12f6f01ffb4fca26f1b9cf66d358 + cd velox4j && git reset --hard f6ea2d7f9c79a3476827dd7fd4c16a2b67a17cc3 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd ..