[SPARK-54593][SQL] Fix DPP eligibility for materialized filtering sides#56535
Open
sunchao wants to merge 1 commit into
Open
[SPARK-54593][SQL] Fix DPP eligibility for materialized filtering sides#56535sunchao wants to merge 1 commit into
sunchao wants to merge 1 commit into
Conversation
Co-authored-by: Tri Tam Hoang <tritam.hoang@gmail.com> Co-authored-by: Dustin Smith <Dustin.William.Smith@gmail.com>
Member
Author
|
cc @cloud-fan @MaxGekk @viirya would appreciate reviews on this follow-up, especially alongside #56603. Both address regressions from #56071, but at different layers:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why are the changes needed?
PR #56071 allowed dynamic partition pruning (DPP) when the filtering side contained locally available rows (
LocalRelation) or a checkpoint-derivedLogicalRDD. That check was too broad: a materialized leaf does not guarantee that the operators above it return the same rows every time they run.For example, consider a fact table partitioned by
pand a filtering plan that applies stateful user code above a checkpoint:The original query contains one evaluation of
keys, but DPP may introduce another evaluation to decide which fact-table partitions to scan:1and prunes the fact table to partitionp = 1.2and needs the row from partitionp = 2.p = 2has already been pruned, so Spark can incorrectly return no rows.The same mismatch can happen when DPP binds to a matching sibling broadcast elsewhere in the physical plan. A subquery or other non-repeatable operator above the materialized leaf has the same fundamental problem.
There is a second issue with lazy checkpoints. The checkpoint marker records that a
LogicalRDDcame fromcheckpoint()orlocalCheckpoint(), but a lazy checkpoint is not actually materialized until its first action completes. Treating the marker alone as proof of materialization can therefore duplicate the original upstream computation before its lineage has been truncated.This is a follow-up to #56071. The materialized-input approach originated in #53263 (SPARK-54554) and was extended to
LocalRelationandLogicalRDDin #53324 (SPARK-54593). This follow-up credits @mc8max and @dwsmith1983 as co-authors, as requested in the attribution discussion on #56071.What changes were proposed in this PR?
LogicalRDDto be both provenance-marked and actually materialized according toRDD.isCheckpointed.Project,Filter,Union, andSubqueryAliasnodes.mapPartitions, scalar subqueries, standalone DPP, lazy checkpoint materialization, and the sibling-broadcast wrong-result shape with adaptive execution both disabled and enabled.This only narrows the materialized-input eligibility added on unreleased
master. The older DPP path for plans with a selectiveFilteris unchanged.Generated-by: OpenAI Codex
How was this PR tested?
build/sbt 'sql/testOnly org.apache.spark.sql.DynamicPartitionPruningV1SuiteAEOff org.apache.spark.sql.DynamicPartitionPruningV1SuiteAEOn'(82 passed, 2 ignored)build/sbt 'sql/testOnly org.apache.spark.sql.DatasetSuite -- -z "Dataset.checkpoint() - basic"'(4 passed)build/sbt sql/scalastyle sql/Test/scalastyle(0 errors and 0 warnings)