diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 21f0c2afa41..a1d7b38f264 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -107,7 +107,7 @@ impl CachedArrayReader { outer_position: 0, inner_position: 0, batch_size, - selections: BooleanBufferBuilder::new(0), + selections: BooleanBufferBuilder::new(batch_size), role, local_cache: HashMap::new(), metrics, @@ -226,7 +226,6 @@ impl ArrayReader for CachedArrayReader { read += select_cnt; self.metrics.increment_cache_reads(select_cnt); self.outer_position += select_cnt; - self.selections.append_n(select_cnt, true); } else { // this is last batch and we have used all records from it break; @@ -245,7 +244,6 @@ impl ArrayReader for CachedArrayReader { ); read += select_from_this_batch; self.outer_position += select_from_this_batch; - self.selections.append_n(select_from_this_batch, true); if read_from_inner < self.batch_size { // this is last batch from inner reader break; @@ -253,6 +251,7 @@ impl ArrayReader for CachedArrayReader { } } } + self.selections.append_n(read, true); Ok(read) } @@ -261,9 +260,9 @@ impl ArrayReader for CachedArrayReader { while skipped < num_records { let size = std::cmp::min(num_records - skipped, self.batch_size); skipped += size; - self.selections.append_n(size, false); self.outer_position += size; } + self.selections.append_n(skipped, false); Ok(num_records) } @@ -313,7 +312,7 @@ impl ArrayReader for CachedArrayReader { selected_arrays.push(filtered); } - self.selections = BooleanBufferBuilder::new(0); + self.selections = BooleanBufferBuilder::new(self.batch_size); // Only remove batches from local buffer that are completely behind current position // Keep the current batch and any future batches as they might still be needed