Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationDef, CTERelationRef, LeafNode, LogicalPlan, OneRowRelation, Project, Subquery, WithCTE}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, NO_GROUPING_AGGREGATE_REFERENCE, SCALAR_SUBQUERY, SCALAR_SUBQUERY_REFERENCE, TreePattern}
import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, CTE, NO_GROUPING_AGGREGATE_REFERENCE, SCALAR_SUBQUERY, SCALAR_SUBQUERY_REFERENCE, TreePattern}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType

Expand Down Expand Up @@ -157,8 +157,11 @@ object MergeSubplans extends Rule[LogicalPlan] {
// This rule does a whole plan traversal, no need to run on subqueries.
case _: Subquery => plan

// Plans with CTEs are not supported for now.
case _: WithCTE => plan
// Plans with CTEs are not supported for now. We check containsPattern instead of
// just the top-level node because a Limit or other operator may wrap the WithCTE
// (e.g. from Dataset.show()), and creating an outer WithCTE with CTE defs that
// reference inner CTE defs causes ReplaceCTERefWithRepartition to crash.
case _ if plan.containsPattern(CTE) => plan

case _ => extractCommonScalarSubqueries(plan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer

import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis.DeduplicateRelations
import org.apache.spark.sql.catalyst.expressions.{Alias, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.Inner
Expand Down Expand Up @@ -66,7 +67,10 @@ object ReplaceCTERefWithRepartition extends Rule[LogicalPlan] {
replaceWithRepartition(child, cteMap)

case ref: CTERelationRef =>
val cteDefPlan = cteMap(ref.cteId)
val cteDefPlan = cteMap.getOrElse(ref.cteId,
throw SparkException.internalError(
s"No CTERelationDef found for CTERelationRef(cteId=${ref.cteId})."))

if (ref.outputSet == cteDefPlan.outputSet) {
cteDefPlan
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis.TestRelation
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand All @@ -39,4 +40,14 @@ class InlineCTESuite extends PlanTest {
).analyze
comparePlans(Optimize.execute(plan), plan)
}

test("SPARK-52818: ReplaceCTERefWithRepartition asserts on orphaned CTERelationRef") {
val cteDef = CTERelationDef(OneRowRelation().select(rand(0).as("a")))
val cteRef = CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming)
// A CTERelationRef without an enclosing WithCTE should produce a clear error.
val e = intercept[SparkException] {
ReplaceCTERefWithRepartition(cteRef.select($"a"))
}
assert(e.getMessage.contains("No CTERelationDef found"))
}
}
27 changes: 27 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,33 @@ abstract class CTEInlineSuiteBase
case _ => false
})
}

test("SPARK-52818: MergeSubplans should not create nested WithCTE with cross-scope refs") {
// A non-deterministic CTE referenced in multiple scalar subqueries is not inlined, leaving
// a WithCTE. .show() adds a Limit, so the top node is not WithCTE and MergeSubplans runs,
// merging scalar subqueries into a new outer WithCTE whose CTE defs reference the inner
// WithCTE's defs. ReplaceCTERefWithRepartition then crashes processing the outer defs
// before the inner ones are in the map.
withTempView("t") {
Seq(("a", "b"), ("c", "d")).toDF("c1", "c2").createOrReplaceTempView("t")
sql(
"""WITH cte AS (
| SELECT c1, c2, monotonically_increasing_id() AS id FROM t
|),
|agg1 AS (
| SELECT c1, count(*) / (SELECT count(c1) FROM cte) AS r
| FROM cte WHERE c1 IS NOT NULL GROUP BY c1
|),
|agg2 AS (
| SELECT c2, count(*) / (SELECT count(c2) FROM cte) AS r
| FROM cte WHERE c2 IS NOT NULL GROUP BY c2
|)
|SELECT b.c1, a1.r, a2.r FROM cte b
|LEFT JOIN agg1 a1 ON b.c1 = a1.c1
|LEFT JOIN agg2 a2 ON b.c2 = a2.c2
|""".stripMargin).show()
}
}
}

class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with DisableAdaptiveExecutionSuite
Expand Down