Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 228 additions & 1 deletion parquet/tests/arrow_reader/row_filter/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use parquet::{
},
},
errors::Result,
file::{metadata::PageIndexPolicy, properties::WriterProperties},
file::{
metadata::PageIndexPolicy,
properties::{WriterProperties, WriterVersion},
},
};

#[test]
Expand Down Expand Up @@ -194,3 +197,227 @@ fn test_row_filter_full_page_skip_is_handled() {
let result = concat_batches(&schema, &batches).unwrap();
assert_eq!(result.num_rows(), 2);
}

/// Regression test for <https://github.com/apache/arrow-rs/issues/9370>
///
/// When `skip_records` on a list column crosses v2 data page boundaries,
/// the partial record state (`has_partial`) in the repetition level
/// decoder must be flushed before the whole-page-skip shortcut can fire.
/// Without the fix, the list column over-skips by one record, causing
/// struct children to disagree on record counts.
#[test]
#[should_panic(expected = "StructArrayReader out of sync in read_records, expected 1 read, got 0")]
fn test_row_selection_list_column_v2_page_boundary_skip() {
use arrow_array::builder::{Int32Builder, ListBuilder};

let num_rows = 10usize;

// Schema: { id: int32, values: list<int32> }
// Two top-level columns so that StructArrayReader can detect the
// desync between a simple column (id) and a list column (values).
let schema = Arc::new(Schema::new(vec![
Field::new("id", ArrowDataType::Int32, false),
Field::new(
"values",
ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Int32, true))),
false,
),
]));

// Each row: id = i, values = [i*10, i*10+1]
let ids = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
let mut list_builder = ListBuilder::new(Int32Builder::new());
for i in 0..num_rows as i32 {
list_builder.values().append_value(i * 10);
list_builder.values().append_value(i * 10 + 1);
list_builder.append(true);
}
let values_array = list_builder.finish();

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(ids) as ArrayRef,
Arc::new(values_array) as ArrayRef,
],
)
.unwrap();

// Force v2 data pages with exactly 2 rows per page → 5 pages.
// The default reader (no offset index) puts SerializedPageReader
// in Values state where at_record_boundary() returns false for
// non-last pages, matching the production scenario.
let props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_write_batch_size(2)
.set_data_page_row_count_limit(2)
.build();

let mut buffer = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let data = Bytes::from(buffer);

// 1. Read without row selection — should always succeed
let reader = ParquetRecordBatchReaderBuilder::try_new(data.clone())
.unwrap()
.build()
.unwrap();
let batches: Vec<_> = reader.map(|r| r.unwrap()).collect();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, num_rows, "full read should return all rows");

// 2. Read with row selection: select first and last row.
// This produces the sequence read(1) + skip(8) + read(1).
//
// The skip crosses v2 page boundaries. Without the fix, the
// list column's rep-level decoder has stale has_partial state
// after exhausting the first page's remaining levels, causing
// the whole-page-skip shortcut to over-count by one record.
//
// We must use RowSelectionPolicy::Selectors because the default
// Auto policy would choose the Mask strategy for this small
// selection, which reads all rows then filters (never calling
// skip_records, thereby hiding the bug).
let selection = RowSelection::from(vec![
RowSelector::select(1),
RowSelector::skip(num_rows - 2),
RowSelector::select(1),
]);

let reader = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_row_selection(selection)
.with_row_selection_policy(RowSelectionPolicy::Selectors)
.build()
.unwrap();
let batches: Vec<RecordBatch> = reader.map(|r| r.unwrap()).collect();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 2, "selection should return exactly 2 rows");

// Verify data correctness: row 0 and row 9
let result = concat_batches(&schema, &batches).unwrap();
let id_col = result
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(id_col.value(0), 0);
assert_eq!(id_col.value(1), (num_rows - 1) as i32);

let list_col = result.column(1).as_list::<i32>();
let first = list_col
.value(0)
.as_primitive::<arrow_array::types::Int32Type>()
.values()
.to_vec();
assert_eq!(first, vec![0, 1]);
let last = list_col
.value(1)
.as_primitive::<arrow_array::types::Int32Type>()
.values()
.to_vec();
let n = (num_rows - 1) as i32;
assert_eq!(last, vec![n * 10, n * 10 + 1]);
}

/// Regression test for <https://github.com/apache/arrow-rs/issues/9370>
///
/// When leaf columns inside a `List<Struct<…>>` have different page
/// boundaries (due to value-size differences), the `has_partial` state
/// bug causes one leaf to over-skip by one record while the other stays
/// correct.
#[test]
#[should_panic(expected = "Not all children array length are the same!")]
fn test_list_struct_page_boundary_desync_produces_length_mismatch() {
use arrow_array::Array;
use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder};
use arrow_schema::Fields;

let num_rows = 14usize;
// Long string forces the string column to flush pages much sooner
// than the int32 column, creating different page boundaries.
let long_prefix = "x".repeat(500);

// Schema: { vals: List<Struct<x: Int32, y: Utf8>> }
let struct_fields = Fields::from(vec![
Field::new("x", ArrowDataType::Int32, false),
Field::new("y", ArrowDataType::Utf8, false),
]);

// Build data: even rows have 2 list elements, odd rows have 3.
// This ensures different physical value counts per record, so
// reading from the wrong position produces a different total.
let mut list_builder = ListBuilder::new(StructBuilder::from_fields(struct_fields, 0));
for i in 0..num_rows {
let num_elems = if i % 2 == 0 { 2 } else { 3 };
let sb = list_builder.values();
for j in 0..num_elems {
sb.field_builder::<Int32Builder>(0)
.unwrap()
.append_value(i as i32 * 10 + j);
sb.field_builder::<StringBuilder>(1)
.unwrap()
.append_value(format!("{long_prefix}_{i}_{j}"));
sb.append(true);
}
list_builder.append(true);
}
let vals_array = list_builder.finish();

// Derive schema from the actual array type to avoid field-name mismatches.
let schema = Arc::new(Schema::new(vec![Field::new(
"vals",
vals_array.data_type().clone(),
false,
)]));

let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(vals_array) as ArrayRef]).unwrap();

// V2 pages + small max_data_page_size.
// Column x (Int32): all 14 rows fit in one page (~140 bytes values).
// Column y (Utf8, 500-byte strings): pages flush after every ~2 rows.
// This creates the page boundary asymmetry needed to trigger the bug.
let props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_write_batch_size(2)
.set_dictionary_enabled(false)
.set_data_page_size_limit(512)
.build();

let mut buffer = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let data = Bytes::from(buffer);

// 1. Read without row selection — should always succeed.
let reader = ParquetRecordBatchReaderBuilder::try_new(data.clone())
.unwrap()
.build()
.unwrap();
let batches: Vec<_> = reader.map(|r| r.unwrap()).collect();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, num_rows, "full read should return all rows");

// 2. Read with selection: select(1) + skip(10) + select(1).
// Without the fix, the string column (y) over-skips by 1,
// reading a record with a different element count than the
// int column (x). The inner StructArrayReader sees arrays
// of different lengths.
let selection = RowSelection::from(vec![
RowSelector::select(1),
RowSelector::skip(10),
RowSelector::select(1),
]);

let reader = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_row_selection(selection)
.with_row_selection_policy(RowSelectionPolicy::Selectors)
.build()
.unwrap();
let batches: Vec<RecordBatch> = reader.map(|r| r.unwrap()).collect();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 2, "selection should return exactly 2 rows");
}
Loading