Skip to content

perf: optimize skipper for varint values used when projecting Avro record types#9397

Open
mzabaluev wants to merge 5 commits intoapache:mainfrom
mzabaluev:faster-skippers
Open

perf: optimize skipper for varint values used when projecting Avro record types#9397
mzabaluev wants to merge 5 commits intoapache:mainfrom
mzabaluev:faster-skippers

Conversation

@mzabaluev
Copy link
Contributor

@mzabaluev mzabaluev commented Feb 11, 2026

Rationale for this change

The Skipper implementation, used to skip over unneeded fields when projecting an Avro record type to a reader schema, delegates to the read_vlq cursor method for variable-length integer types. Besides checking the validity of the encoding, the decoding method performs computations to obtain the value, which is discarded at the skipper call site.

What changes are included in this PR?

Provide a dedicated code path to skip over an encoded variable-length integer, and use it to implement Skipper for the types that uses this encoding.

Are these changes tested?

A benchmark is added to evaluate the performance improvement.
It shows about 7% improvement in my testing on 11th Gen Intel Core i5-1135G7.

Are there any user-facing changes?

No

@github-actions github-actions bot added arrow Changes to the arrow crate arrow-avro arrow-avro crate labels Feb 11, 2026
Handling the first byte specially does not demonstrate perf benefits.

pub(crate) fn skip_varint(buf: &[u8]) -> Option<usize> {
if let Some(array) = buf.get(..10) {
return skip_varint_array(array.try_into().unwrap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wha if get returns less than 10 elements?
buf.get(..10) can return less than that if the buf ends unexpectedly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, then .get(..10) returns None and the rest of the buffer is handled in the slow branch below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, misread the docs^


pub(crate) fn read_vlq(&mut self) -> Result<u64, AvroError> {
let (val, offset) =
read_varint(self.buf).ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you revert this? I.e., keep this vlq::read_varint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean, import both read_varint and skip_varint at the top and use without the module path?

Copy link
Contributor

@EmilyMatt EmilyMatt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! It's a nice and simple save, I wonder if we can get this bench to run in CI

Copy link
Contributor

@jecsand838 jecsand838 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mzabaluev LMGTM! Really love this performance enhancement. I had a few comments I was wanting your thoughts on, but overall love the direction.

Comment on lines +18 to +19
use apache_avro::types::Value;
use apache_avro::{Schema as ApacheSchema, to_avro_datum};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mzabaluev Is it possible remove apache_avro? I had planned to remove it from the crate, but haven't gotten around to it yet. Refer to: #8079 (comment)

Comment on lines +107 to +116
fn skip_varint_array(buf: [u8; 10]) -> Option<usize> {
// Using buf.into_iter().enumerate() regresses performance by 1% on x86-64
#[allow(clippy::needless_range_loop)]
for idx in 0..10 {
if buf[idx] < 0x80 {
return Some(idx + 1);
}
}
None
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this introduces a regression due to skip_varint_array accepting overflowing 10-byte varints because it returns the first terminating byte without the overflow guard used by read_varint (count != 9 || byte < 2).

I added this regression test to arrow-avro/src/reader/mod.rs to verify:

    fn corrupt_first_block_payload_byte(
        mut bytes: Vec<u8>,
        field_offset: usize,
        expected_original: u8,
        replacement: u8,
    ) -> Vec<u8> {
        let mut header_decoder = HeaderDecoder::default();
        let header_len = header_decoder.decode(&bytes).expect("decode header");
        assert!(header_decoder.flush().is_some(), "decode complete header");

        let mut cursor = &bytes[header_len..];
        let (_, count_len) = crate::reader::vlq::read_varint(cursor).expect("decode block count");
        cursor = &cursor[count_len..];
        let (_, size_len) = crate::reader::vlq::read_varint(cursor).expect("decode block size");
        let data_start = header_len + count_len + size_len;
        let target = data_start + field_offset;

        assert!(
            target < bytes.len(),
            "target byte offset {target} out of bounds for input length {}",
            bytes.len()
        );
        assert_eq!(
            bytes[target], expected_original,
            "unexpected original byte at payload offset {field_offset}"
        );
        bytes[target] = replacement;
        bytes
    }

    #[test]
    fn ocf_projection_rejects_overflowing_varint_in_skipped_long_field() {
        // Writer row payload is [bad_long=i64::MIN][keep=7]. The first field is encoded as
        // 10-byte VLQ ending in 0x01. Flipping that terminator to 0x02 creates an overflow
        // varint that must fail.
        let writer_schema = Schema::new(vec![
            Field::new("bad_long", DataType::Int64, false),
            Field::new("keep", DataType::Int32, false),
        ]);
        let batch = RecordBatch::try_new(
            Arc::new(writer_schema.clone()),
            vec![
                Arc::new(Int64Array::from(vec![i64::MIN])) as ArrayRef,
                Arc::new(Int32Array::from(vec![7])) as ArrayRef,
            ],
        )
        .expect("build writer batch");
        let bytes = write_ocf(&writer_schema, &[batch]);
        let mutated = corrupt_first_block_payload_byte(bytes, 9, 0x01, 0x02);

        let err = ReaderBuilder::new()
            .build(Cursor::new(mutated.clone()))
            .expect("build full reader")
            .collect::<Result<Vec<_>, _>>()
            .expect_err("full decode should reject malformed varint");
        assert!(matches!(err, ArrowError::AvroError(_)));
        assert!(err.to_string().contains("bad varint"));

        let err = ReaderBuilder::new()
            .with_projection(vec![1])
            .build(Cursor::new(mutated))
            .expect("build projected reader")
            .collect::<Result<Vec<_>, _>>()
            .expect_err("projection must also reject malformed skipped varint");
        assert!(matches!(err, ArrowError::AvroError(_)));
        assert!(err.to_string().contains("bad varint"));
    }

Comment on lines 2056 to 2059
Self::Int32 => {
buf.get_int()?;
buf.skip_vlq()?;
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching skipped Int32 values from get_int() to skip_vlq() drops i32 range validation (u64 -> u32 overflow check). That means projected int-encoded fields can silently accept out of range varints that decode still rejects with varint overflow. I'm thinking in maybe worth preserving the Int32 overflow validation in the skip path so projection behavior matches full decode. However I'm 100% open to a better idea or any feedback incase I'm missing something.

I created a arrow-avro/src/reader/mod.rs file regression test for this as well:

    fn corrupt_first_block_payload_byte(
        mut bytes: Vec<u8>,
        field_offset: usize,
        expected_original: u8,
        replacement: u8,
    ) -> Vec<u8> {
        let mut header_decoder = HeaderDecoder::default();
        let header_len = header_decoder.decode(&bytes).expect("decode header");
        assert!(header_decoder.flush().is_some(), "decode complete header");

        let mut cursor = &bytes[header_len..];
        let (_, count_len) = crate::reader::vlq::read_varint(cursor).expect("decode block count");
        cursor = &cursor[count_len..];
        let (_, size_len) = crate::reader::vlq::read_varint(cursor).expect("decode block size");
        let data_start = header_len + count_len + size_len;
        let target = data_start + field_offset;

        assert!(
            target < bytes.len(),
            "target byte offset {target} out of bounds for input length {}",
            bytes.len()
        );
        assert_eq!(
            bytes[target], expected_original,
            "unexpected original byte at payload offset {field_offset}"
        );
        bytes[target] = replacement;
        bytes
    }

    #[test]
    fn ocf_projection_rejects_i32_overflow_in_skipped_int_field() {
        // Writer row payload is [bad_int=i32::MIN][keep=11]. The first field encodes to
        // ff ff ff ff 0f. Flipping 0x0f -> 0x10 keeps a syntactically valid varint, but now
        // its value exceeds u32::MAX and must fail Int32 validation even when projected out.
        let writer_schema = Schema::new(vec![
            Field::new("bad_int", DataType::Int32, false),
            Field::new("keep", DataType::Int64, false),
        ]);
        let batch = RecordBatch::try_new(
            Arc::new(writer_schema.clone()),
            vec![
                Arc::new(Int32Array::from(vec![i32::MIN])) as ArrayRef,
                Arc::new(Int64Array::from(vec![11])) as ArrayRef,
            ],
        )
        .expect("build writer batch");
        let bytes = write_ocf(&writer_schema, &[batch]);
        let mutated = corrupt_first_block_payload_byte(bytes, 4, 0x0f, 0x10);

        let err = ReaderBuilder::new()
            .build(Cursor::new(mutated.clone()))
            .expect("build full reader")
            .collect::<Result<Vec<_>, _>>()
            .expect_err("full decode should reject int overflow");
        assert!(matches!(err, ArrowError::AvroError(_)));
        assert!(err.to_string().contains("varint overflow"));

        let err = ReaderBuilder::new()
            .with_projection(vec![1])
            .build(Cursor::new(mutated))
            .expect("build projected reader")
            .collect::<Result<Vec<_>, _>>()
            .expect_err("projection must also reject skipped int overflow");
        assert!(matches!(err, ArrowError::AvroError(_)));
        assert!(err.to_string().contains("varint overflow"));
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

arrow Changes to the arrow crate arrow-avro arrow-avro crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants