Skip to content

Commit f28c056

Browse files
committed
[SPARK-56241][SQL] Derive outputOrdering from KeyedPartitioning key expressions
### What changes were proposed in this pull request? Within a `KeyedPartitioning` partition, all rows share the same key value, so the key expressions are trivially sorted (ascending) within each partition. This PR makes two plan nodes expose that structural guarantee via `outputOrdering`: - **`DataSourceV2ScanExecBase`**: when `outputPartitioning` is a `KeyedPartitioning` and the source reports no ordering via `SupportsReportOrdering`, derive one ascending `SortOrder` per key expression. When the source does report ordering, it is returned as-is. - **`GroupPartitionsExec`**: - *Non-coalescing* (every group has ≤ 1 input partition): pass through `child.outputOrdering` unchanged. - *Coalescing without reducers*: re-derive ordering from the output `KeyedPartitioning` key expressions; a join may embed multiple `KeyedPartitioning`s with different expressions — expose equivalences via `sameOrderExpressions`. - *Coalescing with reducers*: fall back to `super.outputOrdering` (empty), because merged partitions share only the reduced key. ### Why are the changes needed? Before this change, `outputOrdering` on both nodes returned an empty sequence (unless `SupportsReportOrdering` was implemented), even though the within- partition ordering was structurally guaranteed by the partitioning itself. As a result, `EnsureRequirements` would insert a redundant `SortExec` before `SortMergeJoin` inputs that are already in key order. ### Does this PR introduce _any_ user-facing change? Yes. Queries involving storage-partitioned joins (v2 bucketing) no longer add a redundant `SortExec` before `SortMergeJoin` when the join keys match the partition keys, reducing CPU and memory overhead. ### How was this patch tested? - New unit test class `GroupPartitionsExecSuite` covering all four `outputOrdering` branches (non-coalescing, coalescing without reducers with single and multi-key, join `sameOrderExpressions`, coalescing with reducers). - New SQL integration tests in `KeyGroupedPartitioningSuite` (SPARK-56241): - Scan with `KeyedPartitioning` reports key-derived `outputOrdering`. - Non-coalescing `GroupPartitionsExec` (non-identical key sets) passes through child ordering — no pre-join `SortExec`. - Coalescing `GroupPartitionsExec` derives ordering from key expressions — no pre-join `SortExec`. - Updated expected output in `DataSourceV2Suite` for the case where a source is partitioned by a key with no reported ordering — groupBy on the partition key no longer requires a sort. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6
1 parent 042440d commit f28c056

5 files changed

Lines changed: 261 additions & 11 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
1919

2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.catalyst.InternalRow
22-
import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, SortOrder}
22+
import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, RowOrdering, SortOrder}
2323
import org.apache.spark.sql.catalyst.plans.physical
2424
import org.apache.spark.sql.catalyst.plans.physical.KeyedPartitioning
2525
import org.apache.spark.sql.catalyst.util.truncatedString
@@ -104,11 +104,23 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
104104
}
105105

106106
/**
107-
* Returns the output ordering from the data source if available, otherwise falls back
108-
* to the default (no ordering). This allows data sources to report their natural ordering
109-
* through `SupportsReportOrdering`.
107+
* Returns the output ordering for this scan. When the source reports ordering via
108+
* `SupportsReportOrdering`, that ordering is returned as-is. Otherwise, when the output
109+
* partitioning is a `KeyedPartitioning`, each partition contains rows where the key expressions
110+
* evaluate to a single constant value, so the data is trivially sorted by those expressions
111+
* within the partition.
110112
*/
111-
override def outputOrdering: Seq[SortOrder] = ordering.getOrElse(super.outputOrdering)
113+
override def outputOrdering: Seq[SortOrder] = {
114+
val reportedOrdering = ordering.getOrElse(Seq.empty)
115+
if (reportedOrdering.nonEmpty) {
116+
reportedOrdering
117+
} else {
118+
outputPartitioning match {
119+
case k: KeyedPartitioning => k.expressions.map(SortOrder(_, Ascending))
120+
case _ => Seq.empty
121+
}
122+
}
123+
}
112124

113125
override def supportsColumnar: Boolean = {
114126
scan.columnarSupportMode() match {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,31 @@ case class GroupPartitionsExec(
184184
copy(child = newChild)
185185

186186
override def outputOrdering: Seq[SortOrder] = {
187-
// when multiple partitions are grouped together, ordering inside partitions is not preserved
188187
if (groupedPartitions.forall(_._2.size <= 1)) {
188+
// No coalescing: each output partition is exactly one input partition. The child's
189+
// within-partition ordering is fully preserved (including any key-derived ordering that
190+
// `DataSourceV2ScanExecBase` already prepended).
189191
child.outputOrdering
190192
} else {
191-
super.outputOrdering
193+
// Coalescing: multiple input partitions are merged into one output partition. The child's
194+
// within-partition ordering is lost due to concatenation, so we rederive ordering purely from
195+
// the key expressions. A join may embed multiple `KeyedPartitioning`s (one per join side)
196+
// within a single expression tree; they share the same partitionKeys but carry different
197+
// expressions. Collect them all and expose each position's equivalent expressions via
198+
// `sameOrderExpressions` so the planner can use any of them for ordering checks.
199+
outputPartitioning match {
200+
case p: Partitioning with Expression if reducers.isEmpty =>
201+
// Without reducers all merged partitions share the same original key value, so the key
202+
// expressions remain constant within the output partition.
203+
val keyedPartitionings = p.collect { case k: KeyedPartitioning => k }
204+
keyedPartitionings.map(_.expressions).transpose.map { exprs =>
205+
SortOrder(exprs.head, Ascending, sameOrderExpressions = exprs.tail)
206+
}
207+
case _ =>
208+
// With reducers, merged partitions share only the reduced key, not the original key
209+
// expressions, which can take different values within the output partition.
210+
super.outputOrdering
211+
}
192212
}
193213
}
194214

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,9 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
313313
Seq(
314314
// with no partitioning and no order, we expect shuffling AND sorting
315315
(None, None, (true, true), (true, true)),
316-
// partitioned by i and no order, we expect NO shuffling BUT sorting
317-
(Some("i"), None, (false, true), (false, true)),
316+
// partitioned by i and no order,
317+
// we expect NO shuffling AND sorting for groupBy BUT sorting for window function
318+
(Some("i"), None, (false, false), (false, true)),
318319
// partitioned by i and in-partition sorted by i,
319320
// we expect NO shuffling AND sorting for groupBy but sorting for window function
320321
(Some("i"), Some("i"), (false, false), (false, true)),

sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import java.util.Collections
2222
import org.apache.spark.{SparkConf, SparkException}
2323
import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, Row}
2424
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression}
25+
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Literal, TransformExpression}
2626
import org.apache.spark.sql.catalyst.plans.physical
2727
import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog}
2828
import org.apache.spark.sql.connector.catalog.functions._
2929
import org.apache.spark.sql.connector.distributions.Distributions
3030
import org.apache.spark.sql.connector.expressions._
3131
import org.apache.spark.sql.connector.expressions.Expressions._
32-
import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SparkPlan}
32+
import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SortExec, SparkPlan}
3333
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation, GroupPartitionsExec}
3434
import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike}
3535
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -3568,4 +3568,115 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with
35683568
}
35693569
}
35703570
}
3571+
3572+
test("SPARK-56241: scan with KeyedPartitioning reports key-derived outputOrdering") {
3573+
val items_partitions = Array(identity("id"))
3574+
createTable(items, itemsColumns, items_partitions)
3575+
sql(s"INSERT INTO testcat.ns.$items VALUES " +
3576+
"(3, 'cc', 30.0, cast('2021-01-01' as timestamp)), " +
3577+
"(1, 'aa', 10.0, cast('2022-01-01' as timestamp)), " +
3578+
"(2, 'bb', 20.0, cast('2022-01-01' as timestamp))")
3579+
3580+
val df = sql(s"SELECT id, name FROM testcat.ns.$items")
3581+
val plan = df.queryExecution.executedPlan
3582+
val scans = collectScans(plan)
3583+
assert(scans.size === 1)
3584+
// The scan's outputOrdering should include an ascending sort on the partition key `id`,
3585+
// derived from the KeyedPartitioning - regardless of SupportsReportOrdering.
3586+
val ordering = scans.head.outputOrdering
3587+
assert(ordering.length === 1)
3588+
assert(ordering.head.direction === Ascending)
3589+
// identity transforms are unwrapped to AttributeReferences by V2ExpressionUtils.
3590+
val keyExpr = ordering.head.child
3591+
assert(keyExpr.isInstanceOf[AttributeReference])
3592+
assert(keyExpr.asInstanceOf[AttributeReference].name === "id")
3593+
}
3594+
3595+
test("SPARK-56241: GroupPartitionsExec non-coalescing passes through child ordering, " +
3596+
"no pre-join SortExec needed before SortMergeJoin") {
3597+
// Non-identical key sets force GroupPartitionsExec to be inserted on both sides align them,
3598+
// but each group has exactly one partition — no coalescing.
3599+
val items_partitions = Array(identity("id"))
3600+
createTable(items, itemsColumns, items_partitions)
3601+
sql(s"INSERT INTO testcat.ns.$items VALUES " +
3602+
"(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " +
3603+
"(2, 'bb', 20.0, cast('2021-01-01' as timestamp)), " +
3604+
"(3, 'cc', 30.0, cast('2021-01-01' as timestamp))")
3605+
3606+
val purchases_partitions = Array(identity("item_id"))
3607+
createTable(purchases, purchasesColumns, purchases_partitions)
3608+
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
3609+
"(1, 100.0, cast('2021-01-01' as timestamp)), " +
3610+
"(2, 200.0, cast('2021-01-01' as timestamp))")
3611+
3612+
val df = sql(
3613+
s"""
3614+
|${selectWithMergeJoinHint("i", "p")}
3615+
|i.id, i.name
3616+
|FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id
3617+
|""".stripMargin)
3618+
3619+
checkAnswer(df, Seq(Row(1, "aa"), Row(2, "bb")))
3620+
3621+
val plan = df.queryExecution.executedPlan
3622+
val groupPartitions = collectGroupPartitions(plan)
3623+
assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan")
3624+
assert(groupPartitions.forall(_.groupedPartitions.forall(_._2.size <= 1)),
3625+
"expected non-coalescing GroupPartitionsExec")
3626+
// GroupPartitionsExec passes through the child's key-derived outputOrdering.
3627+
// EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before
3628+
// the SMJ.
3629+
val smjs = collect(plan) { case j: SortMergeJoinExec => j }
3630+
assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan")
3631+
smjs.foreach { smj =>
3632+
val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s })
3633+
assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering passes through " +
3634+
"non-coalescing GroupPartitions")
3635+
}
3636+
}
3637+
3638+
test("SPARK-56241: GroupPartitionsExec coalescing derives ordering from key expressions, " +
3639+
"no pre-join SortExec needed before SortMergeJoin") {
3640+
// Duplicate key 1 on both sides causes coalescing.
3641+
val items_partitions = Array(identity("id"))
3642+
createTable(items, itemsColumns, items_partitions)
3643+
sql(s"INSERT INTO testcat.ns.$items VALUES " +
3644+
"(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " +
3645+
"(1, 'ab', 11.0, cast('2021-06-01' as timestamp)), " +
3646+
"(2, 'bb', 20.0, cast('2021-01-01' as timestamp))")
3647+
3648+
val purchases_partitions = Array(identity("item_id"))
3649+
createTable(purchases, purchasesColumns, purchases_partitions)
3650+
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
3651+
"(1, 100.0, cast('2021-01-01' as timestamp)), " +
3652+
"(1, 110.0, cast('2021-06-01' as timestamp)), " +
3653+
"(2, 200.0, cast('2021-01-01' as timestamp))")
3654+
3655+
val df = sql(
3656+
s"""
3657+
|${selectWithMergeJoinHint("i", "p")}
3658+
|i.id, i.name
3659+
|FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id
3660+
|""".stripMargin)
3661+
3662+
checkAnswer(df, Seq(
3663+
Row(1, "aa"), Row(1, "aa"), Row(1, "ab"), Row(1, "ab"),
3664+
Row(2, "bb")))
3665+
3666+
val plan = df.queryExecution.executedPlan
3667+
val groupPartitions = collectGroupPartitions(plan)
3668+
assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan")
3669+
assert(groupPartitions.exists(_.groupedPartitions.exists(_._2.size > 1)),
3670+
"expected coalescing GroupPartitionsExec")
3671+
// GroupPartitionsExec derives outputOrdering from the key expressions after coalescing.
3672+
// EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before
3673+
// the SMJ.
3674+
val smjs = collect(plan) { case j: SortMergeJoinExec => j }
3675+
assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan")
3676+
smjs.foreach { smj =>
3677+
val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s })
3678+
assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering is derived " +
3679+
"from coalesced partition key")
3680+
}
3681+
}
35713682
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2
19+
20+
import org.apache.spark.sql.catalyst.InternalRow
21+
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder}
22+
import org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, PartitioningCollection}
23+
import org.apache.spark.sql.execution.DummySparkPlan
24+
import org.apache.spark.sql.test.SharedSparkSession
25+
import org.apache.spark.sql.types.IntegerType
26+
27+
class GroupPartitionsExecSuite extends SharedSparkSession {
28+
29+
private val exprA = AttributeReference("a", IntegerType)()
30+
private val exprB = AttributeReference("b", IntegerType)()
31+
32+
private def row(a: Int): InternalRow = InternalRow.fromSeq(Seq(a))
33+
private def row(a: Int, b: Int): InternalRow = InternalRow.fromSeq(Seq(a, b))
34+
35+
test("SPARK-56241: non-coalescing passes through child ordering unchanged") {
36+
// Each partition has a distinct key — no coalescing happens.
37+
val partitionKeys = Seq(row(1), row(2), row(3))
38+
val childOrdering = Seq(SortOrder(exprA, Ascending))
39+
val child = DummySparkPlan(
40+
outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys),
41+
outputOrdering = childOrdering)
42+
val gpe = GroupPartitionsExec(child)
43+
44+
assert(gpe.groupedPartitions.forall(_._2.size <= 1), "expected non-coalescing")
45+
assert(gpe.outputOrdering === childOrdering)
46+
}
47+
48+
test("SPARK-56241: coalescing without reducers returns key-derived ordering") {
49+
// Key 1 appears on partitions 0 and 2, causing coalescing.
50+
val partitionKeys = Seq(row(1), row(2), row(1))
51+
val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys))
52+
val gpe = GroupPartitionsExec(child)
53+
54+
assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
55+
val ordering = gpe.outputOrdering
56+
assert(ordering.length === 1)
57+
assert(ordering.head.child === exprA)
58+
assert(ordering.head.direction === Ascending)
59+
assert(ordering.head.sameOrderExpressions.isEmpty)
60+
}
61+
62+
test("SPARK-56241: coalescing without reducers returns one SortOrder per key expression") {
63+
// Multi-key partition: key (1,10) appears on partitions 0 and 2, causing coalescing.
64+
val partitionKeys = Seq(row(1, 10), row(2, 20), row(1, 10))
65+
val child = DummySparkPlan(
66+
outputPartitioning = KeyedPartitioning(Seq(exprA, exprB), partitionKeys))
67+
val gpe = GroupPartitionsExec(child)
68+
69+
assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
70+
val ordering = gpe.outputOrdering
71+
assert(ordering.length === 2)
72+
assert(ordering.head.child === exprA)
73+
assert(ordering(1).child === exprB)
74+
assert(ordering.head.sameOrderExpressions.isEmpty)
75+
assert(ordering(1).sameOrderExpressions.isEmpty)
76+
}
77+
78+
test("SPARK-56241: coalescing join case exposes sameOrderExpressions across join sides") {
79+
// PartitioningCollection wraps two KeyedPartitionings (one per join side), sharing the same
80+
// partition keys. Key 1 coalesces partitions 0 and 2.
81+
val partitionKeys = Seq(row(1), row(2), row(1))
82+
val leftKP = KeyedPartitioning(Seq(exprA), partitionKeys)
83+
val rightKP = KeyedPartitioning(Seq(exprB), partitionKeys)
84+
val child = DummySparkPlan(outputPartitioning = PartitioningCollection(Seq(leftKP, rightKP)))
85+
val gpe = GroupPartitionsExec(child)
86+
87+
assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
88+
val ordering = gpe.outputOrdering
89+
assert(ordering.length === 1)
90+
assert(ordering.head.child === exprA)
91+
assert(ordering.head.sameOrderExpressions === Seq(exprB))
92+
}
93+
94+
test("SPARK-56241: coalescing with reducers returns empty ordering") {
95+
// When reducers are present, the original key expressions are not constant within the merged
96+
// partition, so outputOrdering falls back to the default (empty).
97+
val partitionKeys = Seq(row(1), row(2), row(1))
98+
val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys))
99+
// reducers = Some(Seq(None)) - None element means identity reducer; the important thing is
100+
// that reducers.isDefined, which triggers the fallback.
101+
val gpe = GroupPartitionsExec(child, reducers = Some(Seq(None)))
102+
103+
assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
104+
assert(gpe.outputOrdering === Nil)
105+
}
106+
}

0 commit comments

Comments
 (0)