Skip to content

Commit d9cd19c

Browse files
committed
[SPARK-56250][SQL] Remove confusing defensive code in SortExec.rowSorter and add warning comment
[SPARK-52609](https://issues.apache.org/jira/browse/SPARK-52609) added some defensive code to SortExec. The defensive has some issues and was fixed by [SPARK-56203](https://issues.apache.org/jira/browse/SPARK-56203). But actually the defensive code is not for Spark itself but to guard multithreading access to SortExec that isn't an issue to Spark itself. The defensive code could easily confuse others. After rethinking about it, it might be better to revert it and add some warning comment. To simplify the code and reduce confusion. No Existing tests Generated-by: Claude Sonnet 4.6 Closes #55048 from viirya/revert-sort-exec-defensive-code. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 842d4dd) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent 49b1882 commit d9cd19c

1 file changed

Lines changed: 13 additions & 13 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,10 @@ case class SortExec(
6262
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
6363
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
6464

65-
// Each task has its own instance of UnsafeExternalRowSorter. It is created in the
66-
// createSorter method and stored in a ThreadLocal variable.
67-
private[sql] var rowSorter: ThreadLocal[UnsafeExternalRowSorter] = _
65+
// WARNING: This is a shared mutable var on the SortExec instance. Do not access it from
66+
// multiple threads concurrently - Spark operators do not guarantee thread-safety and one
67+
// task's sorter could overwrite another's, causing a race condition.
68+
private[sql] var rowSorter: UnsafeExternalRowSorter = _
6869

6970
/**
7071
* This method gets invoked only once for each SortExec instance to initialize an
@@ -99,14 +100,13 @@ case class SortExec(
99100
}
100101

101102
val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
102-
val newRowSorter = UnsafeExternalRowSorter.create(
103+
rowSorter = UnsafeExternalRowSorter.create(
103104
schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)
104105

105106
if (testSpillFrequency > 0) {
106-
newRowSorter.setTestSpillFrequency(testSpillFrequency)
107+
rowSorter.setTestSpillFrequency(testSpillFrequency)
107108
}
108-
rowSorter.set(newRowSorter)
109-
rowSorter.get()
109+
rowSorter
110110
}
111111

112112
protected override def doExecute(): RDD[InternalRow] = {
@@ -196,14 +196,14 @@ case class SortExec(
196196
}
197197

198198
/**
199-
* In SortExec, we overwrites cleanupResources to close UnsafeExternalRowSorter.
199+
* In SortExec, we overwrite cleanupResources to close UnsafeExternalRowSorter.
200+
* There's possible for rowSorter to be null here, for example, in the scenario of empty iterator
201+
* in the current task, the downstream physical node (like SortMergeJoinExec) will trigger
202+
* cleanupResources before rowSorter is initialized in createSorter.
200203
*/
201204
override protected[sql] def cleanupResources(): Unit = {
202-
if (rowSorter != null && rowSorter.get() != null) {
203-
// There's possible for rowSorter is null here, for example, in the scenario of empty
204-
// iterator in the current task, the downstream physical node(like SortMergeJoinExec) will
205-
// trigger cleanupResources before rowSorter initialized in createSorter.
206-
rowSorter.get().cleanupResources()
205+
if (rowSorter != null) {
206+
rowSorter.cleanupResources()
207207
}
208208
super.cleanupResources()
209209
}

0 commit comments

Comments
 (0)