Skip to content

[SPARK-56467][SQL] Route scalar subquery partition filters into DSv2 runtime filtering#55335

Open
anton5798 wants to merge 3 commits intoapache:masterfrom
anton5798:scalar-subquery-dsv2-pruning
Open

[SPARK-56467][SQL] Route scalar subquery partition filters into DSv2 runtime filtering#55335
anton5798 wants to merge 3 commits intoapache:masterfrom
anton5798:scalar-subquery-dsv2-pruning

Conversation

@anton5798
Copy link
Copy Markdown
Contributor

@anton5798 anton5798 commented Apr 14, 2026

What changes were proposed in this pull request?

Scalar subquery filters on partition columns (e.g., WHERE d_date_sk = (SELECT min(d_date_sk) FROM ...)) are excluded from pushdown in DSv2 at every stage. The filter lands as a FilterExec above BatchScanExec, evaluated row-by-row. The scan reads all partitions -- no partition pruning occurs.

DSv1 already handles this: FileSourceStrategy puts subquery filters in partitionFilters, isDynamicFilter classifies them as dynamic, and getPartitionPruningFilterFromBroadcast calls ScalarSubquery.toLiteral at execution time for partition pruning via listFiles().

This PR routes partition-column scalar subquery filters into BatchScanExec.runtimeFilters, leveraging the existing SupportsRuntimeV2Filtering.filter() infrastructure:

  • DataSourceV2Strategy: When the scan implements SupportsRuntimeV2Filtering, extract subquery filters from postScanFilters where references are a subset of partition columns. Add to runtimeFilters alongside existing DPP filters. They remain in postScanFilters as a correctness safety net (V2 filter() is advisory).
  • BatchScanExec: In filteredPartitions, non-DPP runtime filters are literalized (replacing ExecScalarSubquery with its resolved literal) and translated to V2 predicates via translateFilterV2.
  • InMemoryTableWithV2Filter (test infra): Added = predicate handling in filter() alongside existing IN, plus a case _ => catch-all.

No new interfaces, no config flags, no connector changes needed.

Why are the changes needed?

TPC-DS queries with scalar subquery partition filters (e.g., Q5, Q12, Q16, Q20, Q37, Q77, Q80, Q92, Q94, Q95) read all partitions in DSv2 scans even though the subquery resolves to a single value at runtime. This causes significant I/O overhead that DSv1 avoids.

Does this PR introduce any user-facing change?

No API changes. Queries with scalar subquery filters on partition columns will now benefit from partition pruning in DSv2 scans, reducing I/O.

How was this patch tested?

New unit test in DataSourceV2SQLSuiteV2Filter:

  • Creates a 10-partition table and a dimension table
  • Runs SELECT * FROM t WHERE part = (SELECT max(val) FROM dim)
  • Asserts query correctness, scalar subquery presence in runtimeFilters, and exactly 1 partition after pruning

Was this patch authored or co-authored using generative AI tooling?

Yes, co-authored with Claude Code.

…runtime filtering

### What changes were proposed in this pull request?

Scalar subquery filters on partition columns (e.g., `WHERE d_date_sk = (SELECT min(d_date_sk) FROM ...)`) are excluded from pushdown in DSv2 at every stage. The filter lands as a `FilterExec` above `BatchScanExec`, evaluated row-by-row. The scan reads all partitions -- no partition pruning occurs.

DSv1 already handles this: `FileSourceStrategy` puts subquery filters in `partitionFilters`, `isDynamicFilter` classifies them as dynamic, and `getPartitionPruningFilterFromBroadcast` calls `ScalarSubquery.toLiteral` at execution time for partition pruning via `listFiles()`.

This PR routes partition-column scalar subquery filters into `BatchScanExec.runtimeFilters`, leveraging the existing `SupportsRuntimeV2Filtering.filter()` infrastructure:

- **DataSourceV2Strategy**: When the scan implements `SupportsRuntimeV2Filtering`, extract subquery filters from `postScanFilters` where references are a subset of partition columns. Add to `runtimeFilters` alongside existing DPP filters. They remain in `postScanFilters` as a correctness safety net (V2 `filter()` is advisory).
- **BatchScanExec**: In `filteredPartitions`, non-DPP runtime filters are literalized (replacing `ExecScalarSubquery` with its resolved literal) and translated to V2 predicates via `translateFilterV2`.
- **InMemoryTableWithV2Filter** (test infra): Added `=` predicate handling in `filter()` alongside existing `IN`, plus a `case _ =>` catch-all.

No new interfaces, no config flags, no connector changes needed.

### Why are the changes needed?

TPC-DS queries with scalar subquery partition filters (e.g., Q5, Q12, Q16, Q20, Q37, Q77, Q80, Q92, Q94, Q95) read all partitions in DSv2 scans even though the subquery resolves to a single value at runtime. This causes significant I/O overhead that DSv1 avoids.

### Does this PR introduce _any_ user-facing change?

No API changes. Queries with scalar subquery filters on partition columns will now benefit from partition pruning in DSv2 scans, reducing I/O.

### How was this patch tested?

New unit test in `DataSourceV2SQLSuiteV2Filter`:
- Creates a 10-partition table and a dimension table
- Runs `SELECT * FROM t WHERE part = (SELECT max(val) FROM dim)`
- Asserts query correctness, scalar subquery presence in `runtimeFilters`, and exactly 1 partition after pruning

### Was this patch authored or co-authored using generative AI tooling?

Yes, co-authored with Claude Code.
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

Prior state and problem: DSv2 scans (BatchScanExec) only supported partition pruning via Dynamic Partition Pruning (DPP). Scalar subquery filters on partition columns (e.g., WHERE d_date_sk = (SELECT min(d_date_sk) FROM ...)) were evaluated row-by-row by FilterExec above the scan, with no partition pruning. DSv1 already handled this — FileSourceStrategy includes scalar subquery partition filters in partitionFilters, evaluated at runtime for file listing.

Design approach: Route scalar subquery partition filters into the existing runtimeFilters infrastructure. This mirrors the DSv1 approach while respecting the V2 architecture (where partition pruning goes through the connector's SupportsRuntimeV2Filtering.filter() API). The filters remain in postScanFilters as a safety net — the V2 filter() is advisory, so FilterExec still evaluates them. Subquery reuse (ReuseExchangeAndSubquery / ReuseAdaptiveSubquery) ensures the duplicated subquery expression is only executed once.

Implementation sketch:

  • DataSourceV2Strategy: Extracts scalar subquery filters whose column references are a subset of filterAttributes (partition columns). Appends them to runtimeFilters alongside DPP filters.
  • BatchScanExec.filteredPartitions: New path for non-DPP runtime filters — literalizes ExecScalarSubqueryLiteral, then translates to V2 Predicate via translateFilterV2.
  • InMemoryTableWithV2Filter (test infra): Adds = predicate handling alongside existing IN, plus a case _ => catch-all.
  • Test: Verifies query correctness, scalar subquery presence in runtimeFilters, and partition count after pruning.

val partitionSet = AttributeSet(partitionAttrs)
if (partitionSet.nonEmpty) {
postScanFilters.filter { f =>
SubqueryExpression.hasSubquery(f) &&
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SubqueryExpression.hasSubquery(f) matches all subquery types (scalar, IN, exists), but only scalar subqueries can be literalized in BatchScanExec. Non-scalar subqueries would be harmlessly skipped (they'd fail translateFilterV2), but they'd be unnecessarily added to runtimeFilters.

The DSv1 analogue in FileSourceStrategy is more precise — it specifically keeps scalar subquery expressions while filtering out other subquery types (line 194-195).

Consider using f.containsPattern(SCALAR_SUBQUERY) instead, which makes the intent explicit and avoids placing non-scalar subquery expressions in runtimeFilters.


override protected val catalogAndNamespace = "testv2filter.ns1.ns2."

test("SPARK-XXXXX: scalar subquery filters on partition columns are pushed into runtimeFilters") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placeholder JIRA ID — should be SPARK-56467.

Suggested change
test("SPARK-XXXXX: scalar subquery filters on partition columns are pushed into runtimeFilters") {
test("SPARK-56467: scalar subquery filters on partition columns are pushed into runtimeFilters") {

if (p.children().length == 2) {
val filterRef = p.children()(0).asInstanceOf[FieldReference].references.head
if (filterRef.toString.equals(ref.toString)) {
val matchingKey = p.children()(1).asInstanceOf[LiteralValue[_]].value.toString
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the scalar subquery returns NULL, toLiteral produces Literal(null, ...). After V2 translation, the = predicate's second child would be LiteralValue(null, ...). Calling .value.toString here would NPE for null values.

This is test infrastructure only (the FilterExec safety net handles NULL correctly in production), but worth guarding to avoid confusing test failures:

Suggested change
val matchingKey = p.children()(1).asInstanceOf[LiteralValue[_]].value.toString
val matchingKey = p.children()(1).asInstanceOf[LiteralValue[_]].value
if (matchingKey != null) {
data = data.filter(partition => {
val key = partition.asInstanceOf[BufferedRows].keyString()
key == matchingKey.toString
})
} else {
data = Seq.empty // NULL = anything is always false
}

- Use containsPattern(SCALAR_SUBQUERY) instead of hasSubquery to match only scalar subqueries, consistent with DSv1 path in FileSourceStrategy
- Add NULL guard in InMemoryTableWithV2Filter equality predicate to avoid NPE on null literal values
- Replace placeholder SPARK-XXXXX with SPARK-56467 in test name

Co-authored-by: Isaac
@anton5798
Copy link
Copy Markdown
Contributor Author

Thanks for the review! Addressed all three points:

  1. containsPattern(SCALAR_SUBQUERY): Switched from hasSubquery to match only scalar subqueries, consistent with the DSv1 path in FileSourceStrategy.
  2. NULL guard in test infra: Added null check in InMemoryTableWithV2Filter equality predicate to avoid NPE when scalar subquery evaluates to NULL.
  3. JIRA ID: Replaced placeholder SPARK-XXXXX with SPARK-56467.

Co-authored-by: Isaac
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! just a few suggestion, lgtm otherwise

// Non-DPP runtime filters (scalar subqueries on partition columns) are resolved here:
// each ExecScalarSubquery is replaced with its literal value, then the expression
// is translated to a V2 Predicate for the connector's filter() call.
val scalarSubqueryV2Filters = runtimeFilters.flatMap {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could be in the same block as above?

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, wdyt of making it utility method for better readability:

val dataSourceFilters = runtimeFilters.flatMap {
  case DynamicPruningExpression(e) =>
    DataSourceV2Strategy.translateRuntimeFilterV2(e)
  case f =>
    DataSourceV2Strategy.translateScalarSubqueryFilterV2(f)
}

The translateRuntimeFilterV2 has unfortunate name as its only DynamicFilters

on DSV2Strategy:

protected[sql] def translateScalarSubqueryFilterV2(expr: Expression): Option[Predicate] = {
  val literalized = expr.transform {
    case s: ScalarSubquery => s.toLiteral
  }
  translateFilterV2(literalized)
}

// partition pruning via SupportsRuntimeV2Filtering.filter().
val scalarSubqueryFilters = relation.scan match {
case s: SupportsRuntimeV2Filtering =>
val partitionAttrs = V2ExpressionUtils.resolveRefs[Attribute](
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: technically its not partition, as the API can return any attribute for 'filterAttribute' right? should we rename?

case s: SupportsRuntimeV2Filtering =>
val partitionAttrs = V2ExpressionUtils.resolveRefs[Attribute](
s.filterAttributes.toImmutableArraySeq, relation)
val partitionSet = AttributeSet(partitionAttrs)
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, might be cleaner to have lazy val on DSV2ScanRelation for runtimeFilterAttributes. But just code preference, wdyt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants