Commit 521449b
committed
[SPARK-55715][SQL] Keep outputOrdering when GroupPartitionsExec coalesces partitions
### What changes were proposed in this pull request?
#### Background
`GroupPartitionsExec` coalesces multiple input partitions that share the same partition key
into a single output partition. Before this PR, `outputOrdering` was always discarded after
coalescing: even when the child reported ordering (e.g. via `SupportsReportOrdering`) or
when ordering was derived from `KeyedPartitioning` key expressions (via
`spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled`), coalescing by simple
concatenation destroyed the within-partition ordering. This forced `EnsureRequirements` to
inject an extra `SortExec` before `SortMergeJoinExec`, defeating the purpose of using a
storage-partitioned join.
#### k-way merge: SortedMergeCoalescedRDD
This PR introduces `SortedMergeCoalescedRDD`, a new RDD that coalesces partitions by
performing a k-way merge instead of simple concatenation. When multiple input partitions
share the same key, a priority-queue-based merge interleaves their rows in sorted order,
producing a single output partition whose row order matches the child's `outputOrdering`.
`GroupPartitionsExec.doExecute()` uses `SortedMergeCoalescedRDD` when all of the following
hold:
1. `spark.sql.sources.v2.bucketing.preserveOrderingOnCoalesce.enabled` is `true`.
2. The child reports a non-empty `outputOrdering`.
3. The child subtree is safe for concurrent partition reads (`childIsSafeForKWayMerge`).
4. At least one output partition actually coalesces multiple input partitions.
When the config is enabled, the k-way merge is always applied regardless of whether the
parent operator actually requires the ordering. Making this dynamic (only merge-sort when
required) will be addressed in a follow-up ticket.
#### Why k-way merge safety matters: SafeForKWayMerge
Unlike `CoalescedRDD`, which processes input partitions sequentially, `SortedMergeCoalescedRDD`
opens all N input partition iterators upfront and interleaves reads across them — all on a
single JVM thread within a single Spark task. A `SparkPlan` object is shared across all
partition computations, so any plan node that stores per-partition mutable state in an
instance field rather than inside the partition's iterator closure is aliased across all N
concurrent computations. The last writer wins, and any computation that reads or frees state
based on its own earlier write will operate on incorrect state (a use-after-free).
To avoid this class of bugs, `GroupPartitionsExec` uses a whitelist approach via a new
marker trait `SafeForKWayMerge`. Nodes implementing this trait guarantee that all
per-partition mutable state is captured inside the partition's iterator closure (e.g. via the
`PartitionEvaluatorFactory` pattern), never in shared plan-node instance fields. Unknown
node types fall through to `unsafe`, causing a silent fallback to simple sequential
coalescing. The following nodes implement `SafeForKWayMerge`:
- `DataSourceV2ScanExecBase` (leaf nodes reading from V2 sources)
- `ProjectExec`, `FilterExec` (stateless row-by-row operators)
- `WholeStageCodegenExec`, `InputAdapter` (code-gen wrappers that delegate to the above)
#### GroupPartitionsExec.outputOrdering
`GroupPartitionsExec.outputOrdering` is updated to reflect what ordering is preserved:
1. **No coalescing** (all groups ≤ 1 partition): `child.outputOrdering` is passed through unchanged.
2. **Coalescing + k-way merge** (config enabled + `childIsSafeForKWayMerge`): `child.outputOrdering`
is returned in full — the k-way merge produces a globally sorted partition.
3. **Coalescing without k-way merge, no reducers**: only sort orders whose expression is a
partition key expression are returned. These key expressions evaluate to the same constant
value within every merged partition (all merged splits share the same key), so their sort
orders remain valid after concatenation. This is the ordering preserved by the existing
`spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabled` config.
4. **Coalescing without k-way merge, with reducers**: `super.outputOrdering` (empty) — the
reduced key can take different values within the output partition, so no ordering is guaranteed.
#### DataSourceRDD: concurrent-reader metrics support
`SortedMergeCoalescedRDD` opens multiple `PartitionReader`s concurrently within a single
Spark task. The existing `DataSourceRDD` assumed at most one active reader per task at a
time, causing only the last reader's custom metrics to be reported (the previous readers'
metrics were overwritten and lost).
`DataSourceRDD` is refactored to support concurrent readers:
- A new `TaskState` class (one per task) holds an `ArrayBuffer[PartitionIterator[_]]`
(`partitionIterators`) tracking all readers opened for the task, Spark input metrics
(`InputMetrics`, owned exclusively to avoid thread-safety issues with `setBytesRead`),
and a `closedMetrics` map accumulating final metric values from already-closed readers.
- `mergeAndUpdateCustomMetrics()` runs in two phases: (1) drain closed iterators into
`closedMetrics`; (2) merge live readers' current values with `closedMetrics` via the new
`CustomTaskMetric.mergeWith()` and push the result to the Spark UI accumulators.
- This works correctly in all three execution modes: single partition per task, sequential
coalescing (one reader at a time), and concurrent k-way merge (N readers simultaneously).
#### CustomTaskMetric.mergeWith
A new default method `mergeWith(CustomTaskMetric other)` is added to `CustomTaskMetric`.
The default implementation sums the two values, which is correct for count-type metrics.
Data sources with non-additive metrics (e.g. max, average) should override this method.
This replaces the previously proposed `PartitionReader.initMetricsValues` mechanism (which
threaded prior metric values into the next reader's constructor) with a cleaner, pull-based
merge at reporting time. `PartitionReader.initMetricsValues` is removed as it was never
released and is no longer needed.
### Why are the changes needed?
Without this fix, `GroupPartitionsExec` always discards ordering when coalescing, forcing
`EnsureRequirements` to inject an extra `SortExec` before `SortMergeJoinExec` even when the
data is already sorted by the join key within each partition. With `SortedMergeCoalescedRDD`,
the full child ordering is preserved end-to-end, eliminating these redundant sorts and making
storage-partitioned joins with ordering fully efficient.
`spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabled` (introduced earlier)
preserves only sort orders over partition key expressions, which remain constant within a
merged partition. This PR goes further: by performing a k-way merge, the full `outputOrdering`
— including secondary sort columns beyond the partition key — is preserved end-to-end.
### Does this PR introduce _any_ user-facing change?
Yes. A new SQL configuration is added:
- `spark.sql.sources.v2.bucketing.preserveOrderingOnCoalesce.enabled` (default: `false`):
when enabled, `GroupPartitionsExec` uses a k-way merge to coalesce partitions while
preserving the full child ordering, avoiding extra sort steps for operations like
`SortMergeJoin`.
### How was this patch tested?
- **`SortedMergeCoalescedRDDSuite`**: unit tests for the new RDD covering correctness of the
k-way merge, empty partitions, single partition, and ordering guarantees.
- **`GroupPartitionsExecSuite`**: unit tests covering all four branches of `outputOrdering`
(no coalescing; k-way merge enabled; key-expression ordering only; reducers present).
- **`KeyGroupedPartitioningSuite`**: SQL-level tests verifying that no extra `SortExec` is
injected when `SortedMergeCoalescedRDD` is used, and a new test (`SPARK-55715: Custom
metrics of sorted-merge coalesced partitions`) that verifies per-scan custom metrics are
correctly reported across concurrent readers in the k-way merge case.
- **`BufferedRowsReader` hardening**: the test-framework reader in `InMemoryBaseTable` now
tracks a `closed` flag and throws `IllegalStateException` for reads, double-closes, or
metric fetches on a closed reader. This ensures future tests catch reader lifecycle bugs
that were previously hidden by the noop `close()`.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.61 parent 038c839 commit 521449b
16 files changed
Lines changed: 973 additions & 140 deletions
File tree
- core/src
- main/scala/org/apache/spark/rdd
- test/scala/org/apache/spark/rdd
- sql
- catalyst/src
- main
- java/org/apache/spark/sql/connector
- metric
- read
- scala/org/apache/spark/sql/internal
- test/scala/org/apache/spark/sql/connector/catalog
- core/src
- main/scala/org/apache/spark/sql/execution
- datasources/v2
- test/scala/org/apache/spark/sql
- connector
- execution/datasources
- v2
- test
Lines changed: 172 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| 139 | + | |
| 140 | + | |
| 141 | + | |
| 142 | + | |
| 143 | + | |
| 144 | + | |
| 145 | + | |
| 146 | + | |
| 147 | + | |
| 148 | + | |
| 149 | + | |
| 150 | + | |
| 151 | + | |
| 152 | + | |
| 153 | + | |
| 154 | + | |
| 155 | + | |
| 156 | + | |
| 157 | + | |
| 158 | + | |
| 159 | + | |
| 160 | + | |
| 161 | + | |
| 162 | + | |
| 163 | + | |
| 164 | + | |
| 165 | + | |
| 166 | + | |
| 167 | + | |
| 168 | + | |
| 169 | + | |
| 170 | + | |
| 171 | + | |
| 172 | + | |
Lines changed: 219 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| 139 | + | |
| 140 | + | |
| 141 | + | |
| 142 | + | |
| 143 | + | |
| 144 | + | |
| 145 | + | |
| 146 | + | |
| 147 | + | |
| 148 | + | |
| 149 | + | |
| 150 | + | |
| 151 | + | |
| 152 | + | |
| 153 | + | |
| 154 | + | |
| 155 | + | |
| 156 | + | |
| 157 | + | |
| 158 | + | |
| 159 | + | |
| 160 | + | |
| 161 | + | |
| 162 | + | |
| 163 | + | |
| 164 | + | |
| 165 | + | |
| 166 | + | |
| 167 | + | |
| 168 | + | |
| 169 | + | |
| 170 | + | |
| 171 | + | |
| 172 | + | |
| 173 | + | |
| 174 | + | |
| 175 | + | |
| 176 | + | |
| 177 | + | |
| 178 | + | |
| 179 | + | |
| 180 | + | |
| 181 | + | |
| 182 | + | |
| 183 | + | |
| 184 | + | |
| 185 | + | |
| 186 | + | |
| 187 | + | |
| 188 | + | |
| 189 | + | |
| 190 | + | |
| 191 | + | |
| 192 | + | |
| 193 | + | |
| 194 | + | |
| 195 | + | |
| 196 | + | |
| 197 | + | |
| 198 | + | |
| 199 | + | |
| 200 | + | |
| 201 | + | |
| 202 | + | |
| 203 | + | |
| 204 | + | |
| 205 | + | |
| 206 | + | |
| 207 | + | |
| 208 | + | |
| 209 | + | |
| 210 | + | |
| 211 | + | |
| 212 | + | |
| 213 | + | |
| 214 | + | |
| 215 | + | |
| 216 | + | |
| 217 | + | |
| 218 | + | |
| 219 | + | |
0 commit comments