Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow_schema::{DataType, Fields, SchemaBuilder};

use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
use crate::arrow::array_reader::cached_array_reader::CacheRole;
use crate::arrow::array_reader::cached_array_reader::{CacheMode, CacheRole};
use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
Expand Down Expand Up @@ -65,6 +65,8 @@ impl<'a> CacheOptionsBuilder<'a> {
projection_mask: self.projection_mask,
cache: self.cache,
role: CacheRole::Producer,
mode: CacheMode::Raw,
exact_selected_column: None,
}
}

Expand All @@ -74,6 +76,30 @@ impl<'a> CacheOptionsBuilder<'a> {
projection_mask: self.projection_mask,
cache: self.cache,
role: CacheRole::Consumer,
mode: CacheMode::Raw,
exact_selected_column: None,
}
}

/// Return producer cache options for exact-selected single-column mode
pub fn producer_exact_selected(self, column_idx: usize) -> CacheOptions<'a> {
CacheOptions {
projection_mask: self.projection_mask,
cache: self.cache,
role: CacheRole::Producer,
mode: CacheMode::ExactSelected,
exact_selected_column: Some(column_idx),
}
}

/// Return consumer cache options for exact-selected single-column mode
pub fn consumer_exact_selected(self, column_idx: usize) -> CacheOptions<'a> {
CacheOptions {
projection_mask: self.projection_mask,
cache: self.cache,
role: CacheRole::Consumer,
mode: CacheMode::ExactSelected,
exact_selected_column: Some(column_idx),
}
}
}
Expand All @@ -84,6 +110,9 @@ pub struct CacheOptions<'a> {
pub projection_mask: &'a ProjectionMask,
pub cache: &'a Arc<RwLock<RowGroupCache>>,
pub role: CacheRole,
pub mode: CacheMode,
/// Optional target column for exact-selected mode
pub exact_selected_column: Option<usize>,
}

/// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader
Expand Down Expand Up @@ -154,11 +183,22 @@ impl<'a> ArrayReaderBuilder<'a> {
};

if cache_options.projection_mask.leaf_included(col_idx) {
Ok(Some(Box::new(CachedArrayReader::new(
// ExactSelected mode only applies to the one projected column
// captured from predicate evaluation. Other projected columns
// keep regular Raw cache behavior.
let mode = if cache_options.mode == CacheMode::ExactSelected
&& cache_options.exact_selected_column == Some(col_idx)
{
CacheMode::ExactSelected
} else {
CacheMode::Raw
};
Ok(Some(Box::new(CachedArrayReader::new_with_mode(
reader,
Arc::clone(cache_options.cache),
col_idx,
cache_options.role,
mode,
self.metrics.clone(), // cheap clone
))))
} else {
Expand Down
135 changes: 127 additions & 8 deletions parquet/src/arrow/array_reader/cached_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ pub enum CacheRole {
Consumer,
}

/// Cache mode for cached readers
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CacheMode {
/// Existing behavior: cache raw decoded batches and filter in consumer
Raw,
/// Single-filter fast-path: cache selected rows and consume directly
ExactSelected,
}

/// A cached wrapper around an ArrayReader that avoids duplicate decoding
/// when the same column appears in both filter predicates and output projection.
///
Expand Down Expand Up @@ -82,9 +91,17 @@ pub struct CachedArrayReader {
selections: BooleanBufferBuilder,
/// Role of this reader (Producer or Consumer)
role: CacheRole,
/// Cache mode for this reader
mode: CacheMode,
/// Local cache to store batches between read_records and consume_batch calls
/// This ensures data is available even if the shared cache evicts items
local_cache: HashMap<BatchID, ArrayRef>,
/// Current selected cached chunk (ExactSelected consumer mode)
selected_chunk: Option<(ArrayRef, usize)>,
/// Staged selected arrays to return from consume_batch (ExactSelected consumer mode)
selected_staged: Vec<ArrayRef>,
/// Last batch id (exclusive) already cleaned from shared cache
last_cleaned_batch_id: usize,
/// Statistics to report on the Cache behavior
metrics: ArrowReaderMetrics,
}
Expand All @@ -97,6 +114,25 @@ impl CachedArrayReader {
column_idx: usize,
role: CacheRole,
metrics: ArrowReaderMetrics,
) -> Self {
Self::new_with_mode(
inner,
cache,
column_idx,
role,
CacheMode::Raw,
metrics,
)
}

/// Creates a new cached array reader with an explicit cache mode
pub fn new_with_mode(
inner: Box<dyn ArrayReader>,
cache: Arc<RwLock<RowGroupCache>>,
column_idx: usize,
role: CacheRole,
mode: CacheMode,
metrics: ArrowReaderMetrics,
) -> Self {
let batch_size = cache.read().unwrap().batch_size();

Expand All @@ -109,11 +145,19 @@ impl CachedArrayReader {
batch_size,
selections: BooleanBufferBuilder::new(0),
role,
mode,
local_cache: HashMap::new(),
selected_chunk: None,
selected_staged: Vec::new(),
last_cleaned_batch_id: 0,
metrics,
}
}

fn is_exact_selected_consumer(&self) -> bool {
self.role == CacheRole::Consumer && self.mode == CacheMode::ExactSelected
}

fn get_batch_id_from_position(&self, row_id: usize) -> BatchID {
BatchID {
val: row_id / self.batch_size,
Expand Down Expand Up @@ -149,13 +193,15 @@ impl CachedArrayReader {
// Store in both shared cache and local cache
// The shared cache is used to reuse results between readers
// The local cache ensures data is available for our consume_batch call
let _cached =
self.shared_cache
.write()
.unwrap()
.insert(self.column_idx, batch_id, array.clone());
// Note: if the shared cache is full (_cached == false), we continue without caching
// The local cache will still store the data for this reader's use
if !(self.mode == CacheMode::ExactSelected && self.role == CacheRole::Producer) {
let _cached =
self.shared_cache
.write()
.unwrap()
.insert(self.column_idx, batch_id, array.clone());
// Note: if the shared cache is full (_cached == false), we continue without caching
// The local cache will still store the data for this reader's use
}

self.local_cache.insert(batch_id, array);

Expand All @@ -172,15 +218,22 @@ impl CachedArrayReader {
// This ensures we don't remove batches that might still be needed for the current batch
// We can safely remove batch_id if current_batch_id > batch_id + 1
if current_batch_id.val > 1 {
// Only clean newly completed ranges. This avoids repeated remove scans
// over already-cleaned batch ids.
let cleanup_end = current_batch_id.val - 1;
if cleanup_end <= self.last_cleaned_batch_id {
return;
}
let mut cache = self.shared_cache.write().unwrap();
for batch_id_to_remove in 0..(current_batch_id.val - 1) {
for batch_id_to_remove in self.last_cleaned_batch_id..cleanup_end {
cache.remove(
self.column_idx,
BatchID {
val: batch_id_to_remove,
},
);
}
self.last_cleaned_batch_id = cleanup_end;
}
}
}
Expand All @@ -195,6 +248,9 @@ impl ArrayReader for CachedArrayReader {
}

fn read_records(&mut self, num_records: usize) -> Result<usize> {
if self.is_exact_selected_consumer() {
return self.read_records_exact_selected(num_records);
}
let mut read = 0;
while read < num_records {
let batch_id = self.get_batch_id_from_position(self.outer_position);
Expand Down Expand Up @@ -257,6 +313,9 @@ impl ArrayReader for CachedArrayReader {
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
if self.is_exact_selected_consumer() {
return Ok(num_records);
}
let mut skipped = 0;
while skipped < num_records {
let size = std::cmp::min(num_records - skipped, self.batch_size);
Expand All @@ -268,6 +327,9 @@ impl ArrayReader for CachedArrayReader {
}

fn consume_batch(&mut self) -> Result<ArrayRef> {
if self.is_exact_selected_consumer() {
return self.consume_exact_selected_batch();
}
let row_count = self.selections.len();
if row_count == 0 {
return Ok(new_empty_array(self.inner.get_data_type()));
Expand Down Expand Up @@ -348,6 +410,63 @@ impl ArrayReader for CachedArrayReader {
}
}

impl CachedArrayReader {
fn read_records_exact_selected(&mut self, num_records: usize) -> Result<usize> {
// ExactSelected consumer mode receives predicate-selected arrays in FIFO
// order from the shared cache and streams them directly.
let mut read = 0;
while read < num_records {
let need_next = match self.selected_chunk.as_ref() {
None => true,
Some((array, offset)) => *offset >= array.len(),
};
if need_next {
self.selected_chunk = self
.shared_cache
.write()
.unwrap()
.pop_selected(self.column_idx)
.map(|array| (array, 0));
}

let Some((array, offset)) = self.selected_chunk.as_mut() else {
break;
};

let available = array.len().saturating_sub(*offset);
if available == 0 {
self.selected_chunk = None;
continue;
}

let take = std::cmp::min(num_records - read, available);
self.selected_staged.push(array.slice(*offset, take));
*offset += take;
read += take;
self.metrics.increment_cache_reads(take);
}
Ok(read)
}

fn consume_exact_selected_batch(&mut self) -> Result<ArrayRef> {
if self.selected_staged.is_empty() {
return Ok(new_empty_array(self.inner.get_data_type()));
}

if self.selected_staged.len() == 1 {
return Ok(self.selected_staged.pop().unwrap());
}

let arrays = self
.selected_staged
.drain(..)
.collect::<Vec<_>>();
Ok(arrow_select::concat::concat(
&arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>(),
)?)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod test_util;
// Note that this crate is public under the `experimental` feature flag.
use crate::file::metadata::RowGroupMetaData;
pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
pub use cached_array_reader::CacheMode;
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
#[allow(unused_imports)] // Only used for benchmarks
Expand Down
Loading
Loading