perf: optimize skipper for varint values used when projecting Avro record types#9397
perf: optimize skipper for varint values used when projecting Avro record types#9397mzabaluev wants to merge 5 commits intoapache:mainfrom
Conversation
Handling the first byte specially does not demonstrate perf benefits.
|
|
||
| pub(crate) fn skip_varint(buf: &[u8]) -> Option<usize> { | ||
| if let Some(array) = buf.get(..10) { | ||
| return skip_varint_array(array.try_into().unwrap()); |
There was a problem hiding this comment.
Wha if get returns less than 10 elements?
buf.get(..10) can return less than that if the buf ends unexpectedly
There was a problem hiding this comment.
No, then .get(..10) returns None and the rest of the buffer is handled in the slow branch below.
There was a problem hiding this comment.
My bad, misread the docs^
|
|
||
| pub(crate) fn read_vlq(&mut self) -> Result<u64, AvroError> { | ||
| let (val, offset) = | ||
| read_varint(self.buf).ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?; |
There was a problem hiding this comment.
Can you revert this? I.e., keep this vlq::read_varint
There was a problem hiding this comment.
You mean, import both read_varint and skip_varint at the top and use without the module path?
EmilyMatt
left a comment
There was a problem hiding this comment.
LGTM! It's a nice and simple save, I wonder if we can get this bench to run in CI
jecsand838
left a comment
There was a problem hiding this comment.
@mzabaluev LMGTM! Really love this performance enhancement. I had a few comments I was wanting your thoughts on, but overall love the direction.
| use apache_avro::types::Value; | ||
| use apache_avro::{Schema as ApacheSchema, to_avro_datum}; |
There was a problem hiding this comment.
@mzabaluev Is it possible remove apache_avro? I had planned to remove it from the crate, but haven't gotten around to it yet. Refer to: #8079 (comment)
| fn skip_varint_array(buf: [u8; 10]) -> Option<usize> { | ||
| // Using buf.into_iter().enumerate() regresses performance by 1% on x86-64 | ||
| #[allow(clippy::needless_range_loop)] | ||
| for idx in 0..10 { | ||
| if buf[idx] < 0x80 { | ||
| return Some(idx + 1); | ||
| } | ||
| } | ||
| None | ||
| } |
There was a problem hiding this comment.
I think this introduces a regression due to skip_varint_array accepting overflowing 10-byte varints because it returns the first terminating byte without the overflow guard used by read_varint (count != 9 || byte < 2).
I added this regression test to arrow-avro/src/reader/mod.rs to verify:
fn corrupt_first_block_payload_byte(
mut bytes: Vec<u8>,
field_offset: usize,
expected_original: u8,
replacement: u8,
) -> Vec<u8> {
let mut header_decoder = HeaderDecoder::default();
let header_len = header_decoder.decode(&bytes).expect("decode header");
assert!(header_decoder.flush().is_some(), "decode complete header");
let mut cursor = &bytes[header_len..];
let (_, count_len) = crate::reader::vlq::read_varint(cursor).expect("decode block count");
cursor = &cursor[count_len..];
let (_, size_len) = crate::reader::vlq::read_varint(cursor).expect("decode block size");
let data_start = header_len + count_len + size_len;
let target = data_start + field_offset;
assert!(
target < bytes.len(),
"target byte offset {target} out of bounds for input length {}",
bytes.len()
);
assert_eq!(
bytes[target], expected_original,
"unexpected original byte at payload offset {field_offset}"
);
bytes[target] = replacement;
bytes
}
#[test]
fn ocf_projection_rejects_overflowing_varint_in_skipped_long_field() {
// Writer row payload is [bad_long=i64::MIN][keep=7]. The first field is encoded as
// 10-byte VLQ ending in 0x01. Flipping that terminator to 0x02 creates an overflow
// varint that must fail.
let writer_schema = Schema::new(vec![
Field::new("bad_long", DataType::Int64, false),
Field::new("keep", DataType::Int32, false),
]);
let batch = RecordBatch::try_new(
Arc::new(writer_schema.clone()),
vec![
Arc::new(Int64Array::from(vec![i64::MIN])) as ArrayRef,
Arc::new(Int32Array::from(vec![7])) as ArrayRef,
],
)
.expect("build writer batch");
let bytes = write_ocf(&writer_schema, &[batch]);
let mutated = corrupt_first_block_payload_byte(bytes, 9, 0x01, 0x02);
let err = ReaderBuilder::new()
.build(Cursor::new(mutated.clone()))
.expect("build full reader")
.collect::<Result<Vec<_>, _>>()
.expect_err("full decode should reject malformed varint");
assert!(matches!(err, ArrowError::AvroError(_)));
assert!(err.to_string().contains("bad varint"));
let err = ReaderBuilder::new()
.with_projection(vec![1])
.build(Cursor::new(mutated))
.expect("build projected reader")
.collect::<Result<Vec<_>, _>>()
.expect_err("projection must also reject malformed skipped varint");
assert!(matches!(err, ArrowError::AvroError(_)));
assert!(err.to_string().contains("bad varint"));
}| Self::Int32 => { | ||
| buf.get_int()?; | ||
| buf.skip_vlq()?; | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Switching skipped Int32 values from get_int() to skip_vlq() drops i32 range validation (u64 -> u32 overflow check). That means projected int-encoded fields can silently accept out of range varints that decode still rejects with varint overflow. I'm thinking in maybe worth preserving the Int32 overflow validation in the skip path so projection behavior matches full decode. However I'm 100% open to a better idea or any feedback incase I'm missing something.
I created a arrow-avro/src/reader/mod.rs file regression test for this as well:
fn corrupt_first_block_payload_byte(
mut bytes: Vec<u8>,
field_offset: usize,
expected_original: u8,
replacement: u8,
) -> Vec<u8> {
let mut header_decoder = HeaderDecoder::default();
let header_len = header_decoder.decode(&bytes).expect("decode header");
assert!(header_decoder.flush().is_some(), "decode complete header");
let mut cursor = &bytes[header_len..];
let (_, count_len) = crate::reader::vlq::read_varint(cursor).expect("decode block count");
cursor = &cursor[count_len..];
let (_, size_len) = crate::reader::vlq::read_varint(cursor).expect("decode block size");
let data_start = header_len + count_len + size_len;
let target = data_start + field_offset;
assert!(
target < bytes.len(),
"target byte offset {target} out of bounds for input length {}",
bytes.len()
);
assert_eq!(
bytes[target], expected_original,
"unexpected original byte at payload offset {field_offset}"
);
bytes[target] = replacement;
bytes
}
#[test]
fn ocf_projection_rejects_i32_overflow_in_skipped_int_field() {
// Writer row payload is [bad_int=i32::MIN][keep=11]. The first field encodes to
// ff ff ff ff 0f. Flipping 0x0f -> 0x10 keeps a syntactically valid varint, but now
// its value exceeds u32::MAX and must fail Int32 validation even when projected out.
let writer_schema = Schema::new(vec![
Field::new("bad_int", DataType::Int32, false),
Field::new("keep", DataType::Int64, false),
]);
let batch = RecordBatch::try_new(
Arc::new(writer_schema.clone()),
vec![
Arc::new(Int32Array::from(vec![i32::MIN])) as ArrayRef,
Arc::new(Int64Array::from(vec![11])) as ArrayRef,
],
)
.expect("build writer batch");
let bytes = write_ocf(&writer_schema, &[batch]);
let mutated = corrupt_first_block_payload_byte(bytes, 4, 0x0f, 0x10);
let err = ReaderBuilder::new()
.build(Cursor::new(mutated.clone()))
.expect("build full reader")
.collect::<Result<Vec<_>, _>>()
.expect_err("full decode should reject int overflow");
assert!(matches!(err, ArrowError::AvroError(_)));
assert!(err.to_string().contains("varint overflow"));
let err = ReaderBuilder::new()
.with_projection(vec![1])
.build(Cursor::new(mutated))
.expect("build projected reader")
.collect::<Result<Vec<_>, _>>()
.expect_err("projection must also reject skipped int overflow");
assert!(matches!(err, ArrowError::AvroError(_)));
assert!(err.to_string().contains("varint overflow"));
}
Rationale for this change
The
Skipperimplementation, used to skip over unneeded fields when projecting an Avro record type to a reader schema, delegates to theread_vlqcursor method for variable-length integer types. Besides checking the validity of the encoding, the decoding method performs computations to obtain the value, which is discarded at the skipper call site.What changes are included in this PR?
Provide a dedicated code path to skip over an encoded variable-length integer, and use it to implement
Skipperfor the types that uses this encoding.Are these changes tested?
A benchmark is added to evaluate the performance improvement.
It shows about 7% improvement in my testing on 11th Gen Intel Core i5-1135G7.
Are there any user-facing changes?
No