Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,14 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
require(filteringKeys.size == 1, "DPP Filters should only have a single broadcasting key " +
"since there are no usage for multiple broadcasting keys at the moment.")
val indices = Seq(joinKeys.indexOf(filteringKeys.head))
lazy val hasBenefit = pruningHasBenefit(
pruningKey, partScan, filteringKeys.head, filteringPlan)
// A filtering side that is eligible only because it is already materialized (a LocalRelation
// or a checkpoint-derived LogicalRDD) carries no selective predicate. pruningHasBenefit's
// fallback filtering ratio is justified only "when CBO stats are missing, but there is a
// predicate that is likely to be selective", which does not hold here, so it would assume a
// benefit that may not exist. Without an estimated benefit such a side is only injected when it
// can reuse a broadcast (onlyInBroadcast), never as a standalone always-applied subquery.
lazy val hasBenefit = hasSelectivePredicate(filteringPlan) &&

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Preserve stats-backed standalone pruning

This hasSelectivePredicate(filteringPlan) && pruningHasBenefit(...) guard short-circuits before pruningHasBenefit can consult column statistics. A checkpoint-derived LogicalRDD can retain valid attributeStats even though it no longer contains a Filter. I reproduced this with a 100-partition table, key-side NDV = 1, fact-side NDV > 1, fallbackFilterRatio=0, reuseBroadcastOnly=false, and no reusable broadcast: the parent injects standalone DPP and prunes the scan, while this head removes DPP and scans all 100 partitions with identical results. Please gate only the fallback-ratio path on hasSelectivePredicate, while preserving the NDV-backed benefit path.

pruningHasBenefit(pruningKey, partScan, filteringKeys.head, filteringPlan)
if (reuseEnabled || hasBenefit) {
// insert a DynamicPruning wrapper to identify the subquery during query planning
Filter(
Expand Down Expand Up @@ -200,35 +206,43 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join


/**
* Search for a selective filtering operation, a LocalRelation, or a checkpoint-derived
* LogicalRDD.
*
* LocalRelation rows are already locally available. A checkpoint-derived LogicalRDD establishes
* an explicit checkpoint boundary and can be used as a broadcast build side for DPP without
* evaluating the computation upstream of that boundary again.
*
* InMemoryRelation is intentionally excluded because cache() and persist() are lazy: its
* presence does not guarantee the cached data has been materialized, and missing or evicted
* blocks may require evaluating the upstream computation again.
* Whether the plan carries a predicate that is likely to filter it down to a small, selective
* key set. This is the only signal we have that dynamic partition pruning would actually prune a
* meaningful number of partitions, so it also governs whether the estimated benefit may be
* trusted (see `insertPredicate`).
*/
private def hasSelectivePredicateOrLocalOrCheckpointedInput(plan: LogicalPlan): Boolean = {
private def hasSelectivePredicate(plan: LogicalPlan): Boolean = {
plan.exists {
case f: Filter => isLikelySelective(f.condition)
case _ => false
}
}

/**
* Whether the plan contains an already-materialized filtering side whose keys are available
* without re-running upstream computation: a LocalRelation (rows already available locally) or a
* checkpoint-derived LogicalRDD (an explicit checkpoint boundary that can serve as a broadcast
* build side without re-evaluating the computation above it).
*
* InMemoryRelation is intentionally excluded because cache() and persist() are lazy: its presence
* does not guarantee the cached data has been materialized, and missing or evicted blocks may
* require evaluating the upstream computation again.
*/
private def hasMaterializedInput(plan: LogicalPlan): Boolean = {
plan.exists {
case _: LocalRelation => true
case r: LogicalRDD => r.isCheckpointedInput
case _ => false
}
}

/**
* To be able to prune partitions on a join key, the filtering side needs to
* meet the following requirements:
* (1) it can not be a stream
* (2) it needs to contain a selective predicate, a LocalRelation, or a checkpoint-derived
* LogicalRDD
* To be able to prune partitions on a join key, the filtering side must not be a stream and must
* be able to cheaply supply a small set of pruning keys, either because it is selectively
* filtered or because it is already materialized.
*/
private def hasPartitionPruningFilter(plan: LogicalPlan): Boolean = {
!plan.isStreaming && hasSelectivePredicateOrLocalOrCheckpointedInput(plan)
!plan.isStreaming && (hasSelectivePredicate(plan) || hasMaterializedInput(plan))
}

private def prune(plan: LogicalPlan): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1762,6 +1762,40 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
}
}

test("SPARK-54593: a materialized filtering side is not injected as a standalone DPP subquery " +
"without an estimated pruning benefit") {
// Broadcast joins are disabled, so the DPP filter cannot reuse a broadcast exchange. A
// materialized filtering side carries no selective predicate, so its pruning benefit cannot be
// estimated; it must therefore not be injected as an always-applied standalone subquery (which
// would re-execute the filtering side and prune nothing for a non-selective side).
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("events") {
Seq((1, "hour1", "a"), (2, "hour1", "b"), (3, "hour2", "a"))
.toDF("id", "hour", "category")
.write
.partitionBy("hour", "category")
.format(tableFormat)
.mode("overwrite")
.saveAsTable("events")

// A materialized filtering side that covers every partition: it offers no pruning benefit.
val sampledKeys = Seq("hour1||a", "hour1||b", "hour2||a").toDF("hc_key")
assert(sampledKeys.queryExecution.optimizedPlan.exists(_.isInstanceOf[LocalRelation]))

val events = spark.table("events").as("events")
val sampled = sampledKeys.as("sampled")
val df = events
.join(sampled, concat_ws("||", $"events.hour", $"events.category") === $"sampled.hc_key")
.select($"events.id")

checkPartitionPruningPredicate(df, withSubquery = false, withBroadcast = false)
checkAnswer(df, Row(1) :: Row(2) :: Row(3) :: Nil)
}
}
}

test("DPP does not treat a non-checkpointed LogicalRDD as a selective filtering side") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
Expand Down