Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,7 @@ harness = false
[[bench]]
name = "encoder"
harness = false

[[bench]]
name = "project_record"
harness = false
239 changes: 239 additions & 0 deletions arrow-avro/benches/project_record.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use apache_avro::types::Value;
use apache_avro::{Schema as ApacheSchema, to_avro_datum};
Comment on lines +18 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mzabaluev Is it possible remove apache_avro? I had planned to remove it from the crate, but haven't gotten around to it yet. Refer to: #8079 (comment)

use arrow_avro::reader::{Decoder, ReaderBuilder};
use arrow_avro::schema::{
AvroSchema, Fingerprint, FingerprintAlgorithm, SINGLE_OBJECT_MAGIC, SchemaStore,
};
use criterion::{BatchSize, BenchmarkId, Criterion, Throughput};
use criterion::{criterion_group, criterion_main};
use rand::Rng;
use rand::rngs::ThreadRng;
use std::hint::black_box;

const BATCH_SIZE: usize = 8192;

const NUM_ROWS: usize = 10_000;

fn make_prefix(fp: Fingerprint) -> Vec<u8> {
match fp {
Fingerprint::Rabin(val) => {
let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of::<u64>());
buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
buf.extend_from_slice(&val.to_le_bytes()); // little-endian
buf
}
other => panic!("Unexpected fingerprint {other:?}"),
}
}

fn encode_records_with_prefix(
schema: &ApacheSchema,
prefix: &[u8],
rows: impl Iterator<Item = Value>,
) -> Vec<u8> {
let mut out = Vec::new();
for v in rows {
out.extend_from_slice(prefix);
out.extend_from_slice(&to_avro_datum(schema, v).expect("encode datum failed"));
}
out
}

fn gen_avro_data_with<F>(schema_json: &str, num_rows: usize, gen_fn: F) -> Vec<u8>
where
F: FnOnce(ThreadRng, &ApacheSchema, usize, &[u8]) -> Vec<u8>,
{
let schema = ApacheSchema::parse_str(schema_json).expect("invalid schema for generator");
let arrow_schema = AvroSchema::new(schema_json.to_owned());
let fingerprint = arrow_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.expect("fingerprint failed");
let prefix = make_prefix(fingerprint);
gen_fn(rand::rng(), &schema, num_rows, &prefix)
}

fn gen_int(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
Value::Record(vec![
("id".into(), Value::Int(i as i32)),
("field1".into(), Value::Int(rng.random())),
])
}),
)
}

fn gen_long(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
Value::Record(vec![
("id".into(), Value::Int(i as i32)),
("field1".into(), Value::Long(rng.random())),
])
}),
)
}

fn gen_float(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
Value::Record(vec![
("id".into(), Value::Int(i as i32)),
("field1".into(), Value::Float(rng.random())),
])
}),
)
}

fn gen_double(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
Value::Record(vec![
("id".into(), Value::Int(i as i32)),
("field1".into(), Value::Double(rng.random())),
])
}),
)
}

const READER_SCHEMA: &str = r#"
{
"type":"record",
"name":"table",
"fields": [
{ "name": "id", "type": "int" }
]
}
"#;

const INT_SCHEMA: &str = r#"
{
"type":"record",
"name":"table",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "field1", "type": "int" }
]
}
"#;

const LONG_SCHEMA: &str = r#"
{
"type":"record",
"name":"table",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "field1", "type": "long" }
]
}
"#;

const FLOAT_SCHEMA: &str = r#"
{
"type":"record",
"name":"table",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "field1", "type": "float" }
]
}
"#;

const DOUBLE_SCHEMA: &str = r#"
{
"type":"record",
"name":"table",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "field1", "type": "double" }
]
}
"#;

fn new_decoder(schema_json: &'static str, batch_size: usize) -> Decoder {
let schema = AvroSchema::new(schema_json.to_owned());
let mut store = SchemaStore::new();
store.register(schema).unwrap();
let reader_schema = AvroSchema::new(READER_SCHEMA.to_owned());
ReaderBuilder::new()
.with_writer_schema_store(store)
.with_batch_size(batch_size)
.with_reader_schema(reader_schema)
.build_decoder()
.expect("failed to build decoder")
}

fn bench_with_decoder<F>(
c: &mut Criterion,
name: &str,
data: &[u8],
num_rows: usize,
mut new_decoder: F,
) where
F: FnMut() -> Decoder,
{
let mut group = c.benchmark_group(name);
group.throughput(Throughput::Bytes(data.len() as u64));
group.bench_function(BenchmarkId::from_parameter(num_rows), |b| {
b.iter_batched_ref(
&mut new_decoder,
|decoder| {
black_box(decoder.decode(data).unwrap());
black_box(decoder.flush().unwrap().unwrap());
},
BatchSize::SmallInput,
)
});
group.finish();
}

fn criterion_benches(c: &mut Criterion) {
let data = gen_avro_data_with(INT_SCHEMA, NUM_ROWS, gen_int);
bench_with_decoder(c, "skip_int", &data, NUM_ROWS, || {
new_decoder(INT_SCHEMA, BATCH_SIZE)
});
let data = gen_avro_data_with(LONG_SCHEMA, NUM_ROWS, gen_long);
bench_with_decoder(c, "skip_long", &data, NUM_ROWS, || {
new_decoder(LONG_SCHEMA, BATCH_SIZE)
});
let data = gen_avro_data_with(FLOAT_SCHEMA, NUM_ROWS, gen_float);
bench_with_decoder(c, "skip_float", &data, NUM_ROWS, || {
new_decoder(FLOAT_SCHEMA, BATCH_SIZE)
});
let data = gen_avro_data_with(DOUBLE_SCHEMA, NUM_ROWS, gen_double);
bench_with_decoder(c, "skip_double", &data, NUM_ROWS, || {
new_decoder(DOUBLE_SCHEMA, BATCH_SIZE)
});
}

criterion_group! {
name = avro_project_record;
config = Criterion::default().configure_from_args();
targets = criterion_benches
}
criterion_main!(avro_project_record);
13 changes: 10 additions & 3 deletions arrow-avro/src/reader/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::errors::AvroError;
use crate::reader::vlq::read_varint;
use crate::reader::vlq;

/// A wrapper around a byte slice, providing low-level decoding for Avro
///
Expand Down Expand Up @@ -59,12 +59,19 @@ impl<'a> AvroCursor<'a> {
}

pub(crate) fn read_vlq(&mut self) -> Result<u64, AvroError> {
let (val, offset) =
read_varint(self.buf).ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you revert this? I.e., keep this vlq::read_varint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean, import both read_varint and skip_varint at the top and use without the module path?

let (val, offset) = vlq::read_varint(self.buf)
.ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?;
self.buf = &self.buf[offset..];
Ok(val)
}

pub(crate) fn skip_vlq(&mut self) -> Result<(), AvroError> {
let offset = vlq::skip_varint(self.buf)
.ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?;
self.buf = &self.buf[offset..];
Ok(())
}

#[inline]
pub(crate) fn get_int(&mut self) -> Result<i32, AvroError> {
let varint = self.read_vlq()?;
Expand Down
6 changes: 3 additions & 3 deletions arrow-avro/src/reader/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2054,15 +2054,15 @@ impl Skipper {
Ok(())
}
Self::Int32 => {
buf.get_int()?;
buf.skip_vlq()?;
Ok(())
}
Comment on lines 2056 to 2059
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching skipped Int32 values from get_int() to skip_vlq() drops i32 range validation (u64 -> u32 overflow check). That means projected int-encoded fields can silently accept out of range varints that decode still rejects with varint overflow. I'm thinking in maybe worth preserving the Int32 overflow validation in the skip path so projection behavior matches full decode. However I'm 100% open to a better idea or any feedback incase I'm missing something.

I created a arrow-avro/src/reader/mod.rs file regression test for this as well:

    fn corrupt_first_block_payload_byte(
        mut bytes: Vec<u8>,
        field_offset: usize,
        expected_original: u8,
        replacement: u8,
    ) -> Vec<u8> {
        let mut header_decoder = HeaderDecoder::default();
        let header_len = header_decoder.decode(&bytes).expect("decode header");
        assert!(header_decoder.flush().is_some(), "decode complete header");

        let mut cursor = &bytes[header_len..];
        let (_, count_len) = crate::reader::vlq::read_varint(cursor).expect("decode block count");
        cursor = &cursor[count_len..];
        let (_, size_len) = crate::reader::vlq::read_varint(cursor).expect("decode block size");
        let data_start = header_len + count_len + size_len;
        let target = data_start + field_offset;

        assert!(
            target < bytes.len(),
            "target byte offset {target} out of bounds for input length {}",
            bytes.len()
        );
        assert_eq!(
            bytes[target], expected_original,
            "unexpected original byte at payload offset {field_offset}"
        );
        bytes[target] = replacement;
        bytes
    }

    #[test]
    fn ocf_projection_rejects_i32_overflow_in_skipped_int_field() {
        // Writer row payload is [bad_int=i32::MIN][keep=11]. The first field encodes to
        // ff ff ff ff 0f. Flipping 0x0f -> 0x10 keeps a syntactically valid varint, but now
        // its value exceeds u32::MAX and must fail Int32 validation even when projected out.
        let writer_schema = Schema::new(vec![
            Field::new("bad_int", DataType::Int32, false),
            Field::new("keep", DataType::Int64, false),
        ]);
        let batch = RecordBatch::try_new(
            Arc::new(writer_schema.clone()),
            vec![
                Arc::new(Int32Array::from(vec![i32::MIN])) as ArrayRef,
                Arc::new(Int64Array::from(vec![11])) as ArrayRef,
            ],
        )
        .expect("build writer batch");
        let bytes = write_ocf(&writer_schema, &[batch]);
        let mutated = corrupt_first_block_payload_byte(bytes, 4, 0x0f, 0x10);

        let err = ReaderBuilder::new()
            .build(Cursor::new(mutated.clone()))
            .expect("build full reader")
            .collect::<Result<Vec<_>, _>>()
            .expect_err("full decode should reject int overflow");
        assert!(matches!(err, ArrowError::AvroError(_)));
        assert!(err.to_string().contains("varint overflow"));

        let err = ReaderBuilder::new()
            .with_projection(vec![1])
            .build(Cursor::new(mutated))
            .expect("build projected reader")
            .collect::<Result<Vec<_>, _>>()
            .expect_err("projection must also reject skipped int overflow");
        assert!(matches!(err, ArrowError::AvroError(_)));
        assert!(err.to_string().contains("varint overflow"));
    }

Self::Int64
| Self::TimeMicros
| Self::TimestampMillis
| Self::TimestampMicros
| Self::TimestampNanos => {
buf.get_long()?;
buf.skip_vlq()?;
Ok(())
}
Self::Float32 => {
Expand Down Expand Up @@ -2090,7 +2090,7 @@ impl Skipper {
Ok(())
}
Self::Enum => {
buf.get_int()?;
buf.skip_vlq()?;
Ok(())
}
Self::DurationFixed12 => {
Expand Down
28 changes: 28 additions & 0 deletions arrow-avro/src/reader/vlq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,34 @@ fn read_varint_slow(buf: &[u8]) -> Option<(u64, usize)> {
None
}

pub(crate) fn skip_varint(buf: &[u8]) -> Option<usize> {
if let Some(array) = buf.get(..10) {
return skip_varint_array(array.try_into().unwrap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wha if get returns less than 10 elements?
buf.get(..10) can return less than that if the buf ends unexpectedly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, then .get(..10) returns None and the rest of the buffer is handled in the slow branch below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, misread the docs^

}
skip_varint_slow(buf)
}

fn skip_varint_array(buf: [u8; 10]) -> Option<usize> {
// Using buf.into_iter().enumerate() regresses performance by 1% on x86-64
#[allow(clippy::needless_range_loop)]
for idx in 0..10 {
if buf[idx] < 0x80 {
return Some(idx + 1);
}
}
None
}
Comment on lines +107 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this introduces a regression due to skip_varint_array accepting overflowing 10-byte varints because it returns the first terminating byte without the overflow guard used by read_varint (count != 9 || byte < 2).

I added this regression test to arrow-avro/src/reader/mod.rs to verify:

    fn corrupt_first_block_payload_byte(
        mut bytes: Vec<u8>,
        field_offset: usize,
        expected_original: u8,
        replacement: u8,
    ) -> Vec<u8> {
        let mut header_decoder = HeaderDecoder::default();
        let header_len = header_decoder.decode(&bytes).expect("decode header");
        assert!(header_decoder.flush().is_some(), "decode complete header");

        let mut cursor = &bytes[header_len..];
        let (_, count_len) = crate::reader::vlq::read_varint(cursor).expect("decode block count");
        cursor = &cursor[count_len..];
        let (_, size_len) = crate::reader::vlq::read_varint(cursor).expect("decode block size");
        let data_start = header_len + count_len + size_len;
        let target = data_start + field_offset;

        assert!(
            target < bytes.len(),
            "target byte offset {target} out of bounds for input length {}",
            bytes.len()
        );
        assert_eq!(
            bytes[target], expected_original,
            "unexpected original byte at payload offset {field_offset}"
        );
        bytes[target] = replacement;
        bytes
    }

    #[test]
    fn ocf_projection_rejects_overflowing_varint_in_skipped_long_field() {
        // Writer row payload is [bad_long=i64::MIN][keep=7]. The first field is encoded as
        // 10-byte VLQ ending in 0x01. Flipping that terminator to 0x02 creates an overflow
        // varint that must fail.
        let writer_schema = Schema::new(vec![
            Field::new("bad_long", DataType::Int64, false),
            Field::new("keep", DataType::Int32, false),
        ]);
        let batch = RecordBatch::try_new(
            Arc::new(writer_schema.clone()),
            vec![
                Arc::new(Int64Array::from(vec![i64::MIN])) as ArrayRef,
                Arc::new(Int32Array::from(vec![7])) as ArrayRef,
            ],
        )
        .expect("build writer batch");
        let bytes = write_ocf(&writer_schema, &[batch]);
        let mutated = corrupt_first_block_payload_byte(bytes, 9, 0x01, 0x02);

        let err = ReaderBuilder::new()
            .build(Cursor::new(mutated.clone()))
            .expect("build full reader")
            .collect::<Result<Vec<_>, _>>()
            .expect_err("full decode should reject malformed varint");
        assert!(matches!(err, ArrowError::AvroError(_)));
        assert!(err.to_string().contains("bad varint"));

        let err = ReaderBuilder::new()
            .with_projection(vec![1])
            .build(Cursor::new(mutated))
            .expect("build projected reader")
            .collect::<Result<Vec<_>, _>>()
            .expect_err("projection must also reject malformed skipped varint");
        assert!(matches!(err, ArrowError::AvroError(_)));
        assert!(err.to_string().contains("bad varint"));
    }


#[cold]
fn skip_varint_slow(buf: &[u8]) -> Option<usize> {
for (idx, &byte) in buf.iter().take(10).enumerate() {
if byte < 0x80 {
return Some(idx + 1);
}
}
None
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading