Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e32dd52
perf: optimize shuffle array element iteration with slice-based append
andygrove Jan 20, 2026
2ea5631
chore: format code
andygrove Jan 20, 2026
a224e22
refactor: remove unnecessary #[inline] from large functions
andygrove Jan 20, 2026
fe54548
fix: address clippy warnings in benchmark
andygrove Jan 20, 2026
471fb2a
perf: optimize struct field processing with field-major order
andygrove Jan 20, 2026
bbe2da7
test: add benchmark for struct column processing
andygrove Jan 20, 2026
f8fe7ba
fix: remove unused import and mut in benchmark
andygrove Jan 20, 2026
b87de09
merge
andygrove Jan 26, 2026
f3da0dc
perf: extend field-major processing to nested struct fields
andygrove Jan 26, 2026
3e8f5b5
perf: batch processing for List and Map columns
andygrove Jan 26, 2026
11a7bca
test: add benchmark for map column processing
andygrove Jan 26, 2026
9b91cfd
refactor: rename struct_conversion benchmark to complex_type_conversion
andygrove Jan 26, 2026
05d72a4
refactor: consolidate and rename shuffle benchmarks
andygrove Jan 26, 2026
0deac66
lint
andygrove Jan 26, 2026
a6123c0
refactor: add safety comments and reduce boilerplate in shuffle code
andygrove Jan 26, 2026
245c647
refactor: improve error handling for builder downcasts
andygrove Jan 26, 2026
ac8c182
lint
andygrove Jan 26, 2026
83d1d30
refactor: add safety comments to remaining unsafe blocks
andygrove Jan 26, 2026
11fe509
refactor: introduce read_row_at! macro to reduce boilerplate
andygrove Jan 26, 2026
ca702ee
refactor: replace unwrap with expect for builder downcasts
andygrove Jan 26, 2026
970e30a
smaller difff
andygrove Jan 26, 2026
1b76054
fix: resolve clippy errors in jni_api.rs
andygrove Jan 26, 2026
738c46e
Merge remote-tracking branch 'apache/main' into shuffle-complex-type-…
andygrove Jan 26, 2026
4c5eb0b
revert: restore row_columnar.rs benchmark and remove jvm_shuffle.rs
andygrove Jan 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,7 @@ harness = false
[[bench]]
name = "parquet_decode"
harness = false

[[bench]]
name = "array_element_append"
harness = false
272 changes: 272 additions & 0 deletions native/core/benches/array_element_append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Micro-benchmarks for SparkUnsafeArray element iteration.
//!
//! This tests the low-level `append_to_builder` function which converts
//! SparkUnsafeArray elements to Arrow array builders. This is the inner loop
//! used when processing List/Array columns in JVM shuffle.

use arrow::array::builder::{
Date32Builder, Float64Builder, Int32Builder, Int64Builder, TimestampMicrosecondBuilder,
};
use arrow::datatypes::{DataType, TimeUnit};
use comet::execution::shuffle::list::{append_to_builder, SparkUnsafeArray};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};

const NUM_ELEMENTS: usize = 10000;

/// Create a SparkUnsafeArray in memory with i32 elements.
/// Layout:
/// - 8 bytes: num_elements (i64)
/// - null bitset: 8 bytes per 64 elements
/// - element data: 4 bytes per element (i32)
fn create_spark_unsafe_array_i32(num_elements: usize, with_nulls: bool) -> Vec<u8> {
// Header size: 8 (num_elements) + ceil(num_elements/64) * 8 (null bitset)
let null_bitset_words = num_elements.div_ceil(64);
let header_size = 8 + null_bitset_words * 8;
let data_size = num_elements * 4; // i32 = 4 bytes
let total_size = header_size + data_size;

let mut buffer = vec![0u8; total_size];

// Write num_elements
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());

// Write null bitset (set every 10th element as null if with_nulls)
if with_nulls {
for i in (0..num_elements).step_by(10) {
let word_idx = i / 64;
let bit_idx = i % 64;
let word_offset = 8 + word_idx * 8;
let current_word =
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
let new_word = current_word | (1i64 << bit_idx);
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
}
}

// Write element data
for i in 0..num_elements {
let offset = header_size + i * 4;
buffer[offset..offset + 4].copy_from_slice(&(i as i32).to_le_bytes());
}

buffer
}

/// Create a SparkUnsafeArray in memory with i64 elements.
fn create_spark_unsafe_array_i64(num_elements: usize, with_nulls: bool) -> Vec<u8> {
let null_bitset_words = num_elements.div_ceil(64);
let header_size = 8 + null_bitset_words * 8;
let data_size = num_elements * 8; // i64 = 8 bytes
let total_size = header_size + data_size;

let mut buffer = vec![0u8; total_size];

// Write num_elements
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());

// Write null bitset
if with_nulls {
for i in (0..num_elements).step_by(10) {
let word_idx = i / 64;
let bit_idx = i % 64;
let word_offset = 8 + word_idx * 8;
let current_word =
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
let new_word = current_word | (1i64 << bit_idx);
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
}
}

// Write element data
for i in 0..num_elements {
let offset = header_size + i * 8;
buffer[offset..offset + 8].copy_from_slice(&(i as i64).to_le_bytes());
}

buffer
}

/// Create a SparkUnsafeArray in memory with f64 elements.
fn create_spark_unsafe_array_f64(num_elements: usize, with_nulls: bool) -> Vec<u8> {
let null_bitset_words = num_elements.div_ceil(64);
let header_size = 8 + null_bitset_words * 8;
let data_size = num_elements * 8; // f64 = 8 bytes
let total_size = header_size + data_size;

let mut buffer = vec![0u8; total_size];

// Write num_elements
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());

// Write null bitset
if with_nulls {
for i in (0..num_elements).step_by(10) {
let word_idx = i / 64;
let bit_idx = i % 64;
let word_offset = 8 + word_idx * 8;
let current_word =
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
let new_word = current_word | (1i64 << bit_idx);
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
}
}

// Write element data
for i in 0..num_elements {
let offset = header_size + i * 8;
buffer[offset..offset + 8].copy_from_slice(&(i as f64).to_le_bytes());
}

buffer
}

fn benchmark_array_conversion(c: &mut Criterion) {
let mut group = c.benchmark_group("spark_unsafe_array_to_arrow");

// Benchmark i32 array conversion
for with_nulls in [false, true] {
let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls);
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };

group.bench_with_input(
BenchmarkId::new("i32", null_str),
&(&array, &buffer),
|b, (array, _buffer)| {
b.iter(|| {
let mut builder = Int32Builder::with_capacity(NUM_ELEMENTS);
if with_nulls {
append_to_builder::<true>(&DataType::Int32, &mut builder, array).unwrap();
} else {
append_to_builder::<false>(&DataType::Int32, &mut builder, array).unwrap();
}
builder.finish()
});
},
);
}

// Benchmark i64 array conversion
for with_nulls in [false, true] {
let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls);
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };

group.bench_with_input(
BenchmarkId::new("i64", null_str),
&(&array, &buffer),
|b, (array, _buffer)| {
b.iter(|| {
let mut builder = Int64Builder::with_capacity(NUM_ELEMENTS);
if with_nulls {
append_to_builder::<true>(&DataType::Int64, &mut builder, array).unwrap();
} else {
append_to_builder::<false>(&DataType::Int64, &mut builder, array).unwrap();
}
builder.finish()
});
},
);
}

// Benchmark f64 array conversion
for with_nulls in [false, true] {
let buffer = create_spark_unsafe_array_f64(NUM_ELEMENTS, with_nulls);
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };

group.bench_with_input(
BenchmarkId::new("f64", null_str),
&(&array, &buffer),
|b, (array, _buffer)| {
b.iter(|| {
let mut builder = Float64Builder::with_capacity(NUM_ELEMENTS);
if with_nulls {
append_to_builder::<true>(&DataType::Float64, &mut builder, array).unwrap();
} else {
append_to_builder::<false>(&DataType::Float64, &mut builder, array)
.unwrap();
}
builder.finish()
});
},
);
}

// Benchmark date32 array conversion (same memory layout as i32)
for with_nulls in [false, true] {
let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls);
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };

group.bench_with_input(
BenchmarkId::new("date32", null_str),
&(&array, &buffer),
|b, (array, _buffer)| {
b.iter(|| {
let mut builder = Date32Builder::with_capacity(NUM_ELEMENTS);
if with_nulls {
append_to_builder::<true>(&DataType::Date32, &mut builder, array).unwrap();
} else {
append_to_builder::<false>(&DataType::Date32, &mut builder, array).unwrap();
}
builder.finish()
});
},
);
}

// Benchmark timestamp array conversion (same memory layout as i64)
for with_nulls in [false, true] {
let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls);
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };

group.bench_with_input(
BenchmarkId::new("timestamp", null_str),
&(&array, &buffer),
|b, (array, _buffer)| {
b.iter(|| {
let mut builder = TimestampMicrosecondBuilder::with_capacity(NUM_ELEMENTS);
let dt = DataType::Timestamp(TimeUnit::Microsecond, None);
if with_nulls {
append_to_builder::<true>(&dt, &mut builder, array).unwrap();
} else {
append_to_builder::<false>(&dt, &mut builder, array).unwrap();
}
builder.finish()
});
},
);
}

group.finish();
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = benchmark_array_conversion
}
criterion_main!(benches);
2 changes: 1 addition & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let physical_plan_time = start.elapsed();

exec_context.plan_creation_time += physical_plan_time;
exec_context.root_op = Some(Arc::clone(&root_op));
exec_context.scans = scans;

if exec_context.explain_native {
Expand All @@ -505,6 +504,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
// so we should always execute partition 0.
let stream = root_op.native_plan.execute(0, task_ctx)?;
exec_context.stream = Some(stream);
exec_context.root_op = Some(root_op);
} else {
// Pull input batches
pull_input_batches(exec_context)?;
Expand Down
Loading
Loading