From 5d85f7b3a5d433c84a7dfd691c13191f0b76301d Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 19 Jun 2026 00:08:33 +0000 Subject: [PATCH 1/2] [SPARK-56032][SQL][FOLLOWUP] Skip FilterExec CSE codegen when the only common subexpression is a leaf ### What changes were proposed in this pull request? A follow-up of the FilterExec whole-stage-codegen subexpression elimination (CSE) work. The CSE path is taken whenever otherPreds contain a common subexpression, where "common subexpression" is anything EquivalentExpressions counts more than once -- which includes bare leaf columns. `c BETWEEN lo AND hi` lowers to `c >= lo AND c <= hi`, so any BETWEEN (or a column used in several conjuncts) makes its column a common subexpression. Taking the CSE path then emits the eager inputVarsEvalCode prologue, which decodes every otherPred-referenced column at the top of the row loop. Caching a bare column load gains nothing (the non-CSE path already loads columns lazily), so this is pure overhead and defeats short-circuiting. This PR requires a non-leaf common subexpression before taking the CSE path. ### Why are the changes needed? TPC-DS q28 filters as `ss_quantity BETWEEN ... AND (ss_list_price BETWEEN ... OR ss_coupon_amt BETWEEN ... OR ss_wholesale_cost BETWEEN ...)`. Its only repeated expressions are the bare columns, so the gate wrongly took the CSE path and eagerly decoded the high-precision decimal columns on every row -- including rows the cheap ss_quantity integer predicate would reject -- allocating a BigInteger/BigDecimal per row. A 3TB q28 run showed ~40% slowdown that this change removes. ### Does this PR introduce any user-facing change? No. Codegen-only change; query results are unchanged. ### How was this patch tested? New unit test in WholeStageCodegenSuite asserting that, for the q28 BETWEEN shape (only leaf common subexpressions), CSE-enabled codegen is identical to CSE-disabled codegen (i.e. it falls back to the lazy, short-circuiting path). The existing FilterExec CSE tests, which use genuine non-leaf common subexpressions, still exercise the CSE path and pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude (Claude Code) Co-authored-by: Isaac --- .../execution/basicPhysicalOperators.scala | 12 +++++- .../execution/WholeStageCodegenSuite.scala | 42 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 88c74ab7adc41..353d6853c67de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -316,12 +316,22 @@ case class FilterExec(condition: Expression, child: SparkPlan) // (e.g. decoding a decimal column for rows a cheaper earlier predicate would reject), so we // fall back to `generatePredicateCode`. // + // A *leaf* common subexpression -- a bare column referenced more than once -- does not count. + // `c BETWEEN lo AND hi` lowers to `c >= lo AND c <= hi`, so any `BETWEEN` (or a column used in + // several conjuncts) makes that column a common subexpression, but caching a column load saves + // nothing: the non-CSE path already loads each column lazily into a variable on demand. Taking + // the CSE path for it would only add the eager prologue that decodes every referenced column up + // front. Require a *non-leaf* common subexpression so filters like TPC-DS q28 + // (`ss_quantity BETWEEN ... AND (ss_list_price BETWEEN ... OR ...)`, whose only repeats are the + // bare columns) keep the lazy, short-circuiting path. + // // `subexpressionElimination.filterExec.enabled` additionally gates this path so it can be // turned off independently of subexpression elimination elsewhere. val (prologueCode, predicateCode) = if (conf.subexpressionEliminationEnabled && conf.subexpressionEliminationFilterExecEnabled && otherPreds.nonEmpty && - otherPredsEquivalentExpressions.getCommonSubexpressions.nonEmpty) { + otherPredsEquivalentExpressions.getCommonSubexpressions + .exists(!_.isInstanceOf[LeafExpression])) { // Pre-evaluate input variables before CSE analysis: CSE clears // ctx.currentVars[i].code as a side effect; without this pre-evaluation, Janino // fails when otherPreds reference the same input columns that CSE already diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 886df9184aca4..e013e59597f69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -1225,6 +1225,48 @@ class WholeStageCodegenSuite extends SharedSparkSession "CSE-disabled codegen (i.e. fall back to the lazy, short-circuiting non-CSE path)") } + test("SPARK-56032: FilterExec skips CSE codegen when the only common subexpression is a leaf") { + // `c BETWEEN lo AND hi` lowers to `c >= lo AND c <= hi`, so a column used in a BETWEEN (or in + // several conjuncts) becomes a "common subexpression" -- but it is a bare leaf column whose + // load CSE cannot meaningfully cache, since the non-CSE path already loads columns lazily. The + // gate must not take the CSE path for it: doing so emits the eager prologue that decodes every + // referenced column (the decimals `p1`/`p2` here) up front, defeating the cheap `q` filter's + // short-circuiting. This is the TPC-DS q28 shape. Verify the leaf-only case falls back to the + // same code as CSE-disabled; `p1`/`p2` stand in for q28's decimal columns whose eager decode + // is the cost, though the fallback is type-independent. + val schema = StructType(Seq( + StructField("q", IntegerType, nullable = true), + StructField("p1", IntegerType, nullable = true), + StructField("p2", IntegerType, nullable = true))) + val data = spark.sparkContext.parallelize(Seq( + Row(4, 10, 7), Row(1, 10, 7), Row(null, 10, 7), + Row(5, 100, 7), Row(6, 100, 100), Row(3, 9, 1))) + val expected = Seq(Row(4, 10, 7), Row(5, 100, 7), Row(3, 9, 1)) + + def filterCode(cseEnabled: Boolean): String = { + withSQLConf( + SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key -> cseEnabled.toString, + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val df = spark.createDataFrame(data, schema) + // The only repeated expressions are the bare columns q, p1, p2 (each referenced by the two + // halves of its BETWEEN). No non-leaf expression is shared. + val filtered = df.where( + "q IS NOT NULL AND q BETWEEN 2 AND 6 AND (p1 BETWEEN 8 AND 18 OR p2 BETWEEN 5 AND 9)") + val plan = filtered.queryExecution.executedPlan + assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]), + "Filter should be in whole-stage codegen") + checkAnswer(filtered, expected) + codegenString(plan) + } + } + + def normalize(code: String): String = code.replaceAll("#\\d+", "#") + assert(normalize(filterCode(cseEnabled = true)) == normalize(filterCode(cseEnabled = false)), + "With only leaf common subexpressions, CSE-enabled FilterExec codegen should be identical " + + "to CSE-disabled codegen (i.e. fall back to the lazy, short-circuiting non-CSE path)") + } + test("SPARK-56032: subexpressionElimination.filterExec.enabled gates FilterExec CSE " + "independently of subexpression elimination") { // The conf disables CSE specifically for FilterExec while leaving subexpression elimination From 88c9f71ed2777ef7a6845a28528aa3cc5d8a5b31 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 19 Jun 2026 00:19:13 +0000 Subject: [PATCH 2/2] [SPARK-56032][SQL][FOLLOWUP] Gate FilterExec CSE on CollapseProject.isCheap Use the canonical CollapseProject.isCheap to decide whether a common subexpression is worth eliminating, instead of an ad-hoc leaf check. A repeated expression that is cheap to recompute (a bare column, a foldable, an Alias/ExtractValue of cheap children) gains nothing from CSE, so requiring a non-cheap common subexpression avoids the eager prologue for those. CollapseProject.isCheap matches Attribute; the gate runs on bound predicates, so teach isCheap that BoundReference (the codegen-bound form of Attribute) is equally cheap. This keeps the single EquivalentExpressions analysis shared by the gate and the CSE codegen. Co-authored-by: Isaac --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 3 ++- .../sql/execution/basicPhysicalOperators.scala | 15 ++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 87500f0ca5149..68a83a55c8daa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1561,7 +1561,8 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper { * Check if the given expression is cheap that we can inline it. */ def isCheap(e: Expression): Boolean = e match { - case _: Attribute | _: OuterReference => true + // `BoundReference` is the codegen-bound form of an `Attribute`; a slot read, equally cheap. + case _: Attribute | _: OuterReference | _: BoundReference => true case _ if e.foldable => true // PythonUDF is handled by the rule ExtractPythonUDFs case _: PythonUDF => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 353d6853c67de..3eadfe7b865e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -316,12 +317,12 @@ case class FilterExec(condition: Expression, child: SparkPlan) // (e.g. decoding a decimal column for rows a cheaper earlier predicate would reject), so we // fall back to `generatePredicateCode`. // - // A *leaf* common subexpression -- a bare column referenced more than once -- does not count. - // `c BETWEEN lo AND hi` lowers to `c >= lo AND c <= hi`, so any `BETWEEN` (or a column used in - // several conjuncts) makes that column a common subexpression, but caching a column load saves - // nothing: the non-CSE path already loads each column lazily into a variable on demand. Taking - // the CSE path for it would only add the eager prologue that decodes every referenced column up - // front. Require a *non-leaf* common subexpression so filters like TPC-DS q28 + // A *cheap* common subexpression does not count. `c BETWEEN lo AND hi` lowers to + // `c >= lo AND c <= hi`, so any `BETWEEN` (or a column referenced in several conjuncts) makes + // that column a common subexpression, but caching a cheap load saves nothing: the non-CSE path + // already loads each column lazily into a variable on demand. Taking the CSE path for it would + // only add the eager prologue that decodes every referenced column up front. Require a + // non-cheap common subexpression (per `CollapseProject.isCheap`) so filters like TPC-DS q28 // (`ss_quantity BETWEEN ... AND (ss_list_price BETWEEN ... OR ...)`, whose only repeats are the // bare columns) keep the lazy, short-circuiting path. // @@ -331,7 +332,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) if (conf.subexpressionEliminationEnabled && conf.subexpressionEliminationFilterExecEnabled && otherPreds.nonEmpty && otherPredsEquivalentExpressions.getCommonSubexpressions - .exists(!_.isInstanceOf[LeafExpression])) { + .exists(!CollapseProject.isCheap(_))) { // Pre-evaluate input variables before CSE analysis: CSE clears // ctx.currentVars[i].code as a side effect; without this pre-evaluation, Janino // fails when otherPreds reference the same input columns that CSE already