Skip to content

[FLINK] Fix print connector: write to stdout/stderr instead of file#49

Merged
zhanglistar merged 1 commit into
bigo-sg:gluten-0530from
ggjh-159:fix/print-sink-multi-parallelism
Jun 23, 2026
Merged

[FLINK] Fix print connector: write to stdout/stderr instead of file#49
zhanglistar merged 1 commit into
bigo-sg:gluten-0530from
ggjh-159:fix/print-sink-multi-parallelism

Conversation

@ggjh-159

@ggjh-159 ggjh-159 commented Jun 18, 2026

Copy link
Copy Markdown

fix: apache/gluten#12306
related pr: bigo-sg/velox4j#39 apache/gluten#12320

Summary

The print connector wrote rows through a dwio Writer against a file path resolved by the planner (<logdir>/taskmanager.out). Under multi-parallelism every subtask opened the same file, and the writer truncated it on open — output from N-1 subtasks was lost, and the surviving stream was interleaved without serialization.

This PR switches the sink to write to std::cout / std::cerr (TM stdout is already redirected to taskmanager.out by Flink) and removes the file plumbing:

  • PrintSink no longer owns a dwio Writer. appendData() formats each row via StringFormatter into a std::stringstream and, guarded by a process-wide std::mutex, writes prefix_ + body + '\n' to the stream selected by isStdErr_.
  • PrintSink::computePrefix(printIdentifier, parallelism, taskIndex) mirrors Flink's PrintSinkOutputWriter.open() prefix logic: empty when parallelism == 1, otherwise <printIdentifier>:<taskIndex+1>> (or <taskIndex+1>> when the identifier is empty, <printIdentifier>> when parallelism is 1 and the identifier is non-empty).
  • PrintTableHandle drops path and gains printIdentifier / isStdErr. serialize() now emits a "name": "PrintTableHandle" discriminator so the Java side (velox4j PolymorphicDeserializer) can round-trip the handle.
  • Constructor reads parallelism / task_index from ConnectorQueryCtx::sessionProperties() via a lambda IIFE in the member-init list, so the prefix is const and computed exactly once.

Test

  • UTs
  • End-to-end via the companion gluten-flink PR (parallelism.default = 2, nexmark q0, 10000 events): every bid row reaches <logdir>/flink-*-taskexecutor-x-*.out, no truncation, and each line carries the 1> / 2> subtask prefix.

@ggjh-159 ggjh-159 changed the title [flink] Fix print connector: write to stdout/stderr instead of file [FLINK] Fix print connector: write to stdout/stderr instead of file Jun 18, 2026

@KevinyhZou KevinyhZou left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@zhanglistar zhanglistar merged commit 81c1e08 into bigo-sg:gluten-0530 Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants