You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
[Umbrella] Cost-based optimization for the multi-stage query engine
Motivation
The multi-stage query engine plans queries with Apache Calcite, but optimization today is purely
rule-based (HepPlanner): join order is whatever the SQL says, join strategies are chosen via
explicit hints, and the planner has no cardinality information (PinotTable does not expose a
Calcite Statistic; there is a long-standing TODO: add support for cost factory in QueryEnvironment). Calcite already ships the machinery for cost-based decisions — what Pinot is
missing is statistics at the broker and the integration to consume them.
This issue tracks the work to introduce cost-based optimization incrementally, gated and off by
default at every step.
Why it matters (measured)
On TPC-H SF=1 (single-server quickstart, 150 samples per variant, medians with bootstrap 95% CIs),
for queries deliberately written in a poor syntactic join order (smallest tables first):
query
literal SQL order
with cost-based reorder
hand-optimized SQL
customer⋈orders⋈lineitem (filtered)
549 ms [516, 570]
462 ms [438, 479]
412 ms [390, 430]
region⋈nation⋈supplier⋈lineitem (filtered)
253 ms [244, 277]
80 ms [80, 81]
207 ms [204, 211]
In the second query the optimizer found a bushy plan (reduce the dimension chain first, then a
single probe over the fact table) that is 2.6× faster than the hand-optimized left-deep SQL
and 3.2× faster than the literal order. Results are identical in all variants, and the reorder
phase adds no measurable planning overhead when it decides to change nothing.
Design overview
Three pillars (detailed design in the linked doc / PR descriptions):
Broker statistics subsystem. Per-segment stats collected from ZooKeeper metadata the broker
already watches (row count, size, time boundaries — effectively free), persisted off-heap in an
embedded SQLite store (bounded heap, warm restart, crc-based reconciliation). Per-column stats
(NDV, min/max, avg byte size) come later via a bounded server fan-out behind a swappable ColumnStatsSource. Every stat carries a confidence tier so that table types where raw
doc counts are biased (upsert/dedup, consuming segments, hybrid time-boundary overlap) degrade
to today's behavior instead of producing silently-wrong plans.
Cost model. Rows-dominated for logical join ordering; extended later with columnar byte
sizes for exchange-aware decisions (shuffle/broadcast cost is the dominant term in MSE).
Planner integration.PinotTable#getStatistic() + a chained RelMetadataProvider
(row counts, selectivity incl. time-range estimation from segment time boundaries) feed
Calcite's RelMetadataQuery; a gated join-reorder phase (useJoinReorder query option,
default off) runs between the logical and physical planning phases with strict eligibility
gates (inner joins only, no hinted joins, all scans must have trusted row counts, join-count
cap, fall back to the original plan on any error).
Phase 1 — statistics foundation + join reordering (implemented in #18741; can be split into stacked PRs on request)
PR: broker T0 stats collection from ZK segment metadata (pinot.broker.stats.enabled,
default false) + table-type semantics (hybrid merge at the time boundary, upsert/dedup and
consuming-segment confidence)
PR: planner wiring (PinotTable#getStatistic(), statistics provider through QueryEnvironment.Config)
PR: RelMetadataProvider with stats-backed row counts and selectivity (incl. time-range
selectivity from segment time boundaries)
PR: rows-dominated RelOptCost implementation
PR: gated cost-based join-reorder phase + guardrails (join cap, hint veto, error fallback)
and plan-level tests
PR (independent): quickstart fixes discovered along the way (-configFile dropped by some
quickstarts; multiple -bootstrapTableDir support)
Phase 2 — column statistics and standalone payoffs
Server endpoint audit/extension for lean per-segment column stats (NDV, min/max, avg bytes)
Selectivity refinement: NDV-based equality selectivity, min/max range selectivity,
null-sentinel handling (numeric null default pollutes min — track per-column trust)
Broker-side min/max segment pruning for the single-stage engine (the per-column min/max in
the off-heap store removes the historical reason this lives only on servers — see the
existing TODO in ColumnValueSegmentPruner)
Stats observability: broker metrics (stats age, store size, fallback reasons), EXPLAIN
annotations showing the estimates used, admin purge endpoint
Phase 3 — cost-based physical optimization
The current physical optimizer applies a well-defined sequence of transformation rules and has
served the engine well; by construction it produces a single plan rather than comparing
alternatives by cost. Two tracks:
Short term: build-side normalization — use row counts to place the smaller input on the
hash-build side of a join. Benchmarks above show the logical reorderer minimizes
intermediate cardinality but cannot account for the engine's build-side convention (the
remaining 462 vs 412 ms gap in query 1); this is a small, well-contained improvement.
Longer term: a design proposal for a cost-based physical planning phase — a two-phase
search (bottom-up enumeration of server/distribution candidates per operator, top-down
worker assignment) where colocation, broadcast-vs-hash, aggregate placement and parallelism
become cost-driven decisions sharing the same statistics and cost model. This deserves its
own design document and discussion with the community before any code.
Cross-cutting
Vendor extensibility: statistics source, cost model and rule sets pluggable via the existing RuleSetCustomizer-style SPI pattern
Documentation (config reference, design notes, operations guide for the stats store)
Compatibility and safety
Everything is off by default (pinot.broker.stats.enabled=false; useJoinReorder query
option default false) and broker-local — no wire-format or mixed-version concerns.
Stats-store failures never fail a query: all read paths degrade to the no-stats behavior.
Note for reviewers: enabling stats collection changes the cardinality estimates visible to
all MSE planner rules (never correctness); the join-reorder phase is additionally gated by its
own option.
New dependencies: org.xerial:sqlite-jdbc, org.flywaydb:flyway-core (broker only,
LICENSE-binary updated).
[Umbrella] Cost-based optimization for the multi-stage query engine
Motivation
The multi-stage query engine plans queries with Apache Calcite, but optimization today is purely
rule-based (
HepPlanner): join order is whatever the SQL says, join strategies are chosen viaexplicit hints, and the planner has no cardinality information (
PinotTabledoes not expose aCalcite
Statistic; there is a long-standingTODO: add support for cost factoryinQueryEnvironment). Calcite already ships the machinery for cost-based decisions — what Pinot ismissing is statistics at the broker and the integration to consume them.
This issue tracks the work to introduce cost-based optimization incrementally, gated and off by
default at every step.
Why it matters (measured)
On TPC-H SF=1 (single-server quickstart, 150 samples per variant, medians with bootstrap 95% CIs),
for queries deliberately written in a poor syntactic join order (smallest tables first):
customer⋈orders⋈lineitem(filtered)region⋈nation⋈supplier⋈lineitem(filtered)In the second query the optimizer found a bushy plan (reduce the dimension chain first, then a
single probe over the fact table) that is 2.6× faster than the hand-optimized left-deep SQL
and 3.2× faster than the literal order. Results are identical in all variants, and the reorder
phase adds no measurable planning overhead when it decides to change nothing.
Design overview
Three pillars (detailed design in the linked doc / PR descriptions):
already watches (row count, size, time boundaries — effectively free), persisted off-heap in an
embedded SQLite store (bounded heap, warm restart, crc-based reconciliation). Per-column stats
(NDV, min/max, avg byte size) come later via a bounded server fan-out behind a swappable
ColumnStatsSource. Every stat carries a confidence tier so that table types where rawdoc counts are biased (upsert/dedup, consuming segments, hybrid time-boundary overlap) degrade
to today's behavior instead of producing silently-wrong plans.
sizes for exchange-aware decisions (shuffle/broadcast cost is the dominant term in MSE).
PinotTable#getStatistic()+ a chainedRelMetadataProvider(row counts, selectivity incl. time-range estimation from segment time boundaries) feed
Calcite's
RelMetadataQuery; a gated join-reorder phase (useJoinReorderquery option,default off) runs between the logical and physical planning phases with strict eligibility
gates (inner joins only, no hinted joins, all scans must have trusted row counts, join-count
cap, fall back to the original plan on any error).
Phase 1 — statistics foundation + join reordering (implemented in #18741; can be split into stacked PRs on request)
PinotStatisticsProvider,TableStatistics,ColumnStatistics,StatConfidence; broker-sideStatsStore/ColumnStatsSourceSPIs)StatsStore(WAL, Flyway-migrated schema, corruption auto-recovery)pinot.broker.stats.enabled,default false) + table-type semantics (hybrid merge at the time boundary, upsert/dedup and
consuming-segment confidence)
PinotTable#getStatistic(), statistics provider throughQueryEnvironment.Config)RelMetadataProviderwith stats-backed row counts and selectivity (incl. time-rangeselectivity from segment time boundaries)
RelOptCostimplementationand plan-level tests
-configFiledropped by somequickstarts; multiple
-bootstrapTableDirsupport)Phase 2 — column statistics and standalone payoffs
ColumnStatsSourcebroker-pull implementation (rate-limited, jittered, debounced onrebalance storms; crc-delta fetch)
null-sentinel handling (numeric null default pollutes min — track per-column trust)
the off-heap store removes the historical reason this lives only on servers — see the
existing
TODOinColumnValueSegmentPruner)annotations showing the estimates used, admin purge endpoint
Phase 3 — cost-based physical optimization
The current physical optimizer applies a well-defined sequence of transformation rules and has
served the engine well; by construction it produces a single plan rather than comparing
alternatives by cost. Two tracks:
hash-build side of a join. Benchmarks above show the logical reorderer minimizes
intermediate cardinality but cannot account for the engine's build-side convention (the
remaining 462 vs 412 ms gap in query 1); this is a small, well-contained improvement.
search (bottom-up enumeration of server/distribution candidates per operator, top-down
worker assignment) where colocation, broadcast-vs-hash, aggregate placement and parallelism
become cost-driven decisions sharing the same statistics and cost model. This deserves its
own design document and discussion with the community before any code.
Cross-cutting
RuleSetCustomizer-style SPI patternCompatibility and safety
pinot.broker.stats.enabled=false;useJoinReorderqueryoption default false) and broker-local — no wire-format or mixed-version concerns.
all MSE planner rules (never correctness); the join-reorder phase is additionally gated by its
own option.
org.xerial:sqlite-jdbc,org.flywaydb:flyway-core(broker only,LICENSE-binary updated).