Skip to content

Commit 4e57577

Browse files
committed
Fix DPP regression for Hive scans and add DynamicPartitionPruningHiveScanSuite
1 parent 3c81799 commit 4e57577

7 files changed

Lines changed: 569 additions & 3 deletions

File tree

gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ object OffloadOthers {
202202
case plan: FileSourceScanExec =>
203203
ScanTransformerFactory.createFileSourceScanTransformer(plan)
204204
case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
205-
// TODO: Add DynamicPartitionPruningHiveScanSuite.scala
206205
HiveTableScanExecTransformer(plan)
207206
case plan: CoalesceExec =>
208207
ColumnarCoalesceExec(plan.numPartitions, plan.child)

gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,21 @@ case class HiveTableScanExecTransformer(
7676
override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] =
7777
partitionWithReadFileFormats
7878

79-
override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] =
80-
distinctReadFileFormats
79+
override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = {
80+
if (!relation.isPartitioned) {
81+
Set(fileFormat)
82+
} else {
83+
// Use statically pruned partitions from the relation instead of prunedPartitions
84+
// to avoid triggering DPP (Dynamic Partition Pruning) subquery evaluation during
85+
// validation, when those subqueries have not yet been executed.
86+
relation.prunedPartitions match {
87+
case Some(partitions) if partitions.nonEmpty =>
88+
partitions.map(p => getReadFileFormat(p.storage)).toSet
89+
case _ =>
90+
Set(fileFormat)
91+
}
92+
}
93+
}
8194

8295
override def getPartitionSchema: StructType = relation.tableMeta.partitionSchema
8396

gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.metric.{GlutenCustomMetricsSuite, GlutenSQ
4040
import org.apache.spark.sql.execution.python._
4141
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
4242
import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite}
43+
import org.apache.spark.sql.hive.{GlutenDynamicPartitionPruningHiveScanSuiteAEOff, GlutenDynamicPartitionPruningHiveScanSuiteAEOn}
4344
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
4445
import org.apache.spark.sql.sources._
4546
import org.apache.spark.sql.streaming._
@@ -925,6 +926,12 @@ class VeloxTestSettings extends BackendTestSettings {
925926
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOn]
926927
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan]
927928
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan]
929+
enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOff]
930+
.exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec")
931+
.exclude("Make sure dynamic pruning works on uncorrelated queries")
932+
enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOn]
933+
.exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec")
934+
.exclude("Make sure dynamic pruning works on uncorrelated queries")
928935
enableSuite[GlutenExpressionsSchemaSuite]
929936
enableSuite[GlutenExtraStrategiesSuite]
930937
enableSuite[GlutenFileBasedDataSourceSuite]
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.hive
18+
19+
import org.apache.gluten.execution.{FileSourceScanExecTransformer, FilterExecTransformerBase}
20+
21+
import org.apache.spark.sql._
22+
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
23+
import org.apache.spark.sql.execution._
24+
import org.apache.spark.sql.execution.adaptive._
25+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec}
26+
import org.apache.spark.sql.hive.execution.HiveTableScanExec
27+
import org.apache.spark.sql.internal.SQLConf
28+
29+
abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase
30+
extends DynamicPartitionPruningHiveScanSuiteBase
31+
with GlutenSQLTestsTrait {
32+
33+
import testImplicits._
34+
35+
override def beforeAll(): Unit = {
36+
prepareWorkDir()
37+
super.beforeAll()
38+
spark.sparkContext.setLogLevel("WARN")
39+
}
40+
41+
override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = {
42+
flatMap(plan) {
43+
case s: FileSourceScanExecTransformer =>
44+
s.partitionFilters.collect { case d: DynamicPruningExpression => d.child }
45+
case s: FileSourceScanExec =>
46+
s.partitionFilters.collect { case d: DynamicPruningExpression => d.child }
47+
case h: HiveTableScanExec =>
48+
h.partitionPruningPred.collect { case d: DynamicPruningExpression => d.child }
49+
case h: HiveTableScanExecTransformer =>
50+
h.partitionPruningPred.collect { case d: DynamicPruningExpression => d.child }
51+
case _ => Nil
52+
}
53+
}
54+
55+
override def checkPartitionPruningPredicate(
56+
df: DataFrame,
57+
withSubquery: Boolean,
58+
withBroadcast: Boolean): Unit = {
59+
df.collect()
60+
61+
val plan = df.queryExecution.executedPlan
62+
val dpExprs = collectDynamicPruningExpressions(plan)
63+
val hasSubquery = dpExprs.exists {
64+
case InSubqueryExec(_, _: SubqueryExec, _, _, _, _) => true
65+
case _ => false
66+
}
67+
val subqueryBroadcast = dpExprs.collect {
68+
case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) => b
69+
case InSubqueryExec(_, b: ColumnarSubqueryBroadcastExec, _, _, _, _) => b
70+
}
71+
72+
val hasFilter = if (withSubquery) "Should" else "Shouldn't"
73+
assert(
74+
hasSubquery == withSubquery,
75+
s"$hasFilter trigger DPP with a subquery duplicate:\n${df.queryExecution}")
76+
val hasBroadcast = if (withBroadcast) "Should" else "Shouldn't"
77+
assert(
78+
subqueryBroadcast.nonEmpty == withBroadcast,
79+
s"$hasBroadcast trigger DPP with a reused broadcast exchange:\n${df.queryExecution}")
80+
81+
subqueryBroadcast.foreach {
82+
s =>
83+
s.child match {
84+
case _: ReusedExchangeExec => // reuse check ok.
85+
case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) => // reuse check ok.
86+
case b: BroadcastExchangeLike =>
87+
val hasReuse = plan.find {
88+
case ReusedExchangeExec(_, e) => e eq b
89+
case _ => false
90+
}.isDefined
91+
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
92+
case a: AdaptiveSparkPlanExec =>
93+
val broadcastQueryStage = collectFirst(a) { case b: BroadcastQueryStageExec => b }
94+
val broadcastPlan = broadcastQueryStage.get.broadcast
95+
val hasReuse = find(plan) {
96+
case ReusedExchangeExec(_, e) => e eq broadcastPlan
97+
case b: BroadcastExchangeLike => b eq broadcastPlan
98+
case _ => false
99+
}.isDefined
100+
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
101+
case _ =>
102+
fail(s"Invalid child node found in\n$s")
103+
}
104+
}
105+
106+
val isMainQueryAdaptive = plan.isInstanceOf[AdaptiveSparkPlanExec]
107+
subqueriesAll(plan).filterNot(subqueryBroadcast.contains).foreach {
108+
s =>
109+
val subquery = s match {
110+
case r: ReusedSubqueryExec => r.child
111+
case o => o
112+
}
113+
assert(
114+
subquery.find(_.isInstanceOf[AdaptiveSparkPlanExec]).isDefined == isMainQueryAdaptive)
115+
}
116+
}
117+
118+
override def checkDistinctSubqueries(df: DataFrame, n: Int): Unit = {
119+
df.collect()
120+
121+
val buf = collectDynamicPruningExpressions(df.queryExecution.executedPlan).collect {
122+
case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) =>
123+
b.indices
124+
case InSubqueryExec(_, b: ColumnarSubqueryBroadcastExec, _, _, _, _) =>
125+
b.indices
126+
}
127+
assert(buf.distinct.size == n)
128+
}
129+
130+
override def checkUnpushedFilters(df: DataFrame): Boolean = {
131+
find(df.queryExecution.executedPlan) {
132+
case FilterExec(condition, _) =>
133+
splitConjunctivePredicates(condition).exists {
134+
case _: DynamicPruningExpression => true
135+
case _ => false
136+
}
137+
case transformer: FilterExecTransformerBase =>
138+
splitConjunctivePredicates(transformer.cond).exists {
139+
case _: DynamicPruningExpression => true
140+
case _ => false
141+
}
142+
case _ => false
143+
}.isDefined
144+
}
145+
146+
testGluten("Make sure dynamic pruning works on uncorrelated queries") {
147+
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
148+
val df = sql("""
149+
|SELECT d.store_id,
150+
| SUM(f.units_sold),
151+
| (SELECT SUM(f.units_sold)
152+
| FROM fact_stats f JOIN dim_stats d ON d.store_id = f.store_id
153+
| WHERE d.country = 'US') AS total_prod
154+
|FROM fact_stats f JOIN dim_stats d ON d.store_id = f.store_id
155+
|WHERE d.country = 'US'
156+
|GROUP BY 1
157+
""".stripMargin)
158+
checkAnswer(df, Row(4, 50, 70) :: Row(5, 10, 70) :: Row(6, 10, 70) :: Nil)
159+
160+
val plan = df.queryExecution.executedPlan
161+
val countSubqueryBroadcasts =
162+
collectWithSubqueries(plan) {
163+
case _: SubqueryBroadcastExec => 1
164+
case _: ColumnarSubqueryBroadcastExec => 1
165+
}.sum
166+
167+
val countReusedSubqueryBroadcasts =
168+
collectWithSubqueries(plan) {
169+
case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1
170+
case ReusedSubqueryExec(_: ColumnarSubqueryBroadcastExec) => 1
171+
}.sum
172+
173+
assert(countSubqueryBroadcasts == 1)
174+
assert(countReusedSubqueryBroadcasts == 1)
175+
}
176+
}
177+
178+
testGluten("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec") {
179+
withTable("duplicate_keys") {
180+
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
181+
Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US"))
182+
.toDF("store_id", "country")
183+
.write
184+
.format(tableFormat)
185+
.saveAsTable("duplicate_keys")
186+
187+
val df = sql("""
188+
|SELECT date_id, product_id FROM fact_sk f
189+
|JOIN duplicate_keys s
190+
|ON f.store_id = s.store_id WHERE s.country = 'US' AND date_id > 1050
191+
""".stripMargin)
192+
193+
checkPartitionPruningPredicate(df, withSubquery = false, withBroadcast = true)
194+
195+
val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
196+
case s: ColumnarSubqueryBroadcastExec => s
197+
}
198+
assert(subqueryBroadcastExecs.size === 1)
199+
subqueryBroadcastExecs.foreach {
200+
subqueryBroadcastExec =>
201+
assert(subqueryBroadcastExec.metrics("numOutputRows").value === 1)
202+
}
203+
204+
checkAnswer(df, Row(1060, 2) :: Row(1060, 2) :: Row(1060, 2) :: Nil)
205+
}
206+
}
207+
}
208+
}
209+
210+
class GlutenDynamicPartitionPruningHiveScanSuiteAEOff
211+
extends GlutenDynamicPartitionPruningHiveScanSuiteBase
212+
with DisableAdaptiveExecutionSuite
213+
214+
class GlutenDynamicPartitionPruningHiveScanSuiteAEOn
215+
extends GlutenDynamicPartitionPruningHiveScanSuiteBase
216+
with EnableAdaptiveExecutionSuite

gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.metric.{GlutenCustomMetricsSuite, GlutenSQ
4040
import org.apache.spark.sql.execution.python._
4141
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
4242
import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite}
43+
import org.apache.spark.sql.hive.{GlutenDynamicPartitionPruningHiveScanSuiteAEOff, GlutenDynamicPartitionPruningHiveScanSuiteAEOn}
4344
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
4445
import org.apache.spark.sql.sources._
4546
import org.apache.spark.sql.streaming._
@@ -898,6 +899,12 @@ class VeloxTestSettings extends BackendTestSettings {
898899
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOn]
899900
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan]
900901
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan]
902+
enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOff]
903+
.exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec")
904+
.exclude("Make sure dynamic pruning works on uncorrelated queries")
905+
enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOn]
906+
.exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec")
907+
.exclude("Make sure dynamic pruning works on uncorrelated queries")
901908
enableSuite[GlutenExpressionsSchemaSuite]
902909
enableSuite[GlutenExtraStrategiesSuite]
903910
enableSuite[GlutenFileBasedDataSourceSuite]

0 commit comments

Comments
 (0)