diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index cf607f5f304..d3e60394a32 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca + cd velox4j && git reset --hard 97fc1edafebd0f505e613d260f77f92f5252d048 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java index ae0d94fb3cc..6533935bc75 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java @@ -23,6 +23,7 @@ import org.apache.gluten.util.ReflectUtils; import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit; +import io.github.zhztheplayer.velox4j.connector.NexmarkGeneratorConfig; import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle; import io.github.zhztheplayer.velox4j.plan.PlanNode; import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; @@ -32,17 +33,25 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; import java.util.Map; public class NexmarkSourceFactory implements VeloxSourceSinkFactory { - private static final Logger LOG = LoggerFactory.getLogger(NexmarkSourceFactory.class); + private static final ObjectMapper MAPPER = + new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) + .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE); @SuppressWarnings("rawtypes") @Override @@ -76,9 +85,6 @@ public Transformation buildVeloxSource( Object generatorConfig = ReflectUtils.getObjectField( nexmarkSourceSplit.getClass(), nexmarkSourceSplit, "generatorConfig"); - Long maxEvents = - (Long) - ReflectUtils.getObjectField(generatorConfig.getClass(), generatorConfig, "maxEvents"); PlanNode tableScan = new TableScanNode(id, outputType, new NexmarkTableHandle("connector-nexmark"), List.of()); GlutenStreamSource sourceOp = @@ -88,8 +94,7 @@ public Transformation buildVeloxSource( Map.of(id, outputType), id, new NexmarkConnectorSplit( - "connector-nexmark", - maxEvents > Integer.MAX_VALUE ? Integer.MAX_VALUE : maxEvents.intValue()), + "connector-nexmark", toVeloxNexmarkGeneratorConfig(generatorConfig)), RowData.class)); return new LegacySourceTransformation( @@ -106,4 +111,13 @@ public Transformation buildVeloxSink( Transformation transformation, Map parameters) { throw new UnsupportedOperationException("Unimplemented method 'buildSink'"); } + + private static NexmarkGeneratorConfig toVeloxNexmarkGeneratorConfig(Object javaConfig) { + try { + String json = MAPPER.writeValueAsString(javaConfig); + return MAPPER.readValue(json, NexmarkGeneratorConfig.class); + } catch (JsonProcessingException e) { + throw new TableException("Failed to convert nexmark NexmarkGeneratorConfig to velox4j", e); + } + } } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java index 737d6bab7e7..e9b9a24623d 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java @@ -20,6 +20,7 @@ import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator; import org.apache.gluten.util.LogicalTypeConverter; import org.apache.gluten.util.PlanNodeIdGenerator; +import org.apache.gluten.util.ReflectUtils; import io.github.zhztheplayer.velox4j.connector.CommitStrategy; import io.github.zhztheplayer.velox4j.connector.PrintTableHandle; @@ -30,9 +31,6 @@ import io.github.zhztheplayer.velox4j.type.RowType; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; @@ -71,33 +69,54 @@ public Transformation buildVeloxSource( throw new FlinkRuntimeException("Unimplemented method 'buildSource'"); } + // Pulls print-identifier/standard-error from RowDataPrintFunction via reflection. + // Flink 1.19.x field names: sinkIdentifier (print-identifier), target (standard-error, true = + // stderr). + static PrintOptions extractPrintOptions(Transformation transformation) { + SimpleOperatorFactory operatorFactory = + (SimpleOperatorFactory) ((LegacySinkTransformation) transformation).getOperatorFactory(); + SinkOperator sinkOp = (SinkOperator) operatorFactory.getOperator(); + Object rowDataPrintFn = sinkOp.getUserFunction(); + Object writer = + ReflectUtils.getObjectField(rowDataPrintFn.getClass(), rowDataPrintFn, "writer"); + String printIdentifier = + (String) ReflectUtils.getObjectField(writer.getClass(), writer, "sinkIdentifier"); + boolean isStdErr = (boolean) ReflectUtils.getObjectField(writer.getClass(), writer, "target"); + return new PrintOptions(printIdentifier == null ? "" : printIdentifier, isStdErr); + } + + static final class PrintOptions { + private final String printIdentifier; + private final boolean stdErr; + + PrintOptions(String printIdentifier, boolean stdErr) { + this.printIdentifier = printIdentifier; + this.stdErr = stdErr; + } + + public String getPrintIdentifier() { + return printIdentifier; + } + + public boolean isStdErr() { + return stdErr; + } + } + @SuppressWarnings({"rawtypes", "unchecked"}) @Override public Transformation buildVeloxSink( Transformation transformation, Map parameters) { Transformation inputTrans = (Transformation) transformation.getInputs().get(0); InternalTypeInfo inputTypeInfo = (InternalTypeInfo) inputTrans.getOutputType(); - Configuration config = (Configuration) parameters.get(Configuration.class.getName()); - String logDir = config.get(CoreOptions.FLINK_LOG_DIR); - String printPath; - if (logDir != null) { - printPath = String.format("file://%s/%s", logDir, "taskmanager.out"); - } else { - String flinkHomeDir = System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR); - if (flinkHomeDir == null) { - String flinkConfDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); - if (flinkConfDir == null) { - throw new FlinkRuntimeException( - "Can not get flink home directory, please set FLINK_HOME."); - } - printPath = String.format("file://%s/../log/%s", flinkConfDir, "taskmanager.out"); - } else { - printPath = String.format("file://%s/log/%s", flinkHomeDir, "taskmanager.out"); - } - } + + PrintOptions printOpts = extractPrintOptions(transformation); + RowType inputColumns = (RowType) LogicalTypeConverter.toVLType(inputTypeInfo.toLogicalType()); RowType ignore = new RowType(List.of("num"), List.of(new BigIntType())); - PrintTableHandle tableHandle = new PrintTableHandle("print-table", inputColumns, printPath); + PrintTableHandle tableHandle = + new PrintTableHandle( + "print-table", inputColumns, printOpts.getPrintIdentifier(), printOpts.isStdErr()); TableWriteNode tableWriteNode = new TableWriteNode( PlanNodeIdGenerator.newId(), diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java index 13b195b0bd3..df9da7512a6 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java @@ -38,12 +38,14 @@ public class VeloxConnectorConfig { "connector-from-elements", "connector-print"); private static final String keyTaskIndex = "task_index"; + private static final String keyParallelism = "parallelism"; private static final String keyQueryUUId = "query_uuid"; public static ConnectorConfig getConfig(RuntimeContext context) { Map configMap = new HashMap<>(); TaskInfo taskInfo = context.getTaskInfo(); configMap.put(keyTaskIndex, String.valueOf(taskInfo.getIndexOfThisSubtask())); + configMap.put(keyParallelism, String.valueOf(taskInfo.getNumberOfParallelSubtasks())); configMap.put( keyQueryUUId, UUID.nameUUIDFromBytes(context.getJobInfo().getJobId().toHexString().getBytes()) diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java index 9adae67e215..393a0064d53 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java @@ -16,7 +16,6 @@ */ package org.apache.gluten.table.runtime.stream.common; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.Table; @@ -29,6 +28,8 @@ import org.apache.flink.types.Row; import org.apache.flink.util.FlinkRuntimeException; +import com.sun.jna.Library; +import com.sun.jna.Native; import org.junit.jupiter.api.BeforeAll; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,23 @@ public class GlutenStreamingTestBase extends StreamingTestBase { private static final String EXECUTION_PLAN_PREIFX = "== Physical Execution Plan =="; private static final long timeoutMS = 30000; + // dup2 fd=1 onto a file: Velox print sink writes to std::cout, which bypasses System.setOut and + // goes straight to the process's fd=1. + private interface CLibrary extends Library { + int dup(int oldfd); + + int dup2(int oldfd, int newfd); + + int open(String path, int flags, int mode); + + int close(int fd); + } + + private static final CLibrary C_LIB = Native.load("c", CLibrary.class); + private static final int O_WRONLY = 1; + private static final int O_CREAT = 0100; + private static final int O_TRUNC = 01000; + @BeforeAll public static void setup() throws Exception { LOG.info("GlutenStreamingTestBase setup"); @@ -114,42 +132,57 @@ protected String explainExecutionPlan(String query) { protected void runAndCheck(String query, List expected) { String printResultDirPath = System.getProperty("user.dir") + "/log/"; - tEnv().getConfig().set(CoreOptions.FLINK_LOG_DIR, printResultDirPath); - String printResultFilePath = String.format("%s%s", printResultDirPath, "taskmanager.out"); + new File(printResultDirPath).mkdirs(); + String printResultFilePath = printResultDirPath + "taskmanager.out"; File printResultFile = new File(printResultFilePath); - boolean deleteResultFile = true; if (printResultFile.exists()) { - deleteResultFile = printResultFile.delete(); + printResultFile.delete(); } - Table table = tEnv().sqlQuery(query); - createPrintSinkTable("printT", table.getResolvedSchema()); - String newQuery = String.format("insert into %s %s", "printT", query); - TableResult tableResult = tEnv().executeSql(newQuery); - assertTrue(tableResult.getJobClient().isPresent()); + + int savedStdout = C_LIB.dup(1); + int fileFd = C_LIB.open(printResultFilePath, O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (fileFd < 0) { + C_LIB.close(savedStdout); + throw new FlinkRuntimeException("Failed to open " + printResultFilePath); + } + C_LIB.dup2(fileFd, 1); try { + Table table = tEnv().sqlQuery(query); + createPrintSinkTable("printT", table.getResolvedSchema()); + String newQuery = String.format("insert into %s %s", "printT", query); + TableResult tableResult = tEnv().executeSql(newQuery); + assertTrue(tableResult.getJobClient().isPresent()); JobClient jobClient = tableResult.getJobClient().get(); - if (deleteResultFile) { - try { - long startTime = System.currentTimeMillis(); - while (!printResultFile.exists()) { - if (System.currentTimeMillis() - startTime > timeoutMS) { - break; - } - Thread.sleep(10); + try { + long startTime = System.currentTimeMillis(); + while (printResultFile.length() == 0) { + if (System.currentTimeMillis() - startTime > timeoutMS) { + break; } - long fileSize = -1L; - startTime = System.currentTimeMillis(); - while (printResultFile.length() > fileSize) { - if (System.currentTimeMillis() - startTime > timeoutMS) { - break; - } - fileSize = printResultFile.length(); - Thread.sleep(3000); + Thread.sleep(10); + } + long fileSize = -1L; + startTime = System.currentTimeMillis(); + while (printResultFile.length() > fileSize) { + if (System.currentTimeMillis() - startTime > timeoutMS) { + break; } - } finally { - jobClient.cancel(); + fileSize = printResultFile.length(); + Thread.sleep(3000); } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new FlinkRuntimeException(ie); + } finally { + jobClient.cancel(); } + } finally { + C_LIB.dup2(savedStdout, 1); + C_LIB.close(fileFd); + C_LIB.close(savedStdout); + } + + try { List result = new ArrayList<>(); try (FileReader fr = new FileReader(printResultFile); BufferedReader br = new BufferedReader(fr)) { diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PrintSinkFactoryTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PrintSinkFactoryTest.java new file mode 100644 index 00000000000..46c2e8091cd --- /dev/null +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PrintSinkFactoryTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.velox; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.sink.SinkOperator; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Constructor; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class PrintSinkFactoryTest { + + private static final String ROWDATA_PRINT_FUNCTION_CN = + "org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction"; + + @SuppressWarnings("unchecked") + private static SinkFunction newRowDataPrintFunction(String identifier, boolean isStdErr) + throws Exception { + Class cls = Class.forName(ROWDATA_PRINT_FUNCTION_CN); + Constructor ctor = + cls.getDeclaredConstructor( + org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter.class, + String.class, + boolean.class); + ctor.setAccessible(true); + return (SinkFunction) ctor.newInstance(null, identifier, isStdErr); + } + + private static LegacySinkTransformation buildSinkTransformation( + SinkFunction userFunction) { + SinkOperator sinkOp = new SinkOperator(userFunction, -1); + SimpleOperatorFactory factory = SimpleOperatorFactory.of(sinkOp); + Transformation input = new StubTransformation(); + return new LegacySinkTransformation<>(input, "print-sink", factory, 1); + } + + private static final class StubTransformation extends Transformation { + StubTransformation() { + super("stub", InternalTypeInfo.of(RowType.of(new IntType())), 1); + } + + @Override + public List> getInputs() { + return Collections.emptyList(); + } + + @Override + protected List> getTransitivePredecessorsInternal() { + return Collections.emptyList(); + } + } + + private static final class OtherSinkFunction extends RichSinkFunction {} + + @Test + void testMatchAcceptsRowDataPrintFunction() throws Exception { + PrintSinkFactory factory = new PrintSinkFactory(); + assertTrue(factory.match(buildSinkTransformation(newRowDataPrintFunction("foo", false)))); + } + + @Test + void testMatchRejectsNonPrintSinkFunction() { + PrintSinkFactory factory = new PrintSinkFactory(); + assertFalse(factory.match(buildSinkTransformation(new OtherSinkFunction()))); + } + + @Test + void testMatchRejectsNonLegacySinkTransformation() { + PrintSinkFactory factory = new PrintSinkFactory(); + assertFalse(factory.match(new StubTransformation())); + } + + @Test + void testExtractPrintOptionsReadsIdentifierAndStderr() throws Exception { + LegacySinkTransformation tx = + buildSinkTransformation(newRowDataPrintFunction("foo", true)); + PrintSinkFactory.PrintOptions opts = PrintSinkFactory.extractPrintOptions(tx); + assertEquals("foo", opts.getPrintIdentifier()); + assertTrue(opts.isStdErr()); + } + + @Test + void testExtractPrintOptionsDefaultsWhenUnset() throws Exception { + LegacySinkTransformation tx = + buildSinkTransformation(newRowDataPrintFunction(null, false)); + PrintSinkFactory.PrintOptions opts = PrintSinkFactory.extractPrintOptions(tx); + assertEquals("", opts.getPrintIdentifier()); + assertFalse(opts.isStdErr()); + } +} diff --git a/gluten-flink/ut/src/test/resources/nexmark/q12.sql b/gluten-flink/ut/src/test/resources/nexmark/q12.sql deleted file mode 100755 index f2cda4f463b..00000000000 --- a/gluten-flink/ut/src/test/resources/nexmark/q12.sql +++ /dev/null @@ -1,20 +0,0 @@ -CREATE TABLE nexmark_q12 ( - bidder BIGINT, - bid_count BIGINT, - starttime TIMESTAMP(3), - endtime TIMESTAMP(3) -) WITH ( - 'connector' = 'blackhole' -); - -CREATE VIEW B AS SELECT *, PROCTIME() as p_time FROM bid; - -INSERT INTO nexmark_q12 -SELECT - bidder, - count(*) as bid_count, - window_start AS starttime, - window_end AS endtime -FROM TABLE( - TUMBLE(TABLE B, DESCRIPTOR(p_time), INTERVAL '10' SECOND)) -GROUP BY bidder, window_start, window_end;