-
Notifications
You must be signed in to change notification settings - Fork 1.1k
perf: optimize skipper for varint values used when projecting Avro record types #9397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
bd616ff
96d122d
af79e60
a553c3b
26c28d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -112,3 +112,7 @@ harness = false | |
| [[bench]] | ||
| name = "encoder" | ||
| harness = false | ||
|
|
||
| [[bench]] | ||
| name = "project_record" | ||
| harness = false | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,239 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use apache_avro::types::Value; | ||
| use apache_avro::{Schema as ApacheSchema, to_avro_datum}; | ||
|
Comment on lines
+18
to
+19
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mzabaluev Is it possible remove |
||
| use arrow_avro::reader::{Decoder, ReaderBuilder}; | ||
| use arrow_avro::schema::{ | ||
| AvroSchema, Fingerprint, FingerprintAlgorithm, SINGLE_OBJECT_MAGIC, SchemaStore, | ||
| }; | ||
| use criterion::{BatchSize, BenchmarkId, Criterion, Throughput}; | ||
| use criterion::{criterion_group, criterion_main}; | ||
| use rand::Rng; | ||
| use rand::rngs::ThreadRng; | ||
| use std::hint::black_box; | ||
|
|
||
| const BATCH_SIZE: usize = 8192; | ||
|
|
||
| const NUM_ROWS: usize = 10_000; | ||
|
|
||
| fn make_prefix(fp: Fingerprint) -> Vec<u8> { | ||
| match fp { | ||
| Fingerprint::Rabin(val) => { | ||
| let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of::<u64>()); | ||
| buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01 | ||
| buf.extend_from_slice(&val.to_le_bytes()); // little-endian | ||
| buf | ||
| } | ||
| other => panic!("Unexpected fingerprint {other:?}"), | ||
| } | ||
| } | ||
|
|
||
| fn encode_records_with_prefix( | ||
| schema: &ApacheSchema, | ||
| prefix: &[u8], | ||
| rows: impl Iterator<Item = Value>, | ||
| ) -> Vec<u8> { | ||
| let mut out = Vec::new(); | ||
| for v in rows { | ||
| out.extend_from_slice(prefix); | ||
| out.extend_from_slice(&to_avro_datum(schema, v).expect("encode datum failed")); | ||
| } | ||
| out | ||
| } | ||
|
|
||
| fn gen_avro_data_with<F>(schema_json: &str, num_rows: usize, gen_fn: F) -> Vec<u8> | ||
| where | ||
| F: FnOnce(ThreadRng, &ApacheSchema, usize, &[u8]) -> Vec<u8>, | ||
| { | ||
| let schema = ApacheSchema::parse_str(schema_json).expect("invalid schema for generator"); | ||
| let arrow_schema = AvroSchema::new(schema_json.to_owned()); | ||
| let fingerprint = arrow_schema | ||
| .fingerprint(FingerprintAlgorithm::Rabin) | ||
| .expect("fingerprint failed"); | ||
| let prefix = make_prefix(fingerprint); | ||
| gen_fn(rand::rng(), &schema, num_rows, &prefix) | ||
| } | ||
|
|
||
| fn gen_int(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> { | ||
| encode_records_with_prefix( | ||
| sc, | ||
| prefix, | ||
| (0..n).map(|i| { | ||
| Value::Record(vec![ | ||
| ("id".into(), Value::Int(i as i32)), | ||
| ("field1".into(), Value::Int(rng.random())), | ||
| ]) | ||
| }), | ||
| ) | ||
| } | ||
|
|
||
| fn gen_long(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> { | ||
| encode_records_with_prefix( | ||
| sc, | ||
| prefix, | ||
| (0..n).map(|i| { | ||
| Value::Record(vec![ | ||
| ("id".into(), Value::Int(i as i32)), | ||
| ("field1".into(), Value::Long(rng.random())), | ||
| ]) | ||
| }), | ||
| ) | ||
| } | ||
|
|
||
| fn gen_float(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> { | ||
| encode_records_with_prefix( | ||
| sc, | ||
| prefix, | ||
| (0..n).map(|i| { | ||
| Value::Record(vec![ | ||
| ("id".into(), Value::Int(i as i32)), | ||
| ("field1".into(), Value::Float(rng.random())), | ||
| ]) | ||
| }), | ||
| ) | ||
| } | ||
|
|
||
| fn gen_double(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> { | ||
| encode_records_with_prefix( | ||
| sc, | ||
| prefix, | ||
| (0..n).map(|i| { | ||
| Value::Record(vec![ | ||
| ("id".into(), Value::Int(i as i32)), | ||
| ("field1".into(), Value::Double(rng.random())), | ||
| ]) | ||
| }), | ||
| ) | ||
| } | ||
|
|
||
| const READER_SCHEMA: &str = r#" | ||
| { | ||
| "type":"record", | ||
| "name":"table", | ||
| "fields": [ | ||
| { "name": "id", "type": "int" } | ||
| ] | ||
| } | ||
| "#; | ||
|
|
||
| const INT_SCHEMA: &str = r#" | ||
| { | ||
| "type":"record", | ||
| "name":"table", | ||
| "fields": [ | ||
| { "name": "id", "type": "int" }, | ||
| { "name": "field1", "type": "int" } | ||
| ] | ||
| } | ||
| "#; | ||
|
|
||
| const LONG_SCHEMA: &str = r#" | ||
| { | ||
| "type":"record", | ||
| "name":"table", | ||
| "fields": [ | ||
| { "name": "id", "type": "int" }, | ||
| { "name": "field1", "type": "long" } | ||
| ] | ||
| } | ||
| "#; | ||
|
|
||
| const FLOAT_SCHEMA: &str = r#" | ||
| { | ||
| "type":"record", | ||
| "name":"table", | ||
| "fields": [ | ||
| { "name": "id", "type": "int" }, | ||
| { "name": "field1", "type": "float" } | ||
| ] | ||
| } | ||
| "#; | ||
|
|
||
| const DOUBLE_SCHEMA: &str = r#" | ||
| { | ||
| "type":"record", | ||
| "name":"table", | ||
| "fields": [ | ||
| { "name": "id", "type": "int" }, | ||
| { "name": "field1", "type": "double" } | ||
| ] | ||
| } | ||
| "#; | ||
|
|
||
| fn new_decoder(schema_json: &'static str, batch_size: usize) -> Decoder { | ||
| let schema = AvroSchema::new(schema_json.to_owned()); | ||
| let mut store = SchemaStore::new(); | ||
| store.register(schema).unwrap(); | ||
| let reader_schema = AvroSchema::new(READER_SCHEMA.to_owned()); | ||
| ReaderBuilder::new() | ||
| .with_writer_schema_store(store) | ||
| .with_batch_size(batch_size) | ||
| .with_reader_schema(reader_schema) | ||
| .build_decoder() | ||
| .expect("failed to build decoder") | ||
| } | ||
|
|
||
| fn bench_with_decoder<F>( | ||
| c: &mut Criterion, | ||
| name: &str, | ||
| data: &[u8], | ||
| num_rows: usize, | ||
| mut new_decoder: F, | ||
| ) where | ||
| F: FnMut() -> Decoder, | ||
| { | ||
| let mut group = c.benchmark_group(name); | ||
| group.throughput(Throughput::Bytes(data.len() as u64)); | ||
| group.bench_function(BenchmarkId::from_parameter(num_rows), |b| { | ||
| b.iter_batched_ref( | ||
| &mut new_decoder, | ||
| |decoder| { | ||
| black_box(decoder.decode(data).unwrap()); | ||
| black_box(decoder.flush().unwrap().unwrap()); | ||
| }, | ||
| BatchSize::SmallInput, | ||
| ) | ||
| }); | ||
| group.finish(); | ||
| } | ||
|
|
||
| fn criterion_benches(c: &mut Criterion) { | ||
| let data = gen_avro_data_with(INT_SCHEMA, NUM_ROWS, gen_int); | ||
| bench_with_decoder(c, "skip_int", &data, NUM_ROWS, || { | ||
| new_decoder(INT_SCHEMA, BATCH_SIZE) | ||
| }); | ||
| let data = gen_avro_data_with(LONG_SCHEMA, NUM_ROWS, gen_long); | ||
| bench_with_decoder(c, "skip_long", &data, NUM_ROWS, || { | ||
| new_decoder(LONG_SCHEMA, BATCH_SIZE) | ||
| }); | ||
| let data = gen_avro_data_with(FLOAT_SCHEMA, NUM_ROWS, gen_float); | ||
| bench_with_decoder(c, "skip_float", &data, NUM_ROWS, || { | ||
| new_decoder(FLOAT_SCHEMA, BATCH_SIZE) | ||
| }); | ||
| let data = gen_avro_data_with(DOUBLE_SCHEMA, NUM_ROWS, gen_double); | ||
| bench_with_decoder(c, "skip_double", &data, NUM_ROWS, || { | ||
| new_decoder(DOUBLE_SCHEMA, BATCH_SIZE) | ||
| }); | ||
| } | ||
|
|
||
| criterion_group! { | ||
| name = avro_project_record; | ||
| config = Criterion::default().configure_from_args(); | ||
| targets = criterion_benches | ||
| } | ||
| criterion_main!(avro_project_record); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,7 @@ | |
| // under the License. | ||
|
|
||
| use crate::errors::AvroError; | ||
| use crate::reader::vlq::read_varint; | ||
| use crate::reader::vlq; | ||
|
|
||
| /// A wrapper around a byte slice, providing low-level decoding for Avro | ||
| /// | ||
|
|
@@ -59,12 +59,19 @@ impl<'a> AvroCursor<'a> { | |
| } | ||
|
|
||
| pub(crate) fn read_vlq(&mut self) -> Result<u64, AvroError> { | ||
| let (val, offset) = | ||
| read_varint(self.buf).ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you revert this? I.e., keep this vlq::read_varint
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean, import both |
||
| let (val, offset) = vlq::read_varint(self.buf) | ||
| .ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?; | ||
| self.buf = &self.buf[offset..]; | ||
| Ok(val) | ||
| } | ||
|
|
||
| pub(crate) fn skip_vlq(&mut self) -> Result<(), AvroError> { | ||
| let offset = vlq::skip_varint(self.buf) | ||
| .ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?; | ||
| self.buf = &self.buf[offset..]; | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[inline] | ||
| pub(crate) fn get_int(&mut self) -> Result<i32, AvroError> { | ||
| let varint = self.read_vlq()?; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2054,15 +2054,15 @@ impl Skipper { | |
| Ok(()) | ||
| } | ||
| Self::Int32 => { | ||
| buf.get_int()?; | ||
| buf.skip_vlq()?; | ||
| Ok(()) | ||
| } | ||
|
Comment on lines
2056
to
2059
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switching skipped I created a fn corrupt_first_block_payload_byte(
mut bytes: Vec<u8>,
field_offset: usize,
expected_original: u8,
replacement: u8,
) -> Vec<u8> {
let mut header_decoder = HeaderDecoder::default();
let header_len = header_decoder.decode(&bytes).expect("decode header");
assert!(header_decoder.flush().is_some(), "decode complete header");
let mut cursor = &bytes[header_len..];
let (_, count_len) = crate::reader::vlq::read_varint(cursor).expect("decode block count");
cursor = &cursor[count_len..];
let (_, size_len) = crate::reader::vlq::read_varint(cursor).expect("decode block size");
let data_start = header_len + count_len + size_len;
let target = data_start + field_offset;
assert!(
target < bytes.len(),
"target byte offset {target} out of bounds for input length {}",
bytes.len()
);
assert_eq!(
bytes[target], expected_original,
"unexpected original byte at payload offset {field_offset}"
);
bytes[target] = replacement;
bytes
}
#[test]
fn ocf_projection_rejects_i32_overflow_in_skipped_int_field() {
// Writer row payload is [bad_int=i32::MIN][keep=11]. The first field encodes to
// ff ff ff ff 0f. Flipping 0x0f -> 0x10 keeps a syntactically valid varint, but now
// its value exceeds u32::MAX and must fail Int32 validation even when projected out.
let writer_schema = Schema::new(vec![
Field::new("bad_int", DataType::Int32, false),
Field::new("keep", DataType::Int64, false),
]);
let batch = RecordBatch::try_new(
Arc::new(writer_schema.clone()),
vec![
Arc::new(Int32Array::from(vec![i32::MIN])) as ArrayRef,
Arc::new(Int64Array::from(vec![11])) as ArrayRef,
],
)
.expect("build writer batch");
let bytes = write_ocf(&writer_schema, &[batch]);
let mutated = corrupt_first_block_payload_byte(bytes, 4, 0x0f, 0x10);
let err = ReaderBuilder::new()
.build(Cursor::new(mutated.clone()))
.expect("build full reader")
.collect::<Result<Vec<_>, _>>()
.expect_err("full decode should reject int overflow");
assert!(matches!(err, ArrowError::AvroError(_)));
assert!(err.to_string().contains("varint overflow"));
let err = ReaderBuilder::new()
.with_projection(vec![1])
.build(Cursor::new(mutated))
.expect("build projected reader")
.collect::<Result<Vec<_>, _>>()
.expect_err("projection must also reject skipped int overflow");
assert!(matches!(err, ArrowError::AvroError(_)));
assert!(err.to_string().contains("varint overflow"));
} |
||
| Self::Int64 | ||
| | Self::TimeMicros | ||
| | Self::TimestampMillis | ||
| | Self::TimestampMicros | ||
| | Self::TimestampNanos => { | ||
| buf.get_long()?; | ||
| buf.skip_vlq()?; | ||
| Ok(()) | ||
| } | ||
| Self::Float32 => { | ||
|
|
@@ -2090,7 +2090,7 @@ impl Skipper { | |
| Ok(()) | ||
| } | ||
| Self::Enum => { | ||
| buf.get_int()?; | ||
| buf.skip_vlq()?; | ||
| Ok(()) | ||
| } | ||
| Self::DurationFixed12 => { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -97,6 +97,34 @@ fn read_varint_slow(buf: &[u8]) -> Option<(u64, usize)> { | |
| None | ||
| } | ||
|
|
||
| pub(crate) fn skip_varint(buf: &[u8]) -> Option<usize> { | ||
| if let Some(array) = buf.get(..10) { | ||
| return skip_varint_array(array.try_into().unwrap()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wha if get returns less than 10 elements?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, then
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My bad, misread the docs^ |
||
| } | ||
| skip_varint_slow(buf) | ||
| } | ||
|
|
||
| fn skip_varint_array(buf: [u8; 10]) -> Option<usize> { | ||
| // Using buf.into_iter().enumerate() regresses performance by 1% on x86-64 | ||
| #[allow(clippy::needless_range_loop)] | ||
| for idx in 0..10 { | ||
| if buf[idx] < 0x80 { | ||
| return Some(idx + 1); | ||
| } | ||
| } | ||
| None | ||
| } | ||
|
Comment on lines
+107
to
+116
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this introduces a regression due to I added this regression test to fn corrupt_first_block_payload_byte(
mut bytes: Vec<u8>,
field_offset: usize,
expected_original: u8,
replacement: u8,
) -> Vec<u8> {
let mut header_decoder = HeaderDecoder::default();
let header_len = header_decoder.decode(&bytes).expect("decode header");
assert!(header_decoder.flush().is_some(), "decode complete header");
let mut cursor = &bytes[header_len..];
let (_, count_len) = crate::reader::vlq::read_varint(cursor).expect("decode block count");
cursor = &cursor[count_len..];
let (_, size_len) = crate::reader::vlq::read_varint(cursor).expect("decode block size");
let data_start = header_len + count_len + size_len;
let target = data_start + field_offset;
assert!(
target < bytes.len(),
"target byte offset {target} out of bounds for input length {}",
bytes.len()
);
assert_eq!(
bytes[target], expected_original,
"unexpected original byte at payload offset {field_offset}"
);
bytes[target] = replacement;
bytes
}
#[test]
fn ocf_projection_rejects_overflowing_varint_in_skipped_long_field() {
// Writer row payload is [bad_long=i64::MIN][keep=7]. The first field is encoded as
// 10-byte VLQ ending in 0x01. Flipping that terminator to 0x02 creates an overflow
// varint that must fail.
let writer_schema = Schema::new(vec![
Field::new("bad_long", DataType::Int64, false),
Field::new("keep", DataType::Int32, false),
]);
let batch = RecordBatch::try_new(
Arc::new(writer_schema.clone()),
vec![
Arc::new(Int64Array::from(vec![i64::MIN])) as ArrayRef,
Arc::new(Int32Array::from(vec![7])) as ArrayRef,
],
)
.expect("build writer batch");
let bytes = write_ocf(&writer_schema, &[batch]);
let mutated = corrupt_first_block_payload_byte(bytes, 9, 0x01, 0x02);
let err = ReaderBuilder::new()
.build(Cursor::new(mutated.clone()))
.expect("build full reader")
.collect::<Result<Vec<_>, _>>()
.expect_err("full decode should reject malformed varint");
assert!(matches!(err, ArrowError::AvroError(_)));
assert!(err.to_string().contains("bad varint"));
let err = ReaderBuilder::new()
.with_projection(vec![1])
.build(Cursor::new(mutated))
.expect("build projected reader")
.collect::<Result<Vec<_>, _>>()
.expect_err("projection must also reject malformed skipped varint");
assert!(matches!(err, ArrowError::AvroError(_)));
assert!(err.to_string().contains("bad varint"));
} |
||
|
|
||
| #[cold] | ||
| fn skip_varint_slow(buf: &[u8]) -> Option<usize> { | ||
| for (idx, &byte) in buf.iter().take(10).enumerate() { | ||
| if byte < 0x80 { | ||
| return Some(idx + 1); | ||
| } | ||
| } | ||
| None | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.