Skip to content
Open
131 changes: 95 additions & 36 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,17 +397,29 @@ impl ByteArrayDecoderPlain {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?;
match output.try_push(&buf[start_offset..end_offset], self.validate_utf8) {
Ok(_) => {
self.offset = end_offset;
read += 1;
}
Err(e) => {
if e.to_string().contains("index overflow") {
break;
} else {
// Update state before returning error
self.max_remaining_values -= read;
return Err(e);
}
}
}

self.offset = end_offset;
read += 1;
}
self.max_remaining_values -= to_read;
self.max_remaining_values -= read;

if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
}
Ok(to_read)
Ok(read)
}

pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
Expand Down Expand Up @@ -474,39 +486,58 @@ impl ByteArrayDecoderDeltaLength {
}

fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
) -> Result<usize> {
let initial_values_length = output.values.len();
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
) -> Result<usize> {
let initial_values_length = output.values.len();

let to_read = len.min(self.lengths.len() - self.length_offset);
output.offsets.reserve(to_read);
let to_read = len.min(self.lengths.len() - self.length_offset);
output.offsets.reserve(to_read);

let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
let src_lengths =
&self.lengths[self.length_offset..self.length_offset + to_read];

let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
output.values.reserve(total_bytes);

let mut current_offset = self.data_offset;
for length in src_lengths {
let end_offset = current_offset + *length as usize;
output.try_push(
&self.data.as_ref()[current_offset..end_offset],
self.validate_utf8,
)?;
current_offset = end_offset;
}
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
output.values.reserve(total_bytes);

self.data_offset = current_offset;
self.length_offset += to_read;
let mut current_offset = self.data_offset;
let mut read = 0;

if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
for length in src_lengths {
let end_offset = current_offset + *length as usize;

match output.try_push(
&self.data.as_ref()[current_offset..end_offset],
self.validate_utf8,
) {
Ok(_) => {
current_offset = end_offset;
read += 1;
}
Err(e) => {
if e.to_string().contains("index overflow") {
break;
} else {
// Update state before returning error
self.data_offset = current_offset;
self.length_offset += read;
return Err(e);
}
}
}
Ok(to_read)
}

self.data_offset = current_offset;
self.length_offset += read;

if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
}

Ok(read)
}

fn skip(&mut self, to_skip: usize) -> Result<usize> {
let remain_values = self.lengths.len() - self.length_offset;
let to_skip = remain_values.min(to_skip);
Expand Down Expand Up @@ -542,13 +573,41 @@ impl ByteArrayDecoderDelta {
let initial_values_length = output.values.len();
output.offsets.reserve(len.min(self.decoder.remaining()));

let read = self
.decoder
.read(len, |bytes| output.try_push(bytes, self.validate_utf8))?;

if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
let mut read = 0;
let mut hit_overflow = false;

let result = self.decoder.read(len, |bytes| {
match output.try_push(bytes, self.validate_utf8) {
Ok(_) => {
read += 1;
Ok(())
}
Err(e) if e.to_string().contains("index overflow") => {
hit_overflow = true;
// Return error to stop decoder, but we'll handle it as overflow outside
Err(e)
}
Err(e) => Err(e),
}
});

match result {
Ok(_) => {
// All values read successfully
if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
}
}
Err(_) if hit_overflow => {
// Index overflow - this is expected, just stop reading
// Don't validate UTF-8 as buffer may be in intermediate state
}
Err(e) => {
// Other error - propagate it
return Err(e);
}
}

Ok(read)
}

Expand Down
8 changes: 6 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,8 @@ impl ParquetRecordBatchReader {
mask_chunk.chunk_rows
));
}

// Keep strict behavior — do NOT allow partial reads here
if read != mask_chunk.chunk_rows {
return Err(general_err!(
"insufficient rows read from array reader - expected {}, got {}",
Expand All @@ -1413,8 +1415,9 @@ impl ParquetRecordBatchReader {
}

let array = self.array_reader.consume_batch()?;
// The column reader exposes the projection as a struct array; convert this
// into a record batch before applying the boolean filter mask.

// The column reader exposes the projection as a struct array;
// convert this into a record batch before applying the boolean filter mask.
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct array".to_string(),
Expand All @@ -1439,6 +1442,7 @@ impl ParquetRecordBatchReader {
return Ok(Some(filtered_batch));
}
}

RowSelectionCursor::Selectors(selectors_cursor) => {
while read_records < batch_size && !selectors_cursor.is_empty() {
let front = selectors_cursor.next_selector();
Expand Down
13 changes: 8 additions & 5 deletions parquet/src/arrow/buffer/offset_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
/// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after
/// all data has been written
pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
// Keep UTF8 validation
if validate_utf8 {
if let Some(&b) = data.first() {
// A valid code-point iff it does not start with 0b10xxxxxx
// Bit-magic taken from `std::str::is_char_boundary`
if (b as i8) < -0x40 {
return Err(ParquetError::General(
"encountered non UTF-8 data".to_string(),
Expand All @@ -72,15 +71,19 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
}
}

self.values.extend_from_slice(data);
let current_len = self.values.len();
let new_len = current_len + data.len();

let index_offset = I::from_usize(self.values.len())
// Proactively check if offset type can represent this
let index_offset = I::from_usize(new_len)
.ok_or_else(|| general_err!("index overflow decoding byte array"))?;

// Only mutate state AFTER successful conversion
self.values.extend_from_slice(data);
self.offsets.push(index_offset);

Ok(())
}

/// Extends this buffer with a list of keys
///
/// For each value `key` in `keys` this will insert
Expand Down
17 changes: 10 additions & 7 deletions parquet/tests/arrow_reader/checksum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ use parquet::arrow::arrow_reader::ArrowReaderBuilder;
#[test]
fn test_datapage_v1_corrupt_checksum() {
let errors = read_file_batch_errors("datapage_v1-corrupt-checksum.parquet");
assert_eq!(errors, [
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Ok(()),
Ok(()),
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string())
]);
assert_eq!(
errors,
[
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Ok(()),
Ok(()),
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string()),
]
);
}

#[test]
Expand Down
Loading