Skip to content
Merged
9 changes: 8 additions & 1 deletion parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,14 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {
/// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
/// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
fn at_record_boundary(&mut self) -> Result<bool> {
Ok(self.peek_next_page()?.is_none())
match self.peek_next_page()? {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks very reasonable

// Last page in the column chunk - always a record boundary
None => Ok(true),
// A V2 data page is required by the parquet spec to start at a
// record boundary, so the current page ends at one. V2 pages
// are identified by having `num_rows` set in their header.
Some(metadata) => Ok(metadata.num_rows.is_some()),
}
}
}

Expand Down
131 changes: 131 additions & 0 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1361,4 +1361,135 @@ mod tests {
);
}
}

/// Regression test for <https://github.com/apache/arrow-rs/issues/9370>
///
/// Reproduces the production scenario: all DataPage v2 pages for a
/// list column (rep_level=1) read without an offset index (i.e.
/// `at_record_boundary` returns false for non-last pages).
///
/// When a prior operation (here `skip_records(1)`) loads a v2 page,
/// and a subsequent `skip_records` exhausts the remaining levels on
/// that page, the rep level decoder is left with `has_partial=true`.
/// Because `has_record_delimiter` is false, the partial is not
/// flushed during level-based processing. When the next v2 page is
/// then peeked with `num_rows` available, the whole-page-skip
/// shortcut must flush the pending partial first. Otherwise:
///
/// 1. The skip over-counts (skips N+1 records instead of N), and
/// 2. The stale `has_partial` causes a subsequent `read_records` to
/// produce a "phantom" record with 0 values.
#[test]
fn test_skip_records_v2_page_skip_accounts_for_partial() {
use crate::encodings::levels::LevelEncoder;

let max_rep_level: i16 = 1;
let max_def_level: i16 = 1;

// Column descriptor for a list element column (rep=1, def=1)
let primitive_type = SchemaType::primitive_type_builder("element", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap();
let desc = Arc::new(ColumnDescriptor::new(
Arc::new(primitive_type),
max_def_level,
max_rep_level,
ColumnPath::new(vec!["list".to_string(), "element".to_string()]),
));

// Helper: build a DataPage v2 for this list column.
let make_v2_page =
|rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows: u32| -> Page {
let mut rep_enc = LevelEncoder::v2(max_rep_level, rep_levels.len());
rep_enc.put(rep_levels);
let rep_bytes = rep_enc.consume();

let mut def_enc = LevelEncoder::v2(max_def_level, def_levels.len());
def_enc.put(def_levels);
let def_bytes = def_enc.consume();

let val_bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();

let mut buf = Vec::new();
buf.extend_from_slice(&rep_bytes);
buf.extend_from_slice(&def_bytes);
buf.extend_from_slice(&val_bytes);

Page::DataPageV2 {
buf: Bytes::from(buf),
num_values: rep_levels.len() as u32,
encoding: Encoding::PLAIN,
num_nulls: 0,
num_rows,
def_levels_byte_len: def_bytes.len() as u32,
rep_levels_byte_len: rep_bytes.len() as u32,
is_compressed: false,
statistics: None,
}
};

// All pages are DataPage v2 (matching the production scenario where
// parquet-rs writes only v2 data pages and no offset index is loaded,
// so at_record_boundary() returns false for non-last pages).

// Page 1 (v2): 2 records × 2 elements = [10,20], [30,40]
let page1 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[10, 20, 30, 40], 2);

// Page 2 (v2): 2 records × 2 elements = [50,60], [70,80]
let page2 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[50, 60, 70, 80], 2);

// Page 3 (v2): 1 record × 2 elements = [90,100]
let page3 = make_v2_page(&[0, 1], &[1, 1], &[90, 100], 1);

// 5 records total: [10,20], [30,40], [50,60], [70,80], [90,100]
let pages = VecDeque::from(vec![page1, page2, page3]);
let page_reader = InMemoryPageReader::new(pages);
let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
let mut typed_reader = get_typed_column_reader::<Int32Type>(column_reader);

// Step 1 — skip 1 record:
// Peek page 1: num_rows=2, remaining=1 → rows(2) > remaining(1),
// so the page is LOADED (not whole-page-skipped).
// Level-based skip consumes rep levels [0,1] for record [10,20],
// stopping at the 0 that starts record [30,40].
let skipped = typed_reader.skip_records(1).unwrap();
assert_eq!(skipped, 1);

// Step 2 — skip 2 more records ([30,40] and [50,60]):
// Mid-page in page 1 with 2 remaining levels [0,1] for [30,40].
// skip_rep_levels(2, 2): the leading 0 does NOT act as a record
// delimiter (has_partial=false, idx==0), so count_records returns
// (true, 0, 2) — all levels consumed, has_partial=true, 0 records.
//
// has_record_delimiter is false → no flush at page boundary.
// Page 1 exhausted → peek page 2 (v2, num_rows=2).
//
// With fix: flush_partial → remaining 2→1, page 2 NOT skipped
// (rows=2 > remaining=1). Load page 2, skip 1 record [50,60].
//
// Without fix: rows(2) <= remaining(2) → page 2 whole-page-skipped,
// over-counting by 1. has_partial stays true (stale from page 1).
let skipped = typed_reader.skip_records(2).unwrap();
assert_eq!(skipped, 2);

// Step 3 — read 1 record:
let mut values = Vec::new();
let mut def_levels = Vec::new();
let mut rep_levels = Vec::new();

let (records, values_read, levels_read) = typed_reader
.read_records(1, Some(&mut def_levels), Some(&mut rep_levels), &mut values)
.unwrap();

// Without the fix: (1, 0, 0) — phantom record from stale has_partial;
// the rep=0 on page 3 "completes" the phantom, yielding 0 values.
// With the fix: (1, 2, 2) — correctly reads record [70, 80].
assert_eq!(records, 1, "should read exactly 1 record");
assert_eq!(levels_read, 2, "should read 2 levels for the record");
assert_eq!(values_read, 2, "should read 2 non-null values");
assert_eq!(values, vec![70, 80], "should contain 4th record's values");
assert_eq!(rep_levels, vec![0, 1], "rep levels for a 2-element list");
assert_eq!(def_levels, vec![1, 1], "def levels (all non-null)");
}
}
7 changes: 6 additions & 1 deletion parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,12 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

fn at_record_boundary(&mut self) -> Result<bool> {
match &mut self.state {
SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
SerializedPageReaderState::Values { .. } => match self.peek_next_page()? {
None => Ok(true),
// V2 data pages must start at record boundaries per the parquet
// spec, so the current page ends at one.
Some(metadata) => Ok(metadata.num_rows.is_some()),
},
SerializedPageReaderState::Pages { .. } => Ok(true),
}
}
Expand Down
2 changes: 0 additions & 2 deletions parquet/tests/arrow_reader/row_filter/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ fn test_row_filter_full_page_skip_is_handled() {
/// Without the fix, the list column over-skips by one record, causing
/// struct children to disagree on record counts.
#[test]
#[should_panic(expected = "StructArrayReader out of sync in read_records, expected 1 read, got 0")]
fn test_row_selection_list_column_v2_page_boundary_skip() {
use arrow_array::builder::{Int32Builder, ListBuilder};

Expand Down Expand Up @@ -327,7 +326,6 @@ fn test_row_selection_list_column_v2_page_boundary_skip() {
/// bug causes one leaf to over-skip by one record while the other stays
/// correct.
#[test]
#[should_panic(expected = "Not all children array length are the same!")]
fn test_list_struct_page_boundary_desync_produces_length_mismatch() {
use arrow_array::Array;
use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder};
Expand Down
Loading