From e498f0383f8540913604b8c2001f93b7881e54e4 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 13 Mar 2026 14:24:26 +0800 Subject: [PATCH 1/6] [CORE] Fix AdaptiveSparkPlanExec accessibility in columnar write optimization Refactor GlutenWriterColumnarRules to preserve AdaptiveSparkPlanExec in plan hierarchy, enabling shuffle IDs retrieval that was broken by Spark PR #51432. Original implementation wrapped AdaptiveSparkPlanExec with ColumnarToCarrierRow, hiding it from external pattern matching. New approach wraps the input plan first, then creates a new AdaptiveSparkPlanExec with the wrapped child and supportsColumnar=false. Added GlutenWriterColumnarRulesSuite for Spark 4.0 and 4.1 to verify the fix. --- .../GlutenWriterColumnarRules.scala | 25 ++++---- .../GlutenWriterColumnarRulesSuite.scala | 59 +++++++++++++++++++ .../GlutenWriterColumnarRulesSuite.scala | 59 +++++++++++++++++++ 3 files changed, 133 insertions(+), 10 deletions(-) create mode 100644 gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala create mode 100644 gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 606fa377b8a7..7a7a5b65bb2e 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -79,22 +79,27 @@ object GlutenWriterColumnarRules { // So FakeRowAdaptor will always consumes columnar data, // thus avoiding the case of c2r->aqe->r2c->writer case aqe: AdaptiveSparkPlanExec => - command.withNewChildren( - Array( - BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow( - AdaptiveSparkPlanExec( - aqe.inputPlan, - aqe.context, - aqe.preprocessingRules, - aqe.isSubquery, - supportsColumnar = true - )))) + val newChild = BackendsApiManager.getSparkPlanExecApiInstance + .genColumnarToCarrierRow(aqe.inputPlan) + command.withNewChildren(Array(wrapAqeWithColumnarToRow(newChild, aqe))) case other => command.withNewChildren( Array(BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow(other))) } } + private def wrapAqeWithColumnarToRow( + newChild: SparkPlan, + aqe: AdaptiveSparkPlanExec): AdaptiveSparkPlanExec = { + aqe.inputPlan.logicalLink.foreach(newChild.setLogicalLink) + AdaptiveSparkPlanExec( + newChild, + aqe.context, + aqe.preprocessingRules, + aqe.isSubquery, + supportsColumnar = false) + } + case class NativeWritePostRule(session: SparkSession) extends Rule[SparkPlan] { override def apply(p: SparkPlan): SparkPlan = p match { diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala new file mode 100644 index 000000000000..fcd5ce3d44b8 --- /dev/null +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec +import org.apache.spark.sql.{Dataset, GlutenSQLTestsBaseTrait, QueryTest, SaveMode} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.util.QueryExecutionListener + +class GlutenWriterColumnarRulesSuite extends QueryTest with SharedSparkSession with GlutenSQLTestsBaseTrait { + + override protected def shouldRun(testName: String): Boolean = true + + test("AdaptiveSparkPlanExec should be accessible in noop write plan") { + var capturedPlan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + capturedPlan = qe.executedPlan + } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + } + + spark.listenerManager.register(listener) + try { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + val df = Dataset.ofRows(spark, spark.range(100).repartition(10).logicalPlan) + df.write.format("noop").mode(SaveMode.Overwrite).save() + + assert(capturedPlan != null, "Plan should have been captured by the listener") + capturedPlan match { + case OverwriteByExpressionExec(AdaptiveSparkPlanExec(_: VeloxColumnarToCarrierRowExec, _, _, _ ,_), _, _) => + case _ => + fail("Expected AdaptiveSparkPlanExec to be accessible in the plan structure") + } + } + } finally { + spark.listenerManager.unregister(listener) + } + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala new file mode 100644 index 000000000000..fcd5ce3d44b8 --- /dev/null +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec +import org.apache.spark.sql.{Dataset, GlutenSQLTestsBaseTrait, QueryTest, SaveMode} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.util.QueryExecutionListener + +class GlutenWriterColumnarRulesSuite extends QueryTest with SharedSparkSession with GlutenSQLTestsBaseTrait { + + override protected def shouldRun(testName: String): Boolean = true + + test("AdaptiveSparkPlanExec should be accessible in noop write plan") { + var capturedPlan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + capturedPlan = qe.executedPlan + } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + } + + spark.listenerManager.register(listener) + try { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + val df = Dataset.ofRows(spark, spark.range(100).repartition(10).logicalPlan) + df.write.format("noop").mode(SaveMode.Overwrite).save() + + assert(capturedPlan != null, "Plan should have been captured by the listener") + capturedPlan match { + case OverwriteByExpressionExec(AdaptiveSparkPlanExec(_: VeloxColumnarToCarrierRowExec, _, _, _ ,_), _, _) => + case _ => + fail("Expected AdaptiveSparkPlanExec to be accessible in the plan structure") + } + } + } finally { + spark.listenerManager.unregister(listener) + } + } +} From d81d9580ba3adeba6d2f2fdb8debaafd5b5ec621 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 13 Mar 2026 14:44:45 +0800 Subject: [PATCH 2/6] fix format --- .../execution/GlutenWriterColumnarRulesSuite.scala | 11 +++++++++-- .../execution/GlutenWriterColumnarRulesSuite.scala | 11 +++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala index fcd5ce3d44b8..c1256f745eca 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec + import org.apache.spark.sql.{Dataset, GlutenSQLTestsBaseTrait, QueryTest, SaveMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec @@ -24,7 +25,10 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.util.QueryExecutionListener -class GlutenWriterColumnarRulesSuite extends QueryTest with SharedSparkSession with GlutenSQLTestsBaseTrait { +class GlutenWriterColumnarRulesSuite + extends QueryTest + with SharedSparkSession + with GlutenSQLTestsBaseTrait { override protected def shouldRun(testName: String): Boolean = true @@ -47,7 +51,10 @@ class GlutenWriterColumnarRulesSuite extends QueryTest with SharedSparkSession w assert(capturedPlan != null, "Plan should have been captured by the listener") capturedPlan match { - case OverwriteByExpressionExec(AdaptiveSparkPlanExec(_: VeloxColumnarToCarrierRowExec, _, _, _ ,_), _, _) => + case OverwriteByExpressionExec( + AdaptiveSparkPlanExec(_: VeloxColumnarToCarrierRowExec, _, _, _, _), + _, + _) => case _ => fail("Expected AdaptiveSparkPlanExec to be accessible in the plan structure") } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala index fcd5ce3d44b8..c1256f745eca 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec + import org.apache.spark.sql.{Dataset, GlutenSQLTestsBaseTrait, QueryTest, SaveMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec @@ -24,7 +25,10 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.util.QueryExecutionListener -class GlutenWriterColumnarRulesSuite extends QueryTest with SharedSparkSession with GlutenSQLTestsBaseTrait { +class GlutenWriterColumnarRulesSuite + extends QueryTest + with SharedSparkSession + with GlutenSQLTestsBaseTrait { override protected def shouldRun(testName: String): Boolean = true @@ -47,7 +51,10 @@ class GlutenWriterColumnarRulesSuite extends QueryTest with SharedSparkSession w assert(capturedPlan != null, "Plan should have been captured by the listener") capturedPlan match { - case OverwriteByExpressionExec(AdaptiveSparkPlanExec(_: VeloxColumnarToCarrierRowExec, _, _, _ ,_), _, _) => + case OverwriteByExpressionExec( + AdaptiveSparkPlanExec(_: VeloxColumnarToCarrierRowExec, _, _, _, _), + _, + _) => case _ => fail("Expected AdaptiveSparkPlanExec to be accessible in the plan structure") } From 6b2afb44c6a712ff8fdae41ae8ea724e95525ebc Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 14 Mar 2026 09:20:47 +0800 Subject: [PATCH 3/6] fix --- .../spark/sql/execution/GlutenWriterColumnarRulesSuite.scala | 3 ++- .../spark/sql/execution/GlutenWriterColumnarRulesSuite.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala index c1256f745eca..307a4578273d 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec -import org.apache.spark.sql.{Dataset, GlutenSQLTestsBaseTrait, QueryTest, SaveMode} +import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, QueryTest, SaveMode} +import org.apache.spark.sql.classic.Dataset import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec import org.apache.spark.sql.internal.SQLConf diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala index c1256f745eca..307a4578273d 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec -import org.apache.spark.sql.{Dataset, GlutenSQLTestsBaseTrait, QueryTest, SaveMode} +import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, QueryTest, SaveMode} +import org.apache.spark.sql.classic.Dataset import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec import org.apache.spark.sql.internal.SQLConf From 9f63ddfac50c98059400dbdd4ce089208c4c0238 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 14 Mar 2026 17:44:10 +0800 Subject: [PATCH 4/6] fix --- .../adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 9 +++++---- .../adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 9 +++++---- .../adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 9 +++++---- .../adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 9 +++++---- .../adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 9 +++++---- .../adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 9 +++++---- 6 files changed, 30 insertions(+), 24 deletions(-) diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 618dc07431f7..f141dd619583 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1255,12 +1255,13 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT sparkContext.listenerBus.waitUntilEmpty() assert(plan.isInstanceOf[V2TableWriteExec]) val childPlan = plan.asInstanceOf[V2TableWriteExec].child - assert(childPlan.isInstanceOf[ColumnarToCarrierRowExecBase]) + assert(childPlan.isInstanceOf[AdaptiveSparkPlanExec]) assert( childPlan - .asInstanceOf[ColumnarToCarrierRowExecBase] - .child - .isInstanceOf[AdaptiveSparkPlanExec]) + .asInstanceOf[AdaptiveSparkPlanExec] + .children + .head + .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 0c5eae6e586c..0b8bcf7798f9 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1259,12 +1259,13 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT sparkContext.listenerBus.waitUntilEmpty() assert(plan.isInstanceOf[V2TableWriteExec]) val childPlan = plan.asInstanceOf[V2TableWriteExec].child - assert(childPlan.isInstanceOf[ColumnarToCarrierRowExecBase]) + assert(childPlan.isInstanceOf[AdaptiveSparkPlanExec]) assert( childPlan - .asInstanceOf[ColumnarToCarrierRowExecBase] - .child - .isInstanceOf[AdaptiveSparkPlanExec]) + .asInstanceOf[AdaptiveSparkPlanExec] + .children + .head + .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 6acdfed41940..ff1edb1cb7c8 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1208,12 +1208,13 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT sparkContext.listenerBus.waitUntilEmpty() assert(plan.isInstanceOf[V2TableWriteExec]) val childPlan = plan.asInstanceOf[V2TableWriteExec].child - assert(childPlan.isInstanceOf[ColumnarToCarrierRowExecBase]) + assert(childPlan.isInstanceOf[AdaptiveSparkPlanExec]) assert( childPlan - .asInstanceOf[ColumnarToCarrierRowExecBase] - .child - .isInstanceOf[AdaptiveSparkPlanExec]) + .asInstanceOf[AdaptiveSparkPlanExec] + .children + .head + .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 6acdfed41940..ff1edb1cb7c8 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1208,12 +1208,13 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT sparkContext.listenerBus.waitUntilEmpty() assert(plan.isInstanceOf[V2TableWriteExec]) val childPlan = plan.asInstanceOf[V2TableWriteExec].child - assert(childPlan.isInstanceOf[ColumnarToCarrierRowExecBase]) + assert(childPlan.isInstanceOf[AdaptiveSparkPlanExec]) assert( childPlan - .asInstanceOf[ColumnarToCarrierRowExecBase] - .child - .isInstanceOf[AdaptiveSparkPlanExec]) + .asInstanceOf[AdaptiveSparkPlanExec] + .children + .head + .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) } diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 67e9baed0401..518338b6fce3 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1214,12 +1214,13 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT sparkContext.listenerBus.waitUntilEmpty() assert(plan.isInstanceOf[V2TableWriteExec]) val childPlan = plan.asInstanceOf[V2TableWriteExec].child - assert(childPlan.isInstanceOf[ColumnarToCarrierRowExecBase]) + assert(childPlan.isInstanceOf[AdaptiveSparkPlanExec]) assert( childPlan - .asInstanceOf[ColumnarToCarrierRowExecBase] - .child - .isInstanceOf[AdaptiveSparkPlanExec]) + .asInstanceOf[AdaptiveSparkPlanExec] + .children + .head + .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 6ebefbe15a43..7635c42724f2 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1218,12 +1218,13 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT sparkContext.listenerBus.waitUntilEmpty() assert(plan.isInstanceOf[V2TableWriteExec]) val childPlan = plan.asInstanceOf[V2TableWriteExec].child - assert(childPlan.isInstanceOf[ColumnarToCarrierRowExecBase]) + assert(childPlan.isInstanceOf[AdaptiveSparkPlanExec]) assert( childPlan - .asInstanceOf[ColumnarToCarrierRowExecBase] - .child - .isInstanceOf[AdaptiveSparkPlanExec]) + .asInstanceOf[AdaptiveSparkPlanExec] + .children + .head + .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) } From df71eb9de6999c4fd3886024202b79ec06165aeb Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 14 Mar 2026 20:53:33 +0800 Subject: [PATCH 5/6] fix: 2026-03-14T10:50:21.5090604Z - Gluten - SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands *** FAILED *** 2026-03-14T10:50:21.5092177Z java.util.NoSuchElementException: head of empty list 2026-03-14T10:50:21.5093060Z at scala.collection.immutable.Nil$.head(List.scala:652) 2026-03-14T10:50:21.5093744Z at scala.collection.immutable.Nil$.head(List.scala:651) 2026-03-14T10:50:21.5095020Z at org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite.$anonfun$new$111(VeloxAdaptiveQueryExecSuite.scala:1223) 2026-03-14T10:50:21.5096851Z at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) 2026-03-14T10:50:21.5097881Z at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86) 2026-03-14T10:50:21.5098989Z at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83) 2026-03-14T10:50:21.5099940Z at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97) 2026-03-14T10:50:21.5100822Z at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:312) 2026-03-14T10:50:21.5101862Z at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:310) 2026-03-14T10:50:21.5103121Z at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.withTable(AdaptiveQueryExecSuite.scala:62) --- .../execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 3 +-- .../execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 3 +-- .../execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 3 +-- .../execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 3 +-- .../execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 3 +-- .../execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | 3 +-- 6 files changed, 6 insertions(+), 12 deletions(-) diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index f141dd619583..f4450b0c6404 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1259,8 +1259,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT assert( childPlan .asInstanceOf[AdaptiveSparkPlanExec] - .children - .head + .inputPlan .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 0b8bcf7798f9..f768209f5480 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1263,8 +1263,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT assert( childPlan .asInstanceOf[AdaptiveSparkPlanExec] - .children - .head + .inputPlan .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index ff1edb1cb7c8..f25584380aaf 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1212,8 +1212,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT assert( childPlan .asInstanceOf[AdaptiveSparkPlanExec] - .children - .head + .inputPlan .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index ff1edb1cb7c8..f25584380aaf 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1212,8 +1212,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT assert( childPlan .asInstanceOf[AdaptiveSparkPlanExec] - .children - .head + .inputPlan .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 518338b6fce3..8ac43cdc94cc 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1218,8 +1218,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT assert( childPlan .asInstanceOf[AdaptiveSparkPlanExec] - .children - .head + .inputPlan .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 7635c42724f2..dd57813fca07 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -1222,8 +1222,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT assert( childPlan .asInstanceOf[AdaptiveSparkPlanExec] - .children - .head + .inputPlan .isInstanceOf[ColumnarToCarrierRowExecBase]) spark.listenerManager.unregister(listener) From a66aff7c04c53ad43e2ee7c37da4360d37b61319 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 15 Mar 2026 09:31:21 +0800 Subject: [PATCH 6/6] Remove test --- .../GlutenWriterColumnarRules.scala | 4 +- .../GlutenWriterColumnarRulesSuite.scala | 67 ------------------- .../GlutenWriterColumnarRulesSuite.scala | 67 ------------------- 3 files changed, 2 insertions(+), 136 deletions(-) delete mode 100644 gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala delete mode 100644 gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 7a7a5b65bb2e..9b4e06172a4a 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -81,14 +81,14 @@ object GlutenWriterColumnarRules { case aqe: AdaptiveSparkPlanExec => val newChild = BackendsApiManager.getSparkPlanExecApiInstance .genColumnarToCarrierRow(aqe.inputPlan) - command.withNewChildren(Array(wrapAqeWithColumnarToRow(newChild, aqe))) + command.withNewChildren(Array(wrapColumnarToRowWithAqe(newChild, aqe))) case other => command.withNewChildren( Array(BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow(other))) } } - private def wrapAqeWithColumnarToRow( + private def wrapColumnarToRowWithAqe( newChild: SparkPlan, aqe: AdaptiveSparkPlanExec): AdaptiveSparkPlanExec = { aqe.inputPlan.logicalLink.foreach(newChild.setLogicalLink) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala deleted file mode 100644 index 307a4578273d..000000000000 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution - -import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec - -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, QueryTest, SaveMode} -import org.apache.spark.sql.classic.Dataset -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec -import org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.util.QueryExecutionListener - -class GlutenWriterColumnarRulesSuite - extends QueryTest - with SharedSparkSession - with GlutenSQLTestsBaseTrait { - - override protected def shouldRun(testName: String): Boolean = true - - test("AdaptiveSparkPlanExec should be accessible in noop write plan") { - var capturedPlan: SparkPlan = null - val listener = new QueryExecutionListener { - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - capturedPlan = qe.executedPlan - } - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} - } - - spark.listenerManager.register(listener) - try { - withSQLConf( - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { - val df = Dataset.ofRows(spark, spark.range(100).repartition(10).logicalPlan) - df.write.format("noop").mode(SaveMode.Overwrite).save() - - assert(capturedPlan != null, "Plan should have been captured by the listener") - capturedPlan match { - case OverwriteByExpressionExec( - AdaptiveSparkPlanExec(_: VeloxColumnarToCarrierRowExec, _, _, _, _), - _, - _) => - case _ => - fail("Expected AdaptiveSparkPlanExec to be accessible in the plan structure") - } - } - } finally { - spark.listenerManager.unregister(listener) - } - } -} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala deleted file mode 100644 index 307a4578273d..000000000000 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenWriterColumnarRulesSuite.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution - -import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec - -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, QueryTest, SaveMode} -import org.apache.spark.sql.classic.Dataset -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec -import org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.util.QueryExecutionListener - -class GlutenWriterColumnarRulesSuite - extends QueryTest - with SharedSparkSession - with GlutenSQLTestsBaseTrait { - - override protected def shouldRun(testName: String): Boolean = true - - test("AdaptiveSparkPlanExec should be accessible in noop write plan") { - var capturedPlan: SparkPlan = null - val listener = new QueryExecutionListener { - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - capturedPlan = qe.executedPlan - } - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} - } - - spark.listenerManager.register(listener) - try { - withSQLConf( - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { - val df = Dataset.ofRows(spark, spark.range(100).repartition(10).logicalPlan) - df.write.format("noop").mode(SaveMode.Overwrite).save() - - assert(capturedPlan != null, "Plan should have been captured by the listener") - capturedPlan match { - case OverwriteByExpressionExec( - AdaptiveSparkPlanExec(_: VeloxColumnarToCarrierRowExec, _, _, _, _), - _, - _) => - case _ => - fail("Expected AdaptiveSparkPlanExec to be accessible in the plan structure") - } - } - } finally { - spark.listenerManager.unregister(listener) - } - } -}