From 37d4051d010f55c8cd150ede1f9a56a476dc8bbd Mon Sep 17 00:00:00 2001 From: brijrajk <22271048+brijrajk@users.noreply.github.com> Date: Tue, 23 Jun 2026 00:34:53 +0530 Subject: [PATCH] [GLUTEN-10992][VL] Fix MatchError for KeyGroupedPartitioning in native shuffle When Spark 4.0's V2 bucketing shuffle (spark.sql.v2.bucketing.shuffle.enabled=true) is used in a join where only one side reports partitioning, Spark generates a ShuffleExchangeExec with KeyGroupedPartitioning as its output. The default case in VeloxSparkPlanExecApi.genColumnarShuffleExchange created a ColumnarShuffleExchangeExec for this node, which then crashed with a scala.MatchError in ExecUtil.genShuffleDependency because KeyGroupedPartitioning was not handled in the native partitioning match. Fix by adding an explicit KeyGroupedPartitioning case to genColumnarShuffleExchange that marks the shuffle for fallback to vanilla Spark. Also harden ExecUtil.genShuffleDependency with an explicit wildcard that throws GlutenNotSupportException instead of a cryptic MatchError for any future unknown partitioning types. The exception now embeds the full partitioning toString (expressions, numPartitions) to aid debugging. Add a dedicated GlutenKeyGroupedPartitioningSuite test (spark40 and spark41) that asserts the KeyGroupedPartitioning shuffle falls back to a vanilla ShuffleExchangeExec and is never offloaded to ColumnarShuffleExchangeExec. Co-Authored-By: Claude Sonnet 4.6 Co-Authored-By: Claude Opus 4.8 (1M context) --- .../velox/VeloxSparkPlanExecApi.scala | 6 +++ .../spark/sql/execution/utils/ExecUtil.scala | 4 ++ .../GlutenKeyGroupedPartitioningSuite.scala | 46 +++++++++++++++++++ .../GlutenKeyGroupedPartitioningSuite.scala | 46 +++++++++++++++++++ 4 files changed, 102 insertions(+) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index be21337d994..b4f027acc68 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -470,6 +470,12 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { } } } + case _: KeyGroupedPartitioning => + FallbackTags.add( + shuffle, + ValidationResult.failed( + "KeyGroupedPartitioning is not supported by Gluten native shuffle")) + shuffle.withNewChildren(child :: Nil) case _ => ColumnarShuffleExchangeExec(shuffle, child, null) } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala index dcfa0ee525c..d2f6c2a74b8 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.utils import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches} import org.apache.gluten.config.ShuffleWriterType +import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.iterator.Iterators import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.runtime.Runtimes @@ -172,6 +173,9 @@ object ExecUtil { // range partitioning fall back to row-based partition id computation case RangePartitioning(orders, n) => new NativePartitioning(GlutenShuffleUtils.RangePartitioningShortName, n) + case other => + throw new GlutenNotSupportException( + s"Partitioning $other is not supported by native shuffle") } val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] && diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenKeyGroupedPartitioningSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenKeyGroupedPartitioningSuite.scala index afe0cb7969c..3826e50cc81 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenKeyGroupedPartitioningSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenKeyGroupedPartitioningSuite.scala @@ -21,6 +21,7 @@ import org.apache.gluten.execution.SortMergeJoinExecTransformer import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait, Row} +import org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.distributions.Distributions import org.apache.spark.sql.connector.expressions.Expressions.{bucket, days, identity, years} @@ -1072,6 +1073,51 @@ class GlutenKeyGroupedPartitioningSuite } } + testGluten( + "GLUTEN-10992: KeyGroupedPartitioning shuffle falls back to vanilla Spark") { + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + + sql( + s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + createTable(purchases, purchasesColumns, Array.empty) + sql( + s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 42.0, cast('2020-01-01' as timestamp)), " + + "(3, 19.5, cast('2020-02-01' as timestamp))") + + // With V2 bucketing shuffle enabled and only one side reporting partitioning, Spark + // shuffles the other side with a ShuffleExchangeExec whose output partitioning is + // KeyGroupedPartitioning. Gluten native shuffle does not support it, so the exchange + // must fall back to vanilla Spark. Offloading it to ColumnarShuffleExchangeExec would + // crash with a scala.MatchError in ExecUtil.genShuffleDependency (GLUTEN-10992). + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true") { + val df = createJoinTestDF(Seq("id" -> "item_id")) + val plan = df.queryExecution.executedPlan + + val keyGroupedShuffles = collect(plan) { + case s: ShuffleExchangeExec + if s.outputPartitioning.isInstanceOf[KeyGroupedPartitioning] => + s + } + assert( + keyGroupedShuffles.nonEmpty, + "KeyGroupedPartitioning shuffle should fall back to a vanilla ShuffleExchangeExec") + + val columnarKeyGroupedShuffles = collectAllShuffles(plan) + .filter(_.outputPartitioning.isInstanceOf[KeyGroupedPartitioning]) + assert( + columnarKeyGroupedShuffles.isEmpty, + "KeyGroupedPartitioning must not be offloaded to ColumnarShuffleExchangeExec") + + checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5))) + } + } + testGluten("SPARK-41471: shuffle one side: only one side reports partitioning") { val items_partitions = Array(identity("id")) createTable(items, itemsColumns, items_partitions) diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/connector/GlutenKeyGroupedPartitioningSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/connector/GlutenKeyGroupedPartitioningSuite.scala index 8e2e4ca47f0..5f6793e0e85 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/connector/GlutenKeyGroupedPartitioningSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/connector/GlutenKeyGroupedPartitioningSuite.scala @@ -21,6 +21,7 @@ import org.apache.gluten.execution.SortMergeJoinExecTransformer import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait, Row} +import org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.distributions.Distributions import org.apache.spark.sql.connector.expressions.Expressions.{bucket, days, identity, years} @@ -1072,6 +1073,51 @@ class GlutenKeyGroupedPartitioningSuite } } + testGluten( + "GLUTEN-10992: KeyGroupedPartitioning shuffle falls back to vanilla Spark") { + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + + sql( + s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + createTable(purchases, purchasesColumns, Array.empty) + sql( + s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 42.0, cast('2020-01-01' as timestamp)), " + + "(3, 19.5, cast('2020-02-01' as timestamp))") + + // With V2 bucketing shuffle enabled and only one side reporting partitioning, Spark + // shuffles the other side with a ShuffleExchangeExec whose output partitioning is + // KeyGroupedPartitioning. Gluten native shuffle does not support it, so the exchange + // must fall back to vanilla Spark. Offloading it to ColumnarShuffleExchangeExec would + // crash with a scala.MatchError in ExecUtil.genShuffleDependency (GLUTEN-10992). + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true") { + val df = createJoinTestDF(Seq("id" -> "item_id")) + val plan = df.queryExecution.executedPlan + + val keyGroupedShuffles = collect(plan) { + case s: ShuffleExchangeExec + if s.outputPartitioning.isInstanceOf[KeyGroupedPartitioning] => + s + } + assert( + keyGroupedShuffles.nonEmpty, + "KeyGroupedPartitioning shuffle should fall back to a vanilla ShuffleExchangeExec") + + val columnarKeyGroupedShuffles = collectAllShuffles(plan) + .filter(_.outputPartitioning.isInstanceOf[KeyGroupedPartitioning]) + assert( + columnarKeyGroupedShuffles.isEmpty, + "KeyGroupedPartitioning must not be offloaded to ColumnarShuffleExchangeExec") + + checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5))) + } + } + testGluten("SPARK-41471: shuffle one side: only one side reports partitioning") { val items_partitions = Array(identity("id")) createTable(items, itemsColumns, items_partitions)