Skip to content

Commit 7e489d0

Browse files
committed
refine logic
1 parent 2e8b8ca commit 7e489d0

2 files changed

Lines changed: 41 additions & 19 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -191,22 +191,21 @@ case class GroupPartitionsExec(
191191
child.outputOrdering
192192
} else {
193193
// Coalescing: multiple input partitions are merged into one output partition. The child's
194-
// within-partition ordering is lost due to concatenation, so we re-derive ordering purely
195-
// from the key expressions. For example, if two input partitions both belong to the same
196-
// output group (same partition key value) and hold [1, 3] and [2, 5] respectively (each
197-
// sorted ascending), concatenating them yields [1, 3, 2, 5] which is no longer sorted.
198-
// A join may embed multiple `KeyedPartitioning`s (one per join side) within a single
199-
// expression tree; they share the same partitionKeys but carry different expressions.
200-
// Collect them all and expose each position's equivalent expressions via
201-
// `sameOrderExpressions` so the planner can use any of them for ordering checks.
194+
// within-partition ordering is lost due to concatenation -- for example, if two input
195+
// partitions both belong to the same output group (same partition key value) and hold
196+
// [1, 3] and [2, 5] respectively (each sorted ascending), concatenating them yields
197+
// [1, 3, 2, 5] which is no longer sorted. Only sort orders over partition key expressions
198+
// (which are constant across all merged partitions) remain valid.
202199
outputPartitioning match {
203200
case p: Partitioning with Expression if reducers.isEmpty =>
204201
// Without reducers all merged partitions share the same original key value, so the key
205-
// expressions remain constant within the output partition.
202+
// expressions remain constant within the output partition. The child's outputOrdering
203+
// should already be in sync with the partitioning (either reported by the source or
204+
// derived from it in DataSourceV2ScanExecBase), so we only need to keep the sort orders
205+
// whose expression is a partition key expression -- all others are lost by concatenation.
206206
val keyedPartitionings = p.collect { case k: KeyedPartitioning => k }
207-
keyedPartitionings.map(_.expressions).transpose.map { exprs =>
208-
SortOrder(exprs.head, Ascending, sameOrderExpressions = exprs.tail)
209-
}
207+
val keyExprs = ExpressionSet(keyedPartitionings.flatMap(_.expressions))
208+
child.outputOrdering.filter(order => keyExprs.contains(order.child))
210209
case _ =>
211210
// With reducers, merged partitions share only the reduced key, not the original key
212211
// expressions, which can take different values within the output partition.

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class GroupPartitionsExecSuite extends SharedSparkSession {
2828

2929
private val exprA = AttributeReference("a", IntegerType)()
3030
private val exprB = AttributeReference("b", IntegerType)()
31+
private val exprC = AttributeReference("c", IntegerType)()
3132

3233
private def row(a: Int): InternalRow = InternalRow.fromSeq(Seq(a))
3334
private def row(a: Int, b: Int): InternalRow = InternalRow.fromSeq(Seq(a, b))
@@ -45,10 +46,12 @@ class GroupPartitionsExecSuite extends SharedSparkSession {
4546
assert(gpe.outputOrdering === childOrdering)
4647
}
4748

48-
test("SPARK-56241: coalescing without reducers returns key-derived ordering") {
49+
test("SPARK-56241: coalescing without reducers keeps key-expression orders from child") {
4950
// Key 1 appears on partitions 0 and 2, causing coalescing.
5051
val partitionKeys = Seq(row(1), row(2), row(1))
51-
val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys))
52+
val child = DummySparkPlan(
53+
outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys),
54+
outputOrdering = Seq(SortOrder(exprA, Ascending)))
5255
val gpe = GroupPartitionsExec(child)
5356

5457
assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
@@ -59,11 +62,12 @@ class GroupPartitionsExecSuite extends SharedSparkSession {
5962
assert(ordering.head.sameOrderExpressions.isEmpty)
6063
}
6164

62-
test("SPARK-56241: coalescing without reducers returns one SortOrder per key expression") {
65+
test("SPARK-56241: coalescing without reducers keeps one SortOrder per key expression") {
6366
// Multi-key partition: key (1,10) appears on partitions 0 and 2, causing coalescing.
6467
val partitionKeys = Seq(row(1, 10), row(2, 20), row(1, 10))
6568
val child = DummySparkPlan(
66-
outputPartitioning = KeyedPartitioning(Seq(exprA, exprB), partitionKeys))
69+
outputPartitioning = KeyedPartitioning(Seq(exprA, exprB), partitionKeys),
70+
outputOrdering = Seq(SortOrder(exprA, Ascending), SortOrder(exprB, Ascending)))
6771
val gpe = GroupPartitionsExec(child)
6872

6973
assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
@@ -75,13 +79,16 @@ class GroupPartitionsExecSuite extends SharedSparkSession {
7579
assert(ordering(1).sameOrderExpressions.isEmpty)
7680
}
7781

78-
test("SPARK-56241: coalescing join case exposes sameOrderExpressions across join sides") {
82+
test("SPARK-56241: coalescing join case preserves sameOrderExpressions from child") {
7983
// PartitioningCollection wraps two KeyedPartitionings (one per join side), sharing the same
80-
// partition keys. Key 1 coalesces partitions 0 and 2.
84+
// partition keys. Key 1 coalesces partitions 0 and 2. The child (e.g. SortMergeJoinExec)
85+
// already carries sameOrderExpressions linking both sides' key expressions.
8186
val partitionKeys = Seq(row(1), row(2), row(1))
8287
val leftKP = KeyedPartitioning(Seq(exprA), partitionKeys)
8388
val rightKP = KeyedPartitioning(Seq(exprB), partitionKeys)
84-
val child = DummySparkPlan(outputPartitioning = PartitioningCollection(Seq(leftKP, rightKP)))
89+
val child = DummySparkPlan(
90+
outputPartitioning = PartitioningCollection(Seq(leftKP, rightKP)),
91+
outputOrdering = Seq(SortOrder(exprA, Ascending, sameOrderExpressions = Seq(exprB))))
8592
val gpe = GroupPartitionsExec(child)
8693

8794
assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
@@ -91,6 +98,22 @@ class GroupPartitionsExecSuite extends SharedSparkSession {
9198
assert(ordering.head.sameOrderExpressions === Seq(exprB))
9299
}
93100

101+
test("SPARK-56241: coalescing drops non-key sort orders from child") {
102+
// exprA is the partition key; exprC is a non-key sort order the child also reports
103+
// (e.g. a secondary sort within each partition). After coalescing, exprC ordering is lost
104+
// by concatenation, so only the exprA order should survive.
105+
val partitionKeys = Seq(row(1), row(2), row(1))
106+
val child = DummySparkPlan(
107+
outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys),
108+
outputOrdering = Seq(SortOrder(exprA, Ascending), SortOrder(exprC, Ascending)))
109+
val gpe = GroupPartitionsExec(child)
110+
111+
assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
112+
val ordering = gpe.outputOrdering
113+
assert(ordering.length === 1)
114+
assert(ordering.head.child === exprA)
115+
}
116+
94117
test("SPARK-56241: coalescing with reducers returns empty ordering") {
95118
// When reducers are present, the original key expressions are not constant within the merged
96119
// partition, so outputOrdering falls back to the default (empty).

0 commit comments

Comments
 (0)