From 4dad7b41d679d71aecf74bcc31401d028afc7244 Mon Sep 17 00:00:00 2001 From: "chenhao.chen" Date: Wed, 24 Jun 2026 17:50:10 +0800 Subject: [PATCH 1/4] add default compress for VeloxColumnarBatchSerializer to reduce memory usage --- .../apache/gluten/config/VeloxConfig.scala | 9 +++++ .../gluten/execution/VeloxHashJoinSuite.scala | 36 +++++++++++++++++++ cpp/velox/compute/VeloxRuntime.cc | 4 ++- cpp/velox/config/VeloxConfig.h | 5 +++ .../VeloxColumnarBatchSerializer.cc | 5 ++- .../serializer/VeloxColumnarBatchSerializer.h | 3 +- .../apache/gluten/config/GlutenConfig.scala | 3 +- 7 files changed, 61 insertions(+), 4 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 1f87ec000a0..ccd219e984e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -631,6 +631,15 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(false) + val COLUMNAR_VELOX_BATCH_SERIALIZER_COMPRESSION = + buildConf("spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression") + .internal() + .doc("which compression for the columnar batch serializer (e.g. broadcast).") + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .checkValues(Set("none", "zstd", "zlib", "snappy", "zstd", "lz4", "gzip")) + .createWithDefault("zstd") + val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_ROWS = buildConf("spark.gluten.velox.abandonDedupHashMap.minRows") .experimental() diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala index a83e0ebf0e6..91af6a32764 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala @@ -445,4 +445,40 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite { } } } + + test("test columnarBatchSerializerCompression") { + Seq("none", "zstd", "zlib", "snappy", "zstd", "lz4", "gzip").foreach( + compression => + withSQLConf( + GlutenConfig.GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key -> "16", + VeloxConfig.VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP.key -> "true", + VeloxConfig.COLUMNAR_VELOX_BATCH_SERIALIZER_COMPRESSION.key -> compression + ) { + withTable("t1", "t2") { + spark.sql(""" + |CREATE TABLE t1 USING PARQUET + |AS SELECT id as c1, id as c2 FROM range(10) + |""".stripMargin) + + spark.sql(""" + |CREATE TABLE t2 USING PARQUET PARTITIONED BY (c1) + |AS SELECT id as c1, id as c2 FROM range(30) + |""".stripMargin) + + val df = spark.sql(""" + |SELECT t1.c2 + |FROM t1, t2 + |WHERE t1.c1 = t2.c1 + |AND t1.c2 < 4 + |""".stripMargin) + + checkAnswer(df, Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil) + + val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) { + case subqueryBroadcast: ColumnarSubqueryBroadcastExec => subqueryBroadcast + } + assert(subqueryBroadcastExecs.size == 1) + } + }) + } } diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index f13430bd0c5..237048553a0 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -629,7 +629,9 @@ std::unique_ptr VeloxRuntime::createColumnarBatchSerial return std::make_unique(arrowPool, veloxPool, cSchema); } #endif - return std::make_unique(arrowPool, veloxPool, cSchema); + auto compressionKind = + veloxCfg_->get(kColumnarBatchSerializerCompression, kColumnarBatchSerializerCompressionDefault); + return std::make_unique(arrowPool, veloxPool, cSchema, compressionKind); } void VeloxRuntime::enableDumping() { diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index d88c4361938..bcddf1e44ac 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -50,6 +50,11 @@ const std::string kSparkShuffleSpillCompress = "spark.shuffle.spill.compress"; const std::string kCompressionKind = "spark.io.compression.codec"; /// The compression codec to use for spilling. Use kCompressionKind if not set. const std::string kSpillCompressionKind = "spark.gluten.sql.columnar.backend.velox.spillCompressionCodec"; + +// Which compression kind to use for the columnar batch serializer (e.g. broadcast). +const std::string kColumnarBatchSerializerCompression = + "spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression"; +const std::string kColumnarBatchSerializerCompressionDefault = "zstd"; const std::string kMaxPartialAggregationMemoryRatio = "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio"; const std::string kMaxPartialAggregationMemory = "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory"; diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc index 6df1ab509c0..479c9a5c3ec 100644 --- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc +++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc @@ -25,6 +25,7 @@ #include "memory/ArrowMemory.h" #include "memory/VeloxColumnarBatch.h" +#include "velox/common/compression/Compression.h" #include "velox/common/memory/Memory.h" #include "velox/vector/FlatVector.h" #include "velox/vector/arrow/Bridge.h" @@ -48,7 +49,8 @@ std::unique_ptr toByteStream(uint8_t* data, int32_t size) { VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer( arrow::MemoryPool* arrowPool, std::shared_ptr veloxPool, - struct ArrowSchema* cSchema) + struct ArrowSchema* cSchema, + const std::string& compressionKind) : ColumnarBatchSerializer(arrowPool), veloxPool_(std::move(veloxPool)) { // serializeColumnarBatches don't need rowType_ if (cSchema != nullptr) { @@ -58,6 +60,7 @@ VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer( arena_ = std::make_unique(veloxPool_.get()); serde_ = std::make_unique(); options_.useLosslessTimestamp = true; + options_.compressionKind = facebook::velox::common::stringToCompressionKind(compressionKind); } void VeloxColumnarBatchSerializer::append(const std::shared_ptr& batch) { diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h index 860c3ec5361..4548a1601f6 100644 --- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h +++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h @@ -42,7 +42,8 @@ class VeloxColumnarBatchSerializer : public ColumnarBatchSerializer { VeloxColumnarBatchSerializer( arrow::MemoryPool* arrowPool, std::shared_ptr veloxPool, - struct ArrowSchema* cSchema); + struct ArrowSchema* cSchema, + const std::string& compressionKind = "none"); void append(const std::shared_ptr& batch) override; diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 053d7151255..416dde0b76d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -528,7 +528,8 @@ object GlutenConfig extends ConfigRegistry { "spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct", "spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks", "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes", - "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan" + "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan", + "spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression" ) /** Get dynamic configs. */ From 22ad4fdf956a2fe8101125709ef25c02cca35cb4 Mon Sep 17 00:00:00 2001 From: "chenhao.chen" Date: Fri, 26 Jun 2026 11:33:39 +0800 Subject: [PATCH 2/4] add default compress for VeloxColumnarBatchSerializer to reduce memory usage --- .../scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala index 91af6a32764..c80950aa1fd 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala @@ -447,7 +447,7 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite { } test("test columnarBatchSerializerCompression") { - Seq("none", "zstd", "zlib", "snappy", "zstd", "lz4", "gzip").foreach( + Seq("none", "zstd", "zlib", "snappy", "lz4", "gzip").foreach( compression => withSQLConf( GlutenConfig.GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key -> "16", From 6ef833abbe0ac6836c7ad0357ca73132c4324758 Mon Sep 17 00:00:00 2001 From: "chenhao.chen" Date: Fri, 26 Jun 2026 11:51:00 +0800 Subject: [PATCH 3/4] add default compress for VeloxColumnarBatchSerializer to reduce memory usage --- .../src/main/scala/org/apache/gluten/config/VeloxConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index ccd219e984e..66f31e3f037 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -637,7 +637,7 @@ object VeloxConfig extends ConfigRegistry { .doc("which compression for the columnar batch serializer (e.g. broadcast).") .stringConf .transform(_.toLowerCase(Locale.ROOT)) - .checkValues(Set("none", "zstd", "zlib", "snappy", "zstd", "lz4", "gzip")) + .checkValues(Set("none", "zstd", "zlib", "snappy", "lz4", "gzip")) .createWithDefault("zstd") val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_ROWS = From 4e706968d353889353df82fbde16e385b02af517 Mon Sep 17 00:00:00 2001 From: "chenhao.chen" Date: Fri, 26 Jun 2026 14:47:15 +0800 Subject: [PATCH 4/4] default to none --- .../src/main/scala/org/apache/gluten/config/VeloxConfig.scala | 2 +- cpp/velox/config/VeloxConfig.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 66f31e3f037..dcb79a462fe 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -638,7 +638,7 @@ object VeloxConfig extends ConfigRegistry { .stringConf .transform(_.toLowerCase(Locale.ROOT)) .checkValues(Set("none", "zstd", "zlib", "snappy", "lz4", "gzip")) - .createWithDefault("zstd") + .createWithDefault("none") val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_ROWS = buildConf("spark.gluten.velox.abandonDedupHashMap.minRows") diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index bcddf1e44ac..ad51066f401 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -54,7 +54,7 @@ const std::string kSpillCompressionKind = "spark.gluten.sql.columnar.backend.vel // Which compression kind to use for the columnar batch serializer (e.g. broadcast). const std::string kColumnarBatchSerializerCompression = "spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression"; -const std::string kColumnarBatchSerializerCompressionDefault = "zstd"; +const std::string kColumnarBatchSerializerCompressionDefault = "none"; const std::string kMaxPartialAggregationMemoryRatio = "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio"; const std::string kMaxPartialAggregationMemory = "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory";