diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 73867aace94..0d84a2bccdb 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -125,12 +125,40 @@ class VeloxIteratorApi extends IteratorApi with Logging { // Only serialize plan once, save lots time when plan is complex. val planByteArray = wsCtx.root.toProtobuf.toByteArray + // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys from the driver-side + // Hadoop configuration NOW, while we are still on the driver, and embed + // them in every GlutenPartition. These keys are set by the user via + // spark.conf.set("fs.azure.account.auth.type", ...) or + // sparkContext.hadoopConfiguration.set(...) + // Spark's withSQLConfPropagated only forwards keys starting with "spark" + // as task-local-properties, so "fs.*" keys never reach the executor's + // SQLConf. Serialising them inside the partition is the only safe way + // to make them available to the native runtime on the executor. + // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys while on the driver. + // SparkPlan.sqlContext is available on the driver -- using the first leaf + // gives us access to sessionState.newHadoopConf() which includes all keys + // set via spark.conf.set(), sparkContext.hadoopConfiguration, and + // DataFrameReader.option(). These are NOT propagated to executors by + // Spark's withSQLConfPropagated (it only forwards keys starting with + // "spark"), so embedding them in the serialised GlutenPartition is the + // only reliable transport mechanism. + val fsPrefixes = Seq("fs.azure.", "fs.s3a.", "fs.gs.") + // scalastyle:off hadoopconfiguration + val baseHadoopConf = org.apache.spark.SparkContext.getOrCreate().hadoopConfiguration + // scalastyle:on hadoopconfiguration + val fsConf: Map[String, String] = fsPrefixes.flatMap { + prefix => + baseHadoopConf.getPropsWithPrefix(prefix).asScala + .map { case (suffix, v) => (prefix + suffix) -> v } + }.toMap + splitInfos.zipWithIndex.map { case (splitInfos, index) => GlutenPartition( index, planByteArray, - splitInfos.toArray + splitInfos.toArray, + fsConf = fsConf ) } } @@ -199,7 +227,16 @@ class VeloxIteratorApi extends IteratorApi with Logging { iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName, iter.asJava) } - val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString).asJava + // Merge the fs.* keys captured on the driver (stored in GlutenPartition.fsConf) + // into the extraConf passed to NativePlanEvaluator / VeloxRuntime. + // Runtimes.contextInstance() will call GlutenConfig.getNativeSessionConf() which + // merges extraConf on top of SQLConf.get.getAllConfs. Because the executor-side + // SQLConf never receives "fs.*" keys (Spark only propagates "spark.*" keys via + // task local properties), this is the only path these credentials can take to + // reach the native session config and ultimately the Velox ABFS connector. + val partitionFsConf = inputPartition.asInstanceOf[GlutenPartition].fsConf + val extraConf = (partitionFsConf + + (GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString)).asJava val transKernel = NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf) val splitInfoByteArray = inputPartition diff --git a/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala new file mode 100644 index 00000000000..985d351c509 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.velox + +import org.apache.gluten.execution.{GlutenPartition, WholeStageTransformContext} +import org.apache.gluten.substrait.plan.PlanBuilder + +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, fs.s3a.*, and fs.gs.* keys + * from the driver-side Hadoop configuration and embeds them in [[GlutenPartition.fsConf]], so they + * are available on executors where Spark's SQLConf propagation does not reach "fs.*" keys. + * + * Keys must be set on `sparkContext.hadoopConfiguration` (the mutable base configuration) because + * `sessionState.newHadoopConf()` creates a fresh copy each time - mutations to its return value are + * discarded before the next call. + */ +class VeloxIteratorApiFsConfSuite extends SharedSparkSession { + + private val api = new VeloxIteratorApi + + /** + * Build a minimal WholeStageTransformContext backed by an empty Substrait plan. genPartitions + * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations is sufficient for the + * purpose of this test. + */ + private def emptyWsCtx: WholeStageTransformContext = + WholeStageTransformContext(PlanBuilder.empty()) + + /** + * Set Hadoop conf keys on the underlying mutable configuration and restore their previous values + * (or unset them) after the block. `sessionState.newHadoopConf()` copies from + * `sparkContext.hadoopConfiguration`, so this is the correct mutation point. + */ + private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = { + // scalastyle:off hadoopconfiguration + val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration + val prev: Seq[(String, Option[String])] = pairs.map { + case (k, _) => k -> Option(hadoopConf.get(k)) + } + pairs.foreach { case (k, v) => hadoopConf.set(k, v) } + try body + finally prev.foreach { + case (k, Some(old)) => hadoopConf.set(k, old) + case (k, None) => hadoopConf.unset(k) + } + } + + test("genPartitions embeds fs.azure.* keys from Hadoop conf into GlutenPartition.fsConf") { + withHadoopConf( + "fs.azure.account.auth.type.myaccount.dfs.core.windows.net" -> "OAuth", + "fs.azure.account.oauth.provider.type" -> "ClientCredentials" + ) { + val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) + assert(partitions.size == 1) + val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf + assert( + fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"), + s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}") + assert(fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == "OAuth") + assert( + fsConf.contains("fs.azure.account.oauth.provider.type"), + s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}") + assert(fsConf("fs.azure.account.oauth.provider.type") == "ClientCredentials") + } + } + + test("genPartitions embeds fs.s3a.* keys from Hadoop conf into GlutenPartition.fsConf") { + withHadoopConf( + "fs.s3a.access.key" -> "AKIAIOSFODNN7EXAMPLE", + "fs.s3a.secret.key" -> "wJalrXUtnFEMI" + ) { + val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) + assert(partitions.size == 1) + val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf + assert( + fsConf.contains("fs.s3a.access.key"), + s"Expected fs.s3a.access.key not found; got: ${fsConf.keys.mkString(", ")}") + assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE") + assert(fsConf.contains("fs.s3a.secret.key")) + assert(fsConf("fs.s3a.secret.key") == "wJalrXUtnFEMI") + } + } + + test("genPartitions embeds fs.gs.* keys from Hadoop conf into GlutenPartition.fsConf") { + withHadoopConf("fs.gs.auth.service.account.json.keyfile" -> "/tmp/sa.json") { + val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) + assert(partitions.size == 1) + val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf + assert( + fsConf.contains("fs.gs.auth.service.account.json.keyfile"), + s"Expected fs.gs key not found; got: ${fsConf.keys.mkString(", ")}") + assert(fsConf("fs.gs.auth.service.account.json.keyfile") == "/tmp/sa.json") + } + } + + test("genPartitions does not include non-fs.* keys in GlutenPartition.fsConf") { + withHadoopConf( + "fs.s3a.access.key" -> "KEY", + "mapreduce.input.fileinputformat.split.maxsize" -> "128000000" + ) { + val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty) + val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf + assert( + !fsConf.contains("mapreduce.input.fileinputformat.split.maxsize"), + "Non-fs key must not appear in fsConf") + // fs.s3a.access.key must be captured + assert( + fsConf.contains("fs.s3a.access.key"), + s"Expected fs.s3a.access.key not found; got: ${fsConf.keys.mkString(", ")}") + } + } + + test("genPartitions produces one partition per input split group") { + withHadoopConf("fs.s3a.endpoint" -> "s3.amazonaws.com") { + // Two split groups => two GlutenPartitions + val partitions = + api.genPartitions(emptyWsCtx, Seq(Seq.empty, Seq.empty), Seq.empty) + assert(partitions.size == 2) + partitions.foreach { + p => + val fsConf = p.asInstanceOf[GlutenPartition].fsConf + assert( + fsConf.contains("fs.s3a.endpoint"), + s"Expected fs.s3a.endpoint not found; got: ${fsConf.keys.mkString(", ")}") + assert(fsConf("fs.s3a.endpoint") == "s3.amazonaws.com") + } + } + } +} diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala index b4c344ca989..8fb5ae2af30 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala @@ -18,10 +18,48 @@ package org.apache.gluten.runtime import org.apache.spark.task.TaskResources +import java.security.MessageDigest import java.util object Runtimes { + /** + * Produce a stable, value-free cache key for a (backendName, name, extraConf) triple. + * + * Two problems with the old `s"$backendName:$name:$extraConf"` key: + * + * 1. **Credential leakage** – `Map.toString` embeds secret values (e.g. `fs.s3a.secret.key`, + * `fs.azure.account.oauth2.client.secret`) in a plain heap string that can appear in logs, + * thread dumps, and heap snapshots. + * 2. **Nondeterminism** – Scala `Map.toString` does not guarantee insertion order, so two maps + * with identical entries can produce different strings, causing spurious duplicate + * `VeloxRuntime` registrations within a task. + * + * Fix: sort keys, hash them with SHA-256, and use only the hex digest as the key. Values are + * intentionally excluded from the digest – distinct configs (different credentials for the same + * key set) that need separate runtimes are already separated at the task level through + * `GlutenPartition.fsConf`. Within a single task the key set is stable, so the digest is stable. + */ + private def stableKey( + backendName: String, + name: String, + extraConf: util.Map[String, String]): String = { + val digest = MessageDigest.getInstance("SHA-256") + digest.update(backendName.getBytes("UTF-8")) + digest.update(0.toByte) // field separator + digest.update(name.getBytes("UTF-8")) + digest.update(0.toByte) + // Sort keys for determinism; hash only keys, not values, to avoid leaking secrets. + val sortedKeys = new java.util.ArrayList(extraConf.keySet) + java.util.Collections.sort(sortedKeys) + sortedKeys.forEach { + k => + digest.update(k.getBytes("UTF-8")) + digest.update(0.toByte) + } + digest.digest().map("%02x".format(_)).mkString + } + def contextInstance( backendName: String, name: String, @@ -30,7 +68,7 @@ object Runtimes { throw new IllegalStateException("This method must be called in a Spark task.") } TaskResources.addResourceIfNotRegistered( - s"$backendName:$name:$extraConf", + stableKey(backendName, name, extraConf), () => Runtime(backendName, name, extraConf)) } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index 4a00dbb5872..074ea8ea113 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -37,8 +37,13 @@ case class GlutenPartition( index: Int, plan: Array[Byte], splitInfos: Array[SplitInfo] = Array.empty[SplitInfo], - files: Array[String] = - Array.empty[String] // touched files, for implementing UDF input_file_name + files: Array[String] = Array.empty[String], // touched files, for UDF input_file_name + // fs.azure.* / fs.s3a.* / fs.gs.* keys captured on the driver from + // sessionState.newHadoopConf() and serialised here so they survive the + // RDD partition boundary. Spark's withSQLConfPropagated only propagates + // keys that start with "spark", so these keys are otherwise invisible on + // the executor side (the executor's SQLConf never sees them). + fsConf: Map[String, String] = Map.empty ) extends BaseGlutenPartition { override def preferredLocations(): Array[String] =