diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index ca7c8442d5f90..b3799217a6819 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -111,7 +111,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join "since there are no usage for multiple broadcasting keys at the moment.") val indices = Seq(joinKeys.indexOf(filteringKeys.head)) lazy val hasBenefit = pruningHasBenefit( - pruningKey, partScan, filteringKeys.head, filteringPlan) + pruningKey, partScan, filteringKeys.head, filteringPlan, hasSelectivePredicate(filteringPlan)) if (reuseEnabled || hasBenefit) { // insert a DynamicPruning wrapper to identify the subquery during query planning Filter( @@ -134,45 +134,58 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join * in bytes of the plan on the other side of the join. We estimate the filtering ratio * using column statistics if they are available, otherwise we use the config value of * `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`. + * + * The fallback ratio is only meaningful "when CBO stats are missing, but there is a predicate + * that is likely to be selective" -- so it is used only when `hasSelectivePredicate` is true. A + * filtering side that is eligible only because it is already materialized (a LocalRelation or a + * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate; for it we rely solely on + * the statistics-based ratio and report no benefit when statistics are unavailable, so it is not + * injected as a standalone always-applied subquery on a guessed ratio. A statistics-based ratio, + * when available, is always honored regardless of `hasSelectivePredicate`. */ private def pruningHasBenefit( partExpr: Expression, partPlan: LogicalPlan, otherExpr: Expression, - otherPlan: LogicalPlan): Boolean = { + otherPlan: LogicalPlan, + hasSelectivePredicate: Boolean): Boolean = { // get the distinct counts of an attribute for a given table def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = { plan.stats.attributeStats.get(attr).flatMap(_.distinctCount) } - // the default filtering ratio when CBO stats are missing, but there is a - // predicate that is likely to be selective - val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio - // the filtering ratio based on the type of the join condition and on the column statistics - val filterRatio = (partExpr.references.toList, otherExpr.references.toList) match { - // filter out expressions with more than one attribute on any side of the operator - case (leftAttr :: Nil, rightAttr :: Nil) - if conf.dynamicPartitionPruningUseStats => - // get the CBO stats for each attribute in the join condition - val partDistinctCount = distinctCounts(leftAttr, partPlan) - val otherDistinctCount = distinctCounts(rightAttr, otherPlan) - val availableStats = partDistinctCount.isDefined && partDistinctCount.get > 0 && - otherDistinctCount.isDefined - if (!availableStats) { - fallbackRatio - } else if (partDistinctCount.get.toDouble <= otherDistinctCount.get.toDouble) { - // there is likely an estimation error, so we fallback - fallbackRatio - } else { - 1 - otherDistinctCount.get.toDouble / partDistinctCount.get.toDouble - } - case _ => fallbackRatio + // the filtering ratio derived from column statistics, when reliable stats are available + val statsBasedRatio: Option[Double] = + (partExpr.references.toList, otherExpr.references.toList) match { + // filter out expressions with more than one attribute on any side of the operator + case (leftAttr :: Nil, rightAttr :: Nil) + if conf.dynamicPartitionPruningUseStats => + // get the CBO stats for each attribute in the join condition + val partDistinctCount = distinctCounts(leftAttr, partPlan) + val otherDistinctCount = distinctCounts(rightAttr, otherPlan) + val availableStats = partDistinctCount.isDefined && partDistinctCount.get > 0 && + otherDistinctCount.isDefined + if (!availableStats) { + None + } else if (partDistinctCount.get.toDouble <= otherDistinctCount.get.toDouble) { + // there is likely an estimation error, so there is no reliable stats-based ratio + None + } else { + Some(1 - otherDistinctCount.get.toDouble / partDistinctCount.get.toDouble) + } + case _ => None + } + + // Without a reliable stats-based ratio, fall back to the configured ratio only when there is a + // predicate likely to be selective; otherwise there is no evidence of a pruning benefit. + val filterRatio = statsBasedRatio.orElse { + if (hasSelectivePredicate) Some(conf.dynamicPartitionPruningFallbackFilterRatio) else None } - val estimatePruningSideSize = filterRatio * partPlan.stats.sizeInBytes.toFloat - val overhead = calculatePlanOverhead(otherPlan) - estimatePruningSideSize > overhead + filterRatio.exists { ratio => + ratio * partPlan.stats.sizeInBytes.toFloat > calculatePlanOverhead(otherPlan) + } } /** @@ -200,20 +213,30 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join /** - * Search for a selective filtering operation, a LocalRelation, or a checkpoint-derived - * LogicalRDD. - * - * LocalRelation rows are already locally available. A checkpoint-derived LogicalRDD establishes - * an explicit checkpoint boundary and can be used as a broadcast build side for DPP without - * evaluating the computation upstream of that boundary again. - * - * InMemoryRelation is intentionally excluded because cache() and persist() are lazy: its - * presence does not guarantee the cached data has been materialized, and missing or evicted - * blocks may require evaluating the upstream computation again. + * Whether the plan carries a predicate that is likely to filter it down to a small, selective + * key set. This is the only signal we have that dynamic partition pruning would actually prune a + * meaningful number of partitions, so it also governs whether the estimated benefit may be + * trusted (see `insertPredicate`). */ - private def hasSelectivePredicateOrLocalOrCheckpointedInput(plan: LogicalPlan): Boolean = { + private def hasSelectivePredicate(plan: LogicalPlan): Boolean = { plan.exists { case f: Filter => isLikelySelective(f.condition) + case _ => false + } + } + + /** + * Whether the plan contains an already-materialized filtering side whose keys are available + * without re-running upstream computation: a LocalRelation (rows already available locally) or a + * checkpoint-derived LogicalRDD (an explicit checkpoint boundary that can serve as a broadcast + * build side without re-evaluating the computation above it). + * + * InMemoryRelation is intentionally excluded because cache() and persist() are lazy: its presence + * does not guarantee the cached data has been materialized, and missing or evicted blocks may + * require evaluating the upstream computation again. + */ + private def hasMaterializedInput(plan: LogicalPlan): Boolean = { + plan.exists { case _: LocalRelation => true case r: LogicalRDD => r.isCheckpointedInput case _ => false @@ -221,14 +244,12 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join } /** - * To be able to prune partitions on a join key, the filtering side needs to - * meet the following requirements: - * (1) it can not be a stream - * (2) it needs to contain a selective predicate, a LocalRelation, or a checkpoint-derived - * LogicalRDD + * To be able to prune partitions on a join key, the filtering side must not be a stream and must + * be able to cheaply supply a small set of pruning keys, either because it is selectively + * filtered or because it is already materialized. */ private def hasPartitionPruningFilter(plan: LogicalPlan): Boolean = { - !plan.isStreaming && hasSelectivePredicateOrLocalOrCheckpointedInput(plan) + !plan.isStreaming && (hasSelectivePredicate(plan) || hasMaterializedInput(plan)) } private def prune(plan: LogicalPlan): LogicalPlan = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index d303a03ba64b7..af855c54e0bbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1762,6 +1762,79 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat } } + test("SPARK-54593: a materialized filtering side is not injected as a standalone DPP subquery " + + "without an estimated pruning benefit") { + // Broadcast joins are disabled, so the DPP filter cannot reuse a broadcast exchange. A + // materialized filtering side carries no selective predicate, so its pruning benefit cannot be + // estimated; it must therefore not be injected as an always-applied standalone subquery (which + // would re-execute the filtering side and prune nothing for a non-selective side). + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTable("events") { + Seq((1, "hour1", "a"), (2, "hour1", "b"), (3, "hour2", "a")) + .toDF("id", "hour", "category") + .write + .partitionBy("hour", "category") + .format(tableFormat) + .mode("overwrite") + .saveAsTable("events") + + // A materialized filtering side that covers every partition: it offers no pruning benefit. + val sampledKeys = Seq("hour1||a", "hour1||b", "hour2||a").toDF("hc_key") + assert(sampledKeys.queryExecution.optimizedPlan.exists(_.isInstanceOf[LocalRelation])) + + val events = spark.table("events").as("events") + val sampled = sampledKeys.as("sampled") + val df = events + .join(sampled, concat_ws("||", $"events.hour", $"events.category") === $"sampled.hc_key") + .select($"events.id") + + checkPartitionPruningPredicate(df, withSubquery = false, withBroadcast = false) + checkAnswer(df, Row(1) :: Row(2) :: Row(3) :: Nil) + } + } + } + + test("SPARK-54593: a materialized filtering side keeps statistics-backed standalone DPP") { + // A checkpoint-derived LogicalRDD has no Filter but can retain column statistics. When those + // statistics establish a pruning benefit, DPP must still be injected as a standalone subquery + // when no broadcast can be reused -- the materialized-input handling gates only the + // no-statistics fallback ratio, not the statistics-based benefit. + withSQLConf( + SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + // The fallback ratio offers no benefit, so DPP can only come from the statistics-based ratio. + SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0", + // No broadcast can be reused, so an injected filter must be a standalone subquery. + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false", + SQLConf.CBO_ENABLED.key -> "true") { + withTable("dim_one") { + // A dimension with a single distinct store_id, with column statistics computed. + spark.range(20).selectExpr("1 AS store_id") + .write.format(tableFormat).saveAsTable("dim_one") + sql("ANALYZE TABLE dim_one COMPUTE STATISTICS FOR COLUMNS store_id") + + // Checkpoint a projection of it: a LogicalRDD with no Filter that retains the NDV (= 1). + val keys = spark.table("dim_one").select("store_id").localCheckpoint(eager = true) + val keysPlan = keys.queryExecution.optimizedPlan + assert(keysPlan.exists { + case r: LogicalRDD => r.isCheckpointedInput + case _ => false + }) + assert(keysPlan.stats.attributeStats.values.exists(_.distinctCount.contains(BigInt(1))), + s"checkpointed side should retain the NDV statistic:\n$keysPlan") + + // fact_stats is partitioned by store_id (NDV > 1) and has column statistics, so the + // statistics-based ratio establishes a pruning benefit for this materialized side. + val df = spark.table("fact_stats").join(keys, "store_id").select("date_id") + + checkPartitionPruningPredicate(df, withSubquery = true, withBroadcast = false) + } + } + } + test("DPP does not treat a non-checkpointed LogicalRDD as a selective filtering side") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {