diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 818e06e8b81f..fdaeaf1df783 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -21,7 +21,7 @@ use arrow_schema::{DataType, Fields, SchemaBuilder}; use crate::arrow::ProjectionMask; use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; -use crate::arrow::array_reader::cached_array_reader::CacheRole; +use crate::arrow::array_reader::cached_array_reader::{CacheMode, CacheRole}; use crate::arrow::array_reader::cached_array_reader::CachedArrayReader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; @@ -65,6 +65,8 @@ impl<'a> CacheOptionsBuilder<'a> { projection_mask: self.projection_mask, cache: self.cache, role: CacheRole::Producer, + mode: CacheMode::Raw, + exact_selected_column: None, } } @@ -74,6 +76,30 @@ impl<'a> CacheOptionsBuilder<'a> { projection_mask: self.projection_mask, cache: self.cache, role: CacheRole::Consumer, + mode: CacheMode::Raw, + exact_selected_column: None, + } + } + + /// Return producer cache options for exact-selected single-column mode + pub fn producer_exact_selected(self, column_idx: usize) -> CacheOptions<'a> { + CacheOptions { + projection_mask: self.projection_mask, + cache: self.cache, + role: CacheRole::Producer, + mode: CacheMode::ExactSelected, + exact_selected_column: Some(column_idx), + } + } + + /// Return consumer cache options for exact-selected single-column mode + pub fn consumer_exact_selected(self, column_idx: usize) -> CacheOptions<'a> { + CacheOptions { + projection_mask: self.projection_mask, + cache: self.cache, + role: CacheRole::Consumer, + mode: CacheMode::ExactSelected, + exact_selected_column: Some(column_idx), } } } @@ -84,6 +110,9 @@ pub struct CacheOptions<'a> { pub projection_mask: &'a ProjectionMask, pub cache: &'a Arc>, pub role: CacheRole, + pub mode: CacheMode, + /// Optional target column for exact-selected mode + pub exact_selected_column: Option, } /// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader @@ -154,11 +183,22 @@ impl<'a> ArrayReaderBuilder<'a> { }; if cache_options.projection_mask.leaf_included(col_idx) { - Ok(Some(Box::new(CachedArrayReader::new( + // ExactSelected mode only applies to the one projected column + // captured from predicate evaluation. Other projected columns + // keep regular Raw cache behavior. + let mode = if cache_options.mode == CacheMode::ExactSelected + && cache_options.exact_selected_column == Some(col_idx) + { + CacheMode::ExactSelected + } else { + CacheMode::Raw + }; + Ok(Some(Box::new(CachedArrayReader::new_with_mode( reader, Arc::clone(cache_options.cache), col_idx, cache_options.role, + mode, self.metrics.clone(), // cheap clone )))) } else { diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 21f0c2afa410..2596fa6e3194 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -37,6 +37,15 @@ pub enum CacheRole { Consumer, } +/// Cache mode for cached readers +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CacheMode { + /// Existing behavior: cache raw decoded batches and filter in consumer + Raw, + /// Single-filter fast-path: cache selected rows and consume directly + ExactSelected, +} + /// A cached wrapper around an ArrayReader that avoids duplicate decoding /// when the same column appears in both filter predicates and output projection. /// @@ -82,9 +91,17 @@ pub struct CachedArrayReader { selections: BooleanBufferBuilder, /// Role of this reader (Producer or Consumer) role: CacheRole, + /// Cache mode for this reader + mode: CacheMode, /// Local cache to store batches between read_records and consume_batch calls /// This ensures data is available even if the shared cache evicts items local_cache: HashMap, + /// Current selected cached chunk (ExactSelected consumer mode) + selected_chunk: Option<(ArrayRef, usize)>, + /// Staged selected arrays to return from consume_batch (ExactSelected consumer mode) + selected_staged: Vec, + /// Last batch id (exclusive) already cleaned from shared cache + last_cleaned_batch_id: usize, /// Statistics to report on the Cache behavior metrics: ArrowReaderMetrics, } @@ -97,6 +114,25 @@ impl CachedArrayReader { column_idx: usize, role: CacheRole, metrics: ArrowReaderMetrics, + ) -> Self { + Self::new_with_mode( + inner, + cache, + column_idx, + role, + CacheMode::Raw, + metrics, + ) + } + + /// Creates a new cached array reader with an explicit cache mode + pub fn new_with_mode( + inner: Box, + cache: Arc>, + column_idx: usize, + role: CacheRole, + mode: CacheMode, + metrics: ArrowReaderMetrics, ) -> Self { let batch_size = cache.read().unwrap().batch_size(); @@ -109,11 +145,19 @@ impl CachedArrayReader { batch_size, selections: BooleanBufferBuilder::new(0), role, + mode, local_cache: HashMap::new(), + selected_chunk: None, + selected_staged: Vec::new(), + last_cleaned_batch_id: 0, metrics, } } + fn is_exact_selected_consumer(&self) -> bool { + self.role == CacheRole::Consumer && self.mode == CacheMode::ExactSelected + } + fn get_batch_id_from_position(&self, row_id: usize) -> BatchID { BatchID { val: row_id / self.batch_size, @@ -149,13 +193,15 @@ impl CachedArrayReader { // Store in both shared cache and local cache // The shared cache is used to reuse results between readers // The local cache ensures data is available for our consume_batch call - let _cached = - self.shared_cache - .write() - .unwrap() - .insert(self.column_idx, batch_id, array.clone()); - // Note: if the shared cache is full (_cached == false), we continue without caching - // The local cache will still store the data for this reader's use + if !(self.mode == CacheMode::ExactSelected && self.role == CacheRole::Producer) { + let _cached = + self.shared_cache + .write() + .unwrap() + .insert(self.column_idx, batch_id, array.clone()); + // Note: if the shared cache is full (_cached == false), we continue without caching + // The local cache will still store the data for this reader's use + } self.local_cache.insert(batch_id, array); @@ -172,8 +218,14 @@ impl CachedArrayReader { // This ensures we don't remove batches that might still be needed for the current batch // We can safely remove batch_id if current_batch_id > batch_id + 1 if current_batch_id.val > 1 { + // Only clean newly completed ranges. This avoids repeated remove scans + // over already-cleaned batch ids. + let cleanup_end = current_batch_id.val - 1; + if cleanup_end <= self.last_cleaned_batch_id { + return; + } let mut cache = self.shared_cache.write().unwrap(); - for batch_id_to_remove in 0..(current_batch_id.val - 1) { + for batch_id_to_remove in self.last_cleaned_batch_id..cleanup_end { cache.remove( self.column_idx, BatchID { @@ -181,6 +233,7 @@ impl CachedArrayReader { }, ); } + self.last_cleaned_batch_id = cleanup_end; } } } @@ -195,6 +248,9 @@ impl ArrayReader for CachedArrayReader { } fn read_records(&mut self, num_records: usize) -> Result { + if self.is_exact_selected_consumer() { + return self.read_records_exact_selected(num_records); + } let mut read = 0; while read < num_records { let batch_id = self.get_batch_id_from_position(self.outer_position); @@ -257,6 +313,9 @@ impl ArrayReader for CachedArrayReader { } fn skip_records(&mut self, num_records: usize) -> Result { + if self.is_exact_selected_consumer() { + return Ok(num_records); + } let mut skipped = 0; while skipped < num_records { let size = std::cmp::min(num_records - skipped, self.batch_size); @@ -268,6 +327,9 @@ impl ArrayReader for CachedArrayReader { } fn consume_batch(&mut self) -> Result { + if self.is_exact_selected_consumer() { + return self.consume_exact_selected_batch(); + } let row_count = self.selections.len(); if row_count == 0 { return Ok(new_empty_array(self.inner.get_data_type())); @@ -348,6 +410,63 @@ impl ArrayReader for CachedArrayReader { } } +impl CachedArrayReader { + fn read_records_exact_selected(&mut self, num_records: usize) -> Result { + // ExactSelected consumer mode receives predicate-selected arrays in FIFO + // order from the shared cache and streams them directly. + let mut read = 0; + while read < num_records { + let need_next = match self.selected_chunk.as_ref() { + None => true, + Some((array, offset)) => *offset >= array.len(), + }; + if need_next { + self.selected_chunk = self + .shared_cache + .write() + .unwrap() + .pop_selected(self.column_idx) + .map(|array| (array, 0)); + } + + let Some((array, offset)) = self.selected_chunk.as_mut() else { + break; + }; + + let available = array.len().saturating_sub(*offset); + if available == 0 { + self.selected_chunk = None; + continue; + } + + let take = std::cmp::min(num_records - read, available); + self.selected_staged.push(array.slice(*offset, take)); + *offset += take; + read += take; + self.metrics.increment_cache_reads(take); + } + Ok(read) + } + + fn consume_exact_selected_batch(&mut self) -> Result { + if self.selected_staged.is_empty() { + return Ok(new_empty_array(self.inner.get_data_type())); + } + + if self.selected_staged.len() == 1 { + return Ok(self.selected_staged.pop().unwrap()); + } + + let arrays = self + .selected_staged + .drain(..) + .collect::>(); + Ok(arrow_select::concat::concat( + &arrays.iter().map(|a| a.as_ref()).collect::>(), + )?) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 726eae1f51c5..c44b9e8ef21f 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -54,6 +54,7 @@ mod test_util; // Note that this crate is public under the `experimental` feature flag. use crate::file::metadata::RowGroupMetaData; pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder}; +pub use cached_array_reader::CacheMode; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks diff --git a/parquet/src/arrow/array_reader/row_group_cache.rs b/parquet/src/arrow/array_reader/row_group_cache.rs index ef726e16495f..a6515b2ef81c 100644 --- a/parquet/src/arrow/array_reader/row_group_cache.rs +++ b/parquet/src/arrow/array_reader/row_group_cache.rs @@ -17,7 +17,7 @@ use arrow_array::{Array, ArrayRef}; use arrow_schema::DataType; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; /// Starting row ID for this batch /// @@ -60,10 +60,21 @@ fn get_array_memory_size_for_cache(array: &ArrayRef) -> usize { /// /// This cache is designed to avoid duplicate decoding when the same column /// appears in both filter predicates and output projection. +#[derive(Debug)] +struct CacheEntry { + array: ArrayRef, + size: usize, +} + #[derive(Debug)] pub struct RowGroupCache { /// Cache storage mapping (column_idx, row_id) -> ArrayRef - cache: HashMap, + cache: HashMap, + /// Exact-selected cache stream per column (single-filter fast-path). + /// + /// Entries are queued in producer order and consumed FIFO by the matching + /// projection reader, so no additional row filtering is required. + selected_cache: HashMap>, /// Cache granularity batch_size: usize, /// Maximum cache size in bytes @@ -77,6 +88,7 @@ impl RowGroupCache { pub fn new(batch_size: usize, max_cache_bytes: usize) -> Self { Self { cache: HashMap::new(), + selected_cache: HashMap::new(), batch_size, max_cache_bytes, current_cache_size: 0, @@ -98,7 +110,13 @@ impl RowGroupCache { batch_id, }; - let existing = self.cache.insert(key, array); + let existing = self.cache.insert( + key, + CacheEntry { + array, + size: array_size, + }, + ); assert!(existing.is_none()); self.current_cache_size += array_size; true @@ -111,7 +129,7 @@ impl RowGroupCache { column_idx, batch_id, }; - self.cache.get(&key).cloned() + self.cache.get(&key).map(|entry| entry.array.clone()) } /// Gets the batch size for this cache @@ -126,13 +144,41 @@ impl RowGroupCache { column_idx, batch_id, }; - if let Some(array) = self.cache.remove(&key) { - self.current_cache_size -= get_array_memory_size_for_cache(&array); + if let Some(entry) = self.cache.remove(&key) { + self.current_cache_size = self.current_cache_size.saturating_sub(entry.size); true } else { false } } + + /// Inserts selected rows for a cached column (ExactSelected mode) + pub fn insert_selected(&mut self, column_idx: usize, array: ArrayRef) -> bool { + let array_size = get_array_memory_size_for_cache(&array); + if self.current_cache_size + array_size > self.max_cache_bytes { + return false; + } + self.selected_cache + .entry(column_idx) + .or_default() + .push_back(CacheEntry { + array, + size: array_size, + }); + self.current_cache_size += array_size; + true + } + + /// Pops the next selected batch for a column (ExactSelected mode) + pub fn pop_selected(&mut self, column_idx: usize) -> Option { + let queue = self.selected_cache.get_mut(&column_idx)?; + let entry = queue.pop_front()?; + self.current_cache_size = self.current_cache_size.saturating_sub(entry.size); + if queue.is_empty() { + self.selected_cache.remove(&column_idx); + } + Some(entry.array) + } } #[cfg(test)] diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 670f9d80c5a3..c8ebe9e8c5eb 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -49,7 +49,7 @@ use crate::schema::types::SchemaDescriptor; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; // Exposed so integration tests and benchmarks can temporarily override the threshold. -pub use read_plan::{ReadPlan, ReadPlanBuilder}; +pub use read_plan::{PredicateBatchObserver, ReadPlan, ReadPlanBuilder}; mod filter; pub mod metrics; @@ -57,6 +57,18 @@ mod read_plan; pub(crate) mod selection; pub mod statistics; +/// How pushed-down predicates are evaluated during async / push decoding. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum PushdownFilterEvalMode { + /// Existing behavior: evaluate predicates on predicate-specific projections + /// and materialize the final output using row selections. + #[default] + RowSelection, + /// Decode full output projection during predicate evaluation, then apply + /// predicates from that decoded data (FilterExec-like behavior). + DecodeProjectionThenFilter, +} + /// Builder for constructing Parquet readers that decode into [Apache Arrow] /// arrays. /// @@ -139,6 +151,8 @@ pub struct ArrowReaderBuilder { pub(crate) metrics: ArrowReaderMetrics, pub(crate) max_predicate_cache_size: usize, + + pub(crate) pushdown_filter_eval_mode: PushdownFilterEvalMode, } impl Debug for ArrowReaderBuilder { @@ -157,6 +171,7 @@ impl Debug for ArrowReaderBuilder { .field("limit", &self.limit) .field("offset", &self.offset) .field("metrics", &self.metrics) + .field("pushdown_filter_eval_mode", &self.pushdown_filter_eval_mode) .finish() } } @@ -178,6 +193,7 @@ impl ArrowReaderBuilder { offset: None, metrics: ArrowReaderMetrics::Disabled, max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size + pushdown_filter_eval_mode: PushdownFilterEvalMode::default(), } } @@ -430,6 +446,17 @@ impl ArrowReaderBuilder { ..self } } + + /// Configure how pushed-down predicates are evaluated during async / push decoding. + pub fn with_pushdown_filter_eval_mode( + self, + pushdown_filter_eval_mode: PushdownFilterEvalMode, + ) -> Self { + Self { + pushdown_filter_eval_mode, + ..self + } + } } /// Options that control how [`ParquetMetaData`] is read when constructing @@ -1188,6 +1215,8 @@ impl ParquetRecordBatchReaderBuilder { metrics, // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000 max_predicate_cache_size: _, + // Not used for the sync reader + pushdown_filter_eval_mode: _, } = self; // Try to avoid allocate large buffer diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 7c9eb36befe3..67051fe9471f 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -25,10 +25,19 @@ use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector, }; use crate::errors::{ParquetError, Result}; -use arrow_array::Array; +use arrow_array::{Array, BooleanArray, RecordBatch}; use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; +/// Optional observer hook for predicate evaluation batches +pub trait PredicateBatchObserver { + /// Observe each predicate evaluation batch and its normalized boolean filter. + /// + /// This is used by specialized fast-paths to capture predicate-selected + /// values during filtering, so a later projection pass can be avoided. + fn observe(&mut self, batch: &RecordBatch, filter: &BooleanArray) -> Result<()>; +} + /// A builder for [`ReadPlan`] #[derive(Clone, Debug)] pub struct ReadPlanBuilder { @@ -144,16 +153,26 @@ impl ReadPlanBuilder { /// or if the [`ParquetRecordBatchReader`] specified an explicit /// [`RowSelection`] in addition to one or more predicates. pub fn with_predicate( + self, + array_reader: Box, + predicate: &mut dyn ArrowPredicate, + ) -> Result { + self.with_predicate_with_observer(array_reader, predicate, None) + } + + /// Same as [`Self::with_predicate`] but allows observing each predicate batch. + pub fn with_predicate_with_observer( mut self, array_reader: Box, predicate: &mut dyn ArrowPredicate, + mut observer: Option<&mut dyn PredicateBatchObserver>, ) -> Result { let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()); let mut filters = vec![]; for maybe_batch in reader { let maybe_batch = maybe_batch?; let input_rows = maybe_batch.num_rows(); - let filter = predicate.evaluate(maybe_batch)?; + let filter = predicate.evaluate(maybe_batch.clone())?; // Since user supplied predicate, check error here to catch bugs quickly if filter.len() != input_rows { return Err(arrow_err!( @@ -161,10 +180,14 @@ impl ReadPlanBuilder { filter.len() )); } - match filter.null_count() { - 0 => filters.push(filter), - _ => filters.push(prep_null_mask_filter(&filter)), + let filter = match filter.null_count() { + 0 => filter, + _ => prep_null_mask_filter(&filter), }; + if let Some(observer) = observer.as_deref_mut() { + observer.observe(&maybe_batch, &filter)?; + } + filters.push(filter); } let raw = RowSelection::from_filters(&filters); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 9e45a0c3168c..d4169f9d031f 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -497,6 +497,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, + pushdown_filter_eval_mode, } = self; // Ensure schema of ParquetRecordBatchStream respects projection, and does @@ -522,6 +523,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, + pushdown_filter_eval_mode, } .build()?; diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index cdb0715edb55..807fe8ef7455 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -176,6 +176,7 @@ impl ParquetPushDecoderBuilder { metrics, row_selection_policy, max_predicate_cache_size, + pushdown_filter_eval_mode, } = self; // If no row groups were specified, read all of them @@ -197,6 +198,7 @@ impl ParquetPushDecoderBuilder { max_predicate_cache_size, buffers, row_selection_policy, + pushdown_filter_eval_mode, ); // Initialize the decoder with the configured options diff --git a/parquet/src/arrow/push_decoder/reader_builder/filter.rs b/parquet/src/arrow/push_decoder/reader_builder/filter.rs index 380211cca66e..2f50f8c83c5d 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/filter.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/filter.rs @@ -18,7 +18,7 @@ //! [`FilterInfo`] state machine for evaluating row filters use crate::arrow::ProjectionMask; -use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache}; +use crate::arrow::array_reader::{CacheMode, CacheOptions, CacheOptionsBuilder, RowGroupCache}; use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use std::num::NonZeroUsize; use std::sync::{Arc, RwLock}; @@ -52,22 +52,64 @@ pub(super) struct CacheInfo { /// if we have a filter like `(a + 10 > 5) AND (a + b = 0)` we cache `a` to avoid re-reading it between evaluating `a + 10 > 5` and `a + b = 0`. cache_projection: ProjectionMask, row_group_cache: Arc>, + /// Whether the cache stores raw decoded batches or exact-selected rows. + mode: CacheMode, + /// Target column index for ExactSelected mode. + exact_selected_column: Option, + /// Whether this filter uses projection-driven predicate evaluation. + use_projection_filter_eval: bool, } impl CacheInfo { pub(super) fn new( cache_projection: ProjectionMask, row_group_cache: Arc>, + mode: CacheMode, + exact_selected_column: Option, + use_projection_filter_eval: bool, ) -> Self { Self { cache_projection, row_group_cache, + mode, + exact_selected_column, + use_projection_filter_eval, } } pub(super) fn builder(&self) -> CacheOptionsBuilder<'_> { CacheOptionsBuilder::new(&self.cache_projection, &self.row_group_cache) } + + pub(super) fn producer_options(&self) -> CacheOptions<'_> { + match (self.mode, self.exact_selected_column) { + (CacheMode::ExactSelected, Some(column_idx)) => { + self.builder().producer_exact_selected(column_idx) + } + _ => self.builder().producer(), + } + } + + pub(super) fn consumer_options(&self) -> CacheOptions<'_> { + match (self.mode, self.exact_selected_column) { + (CacheMode::ExactSelected, Some(column_idx)) => { + self.builder().consumer_exact_selected(column_idx) + } + _ => self.builder().consumer(), + } + } + + pub(super) fn exact_selected_column(&self) -> Option { + if self.mode == CacheMode::ExactSelected { + self.exact_selected_column + } else { + None + } + } + + pub(super) fn use_projection_filter_eval(&self) -> bool { + self.use_projection_filter_eval + } } pub(super) enum AdvanceResult { @@ -131,9 +173,16 @@ impl FilterInfo { &self.cache_info.cache_projection } - /// Return a cache builder to save the results of predicate evaluation - pub(super) fn cache_builder(&self) -> CacheOptionsBuilder<'_> { - self.cache_info.builder() + pub(super) fn producer_cache_options(&self) -> CacheOptions<'_> { + self.cache_info.producer_options() + } + + pub(super) fn exact_selected_column(&self) -> Option { + self.cache_info.exact_selected_column() + } + + pub(super) fn use_projection_filter_eval(&self) -> bool { + self.cache_info.use_projection_filter_eval() } /// Returns the inner filter, consuming this FilterInfo diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 97658f4946ec..cf2fdfa82ac7 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -20,24 +20,31 @@ mod filter; use crate::DecodeResult; use crate::arrow::ProjectionMask; -use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache}; +use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder, CacheMode, RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::arrow_reader::selection::RowSelectionStrategy; use crate::arrow::arrow_reader::{ - ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy, + ArrowPredicate, ParquetRecordBatchReader, PredicateBatchObserver, PushdownFilterEvalMode, + ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy, }; use crate::arrow::in_memory_row_group::ColumnChunkData; use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder; use crate::arrow::push_decoder::reader_builder::filter::CacheInfo; -use crate::arrow::schema::ParquetField; +use crate::arrow::schema::{ParquetField, ParquetFieldType}; use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::util::push_buffers::PushBuffers; +use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, StructArray, new_empty_array}; +use arrow_schema::DataType as ArrowType; +use arrow_select::filter::{filter, filter_record_batch, prep_null_mask_filter}; use bytes::Bytes; use data::DataRequest; use filter::AdvanceResult; use filter::FilterInfo; +use std::any::Any; +use std::collections::HashMap; +use std::collections::VecDeque; use std::ops::Range; use std::sync::{Arc, RwLock}; @@ -49,6 +56,31 @@ struct RowGroupInfo { plan_builder: ReadPlanBuilder, } +/// Observer that stores exact selected values for one cached column. +struct ExactSelectedObserver { + cache: Arc>, + column_idx: usize, +} + +impl PredicateBatchObserver for ExactSelectedObserver { + fn observe( + &mut self, + batch: &RecordBatch, + filter_values: &BooleanArray, + ) -> crate::errors::Result<()> { + if batch.num_columns() != 1 { + return Ok(()); + } + let selected = filter(batch.column(0).as_ref(), filter_values)?; + let _inserted = self + .cache + .write() + .unwrap() + .insert_selected(self.column_idx, selected); + Ok(()) + } +} + /// This is the inner state machine for reading a single row group. #[derive(Debug)] enum RowGroupDecoderState { @@ -160,6 +192,9 @@ pub(crate) struct RowGroupReaderBuilder { /// Strategy for materialising row selections row_selection_policy: RowSelectionPolicy, + /// How pushed-down predicates are evaluated during filtering. + pushdown_filter_eval_mode: PushdownFilterEvalMode, + /// Current state of the decoder. /// /// It is taken when processing, and must be put back before returning @@ -185,6 +220,7 @@ impl RowGroupReaderBuilder { max_predicate_cache_size: usize, buffers: PushBuffers, row_selection_policy: RowSelectionPolicy, + pushdown_filter_eval_mode: PushdownFilterEvalMode, ) -> Self { Self { batch_size, @@ -197,6 +233,7 @@ impl RowGroupReaderBuilder { metrics, max_predicate_cache_size, row_selection_policy, + pushdown_filter_eval_mode, state: Some(RowGroupDecoderState::Finished), buffers, } @@ -323,9 +360,32 @@ impl RowGroupReaderBuilder { })); }; - // we have predicates to evaluate - let cache_projection = - self.compute_cache_projection(row_group_info.row_group_idx, &filter); + // We have predicates to evaluate. Decide whether to keep the + // existing "predicate->selection->second decode" flow, or use a + // projection-driven single-pass flow that can avoid the second decode. + let use_projection_filter_eval = + self.use_projection_filter_eval(row_group_info.row_group_idx, &filter); + let cache_projection = if use_projection_filter_eval { + self.compute_projection_filter_eval_cache_projection( + row_group_info.row_group_idx, + ) + } else { + self.compute_cache_projection(row_group_info.row_group_idx, &filter) + }; + let exact_selected_column = if use_projection_filter_eval { + None + } else { + self.compute_exact_selected_column( + row_group_info.row_group_idx, + &filter, + &cache_projection, + ) + }; + let cache_mode = if exact_selected_column.is_some() { + CacheMode::ExactSelected + } else { + CacheMode::Raw + }; let cache_info = CacheInfo::new( cache_projection, @@ -333,6 +393,9 @@ impl RowGroupReaderBuilder { self.batch_size, self.max_predicate_cache_size, ))), + cache_mode, + exact_selected_column, + use_projection_filter_eval, ); let filter_info = FilterInfo::new(filter, cache_info); @@ -366,6 +429,11 @@ impl RowGroupReaderBuilder { // Make a request for the data needed to evaluate the current predicate let predicate = filter_info.current(); + let predicate_input_projection = if filter_info.use_projection_filter_eval() { + self.projection.clone() + } else { + predicate.projection().clone() + }; // need to fetch pages the column needs for decoding, figure // that out based on the current selection and projection @@ -374,10 +442,10 @@ impl RowGroupReaderBuilder { row_count, self.batch_size, &self.metadata, - predicate.projection(), // use the predicate's projection + &predicate_input_projection, ) .with_selection(plan_builder.selection()) - // Fetch predicate columns; expand selection only for cached predicate columns + // Expand the fetched selection for cache columns when caching is enabled. .with_cache_projection(Some(filter_info.cache_projection())) .with_column_chunks(column_chunks) .build(); @@ -421,34 +489,144 @@ impl RowGroupReaderBuilder { } = row_group_info; let predicate = filter_info.current(); + let predicate_input_projection = if filter_info.use_projection_filter_eval() { + self.projection.clone() + } else { + predicate.projection().clone() + }; let row_group = data_request.try_into_in_memory_row_group( row_group_idx, row_count, &self.metadata, - predicate.projection(), + &predicate_input_projection, &mut self.buffers, )?; - let cache_options = filter_info.cache_builder().producer(); + if filter_info.use_projection_filter_eval() { + // Single-pass path: + // 1) decode the output projection once + // 2) evaluate predicates against that decoded batch + // 3) keep only selected rows and return directly + // + // Rationale: for overlap-heavy shapes (for example, filtering and + // projecting the same column), this avoids the second decode pass and + // can remove substantial cache/selection orchestration overhead. + let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_parquet_metadata(&self.metadata) + .build_array_reader(self.fields.as_deref(), &self.projection)?; + let predicate_rows_reader = + ParquetRecordBatchReader::new(array_reader, plan_builder.clone().build()); + let mut filter = filter_info.into_filter(); + let predicate_column_indices: Vec>> = filter + .predicates + .iter() + .map(|predicate| { + self.projection_indices_for_predicate( + &self.projection, + predicate.projection(), + ) + }) + .collect::>()?; + + let mut selected_batches = Vec::new(); + for maybe_batch in predicate_rows_reader { + let mut batch = maybe_batch?; + for (predicate, projection_indices) in filter + .predicates + .iter_mut() + .zip(predicate_column_indices.iter()) + { + if batch.num_rows() == 0 { + break; + } + let predicate_batch = match projection_indices.as_ref() { + Some(indices) => { + self.project_batch_for_predicate(&batch, indices)? + } + None => batch.clone(), + }; + let mut mask = predicate.evaluate(predicate_batch)?; + if mask.len() != batch.num_rows() { + return Err(ParquetError::General(format!( + "ArrowPredicate returned {} rows, expected {}", + mask.len(), + batch.num_rows() + ))); + } + if mask.null_count() > 0 { + mask = prep_null_mask_filter(&mask); + } + if mask.true_count() == 0 { + batch = batch.slice(0, 0); + break; + } + if mask.true_count() != batch.num_rows() { + batch = filter_record_batch(&batch, &mask)?; + } + } + if batch.num_rows() > 0 { + selected_batches.push(batch); + } + } + + self.filter = Some(filter); + // Apply global offset/limit here because this path bypasses StartData, + // where those are normally enforced. + let selected_batches = + self.apply_offset_limit_to_selected_batches(selected_batches); + if selected_batches.is_empty() { + return Ok(NextState::result( + RowGroupDecoderState::Finished, + DecodeResult::Finished, + )); + } + + let array_reader = + Box::new(PrecomputedStructArrayReader::try_new(selected_batches)?); + let reader = ParquetRecordBatchReader::new( + array_reader, + ReadPlanBuilder::new(self.batch_size).build(), + ); + return Ok(NextState::result( + RowGroupDecoderState::Finished, + DecodeResult::Data(reader), + )); + } + + let cache_options = filter_info.producer_cache_options(); let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) .with_cache_options(Some(&cache_options)) .with_parquet_metadata(&self.metadata) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + .build_array_reader(self.fields.as_deref(), &predicate_input_projection)?; - // Prepare to evaluate the filter. - // Note: first update the selection strategy to properly handle any pages - // pruned during fetch plan_builder = override_selector_strategy_if_needed( plan_builder, - predicate.projection(), + &predicate_input_projection, self.row_group_offset_index(row_group_idx), ); - // `with_predicate` actually evaluates the filter - plan_builder = - plan_builder.with_predicate(array_reader, filter_info.current_mut())?; + plan_builder = if filter_info.use_projection_filter_eval() { + self.with_predicate_from_projection_reader( + plan_builder, + array_reader, + filter_info.current_mut(), + &predicate_input_projection, + )? + } else if let Some(column_idx) = filter_info.exact_selected_column() { + let mut observer = ExactSelectedObserver { + cache: cache_options.cache.clone(), + column_idx, + }; + plan_builder.with_predicate_with_observer( + array_reader, + filter_info.current_mut(), + Some(&mut observer), + )? + } else { + plan_builder.with_predicate(array_reader, filter_info.current_mut())? + }; let row_group_info = RowGroupInfo { row_group_idx, @@ -544,7 +722,16 @@ impl RowGroupReaderBuilder { // so don't call with_cache_projection here .build(); - plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy); + plan_builder = if cache_info + .as_ref() + .and_then(|c| c.exact_selected_column()) + .is_some() + { + // ExactSelected cache stream is selected-row-only and requires selectors. + plan_builder.with_row_selection_policy(RowSelectionPolicy::Selectors) + } else { + plan_builder.with_row_selection_policy(self.row_selection_policy) + }; plan_builder = override_selector_strategy_if_needed( plan_builder, @@ -604,7 +791,7 @@ impl RowGroupReaderBuilder { let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics) .with_parquet_metadata(&self.metadata); let array_reader = if let Some(cache_info) = cache_info.as_ref() { - let cache_options = cache_info.builder().consumer(); + let cache_options = cache_info.consumer_options(); array_reader_builder .with_cache_options(Some(&cache_options)) .build_array_reader(self.fields.as_deref(), &self.projection) @@ -624,6 +811,222 @@ impl RowGroupReaderBuilder { Ok(result) } + fn with_predicate_from_projection_reader( + &self, + mut plan_builder: ReadPlanBuilder, + array_reader: Box, + predicate: &mut dyn ArrowPredicate, + reader_projection: &ProjectionMask, + ) -> Result { + let projection_indices = + self.projection_indices_for_predicate(reader_projection, predicate.projection())?; + let reader = ParquetRecordBatchReader::new(array_reader, plan_builder.clone().build()); + let mut filters = Vec::new(); + for maybe_batch in reader { + let batch = maybe_batch?; + let input_rows = batch.num_rows(); + let predicate_batch = match projection_indices.as_ref() { + Some(indices) => self.project_batch_for_predicate(&batch, indices)?, + None => batch.clone(), + }; + let filter = predicate.evaluate(predicate_batch)?; + if filter.len() != input_rows { + return Err(ParquetError::General(format!( + "ArrowPredicate returned {} rows, expected {input_rows}", + filter.len() + ))); + } + let filter = match filter.null_count() { + 0 => filter, + _ => prep_null_mask_filter(&filter), + }; + filters.push(filter); + } + + let raw = RowSelection::from_filters(&filters); + let selection = match plan_builder.selection().cloned() { + Some(selection) => selection.and_then(&raw), + None => raw, + }; + plan_builder = plan_builder.with_selection(Some(selection)); + Ok(plan_builder) + } + + fn projection_indices_for_predicate( + &self, + reader_projection: &ProjectionMask, + predicate_projection: &ProjectionMask, + ) -> Result>, ParquetError> { + if reader_projection == predicate_projection { + return Ok(None); + } + + let num_leaves = self.metadata.file_metadata().schema_descr().num_columns(); + let reader_leaves = Self::leaf_indices(reader_projection, num_leaves); + let predicate_leaves = Self::leaf_indices(predicate_projection, num_leaves); + + let mut index_by_leaf = HashMap::with_capacity(reader_leaves.len()); + for (idx, leaf_idx) in reader_leaves.iter().enumerate() { + index_by_leaf.insert(*leaf_idx, idx); + } + + let mut projection_indices = Vec::with_capacity(predicate_leaves.len()); + for leaf_idx in predicate_leaves { + let Some(idx) = index_by_leaf.get(&leaf_idx).copied() else { + return Err(ParquetError::General(format!( + "Predicate leaf {leaf_idx} not present in reader projection" + ))); + }; + projection_indices.push(idx); + } + Ok(Some(projection_indices)) + } + + fn project_batch_for_predicate( + &self, + batch: &RecordBatch, + projection_indices: &[usize], + ) -> Result { + let projected_schema = Arc::new(batch.schema().project(projection_indices)?); + let projected_columns = projection_indices + .iter() + .map(|idx| batch.column(*idx).clone()) + .collect(); + Ok(RecordBatch::try_new(projected_schema, projected_columns)?) + } + + fn leaf_indices(mask: &ProjectionMask, num_leaves: usize) -> Vec { + (0..num_leaves) + .filter(|idx| mask.leaf_included(*idx)) + .collect() + } + + fn use_projection_filter_eval(&self, row_group_idx: usize, filter: &RowFilter) -> bool { + let default_auto_mode = matches!( + self.pushdown_filter_eval_mode, + PushdownFilterEvalMode::RowSelection + ); + let mode_enabled = match self.pushdown_filter_eval_mode { + PushdownFilterEvalMode::DecodeProjectionThenFilter => true, + // In default row-selection mode, auto-enable only for single-predicate + // queries. This keeps behavior conservative while still unlocking the + // no-second-decode optimization for common regression shapes. + PushdownFilterEvalMode::RowSelection => filter.predicates.len() == 1, + }; + if !mode_enabled { + return false; + } + // Virtual output columns (for example `row_number`) rely on synthetic + // readers and currently require the existing row-selection flow. + if self.has_virtual_output_columns() { + return false; + } + if !self.is_non_nested_projection(&self.projection) { + return false; + } + + let num_leaves = self.metadata.row_group(row_group_idx).columns().len(); + let output_leaves: HashMap = Self::leaf_indices(&self.projection, num_leaves) + .into_iter() + .map(|idx| (idx, ())) + .collect(); + + filter.predicates.iter().all(|predicate| { + let projection = predicate.projection(); + let predicate_leaves = Self::leaf_indices(projection, num_leaves); + self.is_non_nested_projection(projection) + && predicate_leaves + .iter() + .all(|idx| output_leaves.contains_key(idx)) + // Default auto-mode should remain conservative: only enable + // when predicate/output leaves are exactly the same shape. + && (!default_auto_mode + || (predicate_leaves.len() == output_leaves.len() + && output_leaves + .keys() + .all(|idx| predicate_leaves.contains(idx)))) + }) + } + + fn has_virtual_output_columns(&self) -> bool { + fn contains_virtual(field: &ParquetField) -> bool { + match &field.field_type { + ParquetFieldType::Virtual(_) => true, + ParquetFieldType::Group { children } => children.iter().any(contains_virtual), + ParquetFieldType::Primitive { .. } => false, + } + } + + self.fields.as_deref().is_some_and(contains_virtual) + } + + fn compute_projection_filter_eval_cache_projection( + &self, + row_group_idx: usize, + ) -> ProjectionMask { + let meta = self.metadata.row_group(row_group_idx); + self.exclude_nested_columns_from_cache(&self.projection) + .unwrap_or_else(|| ProjectionMask::none(meta.columns().len())) + } + + fn is_non_nested_projection(&self, mask: &ProjectionMask) -> bool { + self.exclude_nested_columns_from_cache(mask) + .as_ref() + .is_some_and(|non_nested| non_nested == mask) + } + + fn apply_offset_limit_to_selected_batches( + &mut self, + batches: Vec, + ) -> Vec { + // This helper mirrors the row-group level offset/limit behavior used by + // ReadPlanBuilder, but operates on already materialized filtered batches. + let mut batches = batches; + if let Some(offset_remaining) = self.offset.as_mut() { + if *offset_remaining > 0 { + let mut to_skip = *offset_remaining; + let mut after_offset = Vec::new(); + for batch in batches { + if to_skip == 0 { + after_offset.push(batch); + continue; + } + if to_skip >= batch.num_rows() { + to_skip -= batch.num_rows(); + } else { + let sliced = batch.slice(to_skip, batch.num_rows() - to_skip); + after_offset.push(sliced); + to_skip = 0; + } + } + *offset_remaining = to_skip; + batches = after_offset; + } + } + + if let Some(limit_remaining) = self.limit.as_mut() { + let mut keep = *limit_remaining; + let mut limited = Vec::new(); + for batch in batches { + if keep == 0 { + break; + } + if batch.num_rows() <= keep { + keep -= batch.num_rows(); + limited.push(batch); + } else { + limited.push(batch.slice(0, keep)); + keep = 0; + break; + } + } + *limit_remaining = keep; + batches = limited; + } + + batches + } + /// Which columns should be cached? /// /// Returns the columns that are used by the filters *and* then used in the @@ -636,6 +1039,43 @@ impl RowGroupReaderBuilder { } } + /// Returns the cached column index for ExactSelected mode, if applicable. + /// + /// This prototype is intentionally conservative: + /// - exactly one predicate + /// - exactly one cached leaf + /// - predicate projection contains exactly that same leaf + fn compute_exact_selected_column( + &self, + row_group_idx: usize, + filter: &RowFilter, + cache_projection: &ProjectionMask, + ) -> Option { + // ExactSelected requires all projected outputs to stay synchronized by + // row position; virtual columns (for example `row_number`) are produced + // by synthetic readers and are not part of this cache stream. + if self.has_virtual_output_columns() + || self.max_predicate_cache_size == 0 + || filter.predicates.len() != 1 + { + return None; + } + let num_cols = self.metadata.row_group(row_group_idx).columns().len(); + let cached_cols: Vec = (0..num_cols) + .filter(|idx| cache_projection.leaf_included(*idx)) + .collect(); + if cached_cols.len() != 1 { + return None; + } + let predicate_cols: Vec = (0..num_cols) + .filter(|idx| filter.predicates[0].projection().leaf_included(*idx)) + .collect(); + if predicate_cols.len() != 1 || predicate_cols[0] != cached_cols[0] { + return None; + } + Some(cached_cols[0]) + } + fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option { // Do not compute the projection mask if the predicate cache is disabled if self.max_predicate_cache_size == 0 { @@ -664,6 +1104,135 @@ impl RowGroupReaderBuilder { } } +/// ArrayReader backed by precomputed filtered batches. +/// +/// This is used by the single-pass filter path to return already-selected rows +/// through the same `ParquetRecordBatchReader` interface without re-decoding. +#[derive(Debug)] +struct PrecomputedStructArrayReader { + data_type: ArrowType, + remaining: VecDeque, + current: Option<(ArrayRef, usize)>, + staged: Vec, +} + +impl PrecomputedStructArrayReader { + fn try_new(batches: Vec) -> Result { + if batches.is_empty() { + return Err(ParquetError::General( + "Precomputed reader requires at least one batch".to_string(), + )); + } + + let schema = batches[0].schema(); + for batch in batches.iter().skip(1) { + if batch.schema() != schema { + return Err(ParquetError::General( + "Precomputed reader requires consistent batch schemas".to_string(), + )); + } + } + + let remaining = batches + .into_iter() + .map(|batch| Arc::new(StructArray::from(batch)) as ArrayRef) + .collect(); + + Ok(Self { + data_type: ArrowType::Struct(schema.fields().clone()), + remaining, + current: None, + staged: Vec::new(), + }) + } +} + +impl ArrayReader for PrecomputedStructArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, num_records: usize) -> Result { + let mut read = 0; + while read < num_records { + if self.current.is_none() { + self.current = self.remaining.pop_front().map(|array| (array, 0)); + } + let Some((array, offset)) = self.current.as_mut() else { + break; + }; + + let available = array.len().saturating_sub(*offset); + if available == 0 { + self.current = None; + continue; + } + + let take = (num_records - read).min(available); + self.staged.push(array.slice(*offset, take)); + *offset += take; + read += take; + + if *offset >= array.len() { + self.current = None; + } + } + Ok(read) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + let mut skipped = 0; + while skipped < num_records { + if self.current.is_none() { + self.current = self.remaining.pop_front().map(|array| (array, 0)); + } + let Some((array, offset)) = self.current.as_mut() else { + break; + }; + + let available = array.len().saturating_sub(*offset); + if available == 0 { + self.current = None; + continue; + } + + let take = (num_records - skipped).min(available); + *offset += take; + skipped += take; + + if *offset >= array.len() { + self.current = None; + } + } + Ok(skipped) + } + + fn consume_batch(&mut self) -> Result { + if self.staged.is_empty() { + return Ok(new_empty_array(&self.data_type)); + } + if self.staged.len() == 1 { + return Ok(self.staged.pop().expect("len checked")); + } + let arrays = self.staged.drain(..).collect::>(); + Ok(arrow_select::concat::concat( + &arrays.iter().map(|a| a.as_ref()).collect::>(), + )?) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } +} + /// Override the selection strategy if needed. /// /// Some pages can be skipped during row-group construction if they are not read @@ -722,6 +1291,6 @@ mod tests { #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 200); + assert_eq!(std::mem::size_of::(), 224); } }