Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,15 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_BATCH_SERIALIZER_COMPRESSION =
buildConf("spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression")
.internal()
.doc("which compression for the columnar batch serializer (e.g. broadcast).")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "zstd", "zlib", "snappy", "lz4", "gzip"))
.createWithDefault("zstd")

val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_ROWS =
buildConf("spark.gluten.velox.abandonDedupHashMap.minRows")
.experimental()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,4 +445,40 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
}
}
}

test("test columnarBatchSerializerCompression") {
Seq("none", "zstd", "zlib", "snappy", "lz4", "gzip").foreach(
compression =>
withSQLConf(
GlutenConfig.GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key -> "16",
VeloxConfig.VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP.key -> "true",
VeloxConfig.COLUMNAR_VELOX_BATCH_SERIALIZER_COMPRESSION.key -> compression
) {
withTable("t1", "t2") {
spark.sql("""
|CREATE TABLE t1 USING PARQUET
|AS SELECT id as c1, id as c2 FROM range(10)
|""".stripMargin)

spark.sql("""
|CREATE TABLE t2 USING PARQUET PARTITIONED BY (c1)
|AS SELECT id as c1, id as c2 FROM range(30)
|""".stripMargin)

val df = spark.sql("""
|SELECT t1.c2
|FROM t1, t2
|WHERE t1.c1 = t2.c1
|AND t1.c2 < 4
|""".stripMargin)

checkAnswer(df, Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil)

val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
case subqueryBroadcast: ColumnarSubqueryBroadcastExec => subqueryBroadcast
}
assert(subqueryBroadcastExecs.size == 1)
}
})
}
}
4 changes: 3 additions & 1 deletion cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,9 @@ std::unique_ptr<ColumnarBatchSerializer> VeloxRuntime::createColumnarBatchSerial
return std::make_unique<VeloxGpuColumnarBatchSerializer>(arrowPool, veloxPool, cSchema);
}
#endif
return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, cSchema);
auto compressionKind =
veloxCfg_->get<std::string>(kColumnarBatchSerializerCompression, kColumnarBatchSerializerCompressionDefault);
return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, cSchema, compressionKind);
}

void VeloxRuntime::enableDumping() {
Expand Down
5 changes: 5 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ const std::string kSparkShuffleSpillCompress = "spark.shuffle.spill.compress";
const std::string kCompressionKind = "spark.io.compression.codec";
/// The compression codec to use for spilling. Use kCompressionKind if not set.
const std::string kSpillCompressionKind = "spark.gluten.sql.columnar.backend.velox.spillCompressionCodec";

// Which compression kind to use for the columnar batch serializer (e.g. broadcast).
const std::string kColumnarBatchSerializerCompression =
"spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression";
const std::string kColumnarBatchSerializerCompressionDefault = "zstd";
const std::string kMaxPartialAggregationMemoryRatio =
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio";
const std::string kMaxPartialAggregationMemory = "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "memory/ArrowMemory.h"
#include "memory/VeloxColumnarBatch.h"
#include "velox/common/compression/Compression.h"
#include "velox/common/memory/Memory.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/arrow/Bridge.h"
Expand All @@ -48,7 +49,8 @@ std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, int32_t size) {
VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
std::shared_ptr<memory::MemoryPool> veloxPool,
struct ArrowSchema* cSchema)
struct ArrowSchema* cSchema,
const std::string& compressionKind)
: ColumnarBatchSerializer(arrowPool), veloxPool_(std::move(veloxPool)) {
// serializeColumnarBatches don't need rowType_
if (cSchema != nullptr) {
Expand All @@ -58,6 +60,7 @@ VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer(
arena_ = std::make_unique<StreamArena>(veloxPool_.get());
serde_ = std::make_unique<serializer::presto::PrestoVectorSerde>();
options_.useLosslessTimestamp = true;
options_.compressionKind = facebook::velox::common::stringToCompressionKind(compressionKind);
}

void VeloxColumnarBatchSerializer::append(const std::shared_ptr<ColumnarBatch>& batch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class VeloxColumnarBatchSerializer : public ColumnarBatchSerializer {
VeloxColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
struct ArrowSchema* cSchema);
struct ArrowSchema* cSchema,
const std::string& compressionKind = "none");

void append(const std::shared_ptr<ColumnarBatch>& batch) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@ object GlutenConfig extends ConfigRegistry {
"spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct",
"spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks",
"spark.gluten.sql.columnar.backend.velox.preferredBatchBytes",
"spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan"
"spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan",
"spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression"
)

/** Get dynamic configs. */
Expand Down
Loading