Skip to content
4 changes: 2 additions & 2 deletions .github/workflows/flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ jobs:
export VELOX_DEPENDENCY_SOURCE=BUNDLED
export fmt_SOURCE=BUNDLED
export folly_SOURCE=BUNDLED
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
git clone -b fix/print-sink-multi-parallelism https://github.com/ggjh-159/velox4j.git
cd velox4j
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
$GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
import io.github.zhztheplayer.velox4j.type.RowType;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
Expand All @@ -41,6 +38,7 @@
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.FlinkRuntimeException;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -71,33 +69,48 @@ public Transformation<RowData> buildVeloxSource(
throw new FlinkRuntimeException("Unimplemented method 'buildSource'");
}

// Pulls print-identifier/standard-error from RowDataPrintFunction via reflection.
// Flink 1.19.x field names: sinkIdentifier (print-identifier), target (standard-error, true =
// stderr).
// Package-private for direct unit testing.
static String[] extractPrintOptions(Transformation<RowData> transformation) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the readability of String[] is bad, may we can use class PringOptions instead?

SimpleOperatorFactory operatorFactory =
(SimpleOperatorFactory) ((LegacySinkTransformation) transformation).getOperatorFactory();
SinkOperator sinkOp = (SinkOperator) operatorFactory.getOperator();
Object rowDataPrintFn = sinkOp.getUserFunction();
try {
Field writerField = rowDataPrintFn.getClass().getDeclaredField("writer");
writerField.setAccessible(true);
Object writer = writerField.get(rowDataPrintFn);
Field idField = writer.getClass().getDeclaredField("sinkIdentifier");
idField.setAccessible(true);
Field stdErrField = writer.getClass().getDeclaredField("target");
stdErrField.setAccessible(true);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ReflectUtils.getObjectField() should be simpler

String printIdentifier = (String) idField.get(writer);
boolean isStdErr = stdErrField.getBoolean(writer);
return new String[] {
printIdentifier == null ? "" : printIdentifier, Boolean.toString(isStdErr)
};
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new FlinkRuntimeException("Failed to extract print sink options", e);
}
}

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public Transformation buildVeloxSink(
Transformation<RowData> transformation, Map<String, Object> parameters) {
Transformation inputTrans = (Transformation) transformation.getInputs().get(0);
InternalTypeInfo inputTypeInfo = (InternalTypeInfo) inputTrans.getOutputType();
Configuration config = (Configuration) parameters.get(Configuration.class.getName());
String logDir = config.get(CoreOptions.FLINK_LOG_DIR);
String printPath;
if (logDir != null) {
printPath = String.format("file://%s/%s", logDir, "taskmanager.out");
} else {
String flinkHomeDir = System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR);
if (flinkHomeDir == null) {
String flinkConfDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
if (flinkConfDir == null) {
throw new FlinkRuntimeException(
"Can not get flink home directory, please set FLINK_HOME.");
}
printPath = String.format("file://%s/../log/%s", flinkConfDir, "taskmanager.out");
} else {
printPath = String.format("file://%s/log/%s", flinkHomeDir, "taskmanager.out");
}
}

String[] printOpts = extractPrintOptions(transformation);
String printIdentifier = printOpts[0];
boolean isStdErr = Boolean.parseBoolean(printOpts[1]);

RowType inputColumns = (RowType) LogicalTypeConverter.toVLType(inputTypeInfo.toLogicalType());
RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
PrintTableHandle tableHandle = new PrintTableHandle("print-table", inputColumns, printPath);
PrintTableHandle tableHandle =
new PrintTableHandle("print-table", inputColumns, printIdentifier, isStdErr);
TableWriteNode tableWriteNode =
new TableWriteNode(
PlanNodeIdGenerator.newId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ public class VeloxConnectorConfig {
"connector-from-elements",
"connector-print");
private static final String keyTaskIndex = "task_index";
private static final String keyParallelism = "parallelism";
private static final String keyQueryUUId = "query_uuid";

public static ConnectorConfig getConfig(RuntimeContext context) {
Map<String, String> configMap = new HashMap<>();
TaskInfo taskInfo = context.getTaskInfo();
configMap.put(keyTaskIndex, String.valueOf(taskInfo.getIndexOfThisSubtask()));
configMap.put(keyParallelism, String.valueOf(taskInfo.getNumberOfParallelSubtasks()));
configMap.put(
keyQueryUUId,
UUID.nameUUIDFromBytes(context.getJobInfo().getJobId().toHexString().getBytes())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.velox;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.sink.SinkOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;

import org.junit.jupiter.api.Test;

import java.lang.reflect.Constructor;
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class PrintSinkFactoryTest {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GlutenStreamingTestBase.runAndCheck() still read taskmanager.out, please check whether it should be fixed together?


private static final String ROWDATA_PRINT_FUNCTION_CN =
"org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction";

@SuppressWarnings("unchecked")
private static SinkFunction<RowData> newRowDataPrintFunction(String identifier, boolean isStdErr)
throws Exception {
Class<?> cls = Class.forName(ROWDATA_PRINT_FUNCTION_CN);
Constructor<?> ctor =
cls.getDeclaredConstructor(
org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter.class,
String.class,
boolean.class);
ctor.setAccessible(true);
return (SinkFunction<RowData>) ctor.newInstance(null, identifier, isStdErr);
}

private static LegacySinkTransformation<RowData> buildSinkTransformation(
SinkFunction<RowData> userFunction) {
SinkOperator sinkOp = new SinkOperator(userFunction, -1);
SimpleOperatorFactory<Object> factory = SimpleOperatorFactory.of(sinkOp);
Transformation<RowData> input = new StubTransformation();
return new LegacySinkTransformation<>(input, "print-sink", factory, 1);
}

private static final class StubTransformation extends Transformation<RowData> {
StubTransformation() {
super("stub", InternalTypeInfo.of(RowType.of(new IntType())), 1);
}

@Override
public List<Transformation<?>> getInputs() {
return Collections.emptyList();
}

@Override
protected List<Transformation<?>> getTransitivePredecessorsInternal() {
return Collections.emptyList();
}
}

private static final class OtherSinkFunction extends RichSinkFunction<RowData> {}

@Test
void testMatchAcceptsRowDataPrintFunction() throws Exception {
PrintSinkFactory factory = new PrintSinkFactory();
assertTrue(factory.match(buildSinkTransformation(newRowDataPrintFunction("foo", false))));
}

@Test
void testMatchRejectsNonPrintSinkFunction() {
PrintSinkFactory factory = new PrintSinkFactory();
assertFalse(factory.match(buildSinkTransformation(new OtherSinkFunction())));
}

@Test
void testMatchRejectsNonLegacySinkTransformation() {
PrintSinkFactory factory = new PrintSinkFactory();
assertFalse(factory.match(new StubTransformation()));
}

@Test
void testExtractPrintOptionsReadsIdentifierAndStderr() throws Exception {
LegacySinkTransformation<RowData> tx =
buildSinkTransformation(newRowDataPrintFunction("foo", true));
String[] opts = PrintSinkFactory.extractPrintOptions(tx);
assertEquals("foo", opts[0]);
assertEquals("true", opts[1]);
}

@Test
void testExtractPrintOptionsDefaultsWhenUnset() throws Exception {
LegacySinkTransformation<RowData> tx =
buildSinkTransformation(newRowDataPrintFunction(null, false));
String[] opts = PrintSinkFactory.extractPrintOptions(tx);
assertEquals("", opts[0]);
assertEquals("false", opts[1]);
}
}
Loading