Skip to content

Commit 1c9e6ff

Browse files
committed
[SPARK-55715][SQL] Keep outputOrdering when GroupPartitionsExec coalesces partitions
### What changes were proposed in this pull request? #### Background `GroupPartitionsExec` coalesces multiple input partitions that share the same partition key into a single output partition. Before this PR, `outputOrdering` was always discarded after coalescing: even when the child reported ordering (e.g. via `SupportsReportOrdering`) or when ordering was derived from `KeyedPartitioning` key expressions (via `spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled`), coalescing by simple concatenation destroyed the within-partition ordering. This forced `EnsureRequirements` to inject an extra `SortExec` before `SortMergeJoinExec`, defeating the purpose of using a storage-partitioned join. #### k-way merge: SortedMergeCoalescedRDD This PR introduces `SortedMergeCoalescedRDD`, a new RDD that coalesces partitions by performing a k-way merge instead of simple concatenation. When multiple input partitions share the same key, a priority-queue-based merge interleaves their rows in sorted order, producing a single output partition whose row order matches the child's `outputOrdering`. `GroupPartitionsExec.doExecute()` uses `SortedMergeCoalescedRDD` when all of the following hold: 1. `spark.sql.sources.v2.bucketing.preserveOrderingOnCoalesce.enabled` is `true`. 2. The child reports a non-empty `outputOrdering`. 3. The child subtree is safe for concurrent partition reads (`childIsSafeForKWayMerge`). 4. At least one output partition actually coalesces multiple input partitions. When the config is enabled, the k-way merge is always applied regardless of whether the parent operator actually requires the ordering. Making this dynamic (only merge-sort when required) will be addressed in a follow-up ticket. #### Why k-way merge safety matters: SafeForKWayMerge Unlike `CoalescedRDD`, which processes input partitions sequentially, `SortedMergeCoalescedRDD` opens all N input partition iterators upfront and interleaves reads across them — all on a single JVM thread within a single Spark task. A `SparkPlan` object is shared across all partition computations, so any plan node that stores per-partition mutable state in an instance field rather than inside the partition's iterator closure is aliased across all N concurrent computations. The last writer wins, and any computation that reads or frees state based on its own earlier write will operate on incorrect state (a use-after-free). To avoid this class of bugs, `GroupPartitionsExec` uses a whitelist approach via a new marker trait `SafeForKWayMerge`. Nodes implementing this trait guarantee that all per-partition mutable state is captured inside the partition's iterator closure (e.g. via the `PartitionEvaluatorFactory` pattern), never in shared plan-node instance fields. Unknown node types fall through to unsafe, causing a silent fallback to simple sequential coalescing. The following nodes implement `SafeForKWayMerge`: - `DataSourceV2ScanExecBase` (leaf nodes reading from V2 sources) - `ProjectExec`, `FilterExec` (stateless row-by-row operators) - `WholeStageCodegenExec`, `InputAdapter` (code-gen wrappers that delegate to the above) #### GroupPartitionsExec.outputOrdering `GroupPartitionsExec.outputOrdering` is updated to reflect what ordering is preserved: 1. **No coalescing** (all groups ≤ 1 partition): `child.outputOrdering` is passed through unchanged. 2. **Coalescing with k-way merge** (config enabled + `childIsSafeForKWayMerge`): `child.outputOrdering` is returned in full — the k-way merge produces a globally sorted partition. 3. **Coalescing without k-way merge, no reducers**: only sort orders whose expression is a partition key expression are returned. These key expressions evaluate to the same constant value within every merged partition (all merged splits share the same key), so their sort orders remain valid after concatenation. This is the ordering preserved by the existing `spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabled` config. 4. **Coalescing without k-way merge, with reducers**: `super.outputOrdering` (empty) — the reduced key can take different values within the output partition, so no ordering is guaranteed. #### DataSourceRDD: concurrent-reader metrics support `SortedMergeCoalescedRDD` opens multiple `PartitionReader`s concurrently within a single Spark task. The existing `DataSourceRDD` assumed at most one active reader per task at a time, causing only the last reader's custom metrics to be reported (the previous readers' metrics were overwritten and lost). `DataSourceRDD` is refactored to support concurrent readers: - A new `TaskState` class (one per task) holds an `ArrayBuffer[PartitionIterator[_]]` (`partitionIterators`) tracking all readers opened for the task, Spark input metrics (`InputMetrics`), and a `closedMetrics` map accumulating final metric values from already-closed readers. - `mergeAndUpdateCustomMetrics()` runs in two phases: (1) drain closed iterators into `closedMetrics`; (2) merge live readers' current values with `closedMetrics` via the new `CustomTaskMetric.mergeWith()` and push the result to the Spark UI accumulators. - This works correctly in all three execution modes: single partition per task, sequential coalescing (one reader at a time), and concurrent k-way merge (N readers simultaneously). #### CustomTaskMetric.mergeWith A new default method `mergeWith(CustomTaskMetric other)` is added to `CustomTaskMetric`. The default implementation sums the two values, which is correct for count-type metrics. Data sources with non-additive metrics (e.g. max, average) should override this method. This replaces the previously proposed `PartitionReader.initMetricsValues` mechanism (which threaded prior metric values into the next reader's constructor) with a cleaner, pull-based merge at reporting time. `PartitionReader.initMetricsValues` becomes deprecated as it is no longer needed. ### Why are the changes needed? Without this fix, `GroupPartitionsExec` always discards ordering when coalescing, forcing `EnsureRequirements` to inject an extra `SortExec` before `SortMergeJoinExec` even when the data is already sorted by the join key within each partition. With `SortedMergeCoalescedRDD`, the full child ordering is preserved end-to-end, eliminating these redundant sorts and making storage-partitioned joins with ordering fully efficient. `spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabled` (introduced earlier) preserves only sort orders over partition key expressions, which remain constant within a merged partition. This PR goes further: by performing a k-way merge, the full `outputOrdering` — including secondary sort columns beyond the partition key — is preserved end-to-end. ### Does this PR introduce _any_ user-facing change? Yes. A new SQL configuration is added: - `spark.sql.sources.v2.bucketing.preserveOrderingOnCoalesce.enabled` (default: `false`): when enabled, `GroupPartitionsExec` uses a k-way merge to coalesce partitions while preserving the full child ordering, avoiding extra sort steps for operations like `SortMergeJoin`. ### How was this patch tested? - **`SortedMergeCoalescedRDDSuite`**: unit tests for the new RDD covering correctness of the k-way merge, empty partitions, single partition, and ordering guarantees. - **`GroupPartitionsExecSuite`**: unit tests covering all four branches of `outputOrdering` (no coalescing; k-way merge enabled; key-expression ordering only; reducers present). - **`KeyGroupedPartitioningSuite`**: SQL-level tests verifying that no extra `SortExec` is injected when `SortedMergeCoalescedRDD` is used, and a new test (`SPARK-55715: Custom metrics of sorted-merge coalesced partitions`) that verifies per-scan custom metrics are correctly reported across concurrent readers in the k-way merge case. - **`BufferedRowsReader` hardening**: the test-framework reader in `InMemoryBaseTable` now tracks a `closed` flag and throws `IllegalStateException` for reads, double-closes, or metric fetches on a closed reader. This ensures future tests catch reader lifecycle bugs that were previously hidden by the noop `close()`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6
1 parent 038c839 commit 1c9e6ff

16 files changed

Lines changed: 979 additions & 131 deletions

File tree

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd
19+
20+
import java.io.{IOException, ObjectOutputStream}
21+
22+
import scala.collection.mutable
23+
import scala.reflect.ClassTag
24+
25+
import org.apache.spark.{Partition, TaskContext}
26+
import org.apache.spark.util.Utils
27+
28+
/**
29+
* An RDD that coalesces partitions while preserving ordering through k-way merge.
30+
*
31+
* Unlike CoalescedRDD which simply concatenates partitions, this RDD performs a sorted
32+
* merge of multiple input partitions to maintain ordering. This is useful when input
33+
* partitions are locally sorted and we want to preserve that ordering after coalescing.
34+
*
35+
* The merge is performed using a priority queue (min-heap) which provides O(n log k)
36+
* time complexity, where n is the total number of elements and k is the number of
37+
* partitions being merged.
38+
*
39+
* @param prev The parent RDD
40+
* @param numPartitions The number of output partitions after coalescing
41+
* @param ordering The ordering to maintain during merge
42+
* @param partitionCoalescer The coalescer defining how to group input partitions
43+
* @tparam T The element type
44+
*/
45+
private[spark] class SortedMergeCoalescedRDD[T: ClassTag](
46+
@transient var prev: RDD[T],
47+
numPartitions: Int,
48+
partitionCoalescer: PartitionCoalescer,
49+
ordering: Ordering[T])
50+
extends RDD[T](prev.context, Nil) {
51+
52+
override def getPartitions: Array[Partition] = {
53+
partitionCoalescer.coalesce(numPartitions, prev).zipWithIndex.map {
54+
case (pg, i) =>
55+
val parentIndices = pg.partitions.map(_.index).toSeq
56+
new SortedMergePartition(i, prev, parentIndices, pg.prefLoc)
57+
}
58+
}
59+
60+
override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
61+
val mergePartition = partition.asInstanceOf[SortedMergePartition]
62+
val parentPartitions = mergePartition.parents
63+
64+
if (parentPartitions.isEmpty) {
65+
Iterator.empty
66+
} else if (parentPartitions.length == 1) {
67+
// No merge needed for single partition
68+
firstParent[T].iterator(parentPartitions.head, context)
69+
} else {
70+
// Perform k-way merge
71+
new SortedMergeIterator[T](
72+
parentPartitions.map(p => firstParent[T].iterator(p, context)),
73+
ordering
74+
)
75+
}
76+
}
77+
78+
override def getDependencies: Seq[org.apache.spark.Dependency[_]] = {
79+
Seq(new org.apache.spark.NarrowDependency(prev) {
80+
def getParents(id: Int): Seq[Int] =
81+
partitions(id).asInstanceOf[SortedMergePartition].parentsIndices
82+
})
83+
}
84+
85+
override def getPreferredLocations(partition: Partition): Seq[String] = {
86+
partition.asInstanceOf[SortedMergePartition].prefLoc.toSeq
87+
}
88+
89+
override def clearDependencies(): Unit = {
90+
super.clearDependencies()
91+
prev = null
92+
}
93+
}
94+
95+
/**
96+
* Partition for SortedMergeCoalescedRDD that tracks which parent partitions to merge.
97+
* @param index of this coalesced partition
98+
* @param rdd which it belongs to
99+
* @param parentsIndices list of indices in the parent that have been coalesced into this partition
100+
* @param prefLoc the preferred location for this partition
101+
*/
102+
private[spark] class SortedMergePartition(
103+
idx: Int,
104+
@transient private val rdd: RDD[_],
105+
val parentsIndices: Seq[Int],
106+
val prefLoc: Option[String] = None) extends Partition {
107+
override val index: Int = idx
108+
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
109+
110+
@throws(classOf[IOException])
111+
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
112+
// Update the reference to parent partition at the time of task serialization
113+
parents = parentsIndices.map(rdd.partitions(_))
114+
oos.defaultWriteObject()
115+
}
116+
}
117+
118+
/**
119+
* Iterator that performs k-way merge of sorted iterators.
120+
*
121+
* Uses a priority queue (min-heap) to efficiently find the next smallest element
122+
* across all input iterators according to the specified ordering. This provides
123+
* O(n log k) time complexity where n is the total number of elements and k is
124+
* the number of iterators being merged.
125+
*
126+
* @param iterators The sequence of sorted iterators to merge
127+
* @param ordering The ordering to use for comparison
128+
* @tparam T The element type
129+
*/
130+
private[spark] class SortedMergeIterator[T](
131+
iterators: Seq[Iterator[T]],
132+
ordering: Ordering[T]) extends Iterator[T] {
133+
134+
// Priority queue entry: (current element, iterator index)
135+
private case class QueueEntry(element: T, iteratorIdx: Int)
136+
137+
// Min-heap ordered by element according to the provided ordering
138+
private implicit val queueOrdering: Ordering[QueueEntry] = new Ordering[QueueEntry] {
139+
override def compare(x: QueueEntry, y: QueueEntry): Int = {
140+
// Reverse for min-heap (PriorityQueue is max-heap by default)
141+
ordering.compare(y.element, x.element)
142+
}
143+
}
144+
145+
private val queue = mutable.PriorityQueue.empty[QueueEntry]
146+
147+
// Initialize queue with first element from each non-empty iterator
148+
iterators.zipWithIndex.foreach { case (iter, idx) =>
149+
if (iter.hasNext) {
150+
queue.enqueue(QueueEntry(iter.next(), idx))
151+
}
152+
}
153+
154+
override def hasNext: Boolean = queue.nonEmpty
155+
156+
override def next(): T = {
157+
if (!hasNext) {
158+
throw new NoSuchElementException("next on empty iterator")
159+
}
160+
161+
val entry = queue.dequeue()
162+
val result = entry.element
163+
164+
// If the iterator has more elements, add the next one to the queue
165+
val iter = iterators(entry.iteratorIdx)
166+
if (iter.hasNext) {
167+
queue.enqueue(QueueEntry(iter.next(), entry.iteratorIdx))
168+
}
169+
170+
result
171+
}
172+
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd
19+
20+
import org.apache.spark.{SharedSparkContext, SparkFunSuite}
21+
22+
class SortedMergeCoalescedRDDSuite extends SparkFunSuite with SharedSparkContext {
23+
24+
test("SPARK-55715: k-way merge maintains ordering - integers") {
25+
// Create RDD with 4 partitions, each sorted
26+
val data = Seq(
27+
Seq(1, 5, 9, 13), // partition 0
28+
Seq(2, 6, 10, 14), // partition 1
29+
Seq(3, 7, 11, 15), // partition 2
30+
Seq(4, 8, 12, 16) // partition 3
31+
)
32+
33+
val rdd = sc.parallelize(data, data.size).flatMap(identity)
34+
35+
// Coalesce to 2 partitions using sorted merge
36+
val coalescer = new TestPartitionCoalescer(Seq(Seq(0, 1), Seq(2, 3)))
37+
val merged = new SortedMergeCoalescedRDD[Int](rdd, 2, coalescer, Ordering.Int)
38+
39+
// Verify per-partition contents: group (0,1) merges elements from partitions 0+1,
40+
// group (2,3) merges elements from partitions 2+3
41+
val partitionData = merged
42+
.mapPartitionsWithIndex { (idx, iter) => Iterator.single((idx, iter.toSeq)) }
43+
.collect().toMap
44+
45+
assert(partitionData(0) === Seq(1, 2, 5, 6, 9, 10, 13, 14))
46+
assert(partitionData(1) === Seq(3, 4, 7, 8, 11, 12, 15, 16))
47+
}
48+
49+
test("SPARK-55715: k-way merge handles empty partitions") {
50+
val data = Seq(
51+
Seq(1, 5, 9), // partition 0
52+
Seq.empty[Int], // partition 1 - empty
53+
Seq(3, 7, 11), // partition 2
54+
Seq.empty[Int] // partition 3 - empty
55+
)
56+
57+
val rdd = sc.parallelize(data, data.size).flatMap(identity)
58+
59+
val coalescer = new TestPartitionCoalescer(Seq(Seq(0, 1, 2, 3)))
60+
val merged = new SortedMergeCoalescedRDD[Int]( rdd, 1, coalescer, Ordering.Int)
61+
62+
val result = merged.collect()
63+
assert(result === Seq(1, 3, 5, 7, 9, 11))
64+
}
65+
66+
test("SPARK-55715: k-way merge handles all-empty partitions in a group") {
67+
val data = Seq(
68+
Seq.empty[Int], // partition 0 - empty
69+
Seq.empty[Int], // partition 1 - empty
70+
Seq(1, 2, 3) // partition 2 - non-empty (different group)
71+
)
72+
73+
val rdd = sc.parallelize(data, data.size).flatMap(identity)
74+
75+
val coalescer = new TestPartitionCoalescer(Seq(Seq(0, 1), Seq(2)))
76+
val merged = new SortedMergeCoalescedRDD[Int]( rdd, 2, coalescer, Ordering.Int)
77+
78+
val partitionData = merged
79+
.mapPartitionsWithIndex { (idx, iter) => Iterator.single((idx, iter.toSeq)) }
80+
.collect().toMap
81+
82+
assert(partitionData(0) === Seq.empty)
83+
assert(partitionData(1) === Seq(1, 2, 3))
84+
}
85+
86+
test("SPARK-55715: k-way merge with single partition per group - no merge needed") {
87+
val data = Seq(1, 2, 3, 4, 5, 6)
88+
val rdd = sc.parallelize(data, 3)
89+
90+
// Each group has only 1 partition - should just pass through
91+
val coalescer = new TestPartitionCoalescer(Seq(Seq(0), Seq(1), Seq(2)))
92+
val merged = new SortedMergeCoalescedRDD[Int](rdd, 3, coalescer, Ordering.Int)
93+
94+
assert(merged.collect() === data)
95+
}
96+
97+
test("SPARK-55715: k-way merge with reverse ordering") {
98+
val data = Seq(
99+
Seq(13, 9, 5, 1), // partition 0 - descending
100+
Seq(14, 10, 6, 2), // partition 1 - descending
101+
Seq(15, 11, 7, 3), // partition 2 - descending
102+
Seq(16, 12, 8, 4) // partition 3 - descending
103+
)
104+
105+
val rdd = sc.parallelize(data, data.size).flatMap(identity)
106+
107+
// Use reverse ordering
108+
val coalescer = new TestPartitionCoalescer(Seq(Seq(0, 1, 2, 3)))
109+
val merged = new SortedMergeCoalescedRDD[Int](
110+
rdd, 1, coalescer, Ordering.Int.reverse)
111+
112+
val result = merged.collect()
113+
assert(result === (1 to 16).reverse)
114+
}
115+
116+
test("SPARK-55715: k-way merge with many partitions") {
117+
val numPartitions = 20
118+
val rowsPerPartition = 50
119+
120+
// Create sorted data where partition i starts at i and increments by numPartitions
121+
val data = (0 until numPartitions).map { partIdx =>
122+
(0 until rowsPerPartition).map(i => partIdx + i * numPartitions)
123+
}
124+
125+
val rdd = sc.parallelize(data, numPartitions).flatMap(identity)
126+
127+
// Coalesce all partitions into one
128+
val coalescer = new TestPartitionCoalescer(Seq((0 until numPartitions)))
129+
val merged = new SortedMergeCoalescedRDD[Int](rdd, 1, coalescer, Ordering.Int)
130+
131+
val result = merged.collect()
132+
assert(result.length === numPartitions * rowsPerPartition)
133+
assert(result === result.sorted)
134+
}
135+
136+
test("SPARK-55715: k-way merge preserves duplicate elements across partitions") {
137+
val data = Seq(
138+
Seq(1, 2, 3), // partition 0
139+
Seq(1, 2, 3), // partition 1 - identical to partition 0
140+
Seq(2, 2, 4) // partition 2 - contains repeated value within partition
141+
)
142+
143+
val rdd = sc.parallelize(data, data.size).flatMap(identity)
144+
145+
val coalescer = new TestPartitionCoalescer(Seq(Seq(0, 1, 2)))
146+
val merged = new SortedMergeCoalescedRDD[Int](rdd, 1, coalescer, Ordering.Int)
147+
148+
val result = merged.collect()
149+
assert(result === Seq(1, 1, 2, 2, 2, 2, 3, 3, 4))
150+
}
151+
152+
test("SPARK-55715: k-way merge with strings") {
153+
val data = Seq(
154+
Seq("apple", "cherry", "grape"),
155+
Seq("banana", "date", "kiwi"),
156+
Seq("apricot", "fig", "mango")
157+
)
158+
159+
val rdd = sc.parallelize(data, data.size).flatMap(identity)
160+
161+
val coalescer = new TestPartitionCoalescer(Seq(Seq(0, 1, 2)))
162+
val merged = new SortedMergeCoalescedRDD[String](rdd, 1, coalescer, Ordering.String)
163+
164+
val result = merged.collect()
165+
assert(result === result.sorted)
166+
}
167+
168+
test("SPARK-55715: k-way merge with tuples") {
169+
val data = Seq(
170+
Seq((1, "a"), (3, "c"), (5, "e")),
171+
Seq((2, "b"), (4, "d"), (6, "f")),
172+
Seq((1, "z"), (4, "y"), (7, "x"))
173+
)
174+
175+
val rdd = sc.parallelize(data, data.size).flatMap(identity)
176+
177+
implicit val tupleOrdering: Ordering[(Int, String)] = Ordering.by[(Int, String), Int](_._1)
178+
179+
val coalescer = new TestPartitionCoalescer(Seq(Seq(0, 1, 2)))
180+
val merged = new SortedMergeCoalescedRDD[(Int, String)](rdd, 1, coalescer, tupleOrdering)
181+
182+
val result = merged.collect()
183+
val expected = data.flatten.sortBy(_._1)
184+
assert(result === expected)
185+
}
186+
187+
test("SPARK-55715: SortedMergeIterator - next() on empty iterator throws " +
188+
"NoSuchElementException") {
189+
val iter = new SortedMergeIterator[Int](Seq.empty, Ordering.Int)
190+
assert(!iter.hasNext)
191+
intercept[NoSuchElementException] { iter.next() }
192+
}
193+
194+
test("SPARK-55715: SortedMergeIterator - empty iterators list") {
195+
val iter = new SortedMergeIterator[Int](Seq(Iterator.empty, Iterator.empty), Ordering.Int)
196+
assert(!iter.hasNext)
197+
assert(iter.toSeq === Seq.empty)
198+
}
199+
200+
test("SPARK-55715: SortedMergeIterator - single iterator passes through unchanged") {
201+
val iter = new SortedMergeIterator[Int](Seq(Iterator(3, 1, 2)), Ordering.Int)
202+
assert(iter.toSeq === Seq(3, 1, 2))
203+
}
204+
}
205+
206+
/**
207+
* Test partition coalescer that groups partitions according to a predefined plan.
208+
*/
209+
class TestPartitionCoalescer(grouping: Seq[Seq[Int]]) extends PartitionCoalescer with Serializable {
210+
override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
211+
grouping.map { partitionIndices =>
212+
val pg = new PartitionGroup(None)
213+
partitionIndices.foreach { idx =>
214+
pg.partitions += parent.partitions(idx)
215+
}
216+
pg
217+
}.toArray
218+
}
219+
}

0 commit comments

Comments
 (0)