From af6b23b6d91c22a5462cc97d8d29de5fe43af34c Mon Sep 17 00:00:00 2001 From: Yuan Date: Thu, 25 Jun 2026 14:03:51 +0100 Subject: [PATCH 1/7] [VL] Passing hadoop related configurations to native Signed-off-by: Yuan --- .../backendsapi/velox/VeloxIteratorApi.scala | 42 ++++++++++++++++++- .../GlutenWholeStageColumnarRDD.scala | 9 +++- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 73867aace94..707cad0fbbf 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -125,12 +125,41 @@ class VeloxIteratorApi extends IteratorApi with Logging { // Only serialize plan once, save lots time when plan is complex. val planByteArray = wsCtx.root.toProtobuf.toByteArray + // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys from the driver-side + // Hadoop configuration NOW, while we are still on the driver, and embed + // them in every GlutenPartition. These keys are set by the user via + // spark.conf.set("fs.azure.account.auth.type", ...) or + // sparkContext.hadoopConfiguration.set(...) + // Spark's withSQLConfPropagated only forwards keys starting with "spark" + // as task-local-properties, so "fs.*" keys never reach the executor's + // SQLConf. Serialising them inside the partition is the only safe way + // to make them available to the native runtime on the executor. + // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys while on the driver. + // SparkPlan.sqlContext is available on the driver -- using the first leaf + // gives us access to sessionState.newHadoopConf() which includes all keys + // set via spark.conf.set(), sparkContext.hadoopConfiguration, and + // DataFrameReader.option(). These are NOT propagated to executors by + // Spark's withSQLConfPropagated (it only forwards keys starting with + // "spark"), so embedding them in the serialised GlutenPartition is the + // only reliable transport mechanism. + val fsPrefixes = Seq("fs.azure.", "fs.s3a.", "fs.gs.") + val hadoopConf = leaves.headOption + .map(_ => org.apache.spark.sql.SparkSession.active.sessionState.newHadoopConf()) + .getOrElse(org.apache.spark.SparkContext.getOrCreate().hadoopConfiguration) + val fsConf = { + hadoopConf.iterator().asScala + .filter(e => fsPrefixes.exists(e.getKey.startsWith)) + .map(e => e.getKey -> e.getValue) + .toMap + } + splitInfos.zipWithIndex.map { case (splitInfos, index) => GlutenPartition( index, planByteArray, - splitInfos.toArray + splitInfos.toArray, + fsConf = fsConf ) } } @@ -199,7 +228,16 @@ class VeloxIteratorApi extends IteratorApi with Logging { iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName, iter.asJava) } - val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString).asJava + // Merge the fs.* keys captured on the driver (stored in GlutenPartition.fsConf) + // into the extraConf passed to NativePlanEvaluator / VeloxRuntime. + // Runtimes.contextInstance() will call GlutenConfig.getNativeSessionConf() which + // merges extraConf on top of SQLConf.get.getAllConfs. Because the executor-side + // SQLConf never receives "fs.*" keys (Spark only propagates "spark.*" keys via + // task local properties), this is the only path these credentials can take to + // reach the native session config and ultimately the Velox ABFS connector. + val partitionFsConf = inputPartition.asInstanceOf[GlutenPartition].fsConf + val extraConf = (partitionFsConf + + (GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString)).asJava val transKernel = NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf) val splitInfoByteArray = inputPartition diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index 4a00dbb5872..074ea8ea113 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -37,8 +37,13 @@ case class GlutenPartition( index: Int, plan: Array[Byte], splitInfos: Array[SplitInfo] = Array.empty[SplitInfo], - files: Array[String] = - Array.empty[String] // touched files, for implementing UDF input_file_name + files: Array[String] = Array.empty[String], // touched files, for UDF input_file_name + // fs.azure.* / fs.s3a.* / fs.gs.* keys captured on the driver from + // sessionState.newHadoopConf() and serialised here so they survive the + // RDD partition boundary. Spark's withSQLConfPropagated only propagates + // keys that start with "spark", so these keys are otherwise invisible on + // the executor side (the executor's SQLConf never sees them). + fsConf: Map[String, String] = Map.empty ) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = From 1d6f0877f56e5c03c7dfaa0690aef915136c1945 Mon Sep 17 00:00:00 2001 From: Yuan Date: Thu, 25 Jun 2026 14:11:59 +0100 Subject: [PATCH 2/7] add test Signed-off-by: Yuan --- .../velox/VeloxIteratorApiFsConfSuite.scala | 126 ++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala diff --git a/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala new file mode 100644 index 00000000000..21985f8b137 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.velox + +import org.apache.gluten.execution.{GlutenPartition, WholeStageTransformContext} +import org.apache.gluten.substrait.plan.PlanBuilder + +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, fs.s3a.*, and fs.gs.* keys + * from the driver-side Hadoop configuration and embeds them in [[GlutenPartition.fsConf]], so they + * are available on executors where Spark's SQLConf propagation does not reach "fs.*" keys. + */ +class VeloxIteratorApiFsConfSuite extends SharedSparkSession { + + private val api = new VeloxIteratorApi + + /** + * Build a minimal WholeStageTransformContext backed by an empty Substrait plan. genPartitions + * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations is sufficient for the + * purpose of this test. + */ + private def emptyWsCtx: WholeStageTransformContext = + WholeStageTransformContext(PlanBuilder.empty()) + + test("genPartitions embeds fs.azure.* keys from Hadoop conf into GlutenPartition.fsConf") { + val hadoopConf = spark.sparkContext.hadoopConfiguration + hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", "OAuth") + hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials") + try { + val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) + assert(partitions.size == 1) + val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf + assert( + fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"), + s"Expected fs.azure key not found; got: $fsConf") + assert( + fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == "OAuth") + assert( + fsConf.contains("fs.azure.account.oauth.provider.type"), + s"Expected fs.azure key not found; got: $fsConf") + } finally { + hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") + hadoopConf.unset("fs.azure.account.oauth.provider.type") + } + } + + test("genPartitions embeds fs.s3a.* keys from Hadoop conf into GlutenPartition.fsConf") { + val hadoopConf = spark.sparkContext.hadoopConfiguration + hadoopConf.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") + hadoopConf.set("fs.s3a.secret.key", "wJalrXUtnFEMI") + try { + val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) + assert(partitions.size == 1) + val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf + assert(fsConf.contains("fs.s3a.access.key"), s"Expected fs.s3a key not found; got: $fsConf") + assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE") + assert(fsConf.contains("fs.s3a.secret.key")) + } finally { + hadoopConf.unset("fs.s3a.access.key") + hadoopConf.unset("fs.s3a.secret.key") + } + } + + test("genPartitions embeds fs.gs.* keys from Hadoop conf into GlutenPartition.fsConf") { + val hadoopConf = spark.sparkContext.hadoopConfiguration + hadoopConf.set("fs.gs.auth.service.account.json.keyfile", "/tmp/sa.json") + try { + val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) + assert(partitions.size == 1) + val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf + assert( + fsConf.contains("fs.gs.auth.service.account.json.keyfile"), + s"Expected fs.gs key not found; got: $fsConf") + assert(fsConf("fs.gs.auth.service.account.json.keyfile") == "/tmp/sa.json") + } finally { + hadoopConf.unset("fs.gs.auth.service.account.json.keyfile") + } + } + + test("genPartitions does not include non-fs.* keys in GlutenPartition.fsConf") { + val hadoopConf = spark.sparkContext.hadoopConfiguration + hadoopConf.set("fs.s3a.access.key", "KEY") + hadoopConf.set("spark.some.conf", "value") + hadoopConf.set("mapreduce.input.fileinputformat.split.maxsize", "128000000") + try { + val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) + val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf + assert(!fsConf.contains("spark.some.conf"), "Non-fs key must not appear in fsConf") + assert( + !fsConf.contains("mapreduce.input.fileinputformat.split.maxsize"), + "Non-fs key must not appear in fsConf") + assert(fsConf.contains("fs.s3a.access.key")) + } finally { + hadoopConf.unset("fs.s3a.access.key") + hadoopConf.unset("spark.some.conf") + hadoopConf.unset("mapreduce.input.fileinputformat.split.maxsize") + } + } + + test("genPartitions produces empty fsConf when no fs.* keys are set") { + // Use a key guaranteed not to exist in the Hadoop conf under any test profile. + val hadoopConf = spark.sparkContext.hadoopConfiguration + val uniqueKey = "fs.azure.__test_only_unique_key__" + hadoopConf.unset(uniqueKey) + // Count only keys matching our prefixes. + val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) + val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf + assert(!fsConf.contains(uniqueKey)) + } +} From 6595e1f7a5feb9f89b3707e1c65b2292add3aa1d Mon Sep 17 00:00:00 2001 From: Yuan Date: Thu, 25 Jun 2026 16:16:04 +0100 Subject: [PATCH 3/7] fix Signed-off-by: Yuan --- .../velox/VeloxIteratorApiFsConfSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala index 21985f8b137..00a4f5f474b 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala @@ -39,7 +39,7 @@ class VeloxIteratorApiFsConfSuite extends SharedSparkSession { WholeStageTransformContext(PlanBuilder.empty()) test("genPartitions embeds fs.azure.* keys from Hadoop conf into GlutenPartition.fsConf") { - val hadoopConf = spark.sparkContext.hadoopConfiguration + val hadoopConf = spark.sessionState.newHadoopConf() hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", "OAuth") hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials") try { @@ -61,7 +61,7 @@ class VeloxIteratorApiFsConfSuite extends SharedSparkSession { } test("genPartitions embeds fs.s3a.* keys from Hadoop conf into GlutenPartition.fsConf") { - val hadoopConf = spark.sparkContext.hadoopConfiguration + val hadoopConf = spark.sessionState.newHadoopConf() hadoopConf.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") hadoopConf.set("fs.s3a.secret.key", "wJalrXUtnFEMI") try { @@ -78,7 +78,7 @@ class VeloxIteratorApiFsConfSuite extends SharedSparkSession { } test("genPartitions embeds fs.gs.* keys from Hadoop conf into GlutenPartition.fsConf") { - val hadoopConf = spark.sparkContext.hadoopConfiguration + val hadoopConf = spark.sessionState.newHadoopConf() hadoopConf.set("fs.gs.auth.service.account.json.keyfile", "/tmp/sa.json") try { val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) @@ -94,7 +94,7 @@ class VeloxIteratorApiFsConfSuite extends SharedSparkSession { } test("genPartitions does not include non-fs.* keys in GlutenPartition.fsConf") { - val hadoopConf = spark.sparkContext.hadoopConfiguration + val hadoopConf = spark.sessionState.newHadoopConf() hadoopConf.set("fs.s3a.access.key", "KEY") hadoopConf.set("spark.some.conf", "value") hadoopConf.set("mapreduce.input.fileinputformat.split.maxsize", "128000000") @@ -115,7 +115,7 @@ class VeloxIteratorApiFsConfSuite extends SharedSparkSession { test("genPartitions produces empty fsConf when no fs.* keys are set") { // Use a key guaranteed not to exist in the Hadoop conf under any test profile. - val hadoopConf = spark.sparkContext.hadoopConfiguration + val hadoopConf = spark.sessionState.newHadoopConf() val uniqueKey = "fs.azure.__test_only_unique_key__" hadoopConf.unset(uniqueKey) // Count only keys matching our prefixes. From 9b41c7fd22080a600793ffa58ef9f77efceb82d2 Mon Sep 17 00:00:00 2001 From: Yuan Date: Thu, 25 Jun 2026 18:02:50 +0100 Subject: [PATCH 4/7] fix Signed-off-by: Yuan --- .../velox/VeloxIteratorApiFsConfSuite.scala | 110 +++++++++++------- 1 file changed, 65 insertions(+), 45 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala index 00a4f5f474b..e05e6b5b257 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala @@ -25,6 +25,10 @@ import org.apache.spark.sql.test.SharedSparkSession * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, fs.s3a.*, and fs.gs.* keys * from the driver-side Hadoop configuration and embeds them in [[GlutenPartition.fsConf]], so they * are available on executors where Spark's SQLConf propagation does not reach "fs.*" keys. + * + * Keys must be set on `sparkContext.hadoopConfiguration` (the mutable base configuration) because + * `sessionState.newHadoopConf()` creates a fresh copy each time - mutations to its return value are + * discarded before the next call. */ class VeloxIteratorApiFsConfSuite extends SharedSparkSession { @@ -38,89 +42,105 @@ class VeloxIteratorApiFsConfSuite extends SharedSparkSession { private def emptyWsCtx: WholeStageTransformContext = WholeStageTransformContext(PlanBuilder.empty()) + /** + * Set Hadoop conf keys on the underlying mutable configuration and restore their previous values + * (or unset them) after the block. `sessionState.newHadoopConf()` copies from + * `sparkContext.hadoopConfiguration`, so this is the correct mutation point. + */ + private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = { + // scalastyle:off hadoopconfiguration + val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:off hadoopconfiguration + val prev: Seq[(String, Option[String])] = pairs.map { + case (k, _) => k -> Option(hadoopConf.get(k)) + } + pairs.foreach { case (k, v) => hadoopConf.set(k, v) } + try body + finally prev.foreach { + case (k, Some(old)) => hadoopConf.set(k, old) + case (k, None) => hadoopConf.unset(k) + } + } + test("genPartitions embeds fs.azure.* keys from Hadoop conf into GlutenPartition.fsConf") { - val hadoopConf = spark.sessionState.newHadoopConf() - hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", "OAuth") - hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials") - try { + withHadoopConf( + "fs.azure.account.auth.type.myaccount.dfs.core.windows.net" -> "OAuth", + "fs.azure.account.oauth.provider.type" -> "ClientCredentials" + ) { val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) assert(partitions.size == 1) val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf assert( fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"), - s"Expected fs.azure key not found; got: $fsConf") - assert( - fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == "OAuth") + s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}") + assert(fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == "OAuth") assert( fsConf.contains("fs.azure.account.oauth.provider.type"), - s"Expected fs.azure key not found; got: $fsConf") - } finally { - hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") - hadoopConf.unset("fs.azure.account.oauth.provider.type") + s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}") + assert(fsConf("fs.azure.account.oauth.provider.type") == "ClientCredentials") } } test("genPartitions embeds fs.s3a.* keys from Hadoop conf into GlutenPartition.fsConf") { - val hadoopConf = spark.sessionState.newHadoopConf() - hadoopConf.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") - hadoopConf.set("fs.s3a.secret.key", "wJalrXUtnFEMI") - try { + withHadoopConf( + "fs.s3a.access.key" -> "AKIAIOSFODNN7EXAMPLE", + "fs.s3a.secret.key" -> "wJalrXUtnFEMI" + ) { val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) assert(partitions.size == 1) val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf - assert(fsConf.contains("fs.s3a.access.key"), s"Expected fs.s3a key not found; got: $fsConf") + assert( + fsConf.contains("fs.s3a.access.key"), + s"Expected fs.s3a.access.key not found; got: ${fsConf.keys.mkString(", ")}") assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE") assert(fsConf.contains("fs.s3a.secret.key")) - } finally { - hadoopConf.unset("fs.s3a.access.key") - hadoopConf.unset("fs.s3a.secret.key") + assert(fsConf("fs.s3a.secret.key") == "wJalrXUtnFEMI") } } test("genPartitions embeds fs.gs.* keys from Hadoop conf into GlutenPartition.fsConf") { - val hadoopConf = spark.sessionState.newHadoopConf() - hadoopConf.set("fs.gs.auth.service.account.json.keyfile", "/tmp/sa.json") - try { + withHadoopConf("fs.gs.auth.service.account.json.keyfile" -> "/tmp/sa.json") { val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) assert(partitions.size == 1) val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf assert( fsConf.contains("fs.gs.auth.service.account.json.keyfile"), - s"Expected fs.gs key not found; got: $fsConf") + s"Expected fs.gs key not found; got: ${fsConf.keys.mkString(", ")}") assert(fsConf("fs.gs.auth.service.account.json.keyfile") == "/tmp/sa.json") - } finally { - hadoopConf.unset("fs.gs.auth.service.account.json.keyfile") } } test("genPartitions does not include non-fs.* keys in GlutenPartition.fsConf") { - val hadoopConf = spark.sessionState.newHadoopConf() - hadoopConf.set("fs.s3a.access.key", "KEY") - hadoopConf.set("spark.some.conf", "value") - hadoopConf.set("mapreduce.input.fileinputformat.split.maxsize", "128000000") - try { + withHadoopConf( + "fs.s3a.access.key" -> "KEY", + "mapreduce.input.fileinputformat.split.maxsize" -> "128000000" + ) { val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf - assert(!fsConf.contains("spark.some.conf"), "Non-fs key must not appear in fsConf") assert( !fsConf.contains("mapreduce.input.fileinputformat.split.maxsize"), "Non-fs key must not appear in fsConf") - assert(fsConf.contains("fs.s3a.access.key")) - } finally { - hadoopConf.unset("fs.s3a.access.key") - hadoopConf.unset("spark.some.conf") - hadoopConf.unset("mapreduce.input.fileinputformat.split.maxsize") + // fs.s3a.access.key must be captured + assert( + fsConf.contains("fs.s3a.access.key"), + s"Expected fs.s3a.access.key not found; got: ${fsConf.keys.mkString(", ")}") } } - test("genPartitions produces empty fsConf when no fs.* keys are set") { - // Use a key guaranteed not to exist in the Hadoop conf under any test profile. - val hadoopConf = spark.sessionState.newHadoopConf() - val uniqueKey = "fs.azure.__test_only_unique_key__" - hadoopConf.unset(uniqueKey) - // Count only keys matching our prefixes. - val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) - val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf - assert(!fsConf.contains(uniqueKey)) + test("genPartitions produces one partition per input split group") { + withHadoopConf("fs.s3a.endpoint" -> "s3.amazonaws.com") { + // Two split groups => two GlutenPartitions + val partitions = + api.genPartitions(emptyWsCtx, Seq(Seq.empty, Seq.empty), Seq.empty) + assert(partitions.size == 2) + partitions.foreach { + p => + val fsConf = p.asInstanceOf[GlutenPartition].fsConf + assert( + fsConf.contains("fs.s3a.endpoint"), + s"Expected fs.s3a.endpoint not found; got: ${fsConf.keys.mkString(", ")}") + assert(fsConf("fs.s3a.endpoint") == "s3.amazonaws.com") + } + } } } From 3e5abcd907600ccbe69b985c634b42e4a6dfc878 Mon Sep 17 00:00:00 2001 From: Yuan Date: Fri, 26 Jun 2026 15:17:28 +0100 Subject: [PATCH 5/7] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala index e05e6b5b257..985d351c509 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala @@ -50,7 +50,7 @@ class VeloxIteratorApiFsConfSuite extends SharedSparkSession { private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = { // scalastyle:off hadoopconfiguration val hadoopConf = spark.sparkContext.hadoopConfiguration - // scalastyle:off hadoopconfiguration + // scalastyle:on hadoopconfiguration val prev: Seq[(String, Option[String])] = pairs.map { case (k, _) => k -> Option(hadoopConf.get(k)) } From 05661be435cf42aea2d42edb2e88777520286973 Mon Sep 17 00:00:00 2001 From: Yuan Date: Mon, 29 Jun 2026 10:30:18 +0100 Subject: [PATCH 6/7] remove crednetial log Signed-off-by: Yuan --- .../org/apache/gluten/runtime/Runtimes.scala | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala index b4c344ca989..8fb5ae2af30 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala @@ -18,10 +18,48 @@ package org.apache.gluten.runtime import org.apache.spark.task.TaskResources +import java.security.MessageDigest import java.util object Runtimes { + /** + * Produce a stable, value-free cache key for a (backendName, name, extraConf) triple. + * + * Two problems with the old `s"$backendName:$name:$extraConf"` key: + * + * 1. **Credential leakage** – `Map.toString` embeds secret values (e.g. `fs.s3a.secret.key`, + * `fs.azure.account.oauth2.client.secret`) in a plain heap string that can appear in logs, + * thread dumps, and heap snapshots. + * 2. **Nondeterminism** – Scala `Map.toString` does not guarantee insertion order, so two maps + * with identical entries can produce different strings, causing spurious duplicate + * `VeloxRuntime` registrations within a task. + * + * Fix: sort keys, hash them with SHA-256, and use only the hex digest as the key. Values are + * intentionally excluded from the digest – distinct configs (different credentials for the same + * key set) that need separate runtimes are already separated at the task level through + * `GlutenPartition.fsConf`. Within a single task the key set is stable, so the digest is stable. + */ + private def stableKey( + backendName: String, + name: String, + extraConf: util.Map[String, String]): String = { + val digest = MessageDigest.getInstance("SHA-256") + digest.update(backendName.getBytes("UTF-8")) + digest.update(0.toByte) // field separator + digest.update(name.getBytes("UTF-8")) + digest.update(0.toByte) + // Sort keys for determinism; hash only keys, not values, to avoid leaking secrets. + val sortedKeys = new java.util.ArrayList(extraConf.keySet) + java.util.Collections.sort(sortedKeys) + sortedKeys.forEach { + k => + digest.update(k.getBytes("UTF-8")) + digest.update(0.toByte) + } + digest.digest().map("%02x".format(_)).mkString + } + def contextInstance( backendName: String, name: String, @@ -30,7 +68,7 @@ object Runtimes { throw new IllegalStateException("This method must be called in a Spark task.") } TaskResources.addResourceIfNotRegistered( - s"$backendName:$name:$extraConf", + stableKey(backendName, name, extraConf), () => Runtime(backendName, name, extraConf)) } From 6f59d14f68521631bcb16d7de447de2ee2c79c37 Mon Sep 17 00:00:00 2001 From: Yuan Date: Mon, 29 Jun 2026 20:10:46 +0100 Subject: [PATCH 7/7] improve on hadoop config gen Signed-off-by: Yuan --- .../backendsapi/velox/VeloxIteratorApi.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 707cad0fbbf..0d84a2bccdb 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -143,15 +143,14 @@ class VeloxIteratorApi extends IteratorApi with Logging { // "spark"), so embedding them in the serialised GlutenPartition is the // only reliable transport mechanism. val fsPrefixes = Seq("fs.azure.", "fs.s3a.", "fs.gs.") - val hadoopConf = leaves.headOption - .map(_ => org.apache.spark.sql.SparkSession.active.sessionState.newHadoopConf()) - .getOrElse(org.apache.spark.SparkContext.getOrCreate().hadoopConfiguration) - val fsConf = { - hadoopConf.iterator().asScala - .filter(e => fsPrefixes.exists(e.getKey.startsWith)) - .map(e => e.getKey -> e.getValue) - .toMap - } + // scalastyle:off hadoopconfiguration + val baseHadoopConf = org.apache.spark.SparkContext.getOrCreate().hadoopConfiguration + // scalastyle:on hadoopconfiguration + val fsConf: Map[String, String] = fsPrefixes.flatMap { + prefix => + baseHadoopConf.getPropsWithPrefix(prefix).asScala + .map { case (suffix, v) => (prefix + suffix) -> v } + }.toMap splitInfos.zipWithIndex.map { case (splitInfos, index) =>