Skip to content

[Umbrella] Cost-based optimization for the multi-stage query engine #18740

@gortiz

Description

@gortiz

[Umbrella] Cost-based optimization for the multi-stage query engine

Motivation

The multi-stage query engine plans queries with Apache Calcite, but optimization today is purely
rule-based (HepPlanner): join order is whatever the SQL says, join strategies are chosen via
explicit hints, and the planner has no cardinality information (PinotTable does not expose a
Calcite Statistic; there is a long-standing TODO: add support for cost factory in
QueryEnvironment). Calcite already ships the machinery for cost-based decisions — what Pinot is
missing is statistics at the broker and the integration to consume them.

This issue tracks the work to introduce cost-based optimization incrementally, gated and off by
default at every step.

Why it matters (measured)

On TPC-H SF=1 (single-server quickstart, 150 samples per variant, medians with bootstrap 95% CIs),
for queries deliberately written in a poor syntactic join order (smallest tables first):

query literal SQL order with cost-based reorder hand-optimized SQL
customer⋈orders⋈lineitem (filtered) 549 ms [516, 570] 462 ms [438, 479] 412 ms [390, 430]
region⋈nation⋈supplier⋈lineitem (filtered) 253 ms [244, 277] 80 ms [80, 81] 207 ms [204, 211]

In the second query the optimizer found a bushy plan (reduce the dimension chain first, then a
single probe over the fact table) that is 2.6× faster than the hand-optimized left-deep SQL
and 3.2× faster than the literal order. Results are identical in all variants, and the reorder
phase adds no measurable planning overhead when it decides to change nothing.

Design overview

Three pillars (detailed design in the linked doc / PR descriptions):

  1. Broker statistics subsystem. Per-segment stats collected from ZooKeeper metadata the broker
    already watches (row count, size, time boundaries — effectively free), persisted off-heap in an
    embedded SQLite store (bounded heap, warm restart, crc-based reconciliation). Per-column stats
    (NDV, min/max, avg byte size) come later via a bounded server fan-out behind a swappable
    ColumnStatsSource. Every stat carries a confidence tier so that table types where raw
    doc counts are biased (upsert/dedup, consuming segments, hybrid time-boundary overlap) degrade
    to today's behavior instead of producing silently-wrong plans.
  2. Cost model. Rows-dominated for logical join ordering; extended later with columnar byte
    sizes for exchange-aware decisions (shuffle/broadcast cost is the dominant term in MSE).
  3. Planner integration. PinotTable#getStatistic() + a chained RelMetadataProvider
    (row counts, selectivity incl. time-range estimation from segment time boundaries) feed
    Calcite's RelMetadataQuery; a gated join-reorder phase (useJoinReorder query option,
    default off) runs between the logical and physical planning phases with strict eligibility
    gates (inner joins only, no hinted joins, all scans must have trusted row counts, join-count
    cap, fall back to the original plan on any error).

Phase 1 — statistics foundation + join reordering (implemented in #18741; can be split into stacked PRs on request)

  • PR: statistics contracts (PinotStatisticsProvider, TableStatistics, ColumnStatistics,
    StatConfidence; broker-side StatsStore / ColumnStatsSource SPIs)
  • PR: SQLite-backed StatsStore (WAL, Flyway-migrated schema, corruption auto-recovery)
  • PR: broker T0 stats collection from ZK segment metadata (pinot.broker.stats.enabled,
    default false) + table-type semantics (hybrid merge at the time boundary, upsert/dedup and
    consuming-segment confidence)
  • PR: planner wiring (PinotTable#getStatistic(), statistics provider through
    QueryEnvironment.Config)
  • PR: RelMetadataProvider with stats-backed row counts and selectivity (incl. time-range
    selectivity from segment time boundaries)
  • PR: rows-dominated RelOptCost implementation
  • PR: gated cost-based join-reorder phase + guardrails (join cap, hint veto, error fallback)
    and plan-level tests
  • PR (independent): quickstart fixes discovered along the way (-configFile dropped by some
    quickstarts; multiple -bootstrapTableDir support)

Phase 2 — column statistics and standalone payoffs

  • Server endpoint audit/extension for lean per-segment column stats (NDV, min/max, avg bytes)
  • ColumnStatsSource broker-pull implementation (rate-limited, jittered, debounced on
    rebalance storms; crc-delta fetch)
  • Selectivity refinement: NDV-based equality selectivity, min/max range selectivity,
    null-sentinel handling (numeric null default pollutes min — track per-column trust)
  • Broker-side min/max segment pruning for the single-stage engine (the per-column min/max in
    the off-heap store removes the historical reason this lives only on servers — see the
    existing TODO in ColumnValueSegmentPruner)
  • Stats observability: broker metrics (stats age, store size, fallback reasons), EXPLAIN
    annotations showing the estimates used, admin purge endpoint

Phase 3 — cost-based physical optimization

The current physical optimizer applies a well-defined sequence of transformation rules and has
served the engine well; by construction it produces a single plan rather than comparing
alternatives by cost. Two tracks:

  • Short term: build-side normalization — use row counts to place the smaller input on the
    hash-build side of a join. Benchmarks above show the logical reorderer minimizes
    intermediate cardinality but cannot account for the engine's build-side convention (the
    remaining 462 vs 412 ms gap in query 1); this is a small, well-contained improvement.
  • Longer term: a design proposal for a cost-based physical planning phase — a two-phase
    search (bottom-up enumeration of server/distribution candidates per operator, top-down
    worker assignment) where colocation, broadcast-vs-hash, aggregate placement and parallelism
    become cost-driven decisions sharing the same statistics and cost model. This deserves its
    own design document and discussion with the community before any code.

Cross-cutting

  • Vendor extensibility: statistics source, cost model and rule sets pluggable via the existing
    RuleSetCustomizer-style SPI pattern
  • Documentation (config reference, design notes, operations guide for the stats store)

Compatibility and safety

  • Everything is off by default (pinot.broker.stats.enabled=false; useJoinReorder query
    option default false) and broker-local — no wire-format or mixed-version concerns.
  • Stats-store failures never fail a query: all read paths degrade to the no-stats behavior.
  • Note for reviewers: enabling stats collection changes the cardinality estimates visible to
    all MSE planner rules (never correctness); the join-reorder phase is additionally gated by its
    own option.
  • New dependencies: org.xerial:sqlite-jdbc, org.flywaydb:flyway-core (broker only,
    LICENSE-binary updated).

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureNew functionalitymulti-stageRelated to the multi-stage query engine

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions