Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions velox/connectors/print/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ velox_add_library(
PrintConnector.cpp)

velox_link_libraries(velox_print_connector velox_common_io velox_connector)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
8 changes: 5 additions & 3 deletions velox/connectors/print/PrintConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ std::unique_ptr<DataSink> PrintConnector::createDataSink(
CommitStrategy /** commitStrategy */) {
std::shared_ptr<PrintTableHandle> printTableHandle =
std::dynamic_pointer_cast<PrintTableHandle>(connectorInsertTableHandle);
;
return std::make_unique<PrintSink>(
inputType, printTableHandle->path(), connectorQueryCtx);
inputType,
printTableHandle->printIdentifier(),
printTableHandle->isStdErr(),
connectorQueryCtx);
}

} // namespace facebook::velox::connector::print
} // namespace facebook::velox::connector::print
122 changes: 52 additions & 70 deletions velox/connectors/print/PrintSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,96 +14,79 @@
* limitations under the License.
*/
#include "velox/connectors/print/PrintSink.h"
#include "velox/dwio/common/WriterFactory.h"
#include "velox/type/Type.h"
#include "velox/connectors/utils/StringFormatter.h"
#include "velox/type/tz/TimeZoneMap.h"

#include <fmt/format.h>
#include <memory>
#include <iostream>
#include <mutex>
#include <sstream>

namespace facebook::velox::connector::print {

// Process-wide mutex serializing stdout/stderr writes across subtasks.
namespace {
std::mutex& printSinkWriteMutex() {
static std::mutex m;
return m;
}
} // namespace

PrintSink::PrintSink(
const RowTypePtr& inputType,
const std::string& path,
const std::string& printIdentifier,
bool isStdErr,
const ConnectorQueryCtx* queryCtx)
: inputType_(inputType),
outputType_(createOutputType()),
queryCtx_(queryCtx),
writer_(createWriter(path)),
formatter_(createFormatter(inputType_, tz::locateZone(queryCtx->sessionTimezone()))) {}
formatter_(createFormatter(inputType_, tz::locateZone(queryCtx->sessionTimezone()))),
prefix_([&] {
const auto* props = queryCtx->sessionProperties();
int parallelism = 1;
int taskIndex = 0;
if (props != nullptr) {
parallelism = props->get<int>("parallelism", 1);
taskIndex = props->get<int>("task_index", 0);
}
return computePrefix(printIdentifier, parallelism, taskIndex);
}()),
isStdErr_(isStdErr) {}

std::unique_ptr<dwio::common::Writer> PrintSink::createWriter(
const std::string& path) {
std::unordered_map<std::string, std::string> rawConfigs;
auto fs = filesystems::getFileSystem(
path, std::make_shared<const config::ConfigBase>(std::move(rawConfigs)));
if (fs->exists(path)) {
fs->remove(path);
std::string PrintSink::computePrefix(
const std::string& printIdentifier,
int parallelism,
int taskIndex) {
std::string prefix = printIdentifier;
if (parallelism > 1) {
if (!prefix.empty()) {
prefix += ":";
}
prefix += std::to_string(taskIndex + 1);
}
std::unique_ptr<dwio::common::FileSink> writeFileSink =
dwio::common::FileSink::create(
path,
{
.bufferWrite = false,
.pool = queryCtx_->memoryPool(),
.metricLogger = dwio::common::MetricsLog::voidLog(),
.stats = &ioStats_,
});
auto writerFactory =
dwio::common::getWriterFactory(dwio::common::FileFormat::TEXT);
std::shared_ptr<dwio::common::WriterOptions> options =
writerFactory->createWriterOptions();
if (options->schema == nullptr) {
options->schema = outputType_;
if (!prefix.empty()) {
prefix += "> ";
}
if (options->memoryPool == nullptr) {
options->memoryPool = queryCtx_->connectorMemoryPool();
}
return writerFactory->createWriter(std::move(writeFileSink), options);
return prefix;
}

const RowTypePtr PrintSink::createOutputType() {
std::vector<std::string> fieldNames;
std::vector<TypePtr> fieldTypes;
fieldNames.emplace_back("result");
fieldTypes.emplace_back(std::make_shared<const VarcharType>());
return std::make_shared<const RowType>(
std::move(fieldNames), std::move(fieldTypes));
}
void PrintSink::appendData(RowVectorPtr input) {
VELOX_CHECK_NOT_NULL(formatter_);
const auto& inputFields = inputType_->children();
VELOX_CHECK_EQ(input->childrenSize(), inputFields.size());

const RowVectorPtr PrintSink::formatToSingleStringVector(
const RowVectorPtr& input) {
VELOX_CHECK_EQ(input->childrenSize(), inputType_->children().size());
auto output =
RowVector::create(outputType_, input->size(), queryCtx_->memoryPool());
RowVectorPtr rowVector = std::dynamic_pointer_cast<RowVector>(output);
VELOX_CHECK(rowVector != nullptr);
VELOX_CHECK_EQ(rowVector->childrenSize(), 1);
auto outputField =
std::dynamic_pointer_cast<FlatVector<StringView>>(rowVector->childAt(0));
VELOX_CHECK(outputField != nullptr);
const std::vector<VectorPtr> inputFields = input->children();
for (size_t i = 0; i < input->size(); ++i) {
std::ostream& stream = isStdErr_ ? static_cast<std::ostream&>(std::cerr)
: static_cast<std::ostream&>(std::cout);
std::lock_guard<std::mutex> lock(printSinkWriteMutex());
for (auto i = 0; i < input->size(); ++i) {
std::stringstream ss;
formatter_->toString(input, inputType_, i, ss);
const std::string sValue = ss.str();
outputField->set(i, StringView(sValue.data(), sValue.size()));
stream << prefix_ << ss.str() << std::endl;
}
return rowVector;
}

void PrintSink::appendData(RowVectorPtr input) {
VELOX_CHECK(writer_ != nullptr);
writer_->write(formatToSingleStringVector(input));
writer_->flush();
}

std::vector<std::string> PrintSink::close() {
std::vector<std::string> res;
VELOX_CHECK(writer_ != nullptr);
writer_->close();
finished = true;
return res;
return {};
}

bool PrintSink::finish() {
Expand All @@ -114,8 +97,7 @@ bool PrintSink::finish() {
void PrintSink::abort() {}

connector::DataSink::Stats PrintSink::stats() const {
connector::DataSink::Stats stats;
return stats;
return connector::DataSink::Stats{};
}

} // namespace facebook::velox::connector::print
} // namespace facebook::velox::connector::print
29 changes: 14 additions & 15 deletions velox/connectors/print/PrintSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
#pragma once

#include "velox/connectors/Connector.h"
#include "velox/dwio/common/Writer.h"
#include "velox/connectors/utils/StringFormatter.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
#include <cmath>

namespace facebook::velox::connector::print {

// DataSink that writes rows to stdout or stderr.
class PrintSink : public DataSink {
public:
PrintSink(
const RowTypePtr& inputType,
const std::string& path,
const std::string& printIdentifier,
bool isStdErr,
const ConnectorQueryCtx* queryCtx);

void appendData(RowVectorPtr input) override;
Expand All @@ -42,23 +43,21 @@ class PrintSink : public DataSink {

connector::DataSink::Stats stats() const override;

io::IoStatistics ioStats_;
// Computes the output prefix from the print-identifier, parallelism and
// task index. Mirrors Flink's PrintSinkOutputWriter.open() prefix logic.
static std::string computePrefix(
const std::string& printIdentifier,
int parallelism,
int taskIndex);

private:

const RowTypePtr inputType_;
const RowTypePtr outputType_;
const ConnectorQueryCtx* queryCtx_;
const std::unique_ptr<dwio::common::Writer> writer_;
const std::shared_ptr<StringFormatter> formatter_;
bool finished = true;

std::unique_ptr<dwio::common::Writer> createWriter(const std::string& path);
const RowTypePtr createOutputType();
/// Format the input fields' data to a single string of flink-style. e.g. Row(1,2,3) -> +I[1, 2, 3],
/// Row(1,Array(2,3)) -> +I[1, [2, 3]], Row(1,Map(2=3, 3=4)) -> +I[1, {2=3, 3=4}].
const RowVectorPtr formatToSingleStringVector(const RowVectorPtr& input);
const std::shared_ptr<StringFormatter> createStringFormatter(
const TypePtr& type);
const std::string prefix_;
const bool isStdErr_;
bool finished = false;
};

} // namespace facebook::velox::connector::print
} // namespace facebook::velox::connector::print
29 changes: 22 additions & 7 deletions velox/connectors/print/PrintTableHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,33 @@ namespace facebook::velox::connector::print {
PrintTableHandle::PrintTableHandle(
std::string tableName,
const RowTypePtr& dataColumns,
const std::string& path)
: tableName_(tableName), dataColumns_(dataColumns), path_(path) {}
const std::string& printIdentifier,
bool isStdErr)
: tableName_(tableName),
dataColumns_(dataColumns),
printIdentifier_(printIdentifier),
isStdErr_(isStdErr) {}

std::string PrintTableHandle::toString() const {
std::stringstream out;
out << "table: " << tableName_;
if (dataColumns_) {
out << ", data columns: " << dataColumns_->toString();
}
out << ", path" << path_;
out << ", printIdentifier: " << printIdentifier_
<< ", isStdErr: " << (isStdErr_ ? "true" : "false");
return out.str();
}

folly::dynamic PrintTableHandle::serialize() const {
folly::dynamic obj = folly::dynamic::object;
obj["name"] = "PrintTableHandle";
obj["tableName"] = tableName_;
if (dataColumns_) {
obj["dataColumns"] = dataColumns_->serialize();
}
obj["path"] = path_;
obj["printIdentifier"] = printIdentifier_;
obj["isStdErr"] = isStdErr_;
return obj;
}

Expand All @@ -52,13 +59,21 @@ ConnectorInsertTableHandlePtr PrintTableHandle::create(
if (auto it = obj.find("dataColumns"); it != obj.items().end()) {
dataColumns = ISerializable::deserialize<RowType>(it->second, context);
}
const auto& path = obj["path"].asString();
return std::make_shared<const PrintTableHandle>(tableName, dataColumns, path);
std::string printIdentifier;
if (auto it = obj.find("printIdentifier"); it != obj.items().end()) {
printIdentifier = it->second.asString();
}
bool isStdErr = false;
if (auto it = obj.find("isStdErr"); it != obj.items().end()) {
isStdErr = it->second.asBool();
}
return std::make_shared<const PrintTableHandle>(
tableName, dataColumns, printIdentifier, isStdErr);
}

void PrintTableHandle::registerSerDe() {
auto& registry = DeserializationWithContextRegistryForSharedPtr();
registry.Register("PrintTableHandle", create);
}

} // namespace facebook::velox::connector::print
} // namespace facebook::velox::connector::print
18 changes: 12 additions & 6 deletions velox/connectors/print/PrintTableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,23 @@ class PrintTableHandle : public ConnectorInsertTableHandle {
PrintTableHandle(
std::string tableName,
const RowTypePtr& dataColumns,
const std::string& path);
const std::string& printIdentifier,
bool isStdErr);

const std::string& tableName() {
const std::string& tableName() const {
return tableName_;
}

const RowTypePtr& dataColumns() const {
return dataColumns_;
}

const std::string& path() {
return path_;
const std::string& printIdentifier() const {
return printIdentifier_;
}

bool isStdErr() const {
return isStdErr_;
}

std::string toString() const override;
Expand All @@ -56,7 +61,8 @@ class PrintTableHandle : public ConnectorInsertTableHandle {
private:
const std::string tableName_;
const RowTypePtr dataColumns_;
const std::string path_;
const std::string printIdentifier_;
const bool isStdErr_;
};

} // namespace facebook::velox::connector::print
} // namespace facebook::velox::connector::print
32 changes: 32 additions & 0 deletions velox/connectors/print/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_print_sink_test PrintSinkTest.cpp)
add_executable(velox_print_table_handle_test PrintTableHandleTest.cpp)

add_test(velox_print_sink_test velox_print_sink_test)
add_test(velox_print_table_handle_test velox_print_table_handle_test)

target_link_libraries(
velox_print_sink_test
velox_print_connector
GTest::gtest
GTest::gtest_main)

target_link_libraries(
velox_print_table_handle_test
velox_print_connector
velox_core
GTest::gtest
GTest::gtest_main)
Loading