From 2a8aa6ad51fa45e8fe1d596fc8e7ac28786d9214 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 26 Mar 2026 17:36:44 +0100 Subject: [PATCH 1/5] [SPARK-56241][SQL] Derive outputOrdering from KeyedPartitioning key expressions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Within a `KeyedPartitioning` partition, all rows share the same key value, so the key expressions are trivially sorted (ascending) within each partition. This PR makes two plan nodes expose that structural guarantee via `outputOrdering`: - **`DataSourceV2ScanExecBase`**: when `outputPartitioning` is a `KeyedPartitioning` and the source reports no ordering via `SupportsReportOrdering`, derive one ascending `SortOrder` per key expression. When the source does report ordering, it is returned as-is. - **`GroupPartitionsExec`**: - *Non-coalescing* (every group has ≤ 1 input partition): pass through `child.outputOrdering` unchanged. - *Coalescing without reducers*: re-derive ordering from the output `KeyedPartitioning` key expressions; a join may embed multiple `KeyedPartitioning`s with different expressions — expose equivalences via `sameOrderExpressions`. - *Coalescing with reducers*: fall back to `super.outputOrdering` (empty), because merged partitions share only the reduced key. ### Why are the changes needed? Before this change, `outputOrdering` on both nodes returned an empty sequence (unless `SupportsReportOrdering` was implemented), even though the within- partition ordering was structurally guaranteed by the partitioning itself. As a result, `EnsureRequirements` would insert a redundant `SortExec` before `SortMergeJoin` inputs that are already in key order. ### Does this PR introduce _any_ user-facing change? Yes. Queries involving storage-partitioned joins (v2 bucketing) no longer add a redundant `SortExec` before `SortMergeJoin` when the join keys match the partition keys, reducing CPU and memory overhead. ### How was this patch tested? - New unit test class `GroupPartitionsExecSuite` covering all four `outputOrdering` branches (non-coalescing, coalescing without reducers with single and multi-key, join `sameOrderExpressions`, coalescing with reducers). - New SQL integration tests in `KeyGroupedPartitioningSuite` (SPARK-56241): - Scan with `KeyedPartitioning` reports key-derived `outputOrdering`. - Non-coalescing `GroupPartitionsExec` (non-identical key sets) passes through child ordering — no pre-join `SortExec`. - Coalescing `GroupPartitionsExec` derives ordering from key expressions — no pre-join `SortExec`. - Updated expected output in `DataSourceV2Suite` for the case where a source is partitioned by a key with no reported ordering — groupBy on the partition key no longer requires a sort. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 --- .../v2/DataSourceV2ScanExecBase.scala | 22 +++- .../datasources/v2/GroupPartitionsExec.scala | 24 +++- .../sql/connector/DataSourceV2Suite.scala | 5 +- .../KeyGroupedPartitioningSuite.scala | 115 +++++++++++++++++- .../v2/GroupPartitionsExecSuite.scala | 106 ++++++++++++++++ 5 files changed, 261 insertions(+), 11 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 877e65341c1c8..494617dfc4b44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, RowOrdering, SortOrder} import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.KeyedPartitioning import org.apache.spark.sql.catalyst.util.truncatedString @@ -104,11 +104,23 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { } /** - * Returns the output ordering from the data source if available, otherwise falls back - * to the default (no ordering). This allows data sources to report their natural ordering - * through `SupportsReportOrdering`. + * Returns the output ordering for this scan. When the source reports ordering via + * `SupportsReportOrdering`, that ordering is returned as-is. Otherwise, when the output + * partitioning is a `KeyedPartitioning`, each partition contains rows where the key expressions + * evaluate to a single constant value, so the data is trivially sorted by those expressions + * within the partition. */ - override def outputOrdering: Seq[SortOrder] = ordering.getOrElse(super.outputOrdering) + override def outputOrdering: Seq[SortOrder] = { + val reportedOrdering = ordering.getOrElse(Seq.empty) + if (reportedOrdering.nonEmpty) { + reportedOrdering + } else { + outputPartitioning match { + case k: KeyedPartitioning => k.expressions.map(SortOrder(_, Ascending)) + case _ => Seq.empty + } + } + } override def supportsColumnar: Boolean = { scan.columnarSupportMode() match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala index 7ed394df8b300..7a09afddef49c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala @@ -184,11 +184,31 @@ case class GroupPartitionsExec( copy(child = newChild) override def outputOrdering: Seq[SortOrder] = { - // when multiple partitions are grouped together, ordering inside partitions is not preserved if (groupedPartitions.forall(_._2.size <= 1)) { + // No coalescing: each output partition is exactly one input partition. The child's + // within-partition ordering is fully preserved (including any key-derived ordering that + // `DataSourceV2ScanExecBase` already prepended). child.outputOrdering } else { - super.outputOrdering + // Coalescing: multiple input partitions are merged into one output partition. The child's + // within-partition ordering is lost due to concatenation, so we rederive ordering purely from + // the key expressions. A join may embed multiple `KeyedPartitioning`s (one per join side) + // within a single expression tree; they share the same partitionKeys but carry different + // expressions. Collect them all and expose each position's equivalent expressions via + // `sameOrderExpressions` so the planner can use any of them for ordering checks. + outputPartitioning match { + case p: Partitioning with Expression if reducers.isEmpty => + // Without reducers all merged partitions share the same original key value, so the key + // expressions remain constant within the output partition. + val keyedPartitionings = p.collect { case k: KeyedPartitioning => k } + keyedPartitionings.map(_.expressions).transpose.map { exprs => + SortOrder(exprs.head, Ascending, sameOrderExpressions = exprs.tail) + } + case _ => + // With reducers, merged partitions share only the reduced key, not the original key + // expressions, which can take different values within the output partition. + super.outputOrdering + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index a09b7e0827c49..cc9d1f4c48812 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -313,8 +313,9 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS Seq( // with no partitioning and no order, we expect shuffling AND sorting (None, None, (true, true), (true, true)), - // partitioned by i and no order, we expect NO shuffling BUT sorting - (Some("i"), None, (false, true), (false, true)), + // partitioned by i and no order, + // we expect NO shuffling AND sorting for groupBy BUT sorting for window function + (Some("i"), None, (false, false), (false, true)), // partitioned by i and in-partition sorted by i, // we expect NO shuffling AND sorting for groupBy but sorting for window function (Some("i"), Some("i"), (false, false), (false, true)), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 688196b47502e..43b9b058bdee3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -22,14 +22,14 @@ import java.util.Collections import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression} +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Literal, TransformExpression} import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.distributions.Distributions import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.connector.expressions.Expressions._ -import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SparkPlan} +import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SortExec, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation, GroupPartitionsExec} import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.SortMergeJoinExec @@ -3568,4 +3568,115 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with } } } + + test("SPARK-56241: scan with KeyedPartitioning reports key-derived outputOrdering") { + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(3, 'cc', 30.0, cast('2021-01-01' as timestamp)), " + + "(1, 'aa', 10.0, cast('2022-01-01' as timestamp)), " + + "(2, 'bb', 20.0, cast('2022-01-01' as timestamp))") + + val df = sql(s"SELECT id, name FROM testcat.ns.$items") + val plan = df.queryExecution.executedPlan + val scans = collectScans(plan) + assert(scans.size === 1) + // The scan's outputOrdering should include an ascending sort on the partition key `id`, + // derived from the KeyedPartitioning - regardless of SupportsReportOrdering. + val ordering = scans.head.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.direction === Ascending) + // identity transforms are unwrapped to AttributeReferences by V2ExpressionUtils. + val keyExpr = ordering.head.child + assert(keyExpr.isInstanceOf[AttributeReference]) + assert(keyExpr.asInstanceOf[AttributeReference].name === "id") + } + + test("SPARK-56241: GroupPartitionsExec non-coalescing passes through child ordering, " + + "no pre-join SortExec needed before SortMergeJoin") { + // Non-identical key sets force GroupPartitionsExec to be inserted on both sides align them, + // but each group has exactly one partition — no coalescing. + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " + + "(2, 'bb', 20.0, cast('2021-01-01' as timestamp)), " + + "(3, 'cc', 30.0, cast('2021-01-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 100.0, cast('2021-01-01' as timestamp)), " + + "(2, 200.0, cast('2021-01-01' as timestamp))") + + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} + |i.id, i.name + |FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id + |""".stripMargin) + + checkAnswer(df, Seq(Row(1, "aa"), Row(2, "bb"))) + + val plan = df.queryExecution.executedPlan + val groupPartitions = collectGroupPartitions(plan) + assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan") + assert(groupPartitions.forall(_.groupedPartitions.forall(_._2.size <= 1)), + "expected non-coalescing GroupPartitionsExec") + // GroupPartitionsExec passes through the child's key-derived outputOrdering. + // EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before + // the SMJ. + val smjs = collect(plan) { case j: SortMergeJoinExec => j } + assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan") + smjs.foreach { smj => + val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s }) + assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering passes through " + + "non-coalescing GroupPartitions") + } + } + + test("SPARK-56241: GroupPartitionsExec coalescing derives ordering from key expressions, " + + "no pre-join SortExec needed before SortMergeJoin") { + // Duplicate key 1 on both sides causes coalescing. + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " + + "(1, 'ab', 11.0, cast('2021-06-01' as timestamp)), " + + "(2, 'bb', 20.0, cast('2021-01-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 100.0, cast('2021-01-01' as timestamp)), " + + "(1, 110.0, cast('2021-06-01' as timestamp)), " + + "(2, 200.0, cast('2021-01-01' as timestamp))") + + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} + |i.id, i.name + |FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id + |""".stripMargin) + + checkAnswer(df, Seq( + Row(1, "aa"), Row(1, "aa"), Row(1, "ab"), Row(1, "ab"), + Row(2, "bb"))) + + val plan = df.queryExecution.executedPlan + val groupPartitions = collectGroupPartitions(plan) + assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan") + assert(groupPartitions.exists(_.groupedPartitions.exists(_._2.size > 1)), + "expected coalescing GroupPartitionsExec") + // GroupPartitionsExec derives outputOrdering from the key expressions after coalescing. + // EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before + // the SMJ. + val smjs = collect(plan) { case j: SortMergeJoinExec => j } + assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan") + smjs.foreach { smj => + val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s }) + assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering is derived " + + "from coalesced partition key") + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala new file mode 100644 index 0000000000000..012c0b1027d49 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, PartitioningCollection} +import org.apache.spark.sql.execution.DummySparkPlan +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.IntegerType + +class GroupPartitionsExecSuite extends SharedSparkSession { + + private val exprA = AttributeReference("a", IntegerType)() + private val exprB = AttributeReference("b", IntegerType)() + + private def row(a: Int): InternalRow = InternalRow.fromSeq(Seq(a)) + private def row(a: Int, b: Int): InternalRow = InternalRow.fromSeq(Seq(a, b)) + + test("SPARK-56241: non-coalescing passes through child ordering unchanged") { + // Each partition has a distinct key — no coalescing happens. + val partitionKeys = Seq(row(1), row(2), row(3)) + val childOrdering = Seq(SortOrder(exprA, Ascending)) + val child = DummySparkPlan( + outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys), + outputOrdering = childOrdering) + val gpe = GroupPartitionsExec(child) + + assert(gpe.groupedPartitions.forall(_._2.size <= 1), "expected non-coalescing") + assert(gpe.outputOrdering === childOrdering) + } + + test("SPARK-56241: coalescing without reducers returns key-derived ordering") { + // Key 1 appears on partitions 0 and 2, causing coalescing. + val partitionKeys = Seq(row(1), row(2), row(1)) + val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys)) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + assert(ordering.head.direction === Ascending) + assert(ordering.head.sameOrderExpressions.isEmpty) + } + + test("SPARK-56241: coalescing without reducers returns one SortOrder per key expression") { + // Multi-key partition: key (1,10) appears on partitions 0 and 2, causing coalescing. + val partitionKeys = Seq(row(1, 10), row(2, 20), row(1, 10)) + val child = DummySparkPlan( + outputPartitioning = KeyedPartitioning(Seq(exprA, exprB), partitionKeys)) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + val ordering = gpe.outputOrdering + assert(ordering.length === 2) + assert(ordering.head.child === exprA) + assert(ordering(1).child === exprB) + assert(ordering.head.sameOrderExpressions.isEmpty) + assert(ordering(1).sameOrderExpressions.isEmpty) + } + + test("SPARK-56241: coalescing join case exposes sameOrderExpressions across join sides") { + // PartitioningCollection wraps two KeyedPartitionings (one per join side), sharing the same + // partition keys. Key 1 coalesces partitions 0 and 2. + val partitionKeys = Seq(row(1), row(2), row(1)) + val leftKP = KeyedPartitioning(Seq(exprA), partitionKeys) + val rightKP = KeyedPartitioning(Seq(exprB), partitionKeys) + val child = DummySparkPlan(outputPartitioning = PartitioningCollection(Seq(leftKP, rightKP))) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + assert(ordering.head.sameOrderExpressions === Seq(exprB)) + } + + test("SPARK-56241: coalescing with reducers returns empty ordering") { + // When reducers are present, the original key expressions are not constant within the merged + // partition, so outputOrdering falls back to the default (empty). + val partitionKeys = Seq(row(1), row(2), row(1)) + val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys)) + // reducers = Some(Seq(None)) - None element means identity reducer; the important thing is + // that reducers.isDefined, which triggers the fallback. + val gpe = GroupPartitionsExec(child, reducers = Some(Seq(None))) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + assert(gpe.outputOrdering === Nil) + } +} From aae75741cdc0f8d47e3a0eec7c96626a87c13ae7 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Sat, 28 Mar 2026 10:00:16 +0100 Subject: [PATCH 2/5] fix review findings --- .../datasources/v2/DataSourceV2ScanExecBase.scala | 12 ++++-------- .../datasources/v2/GroupPartitionsExec.scala | 11 +++++++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 494617dfc4b44..f557fa0cac2ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -111,14 +111,10 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { * within the partition. */ override def outputOrdering: Seq[SortOrder] = { - val reportedOrdering = ordering.getOrElse(Seq.empty) - if (reportedOrdering.nonEmpty) { - reportedOrdering - } else { - outputPartitioning match { - case k: KeyedPartitioning => k.expressions.map(SortOrder(_, Ascending)) - case _ => Seq.empty - } + (ordering, outputPartitioning) match { + case (Some(o), _) => o + case (_, k: KeyedPartitioning) => k.expressions.map(SortOrder(_, Ascending)) + case _ => Seq.empty } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala index 7a09afddef49c..137db2f7f77c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala @@ -191,10 +191,13 @@ case class GroupPartitionsExec( child.outputOrdering } else { // Coalescing: multiple input partitions are merged into one output partition. The child's - // within-partition ordering is lost due to concatenation, so we rederive ordering purely from - // the key expressions. A join may embed multiple `KeyedPartitioning`s (one per join side) - // within a single expression tree; they share the same partitionKeys but carry different - // expressions. Collect them all and expose each position's equivalent expressions via + // within-partition ordering is lost due to concatenation, so we re-derive ordering purely + // from the key expressions. For example, if two input partitions both belong to the same + // output group (same partition key value) and hold [1, 3] and [2, 5] respectively (each + // sorted ascending), concatenating them yields [1, 3, 2, 5] which is no longer sorted. + // A join may embed multiple `KeyedPartitioning`s (one per join side) within a single + // expression tree; they share the same partitionKeys but carry different expressions. + // Collect them all and expose each position's equivalent expressions via // `sameOrderExpressions` so the planner can use any of them for ordering checks. outputPartitioning match { case p: Partitioning with Expression if reducers.isEmpty => From cf295d8a1ea59611c9395a4c938594d39862eaf6 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Sat, 28 Mar 2026 11:21:17 +0100 Subject: [PATCH 3/5] refine logic --- .../datasources/v2/GroupPartitionsExec.scala | 23 ++++++------ .../sql/connector/DataSourceV2Suite.scala | 5 +-- .../v2/GroupPartitionsExecSuite.scala | 37 +++++++++++++++---- 3 files changed, 43 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala index 137db2f7f77c1..71c1b1f23e8b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala @@ -191,22 +191,21 @@ case class GroupPartitionsExec( child.outputOrdering } else { // Coalescing: multiple input partitions are merged into one output partition. The child's - // within-partition ordering is lost due to concatenation, so we re-derive ordering purely - // from the key expressions. For example, if two input partitions both belong to the same - // output group (same partition key value) and hold [1, 3] and [2, 5] respectively (each - // sorted ascending), concatenating them yields [1, 3, 2, 5] which is no longer sorted. - // A join may embed multiple `KeyedPartitioning`s (one per join side) within a single - // expression tree; they share the same partitionKeys but carry different expressions. - // Collect them all and expose each position's equivalent expressions via - // `sameOrderExpressions` so the planner can use any of them for ordering checks. + // within-partition ordering is lost due to concatenation -- for example, if two input + // partitions both belong to the same output group (same partition key value) and hold + // [1, 3] and [2, 5] respectively (each sorted ascending), concatenating them yields + // [1, 3, 2, 5] which is no longer sorted. Only sort orders over partition key expressions + // (which are constant across all merged partitions) remain valid. outputPartitioning match { case p: Partitioning with Expression if reducers.isEmpty => // Without reducers all merged partitions share the same original key value, so the key - // expressions remain constant within the output partition. + // expressions remain constant within the output partition. The child's outputOrdering + // should already be in sync with the partitioning (either reported by the source or + // derived from it in DataSourceV2ScanExecBase), so we only need to keep the sort orders + // whose expression is a partition key expression -- all others are lost by concatenation. val keyedPartitionings = p.collect { case k: KeyedPartitioning => k } - keyedPartitionings.map(_.expressions).transpose.map { exprs => - SortOrder(exprs.head, Ascending, sameOrderExpressions = exprs.tail) - } + val keyExprs = ExpressionSet(keyedPartitionings.flatMap(_.expressions)) + child.outputOrdering.filter(order => keyExprs.contains(order.child)) case _ => // With reducers, merged partitions share only the reduced key, not the original key // expressions, which can take different values within the output partition. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index cc9d1f4c48812..a09b7e0827c49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -313,9 +313,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS Seq( // with no partitioning and no order, we expect shuffling AND sorting (None, None, (true, true), (true, true)), - // partitioned by i and no order, - // we expect NO shuffling AND sorting for groupBy BUT sorting for window function - (Some("i"), None, (false, false), (false, true)), + // partitioned by i and no order, we expect NO shuffling BUT sorting + (Some("i"), None, (false, true), (false, true)), // partitioned by i and in-partition sorted by i, // we expect NO shuffling AND sorting for groupBy but sorting for window function (Some("i"), Some("i"), (false, false), (false, true)), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala index 012c0b1027d49..00705ee9676f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala @@ -28,6 +28,7 @@ class GroupPartitionsExecSuite extends SharedSparkSession { private val exprA = AttributeReference("a", IntegerType)() private val exprB = AttributeReference("b", IntegerType)() + private val exprC = AttributeReference("c", IntegerType)() private def row(a: Int): InternalRow = InternalRow.fromSeq(Seq(a)) private def row(a: Int, b: Int): InternalRow = InternalRow.fromSeq(Seq(a, b)) @@ -45,10 +46,12 @@ class GroupPartitionsExecSuite extends SharedSparkSession { assert(gpe.outputOrdering === childOrdering) } - test("SPARK-56241: coalescing without reducers returns key-derived ordering") { + test("SPARK-56241: coalescing without reducers keeps key-expression orders from child") { // Key 1 appears on partitions 0 and 2, causing coalescing. val partitionKeys = Seq(row(1), row(2), row(1)) - val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys)) + val child = DummySparkPlan( + outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys), + outputOrdering = Seq(SortOrder(exprA, Ascending))) val gpe = GroupPartitionsExec(child) assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") @@ -59,11 +62,12 @@ class GroupPartitionsExecSuite extends SharedSparkSession { assert(ordering.head.sameOrderExpressions.isEmpty) } - test("SPARK-56241: coalescing without reducers returns one SortOrder per key expression") { + test("SPARK-56241: coalescing without reducers keeps one SortOrder per key expression") { // Multi-key partition: key (1,10) appears on partitions 0 and 2, causing coalescing. val partitionKeys = Seq(row(1, 10), row(2, 20), row(1, 10)) val child = DummySparkPlan( - outputPartitioning = KeyedPartitioning(Seq(exprA, exprB), partitionKeys)) + outputPartitioning = KeyedPartitioning(Seq(exprA, exprB), partitionKeys), + outputOrdering = Seq(SortOrder(exprA, Ascending), SortOrder(exprB, Ascending))) val gpe = GroupPartitionsExec(child) assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") @@ -75,13 +79,16 @@ class GroupPartitionsExecSuite extends SharedSparkSession { assert(ordering(1).sameOrderExpressions.isEmpty) } - test("SPARK-56241: coalescing join case exposes sameOrderExpressions across join sides") { + test("SPARK-56241: coalescing join case preserves sameOrderExpressions from child") { // PartitioningCollection wraps two KeyedPartitionings (one per join side), sharing the same - // partition keys. Key 1 coalesces partitions 0 and 2. + // partition keys. Key 1 coalesces partitions 0 and 2. The child (e.g. SortMergeJoinExec) + // already carries sameOrderExpressions linking both sides' key expressions. val partitionKeys = Seq(row(1), row(2), row(1)) val leftKP = KeyedPartitioning(Seq(exprA), partitionKeys) val rightKP = KeyedPartitioning(Seq(exprB), partitionKeys) - val child = DummySparkPlan(outputPartitioning = PartitioningCollection(Seq(leftKP, rightKP))) + val child = DummySparkPlan( + outputPartitioning = PartitioningCollection(Seq(leftKP, rightKP)), + outputOrdering = Seq(SortOrder(exprA, Ascending, sameOrderExpressions = Seq(exprB)))) val gpe = GroupPartitionsExec(child) assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") @@ -91,6 +98,22 @@ class GroupPartitionsExecSuite extends SharedSparkSession { assert(ordering.head.sameOrderExpressions === Seq(exprB)) } + test("SPARK-56241: coalescing drops non-key sort orders from child") { + // exprA is the partition key; exprC is a non-key sort order the child also reports + // (e.g. a secondary sort within each partition). After coalescing, exprC ordering is lost + // by concatenation, so only the exprA order should survive. + val partitionKeys = Seq(row(1), row(2), row(1)) + val child = DummySparkPlan( + outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys), + outputOrdering = Seq(SortOrder(exprA, Ascending), SortOrder(exprC, Ascending))) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + } + test("SPARK-56241: coalescing with reducers returns empty ordering") { // When reducers are present, the original key expressions are not constant within the merged // partition, so outputOrdering falls back to the default (empty). From 492a875fbe3383a361425aba9478d72b7a869678 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 30 Mar 2026 11:03:27 +0200 Subject: [PATCH 4/5] add new spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled config --- .../org/apache/spark/sql/internal/SQLConf.scala | 15 +++++++++++++++ .../datasources/v2/DataSourceV2ScanExecBase.scala | 10 ++++++---- .../connector/KeyGroupedPartitioningSuite.scala | 7 +++++++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 22f5b3f6c7928..331fa7cd9ca0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2165,6 +2165,18 @@ object SQLConf { .booleanConf .createWithDefault(false) + val V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED = + buildConf("spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled") + .doc("When enabled, Spark derives output ordering from the partition key expressions of " + + "a V2 data source that reports a KeyedPartitioning but does not report explicit ordering " + + "via SupportsReportOrdering. Within a single partition all rows share the same key " + + s"value, so the data is trivially sorted by those expressions. Requires " + + s"${V2_BUCKETING_ENABLED.key} to be enabled.") + .version("4.2.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(true) + val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets") .doc("The maximum number of buckets allowed.") .version("2.4.0") @@ -7731,6 +7743,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def v2BucketingAllowSorting: Boolean = getConf(SQLConf.V2_BUCKETING_SORTING_ENABLED) + def v2BucketingPartitionKeyOrderingEnabled: Boolean = + getConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED) + def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index f557fa0cac2ea..a1a6c6e022482 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -106,14 +106,16 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { /** * Returns the output ordering for this scan. When the source reports ordering via * `SupportsReportOrdering`, that ordering is returned as-is. Otherwise, when the output - * partitioning is a `KeyedPartitioning`, each partition contains rows where the key expressions - * evaluate to a single constant value, so the data is trivially sorted by those expressions - * within the partition. + * partitioning is a `KeyedPartitioning` and + * `spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled` is on, each partition + * contains rows where the key expressions evaluate to a single constant value, so the data + * is trivially sorted by those expressions within the partition. */ override def outputOrdering: Seq[SortOrder] = { (ordering, outputPartitioning) match { case (Some(o), _) => o - case (_, k: KeyedPartitioning) => k.expressions.map(SortOrder(_, Ascending)) + case (_, k: KeyedPartitioning) if conf.v2BucketingPartitionKeyOrderingEnabled => + k.expressions.map(SortOrder(_, Ascending)) case _ => Seq.empty } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 43b9b058bdee3..796f1aad9d15f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -3590,6 +3590,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with val keyExpr = ordering.head.child assert(keyExpr.isInstanceOf[AttributeReference]) assert(keyExpr.asInstanceOf[AttributeReference].name === "id") + + // With the config disabled the derivation is suppressed and ordering falls back to empty. + withSQLConf(V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "false") { + val scansDisabled = collectScans(df.queryExecution.executedPlan) + assert(scansDisabled.size === 1) + assert(scansDisabled.head.outputOrdering.isEmpty) + } } test("SPARK-56241: GroupPartitionsExec non-coalescing passes through child ordering, " + From 930f1e7d12748adc197b6eddcafdb634cc2fc65b Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 31 Mar 2026 10:41:19 +0200 Subject: [PATCH 5/5] address review findings --- .../apache/spark/sql/internal/SQLConf.scala | 19 ++- .../datasources/v2/GroupPartitionsExec.scala | 11 +- .../KeyGroupedPartitioningSuite.scala | 116 +++++++++--------- .../v2/GroupPartitionsExecSuite.scala | 51 +++++--- 4 files changed, 117 insertions(+), 80 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 331fa7cd9ca0d..5639b6bbfbf4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2175,7 +2175,21 @@ object SQLConf { .version("4.2.0") .withBindingPolicy(ConfigBindingPolicy.SESSION) .booleanConf - .createWithDefault(true) + .createWithDefault(false) + + val V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED = + buildConf("spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabled") + .doc("When enabled, Spark preserves sort orders over partition key expressions when " + + "GroupPartitionsExec coalesces multiple input partitions into one output partition. " + + "Because all merged partitions share the same partition key value, sort orders over " + + "those key expressions remain valid after the merge. This applies to both key-derived " + + "ordering (from SupportsReportOrdering) and ordering derived from " + + s"${V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key}. Requires " + + s"${V2_BUCKETING_ENABLED.key} to be enabled.") + .version("4.2.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(false) val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets") .doc("The maximum number of buckets allowed.") @@ -7746,6 +7760,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def v2BucketingPartitionKeyOrderingEnabled: Boolean = getConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED) + def v2BucketingPreserveKeyOrderingOnCoalesceEnabled: Boolean = + getConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED) + def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala index 71c1b1f23e8b9..81981c29b2b31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala @@ -192,12 +192,13 @@ case class GroupPartitionsExec( } else { // Coalescing: multiple input partitions are merged into one output partition. The child's // within-partition ordering is lost due to concatenation -- for example, if two input - // partitions both belong to the same output group (same partition key value) and hold - // [1, 3] and [2, 5] respectively (each sorted ascending), concatenating them yields - // [1, 3, 2, 5] which is no longer sorted. Only sort orders over partition key expressions - // (which are constant across all merged partitions) remain valid. + // partitions both share key=A and hold rows (A,1),(A,3) and (A,2),(A,5) respectively (each + // sorted ascending by the data column), concatenating them yields (A,1),(A,3),(A,2),(A,5) + // which is no longer sorted by the data column. Only sort orders over partition key + // expressions remain valid -- they evaluate to the same value (A) in every merged partition. outputPartitioning match { - case p: Partitioning with Expression if reducers.isEmpty => + case p: Partitioning with Expression + if reducers.isEmpty && conf.v2BucketingPreserveKeyOrderingOnCoalesceEnabled => // Without reducers all merged partitions share the same original key value, so the key // expressions remain constant within the output partition. The child's outputOrdering // should already be in sync with the partitioning (either reported by the source or diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 796f1aad9d15f..44cb3a23cfa7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -3581,21 +3581,19 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with val plan = df.queryExecution.executedPlan val scans = collectScans(plan) assert(scans.size === 1) - // The scan's outputOrdering should include an ascending sort on the partition key `id`, - // derived from the KeyedPartitioning - regardless of SupportsReportOrdering. - val ordering = scans.head.outputOrdering - assert(ordering.length === 1) - assert(ordering.head.direction === Ascending) + // With the config disabled (default), ordering derivation is suppressed. + assert(scans.head.outputOrdering.isEmpty) + // When enabled, the scan derives an ascending sort on the partition key `id`. // identity transforms are unwrapped to AttributeReferences by V2ExpressionUtils. - val keyExpr = ordering.head.child - assert(keyExpr.isInstanceOf[AttributeReference]) - assert(keyExpr.asInstanceOf[AttributeReference].name === "id") - - // With the config disabled the derivation is suppressed and ordering falls back to empty. - withSQLConf(V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "false") { - val scansDisabled = collectScans(df.queryExecution.executedPlan) - assert(scansDisabled.size === 1) - assert(scansDisabled.head.outputOrdering.isEmpty) + withSQLConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "true") { + val scansEnabled = collectScans(df.queryExecution.executedPlan) + assert(scansEnabled.size === 1) + val ordering = scansEnabled.head.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.direction === Ascending) + val keyExpr = ordering.head.child + assert(keyExpr.isInstanceOf[AttributeReference]) + assert(keyExpr.asInstanceOf[AttributeReference].name === "id") } } @@ -3616,29 +3614,31 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with "(1, 100.0, cast('2021-01-01' as timestamp)), " + "(2, 200.0, cast('2021-01-01' as timestamp))") - val df = sql( - s""" - |${selectWithMergeJoinHint("i", "p")} - |i.id, i.name - |FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id - |""".stripMargin) - - checkAnswer(df, Seq(Row(1, "aa"), Row(2, "bb"))) - - val plan = df.queryExecution.executedPlan - val groupPartitions = collectGroupPartitions(plan) - assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan") - assert(groupPartitions.forall(_.groupedPartitions.forall(_._2.size <= 1)), - "expected non-coalescing GroupPartitionsExec") // GroupPartitionsExec passes through the child's key-derived outputOrdering. // EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before // the SMJ. - val smjs = collect(plan) { case j: SortMergeJoinExec => j } - assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan") - smjs.foreach { smj => - val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s }) - assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering passes through " + - "non-coalescing GroupPartitions") + withSQLConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "true") { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} + |i.id, i.name + |FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id + |""".stripMargin) + + checkAnswer(df, Seq(Row(1, "aa"), Row(2, "bb"))) + + val plan = df.queryExecution.executedPlan + val groupPartitions = collectGroupPartitions(plan) + assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan") + assert(groupPartitions.forall(_.groupedPartitions.forall(_._2.size <= 1)), + "expected non-coalescing GroupPartitionsExec") + val smjs = collect(plan) { case j: SortMergeJoinExec => j } + assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan") + smjs.foreach { smj => + val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s }) + assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering passes through " + + "non-coalescing GroupPartitions") + } } } @@ -3659,31 +3659,35 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with "(1, 110.0, cast('2021-06-01' as timestamp)), " + "(2, 200.0, cast('2021-01-01' as timestamp))") - val df = sql( - s""" - |${selectWithMergeJoinHint("i", "p")} - |i.id, i.name - |FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id - |""".stripMargin) - - checkAnswer(df, Seq( - Row(1, "aa"), Row(1, "aa"), Row(1, "ab"), Row(1, "ab"), - Row(2, "bb"))) - - val plan = df.queryExecution.executedPlan - val groupPartitions = collectGroupPartitions(plan) - assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan") - assert(groupPartitions.exists(_.groupedPartitions.exists(_._2.size > 1)), - "expected coalescing GroupPartitionsExec") // GroupPartitionsExec derives outputOrdering from the key expressions after coalescing. // EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before // the SMJ. - val smjs = collect(plan) { case j: SortMergeJoinExec => j } - assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan") - smjs.foreach { smj => - val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s }) - assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering is derived " + - "from coalesced partition key") + withSQLConf( + SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} + |i.id, i.name + |FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id + |""".stripMargin) + + checkAnswer(df, Seq( + Row(1, "aa"), Row(1, "aa"), Row(1, "ab"), Row(1, "ab"), + Row(2, "bb"))) + + val plan = df.queryExecution.executedPlan + val groupPartitions = collectGroupPartitions(plan) + assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan") + assert(groupPartitions.exists(_.groupedPartitions.exists(_._2.size > 1)), + "expected coalescing GroupPartitionsExec") + val smjs = collect(plan) { case j: SortMergeJoinExec => j } + assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan") + smjs.foreach { smj => + val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s }) + assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering is derived " + + "from coalesced partition key") + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala index 00705ee9676f3..c37e051929555 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, PartitioningCollection} import org.apache.spark.sql.execution.DummySparkPlan +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.IntegerType @@ -55,11 +56,16 @@ class GroupPartitionsExecSuite extends SharedSparkSession { val gpe = GroupPartitionsExec(child) assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") - val ordering = gpe.outputOrdering - assert(ordering.length === 1) - assert(ordering.head.child === exprA) - assert(ordering.head.direction === Ascending) - assert(ordering.head.sameOrderExpressions.isEmpty) + // With the config disabled (default), key-expression filtering is skipped. + assert(gpe.outputOrdering === Nil) + // When enabled, the key-expression order is preserved through coalescing. + withSQLConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") { + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + assert(ordering.head.direction === Ascending) + assert(ordering.head.sameOrderExpressions.isEmpty) + } } test("SPARK-56241: coalescing without reducers keeps one SortOrder per key expression") { @@ -71,12 +77,15 @@ class GroupPartitionsExecSuite extends SharedSparkSession { val gpe = GroupPartitionsExec(child) assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") - val ordering = gpe.outputOrdering - assert(ordering.length === 2) - assert(ordering.head.child === exprA) - assert(ordering(1).child === exprB) - assert(ordering.head.sameOrderExpressions.isEmpty) - assert(ordering(1).sameOrderExpressions.isEmpty) + assert(gpe.outputOrdering === Nil) + withSQLConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") { + val ordering = gpe.outputOrdering + assert(ordering.length === 2) + assert(ordering.head.child === exprA) + assert(ordering(1).child === exprB) + assert(ordering.head.sameOrderExpressions.isEmpty) + assert(ordering(1).sameOrderExpressions.isEmpty) + } } test("SPARK-56241: coalescing join case preserves sameOrderExpressions from child") { @@ -92,10 +101,13 @@ class GroupPartitionsExecSuite extends SharedSparkSession { val gpe = GroupPartitionsExec(child) assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") - val ordering = gpe.outputOrdering - assert(ordering.length === 1) - assert(ordering.head.child === exprA) - assert(ordering.head.sameOrderExpressions === Seq(exprB)) + assert(gpe.outputOrdering === Nil) + withSQLConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") { + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + assert(ordering.head.sameOrderExpressions === Seq(exprB)) + } } test("SPARK-56241: coalescing drops non-key sort orders from child") { @@ -109,9 +121,12 @@ class GroupPartitionsExecSuite extends SharedSparkSession { val gpe = GroupPartitionsExec(child) assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") - val ordering = gpe.outputOrdering - assert(ordering.length === 1) - assert(ordering.head.child === exprA) + assert(gpe.outputOrdering === Nil) + withSQLConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") { + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + } } test("SPARK-56241: coalescing with reducers returns empty ordering") {