From 24a206f77dc4ad257448e03a1c6a27947853f802 Mon Sep 17 00:00:00 2001 From: ggjh-159 Date: Wed, 17 Jun 2026 16:43:08 +0800 Subject: [PATCH] fix(print): write to stdout/stderr instead of file --- velox/connectors/print/CMakeLists.txt | 4 + velox/connectors/print/PrintConnector.cpp | 8 +- velox/connectors/print/PrintSink.cpp | 122 ++++++++---------- velox/connectors/print/PrintSink.h | 29 ++--- velox/connectors/print/PrintTableHandle.cpp | 29 ++++- velox/connectors/print/PrintTableHandle.h | 18 ++- velox/connectors/print/tests/CMakeLists.txt | 32 +++++ .../connectors/print/tests/PrintSinkTest.cpp | 41 ++++++ .../print/tests/PrintTableHandleTest.cpp | 69 ++++++++++ 9 files changed, 251 insertions(+), 101 deletions(-) create mode 100644 velox/connectors/print/tests/CMakeLists.txt create mode 100644 velox/connectors/print/tests/PrintSinkTest.cpp create mode 100644 velox/connectors/print/tests/PrintTableHandleTest.cpp diff --git a/velox/connectors/print/CMakeLists.txt b/velox/connectors/print/CMakeLists.txt index 56d2e441326..62549f756e6 100644 --- a/velox/connectors/print/CMakeLists.txt +++ b/velox/connectors/print/CMakeLists.txt @@ -23,3 +23,7 @@ velox_add_library( PrintConnector.cpp) velox_link_libraries(velox_print_connector velox_common_io velox_connector) + +if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) +endif() diff --git a/velox/connectors/print/PrintConnector.cpp b/velox/connectors/print/PrintConnector.cpp index dea071aa72a..72535004093 100644 --- a/velox/connectors/print/PrintConnector.cpp +++ b/velox/connectors/print/PrintConnector.cpp @@ -27,9 +27,11 @@ std::unique_ptr PrintConnector::createDataSink( CommitStrategy /** commitStrategy */) { std::shared_ptr printTableHandle = std::dynamic_pointer_cast(connectorInsertTableHandle); - ; return std::make_unique( - inputType, printTableHandle->path(), connectorQueryCtx); + inputType, + printTableHandle->printIdentifier(), + printTableHandle->isStdErr(), + connectorQueryCtx); } -} // namespace facebook::velox::connector::print \ No newline at end of file +} // namespace facebook::velox::connector::print diff --git a/velox/connectors/print/PrintSink.cpp b/velox/connectors/print/PrintSink.cpp index e74a3007d52..f719041f147 100644 --- a/velox/connectors/print/PrintSink.cpp +++ b/velox/connectors/print/PrintSink.cpp @@ -14,96 +14,79 @@ * limitations under the License. */ #include "velox/connectors/print/PrintSink.h" -#include "velox/dwio/common/WriterFactory.h" -#include "velox/type/Type.h" +#include "velox/connectors/utils/StringFormatter.h" #include "velox/type/tz/TimeZoneMap.h" + #include -#include +#include +#include +#include namespace facebook::velox::connector::print { +// Process-wide mutex serializing stdout/stderr writes across subtasks. +namespace { +std::mutex& printSinkWriteMutex() { + static std::mutex m; + return m; +} +} // namespace + PrintSink::PrintSink( const RowTypePtr& inputType, - const std::string& path, + const std::string& printIdentifier, + bool isStdErr, const ConnectorQueryCtx* queryCtx) : inputType_(inputType), - outputType_(createOutputType()), queryCtx_(queryCtx), - writer_(createWriter(path)), - formatter_(createFormatter(inputType_, tz::locateZone(queryCtx->sessionTimezone()))) {} + formatter_(createFormatter(inputType_, tz::locateZone(queryCtx->sessionTimezone()))), + prefix_([&] { + const auto* props = queryCtx->sessionProperties(); + int parallelism = 1; + int taskIndex = 0; + if (props != nullptr) { + parallelism = props->get("parallelism", 1); + taskIndex = props->get("task_index", 0); + } + return computePrefix(printIdentifier, parallelism, taskIndex); + }()), + isStdErr_(isStdErr) {} -std::unique_ptr PrintSink::createWriter( - const std::string& path) { - std::unordered_map rawConfigs; - auto fs = filesystems::getFileSystem( - path, std::make_shared(std::move(rawConfigs))); - if (fs->exists(path)) { - fs->remove(path); +std::string PrintSink::computePrefix( + const std::string& printIdentifier, + int parallelism, + int taskIndex) { + std::string prefix = printIdentifier; + if (parallelism > 1) { + if (!prefix.empty()) { + prefix += ":"; + } + prefix += std::to_string(taskIndex + 1); } - std::unique_ptr writeFileSink = - dwio::common::FileSink::create( - path, - { - .bufferWrite = false, - .pool = queryCtx_->memoryPool(), - .metricLogger = dwio::common::MetricsLog::voidLog(), - .stats = &ioStats_, - }); - auto writerFactory = - dwio::common::getWriterFactory(dwio::common::FileFormat::TEXT); - std::shared_ptr options = - writerFactory->createWriterOptions(); - if (options->schema == nullptr) { - options->schema = outputType_; + if (!prefix.empty()) { + prefix += "> "; } - if (options->memoryPool == nullptr) { - options->memoryPool = queryCtx_->connectorMemoryPool(); - } - return writerFactory->createWriter(std::move(writeFileSink), options); + return prefix; } -const RowTypePtr PrintSink::createOutputType() { - std::vector fieldNames; - std::vector fieldTypes; - fieldNames.emplace_back("result"); - fieldTypes.emplace_back(std::make_shared()); - return std::make_shared( - std::move(fieldNames), std::move(fieldTypes)); -} +void PrintSink::appendData(RowVectorPtr input) { + VELOX_CHECK_NOT_NULL(formatter_); + const auto& inputFields = inputType_->children(); + VELOX_CHECK_EQ(input->childrenSize(), inputFields.size()); -const RowVectorPtr PrintSink::formatToSingleStringVector( - const RowVectorPtr& input) { - VELOX_CHECK_EQ(input->childrenSize(), inputType_->children().size()); - auto output = - RowVector::create(outputType_, input->size(), queryCtx_->memoryPool()); - RowVectorPtr rowVector = std::dynamic_pointer_cast(output); - VELOX_CHECK(rowVector != nullptr); - VELOX_CHECK_EQ(rowVector->childrenSize(), 1); - auto outputField = - std::dynamic_pointer_cast>(rowVector->childAt(0)); - VELOX_CHECK(outputField != nullptr); - const std::vector inputFields = input->children(); - for (size_t i = 0; i < input->size(); ++i) { + std::ostream& stream = isStdErr_ ? static_cast(std::cerr) + : static_cast(std::cout); + std::lock_guard lock(printSinkWriteMutex()); + for (auto i = 0; i < input->size(); ++i) { std::stringstream ss; formatter_->toString(input, inputType_, i, ss); - const std::string sValue = ss.str(); - outputField->set(i, StringView(sValue.data(), sValue.size())); + stream << prefix_ << ss.str() << std::endl; } - return rowVector; -} - -void PrintSink::appendData(RowVectorPtr input) { - VELOX_CHECK(writer_ != nullptr); - writer_->write(formatToSingleStringVector(input)); - writer_->flush(); } std::vector PrintSink::close() { - std::vector res; - VELOX_CHECK(writer_ != nullptr); - writer_->close(); finished = true; - return res; + return {}; } bool PrintSink::finish() { @@ -114,8 +97,7 @@ bool PrintSink::finish() { void PrintSink::abort() {} connector::DataSink::Stats PrintSink::stats() const { - connector::DataSink::Stats stats; - return stats; + return connector::DataSink::Stats{}; } -} // namespace facebook::velox::connector::print \ No newline at end of file +} // namespace facebook::velox::connector::print diff --git a/velox/connectors/print/PrintSink.h b/velox/connectors/print/PrintSink.h index 1e0caac51c4..6052b8d91de 100644 --- a/velox/connectors/print/PrintSink.h +++ b/velox/connectors/print/PrintSink.h @@ -17,7 +17,6 @@ #pragma once #include "velox/connectors/Connector.h" -#include "velox/dwio/common/Writer.h" #include "velox/connectors/utils/StringFormatter.h" #include "velox/type/Type.h" #include "velox/vector/ComplexVector.h" @@ -25,11 +24,13 @@ namespace facebook::velox::connector::print { +// DataSink that writes rows to stdout or stderr. class PrintSink : public DataSink { public: PrintSink( const RowTypePtr& inputType, - const std::string& path, + const std::string& printIdentifier, + bool isStdErr, const ConnectorQueryCtx* queryCtx); void appendData(RowVectorPtr input) override; @@ -42,23 +43,21 @@ class PrintSink : public DataSink { connector::DataSink::Stats stats() const override; - io::IoStatistics ioStats_; + // Computes the output prefix from the print-identifier, parallelism and + // task index. Mirrors Flink's PrintSinkOutputWriter.open() prefix logic. + static std::string computePrefix( + const std::string& printIdentifier, + int parallelism, + int taskIndex); private: + const RowTypePtr inputType_; - const RowTypePtr outputType_; const ConnectorQueryCtx* queryCtx_; - const std::unique_ptr writer_; const std::shared_ptr formatter_; - bool finished = true; - - std::unique_ptr createWriter(const std::string& path); - const RowTypePtr createOutputType(); - /// Format the input fields' data to a single string of flink-style. e.g. Row(1,2,3) -> +I[1, 2, 3], - /// Row(1,Array(2,3)) -> +I[1, [2, 3]], Row(1,Map(2=3, 3=4)) -> +I[1, {2=3, 3=4}]. - const RowVectorPtr formatToSingleStringVector(const RowVectorPtr& input); - const std::shared_ptr createStringFormatter( - const TypePtr& type); + const std::string prefix_; + const bool isStdErr_; + bool finished = false; }; -} // namespace facebook::velox::connector::print \ No newline at end of file +} // namespace facebook::velox::connector::print diff --git a/velox/connectors/print/PrintTableHandle.cpp b/velox/connectors/print/PrintTableHandle.cpp index 9cd9ea52967..a1d717fd28a 100644 --- a/velox/connectors/print/PrintTableHandle.cpp +++ b/velox/connectors/print/PrintTableHandle.cpp @@ -21,8 +21,12 @@ namespace facebook::velox::connector::print { PrintTableHandle::PrintTableHandle( std::string tableName, const RowTypePtr& dataColumns, - const std::string& path) - : tableName_(tableName), dataColumns_(dataColumns), path_(path) {} + const std::string& printIdentifier, + bool isStdErr) + : tableName_(tableName), + dataColumns_(dataColumns), + printIdentifier_(printIdentifier), + isStdErr_(isStdErr) {} std::string PrintTableHandle::toString() const { std::stringstream out; @@ -30,17 +34,20 @@ std::string PrintTableHandle::toString() const { if (dataColumns_) { out << ", data columns: " << dataColumns_->toString(); } - out << ", path" << path_; + out << ", printIdentifier: " << printIdentifier_ + << ", isStdErr: " << (isStdErr_ ? "true" : "false"); return out.str(); } folly::dynamic PrintTableHandle::serialize() const { folly::dynamic obj = folly::dynamic::object; + obj["name"] = "PrintTableHandle"; obj["tableName"] = tableName_; if (dataColumns_) { obj["dataColumns"] = dataColumns_->serialize(); } - obj["path"] = path_; + obj["printIdentifier"] = printIdentifier_; + obj["isStdErr"] = isStdErr_; return obj; } @@ -52,8 +59,16 @@ ConnectorInsertTableHandlePtr PrintTableHandle::create( if (auto it = obj.find("dataColumns"); it != obj.items().end()) { dataColumns = ISerializable::deserialize(it->second, context); } - const auto& path = obj["path"].asString(); - return std::make_shared(tableName, dataColumns, path); + std::string printIdentifier; + if (auto it = obj.find("printIdentifier"); it != obj.items().end()) { + printIdentifier = it->second.asString(); + } + bool isStdErr = false; + if (auto it = obj.find("isStdErr"); it != obj.items().end()) { + isStdErr = it->second.asBool(); + } + return std::make_shared( + tableName, dataColumns, printIdentifier, isStdErr); } void PrintTableHandle::registerSerDe() { @@ -61,4 +76,4 @@ void PrintTableHandle::registerSerDe() { registry.Register("PrintTableHandle", create); } -} // namespace facebook::velox::connector::print \ No newline at end of file +} // namespace facebook::velox::connector::print diff --git a/velox/connectors/print/PrintTableHandle.h b/velox/connectors/print/PrintTableHandle.h index f1f6b1328dc..7bb153bfad6 100644 --- a/velox/connectors/print/PrintTableHandle.h +++ b/velox/connectors/print/PrintTableHandle.h @@ -29,9 +29,10 @@ class PrintTableHandle : public ConnectorInsertTableHandle { PrintTableHandle( std::string tableName, const RowTypePtr& dataColumns, - const std::string& path); + const std::string& printIdentifier, + bool isStdErr); - const std::string& tableName() { + const std::string& tableName() const { return tableName_; } @@ -39,8 +40,12 @@ class PrintTableHandle : public ConnectorInsertTableHandle { return dataColumns_; } - const std::string& path() { - return path_; + const std::string& printIdentifier() const { + return printIdentifier_; + } + + bool isStdErr() const { + return isStdErr_; } std::string toString() const override; @@ -56,7 +61,8 @@ class PrintTableHandle : public ConnectorInsertTableHandle { private: const std::string tableName_; const RowTypePtr dataColumns_; - const std::string path_; + const std::string printIdentifier_; + const bool isStdErr_; }; -} // namespace facebook::velox::connector::print \ No newline at end of file +} // namespace facebook::velox::connector::print diff --git a/velox/connectors/print/tests/CMakeLists.txt b/velox/connectors/print/tests/CMakeLists.txt new file mode 100644 index 00000000000..c68ffac2851 --- /dev/null +++ b/velox/connectors/print/tests/CMakeLists.txt @@ -0,0 +1,32 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_print_sink_test PrintSinkTest.cpp) +add_executable(velox_print_table_handle_test PrintTableHandleTest.cpp) + +add_test(velox_print_sink_test velox_print_sink_test) +add_test(velox_print_table_handle_test velox_print_table_handle_test) + +target_link_libraries( + velox_print_sink_test + velox_print_connector + GTest::gtest + GTest::gtest_main) + +target_link_libraries( + velox_print_table_handle_test + velox_print_connector + velox_core + GTest::gtest + GTest::gtest_main) diff --git a/velox/connectors/print/tests/PrintSinkTest.cpp b/velox/connectors/print/tests/PrintSinkTest.cpp new file mode 100644 index 00000000000..0d0c6f99fd4 --- /dev/null +++ b/velox/connectors/print/tests/PrintSinkTest.cpp @@ -0,0 +1,41 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/connectors/print/PrintSink.h" + +namespace facebook::velox::connector::print::test { + +TEST(PrintSinkTest, PrefixNoIdentifierSingleParallelism) { + EXPECT_EQ("", PrintSink::computePrefix("", 1, 0)); +} + +TEST(PrintSinkTest, PrefixNoIdentifierMultiParallelism) { + EXPECT_EQ("1> ", PrintSink::computePrefix("", 2, 0)); + EXPECT_EQ("2> ", PrintSink::computePrefix("", 2, 1)); +} + +TEST(PrintSinkTest, PrefixWithIdentifierSingleParallelism) { + EXPECT_EQ("foo> ", PrintSink::computePrefix("foo", 1, 0)); +} + +TEST(PrintSinkTest, PrefixWithIdentifierMultiParallelism) { + EXPECT_EQ("foo:1> ", PrintSink::computePrefix("foo", 2, 0)); + EXPECT_EQ("foo:2> ", PrintSink::computePrefix("foo", 2, 1)); +} + +} // namespace facebook::velox::connector::print::test diff --git a/velox/connectors/print/tests/PrintTableHandleTest.cpp b/velox/connectors/print/tests/PrintTableHandleTest.cpp new file mode 100644 index 00000000000..c2d032a2640 --- /dev/null +++ b/velox/connectors/print/tests/PrintTableHandleTest.cpp @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/connectors/print/PrintTableHandle.h" +#include "velox/type/Type.h" + +namespace facebook::velox::connector::print::test { + +class PrintTableHandleTest : public testing::Test { + protected: + static void SetUpTestSuite() { + Type::registerSerDe(); + PrintTableHandle::registerSerDe(); + } + + static RowTypePtr sampleRowType() { + return ROW({{"a", BIGINT()}, {"b", VARCHAR()}}); + } +}; + +// Discriminator key is required by velox4j's PolymorphicDeserializer. +TEST_F(PrintTableHandleTest, SerializeIncludesDiscriminator) { + PrintTableHandle handle("t", sampleRowType(), "foo", true); + auto obj = handle.serialize(); + ASSERT_EQ(obj["name"].asString(), "PrintTableHandle"); + ASSERT_EQ(obj["tableName"].asString(), "t"); + ASSERT_EQ(obj["printIdentifier"].asString(), "foo"); + ASSERT_TRUE(obj["isStdErr"].asBool()); + ASSERT_TRUE(obj.count("dataColumns") > 0); +} + +TEST_F(PrintTableHandleTest, RoundTripWithAllFields) { + PrintTableHandle handle("t", sampleRowType(), "foo", true); + auto obj = handle.serialize(); + auto clone = ISerializable::deserialize(obj); + ASSERT_EQ(clone->tableName(), "t"); + ASSERT_EQ(clone->printIdentifier(), "foo"); + ASSERT_TRUE(clone->isStdErr()); + ASSERT_EQ(clone->dataColumns()->toString(), sampleRowType()->toString()); + ASSERT_EQ(clone->toString(), handle.toString()); +} + +// Optional fields fall back to defaults when JSON omits them. +TEST_F(PrintTableHandleTest, RoundTripWithMissingOptionalFields) { + PrintTableHandle handle("t", sampleRowType(), "", false); + auto obj = handle.serialize(); + obj.erase("printIdentifier"); + obj.erase("isStdErr"); + auto clone = ISerializable::deserialize(obj); + ASSERT_TRUE(clone->printIdentifier().empty()); + ASSERT_FALSE(clone->isStdErr()); +} + +} // namespace facebook::velox::connector::print::test