From 19fc1614082e3332043b25cda4ef06a0b960fc92 Mon Sep 17 00:00:00 2001 From: Jonas Dedden Date: Thu, 12 Feb 2026 13:26:03 +0100 Subject: [PATCH 1/2] Reproduce the issue of #9370 in a minimal, end-to-end way --- parquet/tests/arrow_reader/row_filter/sync.rs | 229 +++++++++++++++++- 1 file changed, 228 insertions(+), 1 deletion(-) diff --git a/parquet/tests/arrow_reader/row_filter/sync.rs b/parquet/tests/arrow_reader/row_filter/sync.rs index 78ba7569a465..bb9064a13827 100644 --- a/parquet/tests/arrow_reader/row_filter/sync.rs +++ b/parquet/tests/arrow_reader/row_filter/sync.rs @@ -33,7 +33,10 @@ use parquet::{ }, }, errors::Result, - file::{metadata::PageIndexPolicy, properties::WriterProperties}, + file::{ + metadata::PageIndexPolicy, + properties::{WriterProperties, WriterVersion}, + }, }; #[test] @@ -194,3 +197,227 @@ fn test_row_filter_full_page_skip_is_handled() { let result = concat_batches(&schema, &batches).unwrap(); assert_eq!(result.num_rows(), 2); } + +/// Regression test for +/// +/// When `skip_records` on a list column crosses v2 data page boundaries, +/// the partial record state (`has_partial`) in the repetition level +/// decoder must be flushed before the whole-page-skip shortcut can fire. +/// Without the fix, the list column over-skips by one record, causing +/// struct children to disagree on record counts. +#[test] +#[should_panic(expected = "StructArrayReader out of sync in read_records, expected 1 read, got 0")] +fn test_row_selection_list_column_v2_page_boundary_skip() { + use arrow_array::builder::{Int32Builder, ListBuilder}; + + let num_rows = 10usize; + + // Schema: { id: int32, values: list } + // Two top-level columns so that StructArrayReader can detect the + // desync between a simple column (id) and a list column (values). + let schema = Arc::new(Schema::new(vec![ + Field::new("id", ArrowDataType::Int32, false), + Field::new( + "values", + ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Int32, true))), + false, + ), + ])); + + // Each row: id = i, values = [i*10, i*10+1] + let ids = Int32Array::from((0..num_rows as i32).collect::>()); + let mut list_builder = ListBuilder::new(Int32Builder::new()); + for i in 0..num_rows as i32 { + list_builder.values().append_value(i * 10); + list_builder.values().append_value(i * 10 + 1); + list_builder.append(true); + } + let values_array = list_builder.finish(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(ids) as ArrayRef, + Arc::new(values_array) as ArrayRef, + ], + ) + .unwrap(); + + // Force v2 data pages with exactly 2 rows per page → 5 pages. + // The default reader (no offset index) puts SerializedPageReader + // in Values state where at_record_boundary() returns false for + // non-last pages, matching the production scenario. + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_write_batch_size(2) + .set_data_page_row_count_limit(2) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + // 1. Read without row selection — should always succeed + let reader = ParquetRecordBatchReaderBuilder::try_new(data.clone()) + .unwrap() + .build() + .unwrap(); + let batches: Vec<_> = reader.map(|r| r.unwrap()).collect(); + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, num_rows, "full read should return all rows"); + + // 2. Read with row selection: select first and last row. + // This produces the sequence read(1) + skip(8) + read(1). + // + // The skip crosses v2 page boundaries. Without the fix, the + // list column's rep-level decoder has stale has_partial state + // after exhausting the first page's remaining levels, causing + // the whole-page-skip shortcut to over-count by one record. + // + // We must use RowSelectionPolicy::Selectors because the default + // Auto policy would choose the Mask strategy for this small + // selection, which reads all rows then filters (never calling + // skip_records, thereby hiding the bug). + let selection = RowSelection::from(vec![ + RowSelector::select(1), + RowSelector::skip(num_rows - 2), + RowSelector::select(1), + ]); + + let reader = ParquetRecordBatchReaderBuilder::try_new(data) + .unwrap() + .with_row_selection(selection) + .with_row_selection_policy(RowSelectionPolicy::Selectors) + .build() + .unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, 2, "selection should return exactly 2 rows"); + + // Verify data correctness: row 0 and row 9 + let result = concat_batches(&schema, &batches).unwrap(); + let id_col = result + .column(0) + .as_primitive::(); + assert_eq!(id_col.value(0), 0); + assert_eq!(id_col.value(1), (num_rows - 1) as i32); + + let list_col = result.column(1).as_list::(); + let first = list_col + .value(0) + .as_primitive::() + .values() + .to_vec(); + assert_eq!(first, vec![0, 1]); + let last = list_col + .value(1) + .as_primitive::() + .values() + .to_vec(); + let n = (num_rows - 1) as i32; + assert_eq!(last, vec![n * 10, n * 10 + 1]); +} + +/// Regression test for +/// +/// When leaf columns inside a `List>` have different page +/// boundaries (due to value-size differences), the `has_partial` state +/// bug causes one leaf to over-skip by one record while the other stays +/// correct. +#[test] +#[should_panic(expected = "Not all children array length are the same!")] +fn test_list_struct_page_boundary_desync_produces_length_mismatch() { + use arrow_array::Array; + use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder}; + use arrow_schema::Fields; + + let num_rows = 14usize; + // Long string forces the string column to flush pages much sooner + // than the int32 column, creating different page boundaries. + let long_prefix = "x".repeat(500); + + // Schema: { vals: List> } + let struct_fields = Fields::from(vec![ + Field::new("x", ArrowDataType::Int32, false), + Field::new("y", ArrowDataType::Utf8, false), + ]); + + // Build data: even rows have 2 list elements, odd rows have 3. + // This ensures different physical value counts per record, so + // reading from the wrong position produces a different total. + let mut list_builder = ListBuilder::new(StructBuilder::from_fields(struct_fields, 0)); + for i in 0..num_rows { + let num_elems = if i % 2 == 0 { 2 } else { 3 }; + let sb = list_builder.values(); + for j in 0..num_elems { + sb.field_builder::(0) + .unwrap() + .append_value(i as i32 * 10 + j as i32); + sb.field_builder::(1) + .unwrap() + .append_value(format!("{long_prefix}_{i}_{j}")); + sb.append(true); + } + list_builder.append(true); + } + let vals_array = list_builder.finish(); + + // Derive schema from the actual array type to avoid field-name mismatches. + let schema = Arc::new(Schema::new(vec![Field::new( + "vals", + vals_array.data_type().clone(), + false, + )])); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(vals_array) as ArrayRef]).unwrap(); + + // V2 pages + small max_data_page_size. + // Column x (Int32): all 14 rows fit in one page (~140 bytes values). + // Column y (Utf8, 500-byte strings): pages flush after every ~2 rows. + // This creates the page boundary asymmetry needed to trigger the bug. + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_write_batch_size(2) + .set_dictionary_enabled(false) + .set_data_page_size_limit(512) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + // 1. Read without row selection — should always succeed. + let reader = ParquetRecordBatchReaderBuilder::try_new(data.clone()) + .unwrap() + .build() + .unwrap(); + let batches: Vec<_> = reader.map(|r| r.unwrap()).collect(); + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, num_rows, "full read should return all rows"); + + // 2. Read with selection: select(1) + skip(10) + select(1). + // Without the fix, the string column (y) over-skips by 1, + // reading a record with a different element count than the + // int column (x). The inner StructArrayReader sees arrays + // of different lengths. + let selection = RowSelection::from(vec![ + RowSelector::select(1), + RowSelector::skip(10), + RowSelector::select(1), + ]); + + let reader = ParquetRecordBatchReaderBuilder::try_new(data) + .unwrap() + .with_row_selection(selection) + .with_row_selection_policy(RowSelectionPolicy::Selectors) + .build() + .unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, 2, "selection should return exactly 2 rows"); +} From 72ad098130107cd9bbe21869adae87bb7df5c8db Mon Sep 17 00:00:00 2001 From: Jonas Dedden Date: Thu, 12 Feb 2026 13:32:14 +0100 Subject: [PATCH 2/2] CQ fix --- parquet/tests/arrow_reader/row_filter/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/tests/arrow_reader/row_filter/sync.rs b/parquet/tests/arrow_reader/row_filter/sync.rs index bb9064a13827..e59fa392cfd4 100644 --- a/parquet/tests/arrow_reader/row_filter/sync.rs +++ b/parquet/tests/arrow_reader/row_filter/sync.rs @@ -354,7 +354,7 @@ fn test_list_struct_page_boundary_desync_produces_length_mismatch() { for j in 0..num_elems { sb.field_builder::(0) .unwrap() - .append_value(i as i32 * 10 + j as i32); + .append_value(i as i32 * 10 + j); sb.field_builder::(1) .unwrap() .append_value(format!("{long_prefix}_{i}_{j}"));