Skip to content

Commit 842d4dd

Browse files
committed
[SPARK-56250][SQL] Remove confusing defensive code in SortExec.rowSorter and add warning comment
### What changes were proposed in this pull request? [SPARK-52609](https://issues.apache.org/jira/browse/SPARK-52609) added some defensive code to SortExec. The defensive has some issues and was fixed by [SPARK-56203](https://issues.apache.org/jira/browse/SPARK-56203). But actually the defensive code is not for Spark itself but to guard multithreading access to SortExec that isn't an issue to Spark itself. The defensive code could easily confuse others. After rethinking about it, it might be better to revert it and add some warning comment. ### Why are the changes needed? To simplify the code and reduce confusion. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 Closes #55048 from viirya/revert-sort-exec-defensive-code. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent 043df82 commit 842d4dd

1 file changed

Lines changed: 9 additions & 13 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,10 @@ case class SortExec(
6262
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
6363
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
6464

65-
// Each task thread has its own UnsafeExternalRowSorter instance stored here.
66-
// Using a stable lazy val (rather than a reassigned var) ensures that the ThreadLocal
67-
// object itself is never replaced: concurrent tasks on different threads each get their
68-
// own independent slot in the same ThreadLocal, so one task can never observe or clobber
69-
// another task's sorter reference.
70-
@transient private[sql] lazy val rowSorter: ThreadLocal[UnsafeExternalRowSorter] =
71-
new ThreadLocal[UnsafeExternalRowSorter]()
65+
// WARNING: This is a shared mutable var on the SortExec instance. Do not access it from
66+
// multiple threads concurrently - Spark operators do not guarantee thread-safety and one
67+
// task's sorter could overwrite another's, causing a race condition.
68+
private[sql] var rowSorter: UnsafeExternalRowSorter = _
7269

7370
/**
7471
* This method gets invoked only once for each SortExec instance to initialize an
@@ -101,14 +98,13 @@ case class SortExec(
10198
}
10299

103100
val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
104-
val newRowSorter = UnsafeExternalRowSorter.create(
101+
rowSorter = UnsafeExternalRowSorter.create(
105102
schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)
106103

107104
if (testSpillFrequency > 0) {
108-
newRowSorter.setTestSpillFrequency(testSpillFrequency)
105+
rowSorter.setTestSpillFrequency(testSpillFrequency)
109106
}
110-
rowSorter.set(newRowSorter)
111-
rowSorter.get()
107+
rowSorter
112108
}
113109

114110
protected override def doExecute(): RDD[InternalRow] = {
@@ -204,8 +200,8 @@ case class SortExec(
204200
* cleanupResources before rowSorter is initialized in createSorter.
205201
*/
206202
override protected[sql] def cleanupResources(): Unit = {
207-
if (rowSorter.get() != null) {
208-
rowSorter.get().cleanupResources()
203+
if (rowSorter != null) {
204+
rowSorter.cleanupResources()
209205
}
210206
super.cleanupResources()
211207
}

0 commit comments

Comments
 (0)