Skip to content
2 changes: 1 addition & 1 deletion .github/workflows/flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
export fmt_SOURCE=BUNDLED
export folly_SOURCE=BUNDLED
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
cd velox4j && git reset --hard 97fc1edafebd0f505e613d260f77f92f5252d048
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
$GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.gluten.util.ReflectUtils;

import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.NexmarkGeneratorConfig;
import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
Expand All @@ -32,17 +33,25 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.List;
import java.util.Map;

public class NexmarkSourceFactory implements VeloxSourceSinkFactory {
private static final Logger LOG = LoggerFactory.getLogger(NexmarkSourceFactory.class);
private static final ObjectMapper MAPPER =
new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE);

@SuppressWarnings("rawtypes")
@Override
Expand Down Expand Up @@ -76,9 +85,6 @@ public Transformation<RowData> buildVeloxSource(
Object generatorConfig =
ReflectUtils.getObjectField(
nexmarkSourceSplit.getClass(), nexmarkSourceSplit, "generatorConfig");
Long maxEvents =
(Long)
ReflectUtils.getObjectField(generatorConfig.getClass(), generatorConfig, "maxEvents");
PlanNode tableScan =
new TableScanNode(id, outputType, new NexmarkTableHandle("connector-nexmark"), List.of());
GlutenStreamSource sourceOp =
Expand All @@ -88,8 +94,7 @@ public Transformation<RowData> buildVeloxSource(
Map.of(id, outputType),
id,
new NexmarkConnectorSplit(
"connector-nexmark",
maxEvents > Integer.MAX_VALUE ? Integer.MAX_VALUE : maxEvents.intValue()),
"connector-nexmark", toVeloxNexmarkGeneratorConfig(generatorConfig)),
RowData.class));

return new LegacySourceTransformation<RowData>(
Expand All @@ -106,4 +111,13 @@ public Transformation<RowData> buildVeloxSink(
Transformation<RowData> transformation, Map<String, Object> parameters) {
throw new UnsupportedOperationException("Unimplemented method 'buildSink'");
}

private static NexmarkGeneratorConfig toVeloxNexmarkGeneratorConfig(Object javaConfig) {
try {
String json = MAPPER.writeValueAsString(javaConfig);
return MAPPER.readValue(json, NexmarkGeneratorConfig.class);
} catch (JsonProcessingException e) {
throw new TableException("Failed to convert nexmark NexmarkGeneratorConfig to velox4j", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
import org.apache.gluten.util.ReflectUtils;

import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
import io.github.zhztheplayer.velox4j.connector.PrintTableHandle;
Expand All @@ -30,9 +31,6 @@
import io.github.zhztheplayer.velox4j.type.RowType;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
Expand Down Expand Up @@ -71,33 +69,54 @@ public Transformation<RowData> buildVeloxSource(
throw new FlinkRuntimeException("Unimplemented method 'buildSource'");
}

// Pulls print-identifier/standard-error from RowDataPrintFunction via reflection.
// Flink 1.19.x field names: sinkIdentifier (print-identifier), target (standard-error, true =
// stderr).
static PrintOptions extractPrintOptions(Transformation<RowData> transformation) {
SimpleOperatorFactory operatorFactory =
(SimpleOperatorFactory) ((LegacySinkTransformation) transformation).getOperatorFactory();
SinkOperator sinkOp = (SinkOperator) operatorFactory.getOperator();
Object rowDataPrintFn = sinkOp.getUserFunction();
Object writer =
ReflectUtils.getObjectField(rowDataPrintFn.getClass(), rowDataPrintFn, "writer");
String printIdentifier =
(String) ReflectUtils.getObjectField(writer.getClass(), writer, "sinkIdentifier");
boolean isStdErr = (boolean) ReflectUtils.getObjectField(writer.getClass(), writer, "target");
return new PrintOptions(printIdentifier == null ? "" : printIdentifier, isStdErr);
}

static final class PrintOptions {
private final String printIdentifier;
private final boolean stdErr;

PrintOptions(String printIdentifier, boolean stdErr) {
this.printIdentifier = printIdentifier;
this.stdErr = stdErr;
}

public String getPrintIdentifier() {
return printIdentifier;
}

public boolean isStdErr() {
return stdErr;
}
}

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public Transformation buildVeloxSink(
Transformation<RowData> transformation, Map<String, Object> parameters) {
Transformation inputTrans = (Transformation) transformation.getInputs().get(0);
InternalTypeInfo inputTypeInfo = (InternalTypeInfo) inputTrans.getOutputType();
Configuration config = (Configuration) parameters.get(Configuration.class.getName());
String logDir = config.get(CoreOptions.FLINK_LOG_DIR);
String printPath;
if (logDir != null) {
printPath = String.format("file://%s/%s", logDir, "taskmanager.out");
} else {
String flinkHomeDir = System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR);
if (flinkHomeDir == null) {
String flinkConfDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
if (flinkConfDir == null) {
throw new FlinkRuntimeException(
"Can not get flink home directory, please set FLINK_HOME.");
}
printPath = String.format("file://%s/../log/%s", flinkConfDir, "taskmanager.out");
} else {
printPath = String.format("file://%s/log/%s", flinkHomeDir, "taskmanager.out");
}
}

PrintOptions printOpts = extractPrintOptions(transformation);

RowType inputColumns = (RowType) LogicalTypeConverter.toVLType(inputTypeInfo.toLogicalType());
RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
PrintTableHandle tableHandle = new PrintTableHandle("print-table", inputColumns, printPath);
PrintTableHandle tableHandle =
new PrintTableHandle(
"print-table", inputColumns, printOpts.getPrintIdentifier(), printOpts.isStdErr());
TableWriteNode tableWriteNode =
new TableWriteNode(
PlanNodeIdGenerator.newId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ public class VeloxConnectorConfig {
"connector-from-elements",
"connector-print");
private static final String keyTaskIndex = "task_index";
private static final String keyParallelism = "parallelism";
private static final String keyQueryUUId = "query_uuid";

public static ConnectorConfig getConfig(RuntimeContext context) {
Map<String, String> configMap = new HashMap<>();
TaskInfo taskInfo = context.getTaskInfo();
configMap.put(keyTaskIndex, String.valueOf(taskInfo.getIndexOfThisSubtask()));
configMap.put(keyParallelism, String.valueOf(taskInfo.getNumberOfParallelSubtasks()));
configMap.put(
keyQueryUUId,
UUID.nameUUIDFromBytes(context.getJobInfo().getJobId().toHexString().getBytes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.gluten.table.runtime.stream.common;

import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
Expand All @@ -29,6 +28,8 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;

import com.sun.jna.Library;
import com.sun.jna.Native;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,6 +48,23 @@ public class GlutenStreamingTestBase extends StreamingTestBase {
private static final String EXECUTION_PLAN_PREIFX = "== Physical Execution Plan ==";
private static final long timeoutMS = 30000;

// dup2 fd=1 onto a file: Velox print sink writes to std::cout, which bypasses System.setOut and
// goes straight to the process's fd=1.
private interface CLibrary extends Library {
int dup(int oldfd);

int dup2(int oldfd, int newfd);

int open(String path, int flags, int mode);

int close(int fd);
}

private static final CLibrary C_LIB = Native.load("c", CLibrary.class);
private static final int O_WRONLY = 1;
private static final int O_CREAT = 0100;
private static final int O_TRUNC = 01000;

@BeforeAll
public static void setup() throws Exception {
LOG.info("GlutenStreamingTestBase setup");
Expand Down Expand Up @@ -114,42 +132,57 @@ protected String explainExecutionPlan(String query) {

protected void runAndCheck(String query, List<String> expected) {
String printResultDirPath = System.getProperty("user.dir") + "/log/";
tEnv().getConfig().set(CoreOptions.FLINK_LOG_DIR, printResultDirPath);
String printResultFilePath = String.format("%s%s", printResultDirPath, "taskmanager.out");
new File(printResultDirPath).mkdirs();
String printResultFilePath = printResultDirPath + "taskmanager.out";
File printResultFile = new File(printResultFilePath);
boolean deleteResultFile = true;
if (printResultFile.exists()) {
deleteResultFile = printResultFile.delete();
printResultFile.delete();
}
Table table = tEnv().sqlQuery(query);
createPrintSinkTable("printT", table.getResolvedSchema());
String newQuery = String.format("insert into %s %s", "printT", query);
TableResult tableResult = tEnv().executeSql(newQuery);
assertTrue(tableResult.getJobClient().isPresent());

int savedStdout = C_LIB.dup(1);
int fileFd = C_LIB.open(printResultFilePath, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fileFd < 0) {
C_LIB.close(savedStdout);
throw new FlinkRuntimeException("Failed to open " + printResultFilePath);
}
C_LIB.dup2(fileFd, 1);
try {
Table table = tEnv().sqlQuery(query);
createPrintSinkTable("printT", table.getResolvedSchema());
String newQuery = String.format("insert into %s %s", "printT", query);
TableResult tableResult = tEnv().executeSql(newQuery);
assertTrue(tableResult.getJobClient().isPresent());
JobClient jobClient = tableResult.getJobClient().get();
if (deleteResultFile) {
try {
long startTime = System.currentTimeMillis();
while (!printResultFile.exists()) {
if (System.currentTimeMillis() - startTime > timeoutMS) {
break;
}
Thread.sleep(10);
try {
long startTime = System.currentTimeMillis();
while (printResultFile.length() == 0) {
if (System.currentTimeMillis() - startTime > timeoutMS) {
break;
}
long fileSize = -1L;
startTime = System.currentTimeMillis();
while (printResultFile.length() > fileSize) {
if (System.currentTimeMillis() - startTime > timeoutMS) {
break;
}
fileSize = printResultFile.length();
Thread.sleep(3000);
Thread.sleep(10);
}
long fileSize = -1L;
startTime = System.currentTimeMillis();
while (printResultFile.length() > fileSize) {
if (System.currentTimeMillis() - startTime > timeoutMS) {
break;
}
} finally {
jobClient.cancel();
fileSize = printResultFile.length();
Thread.sleep(3000);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new FlinkRuntimeException(ie);
} finally {
jobClient.cancel();
}
} finally {
C_LIB.dup2(savedStdout, 1);
C_LIB.close(fileFd);
C_LIB.close(savedStdout);
}

try {
List<String> result = new ArrayList<>();
try (FileReader fr = new FileReader(printResultFile);
BufferedReader br = new BufferedReader(fr)) {
Expand Down
Loading
Loading