[GLUTEN-12013][VL] Fix bloom-filter bytes corruption on whole-stage AQE fallback#12151
[GLUTEN-12013][VL] Fix bloom-filter bytes corruption on whole-stage AQE fallback#12151brijrajk wants to merge 2 commits into
Conversation
4a56662 to
9bf19dc
Compare
|
Run Gluten Clickhouse CI on x86 |
@brijrajk, thanks for the PR. Could you rebase the code to see if the CI failures go away? |
9bf19dc to
009a9a8
Compare
|
Done — rebased onto current main and force-pushed. Fresh CI triggered. |
009a9a8 to
3148dbe
Compare
| override def apply(plan: SparkPlan): SparkPlan = { | ||
| if (!BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) { | ||
| return plan | ||
| } | ||
| plan match { |
| val df = spark.sql(sqlString) | ||
| // Must not throw java.io.IOException: Unexpected Bloom filter version number (16777217) | ||
| df.collect | ||
| // All 200003 rows match the bloom filter built from the same data. | ||
| assert(df.count() == 200003L) |
|
@brijrajk, could you first check if Copilot's comments make sense? |
|
Thanks for flagging this, @philo-he! Both of Copilot's comments were valid: 1. Patcher active when native bloom filter is disabled When Added a second guard: 2. Combined into |
|
@brijrajk, thanks for the update. Could you check if my following understanding is correct? Besides the |
|
@philo-he You are absolutely right. We confirmed it with a test case. How threshold and cost work
Test case confirming the failure testGluten(
"Test bloom_filter_agg whole-stage fallback when both stages fall back",
Issue12013) {
...
if (BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) {
// threshold=1: Stage 0's inherent transition cost of 1 meets the threshold, so
// ExpandFallbackPolicy promotes Stage 0 to a whole-stage fallback as well.
// Stage 0 runs as Spark and produces Spark-format bytes. Stage 1 also falls back.
// The patcher must NOT rewrite BloomFilterMightContain -> VeloxBloomFilterMightContain
// in this case.
withSQLConf(
GlutenConfig.COLUMNAR_FILTER_ENABLED.key -> "false",
GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1",
SQLConf.ANSI_ENABLED.key -> "false"
) {
val df = spark.sql(sqlString)
assert(df.collect().length == 200003L)
}
}
}Output
Proposed fix The root cause is that Do you see any concerns with this approach, or is there a cleaner way you would handle it? |
25c7fd9 to
2727774
Compare
| * This rule runs as a second fallback-policy pass, after `ExpandFallbackPolicy`, so it only acts | ||
| * when the plan is already wrapped in a `FallbackNode`. | ||
| */ | ||
| case class BloomFilterMightContainFallbackPatcher() extends Rule[SparkPlan] { |
There was a problem hiding this comment.
I don't recall why BloomFilterMightContainJointRewriteRule was made a physical rule, but can you try turning it to a logical rule anyway? So such a patcher rule can be avoided?
There was a problem hiding this comment.
Done — BloomFilterMightContainJointRewriteRule is now a Rule[LogicalPlan] registered via injectOptimizerRule, modelled after CollectRewriteRule. The patcher is gone. Running as an optimizer rule ensures both substitutions (BloomFilterAggregate → VeloxBloomFilterAggregate and BloomFilterMightContain → VeloxBloomFilterMightContain) are captured in the originalPlan snapshot before ExpandFallbackPolicy takes it, so the byte format stays consistent regardless of which stages fall back. This also fixes the threshold=1 case where Stage 0 itself falls back (the patcher would incorrectly rewrite the filter side while Stage 0 was producing Spark-format bytes).
f64edd1 to
cac891f
Compare
rdtr
left a comment
There was a problem hiding this comment.
could CallerInfo.isBloomFilterStatFunction() and inBloomFilterStatFunctionCall() be removed now with this PR?
| // from the original vanilla Spark plan which contains BloomFilterMightContain (not the Velox | ||
| // variant). If Stage 0 (bloom_filter_agg subquery) already ran natively it produced Velox- | ||
| // format bytes, which BloomFilterImpl.readFrom() cannot deserialize. BloomFilterMightContain- | ||
| // FallbackPatcher patches the fallback plan to use VeloxBloomFilterMightContain so Stage 1 |
There was a problem hiding this comment.
I think Patcher is now gone so this comment is outdated?
There was a problem hiding this comment.
Fixed — updated the comment to describe the optimizer rule approach instead.
cac891f to
59c6c50
Compare
299f4f8 to
9d096a3
Compare
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
c06593c to
aca2904
Compare
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
1fe11de to
51185c7
Compare
|
Run Gluten Clickhouse CI on x86 |
51185c7 to
04f38a3
Compare
|
Run Gluten Clickhouse CI on x86 |
04f38a3 to
c34c4e6
Compare
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
|
8a1ef81 to
cd58f6f
Compare
|
Run Gluten Clickhouse CI on x86 |
cd58f6f to
bd52842
Compare
|
Run Gluten Clickhouse CI on x86 |
bd52842 to
e2c7eb4
Compare
|
Run Gluten Clickhouse CI on x86 |
TPC-DS golden regenerationFollowing @zhztheplayer's suggestion to make
|
e2c7eb4 to
9d85377
Compare
|
Run Gluten Clickhouse CI on x86 |
…QE fallback (incl. SPARK-54336)
BloomFilterMightContainJointRewriteRule keeps a bloom-filter producer
(bloom_filter_agg) and its consumer (might_contain) on the same serialized byte
format, so they never end up mismatched (Velox version=1 vs Spark version=0) when
AQE promotes an individual-stage fallback to a whole-stage fallback -- the original
GLUTEN-12013 crash (java.io.IOException: Unexpected Bloom filter version number).
The rule runs as a Rule[LogicalPlan] via injectOptimizerRule (Operator Optimization
batch), so the substitution is captured in the originalPlan snapshot that
ExpandFallbackPolicy uses when promoting a stage fallback to a whole-stage AQE
fallback -- both sides stay consistent regardless of which stages fall back to JVM
execution.
- might_contain(ScalarSubquery(...), col) with a plain column value: rewrite both
the inner aggregate and the outer might-contain to their Velox forms.
- might_contain(ScalarSubquery(...), <non-column>) -- e.g. a literal, as in
SPARK-54336: might_contain((SELECT bloom_filter_agg(col) FROM t), 0L): leave both
vanilla (version=0), which also preserves vanilla's NULL-on-empty-input
semantics. Rewriting only the outer side caused kBloomFilterV1 == version (1 vs. 0).
- Standalone BloomFilterAggregate (e.g. DataFrame.stat.bloomFilter()) is never
matched, so its bytes stay Spark-native (fixes GlutenDataFrameStatSuite).
Because the rule runs before InjectRuntimeFilter and MergeScalarSubqueries,
DPP/runtime-filter might_contain expressions and ScalarSubqueryReference nodes are
never observed here, so no special handling is needed for them.
Adds GlutenBloomFilterFallbackSuite (gluten-ut/test) covering: only-filter-stage
fallback (threshold=2), both-stages fallback (threshold=1),
DataFrame.stat.bloomFilter() standalone usage, native-bloom-filter-disabled
early-exit, and the SPARK-54336 literal-value query. Also removes the now-unused
CallerInfo.isBloomFilterStatFunction() helper.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… 4.0/4.1 BloomFilterMightContainJointRewriteRule now runs as a logical optimizer rule and rewrites BloomFilterAggregate/BloomFilterMightContain to their Velox forms (velox_bloom_filter_agg / velox_might_contain) for the affected TPC-DS queries. Regenerate the spark40 and spark41 plan-stability golden files accordingly. Operator structure (simplified.txt) is unchanged for non-bloom queries. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
9d85377 to
932ff73
Compare
|
Run Gluten Clickhouse CI on x86 |
|
@zhztheplayer — quick update: CI is now fully green on this PR ✅ (62/62 checks pass; approved & mergeable). The earlier |
What changes are proposed in this pull request?
Fixes #12013, and the
SPARK-54336regression that the issue's query shape exposes.Background
BloomFilterMightContainJointRewriteRulerewrites a bloom-filter producer(
bloom_filter_agg) and its consumer (might_contain) to their Velox variants so Veloxevaluates them natively. It is a
Rule[LogicalPlan]registered viainjectOptimizerRule,which lands in Spark's Operator Optimization batch. Running there ensures the
substitution is baked into the
originalPlansnapshot thatExpandFallbackPolicycaptures when it promotes an individual-stage fallback to a whole-stage AQE fallback — so
both sides keep the same serialized byte format even if a stage reverts to JVM execution.
That is the original GLUTEN-12013 crash (
java.io.IOException: Unexpected Bloom filter version number).The
SPARK-54336crashSpark 4.0.2 / 4.1 added
BloomFilterAggregateQuerySuite."SPARK-54336":The
might_containvalue here is a literal (0L), not a column. The earlier revisionwrapped only the outer
might_containinVeloxBloomFilterMightContain(whichexpects version=1 bytes) but left the inner
bloom_filter_aggvanilla. Vanillabloom_filter_agghas no Substrait mapping, so it runs in the JVM and emits version=0bytes, which the Velox might-contain then fails to deserialize:
Fix
Keep the producer and consumer on the same byte format — rewrite them together, or
not at all:
might_contain(ScalarSubquery(...), col)(plain column value): rewrite both theinner aggregate and the outer might-contain to their Velox forms (version=1). This is the
user-facing filter path protected across whole-stage AQE fallback (GLUTEN-12013).
might_contain(ScalarSubquery(...), <non-column>)(e.g. a literal, as inSPARK-54336): leave both vanilla (version=0). This also preserves vanilla'sNULL-on-empty-input semantics, somight_contain((SELECT bloom_filter_agg(x) FROM empty), v)still returnsNULLrather thanfalse.Standalone
BloomFilterAggregate(e.g.DataFrame.stat.bloomFilter()) is never matched,so its collected bytes stay in Spark-native format.
Because this rule runs in the Operator Optimization batch — before
InjectRuntimeFilterandMergeScalarSubqueries— it never observes DPP/runtime-filtermight_containexpressions (which hash the key withxxhash64) orScalarSubqueryReferencenodes (created by subquery merging). Those are produceddownstream and are unaffected by this rule, so no special handling is needed for them.
Files changed
BloomFilterMightContainJointRewriteRule.scala— column-valuedmight_containrewritesboth sides to Velox; a non-column value leaves both vanilla.
VeloxRuleApi.scala— registers the rule viainjectOptimizerRule(replacing theearlier
injectPreTransform/Rule[SparkPlan]+ fallback patcher approach).CallerInfo.scala(gluten-core) — removes the now-unusedisBloomFilterStatFunction()helper (and
inBloomFilterStatFunctionCall); the rule keys off the expression pattern,not the call site, so a standalone
BloomFilterAggregateis excluded inherently.GlutenBloomFilterFallbackSuite.scala(gluten-ut/test) — regression tests for thewhole-stage-fallback and literal-value (
SPARK-54336) scenarios, plus theDataFrame.stat.bloomFilter()andnative bloom filter disabledguards.gluten-ut/spark40andgluten-ut/spark41under
tpcds-plan-stability/gluten-approved-plans-{v1_4,v2_7,modified}/. Because therule now runs in the Operator Optimization batch, the optimized plans of the TPC-DS
queries that use runtime bloom filters (
q2, q10, q16, q24a, q24b, q32, q37, q40, q59, q64, q69, q80, q82, q85, q92, q94, q95,v2.7 q10a/q64/q80a, and the twomodifiedvariants) now carry
velox_bloom_filter_agg/velox_might_containinstead of thevanilla forms. The
explain.txt/simplified.txtgoldens are regenerated accordingly.Operator structure is unchanged for non-bloom queries, and no TPC-H goldens change.
The final patch is intentionally split into two commits:
How was this patch tested?
Verified against the Velox backend on Spark 4.0:
GlutenBloomFilterAggregateQuerySuiteCGOff— incl.SPARK-54336andmight_contain on bloom_filter_agg with empty inputSPARK-54336previously crashed)GlutenBloomFilterFallbackSuite(gluten-ut/test)gluten-ut/spark40+gluten-ut/spark41)SPARK_GENERATE_GOLDEN_FILES=1)GlutenBloomFilterFallbackSuitetests, guarded withrequireBloomFilterAggMightContainJointFallback():threshold=2) —bloom_filter_aggruns natively(Velox bytes), the filter stage falls back; asserts
velox_might_containis in theoptimized plan and the query succeeds.
threshold=1) — both sides execute via JNI in JVM row-mode,producing/consuming Velox-format bytes consistently.
DataFrame.stat.bloomFilter()— standalone aggregate stays vanilla;readFrom()succeeds.
SPARK-54336(new) —might_contain((SELECT bloom_filter_agg(col) FROM t), 0L);both sides stay vanilla and the query returns the correct result without the version
mismatch.
Was this patch authored or co-authored using generative AI tooling?
Yes. Claude Code was used as an AI assistant during development.