Skip to content

Commit abd393e

Browse files
committed
[SPARK-52818][SQL] Fix MergeSubplans creating nested WithCTE with cross-scope CTE references
### What changes were proposed in this pull request? When a non-deterministic CTE (e.g. using `monotonically_increasing_id()`) is referenced in scalar subqueries and the result is displayed with `.show()` (which adds a `Limit`), `MergeSubplans` can create an outer `WithCTE` whose CTE defs reference CTE defs from an inner `WithCTE`. This causes `ReplaceCTERefWithRepartition` to crash with `NoSuchElementException` because it processes the outer CTE defs before the inner CTE defs have been added to the map. The root cause: `MergeSubplans.apply` checks `case _: WithCTE => plan` to skip plans with CTEs, but this only matches when `WithCTE` is the **top-level** node. When `.show()` wraps the plan in `GlobalLimit(LocalLimit(WithCTE(...)))`, the top-level node is `GlobalLimit`, so `MergeSubplans` runs and creates a new outer `WithCTE` around the existing inner one — producing nested `WithCTE` nodes with cross-scope references. The fix has two parts: 1. **`MergeSubplans`**: Change `case _: WithCTE => plan` to `case _ if plan.containsPattern(CTE) => plan` to skip plans that contain `WithCTE` **anywhere**, not just at the top level. 2. **`ReplaceCTERefWithRepartition`**: Add a defensive guard `if cteMap.contains(ref.cteId)` so that orphaned `CTERelationRef` nodes don't crash with `NoSuchElementException`. ### Why are the changes needed? Bug fix. The query crashes with `java.util.NoSuchElementException: key not found` in `ReplaceCTERefWithRepartition`. ### Does this PR introduce _any_ user-facing change? Yes, queries that previously crashed now work correctly. ### How was this patch tested? New tests in `CTEInlineSuite` and `InlineCTESuite`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code
1 parent e7229c7 commit abd393e

4 files changed

Lines changed: 49 additions & 4 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeSubplans.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
2222
import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationDef, CTERelationRef, LeafNode, LogicalPlan, OneRowRelation, Project, Subquery, WithCTE}
2424
import org.apache.spark.sql.catalyst.rules.Rule
25-
import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, NO_GROUPING_AGGREGATE_REFERENCE, SCALAR_SUBQUERY, SCALAR_SUBQUERY_REFERENCE, TreePattern}
25+
import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, CTE, NO_GROUPING_AGGREGATE_REFERENCE, SCALAR_SUBQUERY, SCALAR_SUBQUERY_REFERENCE, TreePattern}
2626
import org.apache.spark.sql.internal.SQLConf
2727
import org.apache.spark.sql.types.DataType
2828

@@ -157,8 +157,11 @@ object MergeSubplans extends Rule[LogicalPlan] {
157157
// This rule does a whole plan traversal, no need to run on subqueries.
158158
case _: Subquery => plan
159159

160-
// Plans with CTEs are not supported for now.
161-
case _: WithCTE => plan
160+
// Plans with CTEs are not supported for now. We check containsPattern instead of
161+
// just the top-level node because a Limit or other operator may wrap the WithCTE
162+
// (e.g. from Dataset.show()), and creating an outer WithCTE with CTE defs that
163+
// reference inner CTE defs causes ReplaceCTERefWithRepartition to crash.
164+
case _ if plan.containsPattern(CTE) => plan
162165

163166
case _ => extractCommonScalarSubqueries(plan)
164167
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
1919

2020
import scala.collection.mutable
2121

22+
import org.apache.spark.SparkException
2223
import org.apache.spark.sql.catalyst.analysis.DeduplicateRelations
2324
import org.apache.spark.sql.catalyst.expressions.{Alias, SubqueryExpression}
2425
import org.apache.spark.sql.catalyst.plans.Inner
@@ -66,7 +67,10 @@ object ReplaceCTERefWithRepartition extends Rule[LogicalPlan] {
6667
replaceWithRepartition(child, cteMap)
6768

6869
case ref: CTERelationRef =>
69-
val cteDefPlan = cteMap(ref.cteId)
70+
val cteDefPlan = cteMap.getOrElse(ref.cteId,
71+
throw SparkException.internalError(
72+
s"No CTERelationDef found for CTERelationRef(cteId=${ref.cteId})."))
73+
7074
if (ref.outputSet == cteDefPlan.outputSet) {
7175
cteDefPlan
7276
} else {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTESuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.optimizer
1919

20+
import org.apache.spark.SparkException
2021
import org.apache.spark.sql.catalyst.analysis.TestRelation
2122
import org.apache.spark.sql.catalyst.dsl.expressions._
2223
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -39,4 +40,14 @@ class InlineCTESuite extends PlanTest {
3940
).analyze
4041
comparePlans(Optimize.execute(plan), plan)
4142
}
43+
44+
test("SPARK-52818: ReplaceCTERefWithRepartition asserts on orphaned CTERelationRef") {
45+
val cteDef = CTERelationDef(OneRowRelation().select(rand(0).as("a")))
46+
val cteRef = CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming)
47+
// A CTERelationRef without an enclosing WithCTE should produce a clear error.
48+
val e = intercept[SparkException] {
49+
ReplaceCTERefWithRepartition(cteRef.select($"a"))
50+
}
51+
assert(e.getMessage.contains("No CTERelationDef found"))
52+
}
4253
}

sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,33 @@ abstract class CTEInlineSuiteBase
847847
case _ => false
848848
})
849849
}
850+
851+
test("SPARK-52818: MergeSubplans should not create nested WithCTE with cross-scope refs") {
852+
// A non-deterministic CTE referenced in multiple scalar subqueries is not inlined, leaving
853+
// a WithCTE. .show() adds a Limit, so the top node is not WithCTE and MergeSubplans runs,
854+
// merging scalar subqueries into a new outer WithCTE whose CTE defs reference the inner
855+
// WithCTE's defs. ReplaceCTERefWithRepartition then crashes processing the outer defs
856+
// before the inner ones are in the map.
857+
withTempView("t") {
858+
Seq(("a", "b"), ("c", "d")).toDF("c1", "c2").createOrReplaceTempView("t")
859+
sql(
860+
"""WITH cte AS (
861+
| SELECT c1, c2, monotonically_increasing_id() AS id FROM t
862+
|),
863+
|agg1 AS (
864+
| SELECT c1, count(*) / (SELECT count(c1) FROM cte) AS r
865+
| FROM cte WHERE c1 IS NOT NULL GROUP BY c1
866+
|),
867+
|agg2 AS (
868+
| SELECT c2, count(*) / (SELECT count(c2) FROM cte) AS r
869+
| FROM cte WHERE c2 IS NOT NULL GROUP BY c2
870+
|)
871+
|SELECT b.c1, a1.r, a2.r FROM cte b
872+
|LEFT JOIN agg1 a1 ON b.c1 = a1.c1
873+
|LEFT JOIN agg2 a2 ON b.c2 = a2.c2
874+
|""".stripMargin).show()
875+
}
876+
}
850877
}
851878

852879
class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with DisableAdaptiveExecutionSuite

0 commit comments

Comments
 (0)