From 6867eedb46ecbd95f34f63ac02d8fee3ebac4625 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 10:42:43 -0700 Subject: [PATCH 1/6] test: add comprehensive JVM shuffle benchmarks Add jvm_shuffle.rs benchmark that covers the full range of data types processed by `process_sorted_row_partition()` in JVM shuffle: - Primitive columns (100 Int64 columns) - Struct (flat with 5/10/20 fields) - Nested struct (2 levels deep) - Deeply nested struct (3 levels deep) - List - Map This replaces the old row_columnar.rs which only tested primitive columns. These benchmarks help measure the performance of the row-to-columnar conversion used by CometColumnarShuffle when writing shuffle data. Co-Authored-By: Claude Opus 4.5 --- native/core/Cargo.toml | 2 +- native/core/benches/jvm_shuffle.rs | 841 ++++++++++++++++++++++++++++ native/core/benches/row_columnar.rs | 113 ---- 3 files changed, 842 insertions(+), 114 deletions(-) create mode 100644 native/core/benches/jvm_shuffle.rs delete mode 100644 native/core/benches/row_columnar.rs diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index b13d6d54fd..c3fce2daf3 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -122,7 +122,7 @@ name = "bit_util" harness = false [[bench]] -name = "row_columnar" +name = "jvm_shuffle" harness = false [[bench]] diff --git a/native/core/benches/jvm_shuffle.rs b/native/core/benches/jvm_shuffle.rs new file mode 100644 index 0000000000..f07151c36a --- /dev/null +++ b/native/core/benches/jvm_shuffle.rs @@ -0,0 +1,841 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for JVM shuffle row-to-columnar conversion. +//! +//! This benchmark measures the performance of converting Spark UnsafeRow +//! to Arrow arrays via `process_sorted_row_partition()`, which is called +//! by JVM shuffle (CometColumnarShuffle) when writing shuffle data. +//! +//! Covers: +//! - Primitive types (Int64) +//! - Struct (flat, nested, deeply nested) +//! - List +//! - Map + +use arrow::datatypes::{DataType, Field, Fields}; +use comet::execution::shuffle::row::{ + process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow, +}; +use comet::execution::shuffle::CompressionCodec; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use std::sync::Arc; +use tempfile::Builder; + +const BATCH_SIZE: usize = 5000; + +/// Create a struct schema with the given number of int64 fields. +fn make_struct_schema(num_fields: usize) -> DataType { + let fields: Vec = (0..num_fields) + .map(|i| Field::new(format!("f{}", i), DataType::Int64, true)) + .collect(); + DataType::Struct(Fields::from(fields)) +} + +/// Calculate the row size for a struct with the given number of fields. +/// UnsafeRow layout: [null bits] [fixed-length values] +/// For struct: the struct value is stored as offset+size (8 bytes) pointing to nested row +fn get_row_size(num_struct_fields: usize) -> usize { + // Top-level row has 1 column (the struct) + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + // Struct pointer (offset + size) is 8 bytes + let struct_pointer_size = 8; + // Nested struct row + let nested_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_struct_fields); + let nested_data_size = num_struct_fields * 8; // int64 values + + top_level_bitset_width + struct_pointer_size + nested_bitset_width + nested_data_size +} + +struct RowData { + data: Vec, +} + +impl RowData { + fn new(num_struct_fields: usize) -> Self { + let row_size = get_row_size(num_struct_fields); + let mut data = vec![0u8; row_size]; + + // Top-level row layout: + // [null bits for 1 field] [struct pointer (offset, size)] + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + + // Nested struct starts after top-level row header + pointer + let nested_offset = top_level_bitset_width + 8; + let nested_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_struct_fields); + let nested_size = nested_bitset_width + num_struct_fields * 8; + + // Write struct pointer (offset in upper 32 bits, size in lower 32 bits) + let offset_and_size = ((nested_offset as i64) << 32) | (nested_size as i64); + data[top_level_bitset_width..top_level_bitset_width + 8] + .copy_from_slice(&offset_and_size.to_le_bytes()); + + // Fill nested struct with some data + for i in 0..num_struct_fields { + let value_offset = nested_offset + nested_bitset_width + i * 8; + let value = (i as i64) * 100; + data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); + } + + RowData { data } + } + + fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { + spark_row.point_to_slice(&self.data); + } +} + +fn benchmark_struct_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("struct_conversion"); + + // Test with different struct sizes and row counts + for num_fields in [5, 10, 20] { + for num_rows in [1000, 10000] { + let schema = vec![make_struct_schema(num_fields)]; + + // Create row data + let rows: Vec = (0..num_rows).map(|_| RowData::new(num_fields)).collect(); + + let spark_rows: Vec = rows + .iter() + .map(|row_data| { + let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); + row_data.to_spark_row(&mut spark_row); + // Mark the struct column as not null + spark_row.set_not_null_at(0); + spark_row + }) + .collect(); + + let mut row_addresses: Vec = + spark_rows.iter().map(|row| row.get_row_addr()).collect(); + let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); + + let row_address_ptr = row_addresses.as_mut_ptr(); + let row_size_ptr = row_sizes.as_mut_ptr(); + + group.bench_with_input( + BenchmarkId::new( + format!("fields_{}", num_fields), + format!("rows_{}", num_rows), + ), + &(num_rows, &schema), + |b, (num_rows, schema)| { + b.iter(|| { + let tempfile = Builder::new().tempfile().unwrap(); + + process_sorted_row_partition( + *num_rows, + BATCH_SIZE, + row_address_ptr, + row_size_ptr, + schema, + tempfile.path().to_str().unwrap().to_string(), + 1.0, + false, + 0, + None, + &CompressionCodec::Zstd(1), + ) + .unwrap(); + }); + }, + ); + + // Keep spark_rows alive for the benchmark + std::mem::drop(spark_rows); + } + } + + group.finish(); +} + +/// Create a schema with nested structs: Struct> +fn make_nested_struct_schema(num_fields: usize) -> DataType { + let inner_fields: Vec = (0..num_fields) + .map(|i| Field::new(format!("inner_f{}", i), DataType::Int64, true)) + .collect(); + let inner_struct = DataType::Struct(Fields::from(inner_fields)); + let outer_fields = vec![Field::new("nested", inner_struct, true)]; + DataType::Struct(Fields::from(outer_fields)) +} + +/// Create a schema with deeply nested structs (3 levels): Struct>> +fn make_deeply_nested_struct_schema(num_fields: usize) -> DataType { + let inner_fields: Vec = (0..num_fields) + .map(|i| Field::new(format!("inner_f{}", i), DataType::Int64, true)) + .collect(); + let inner_struct = DataType::Struct(Fields::from(inner_fields)); + let middle_fields = vec![Field::new("level2", inner_struct, true)]; + let middle_struct = DataType::Struct(Fields::from(middle_fields)); + let outer_fields = vec![Field::new("level1", middle_struct, true)]; + DataType::Struct(Fields::from(outer_fields)) +} + +/// Calculate row size for nested struct: Struct> +fn get_nested_row_size(num_inner_fields: usize) -> usize { + // Top-level row has 1 column (the outer struct) + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let struct_pointer_size = 8; + + // Outer struct has 1 field (the inner struct) + let outer_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let outer_struct_size = outer_bitset_width + 8; // pointer to inner struct + + // Inner struct has num_inner_fields int64 fields + let inner_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_inner_fields); + let inner_data_size = num_inner_fields * 8; + let inner_struct_size = inner_bitset_width + inner_data_size; + + top_level_bitset_width + struct_pointer_size + outer_struct_size + inner_struct_size +} + +/// Calculate row size for deeply nested struct: Struct>> +fn get_deeply_nested_row_size(num_inner_fields: usize) -> usize { + // Top-level row has 1 column (the level1 struct) + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let struct_pointer_size = 8; + + // Level 1 struct has 1 field (the level2 struct) + let level1_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let level1_struct_size = level1_bitset_width + 8; + + // Level 2 struct has 1 field (the inner struct) + let level2_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let level2_struct_size = level2_bitset_width + 8; + + // Inner struct has num_inner_fields int64 fields + let inner_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_inner_fields); + let inner_data_size = num_inner_fields * 8; + let inner_struct_size = inner_bitset_width + inner_data_size; + + top_level_bitset_width + + struct_pointer_size + + level1_struct_size + + level2_struct_size + + inner_struct_size +} + +struct NestedRowData { + data: Vec, +} + +impl NestedRowData { + fn new(num_inner_fields: usize) -> Self { + let row_size = get_nested_row_size(num_inner_fields); + let mut data = vec![0u8; row_size]; + + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let outer_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let inner_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_inner_fields); + + // Calculate offsets + let outer_struct_start = top_level_bitset_width + 8; + let outer_struct_size = outer_bitset_width + 8; + let inner_struct_start = outer_struct_start + outer_struct_size; + let inner_struct_size = inner_bitset_width + num_inner_fields * 8; + + // Write top-level struct pointer (points to outer struct) + let outer_offset_and_size = + ((outer_struct_start as i64) << 32) | (outer_struct_size as i64); + data[top_level_bitset_width..top_level_bitset_width + 8] + .copy_from_slice(&outer_offset_and_size.to_le_bytes()); + + // Write outer struct pointer (points to inner struct) + // Offset is relative to outer struct start + let inner_relative_offset = inner_struct_start - outer_struct_start; + let inner_offset_and_size = + ((inner_relative_offset as i64) << 32) | (inner_struct_size as i64); + data[outer_struct_start + outer_bitset_width..outer_struct_start + outer_bitset_width + 8] + .copy_from_slice(&inner_offset_and_size.to_le_bytes()); + + // Fill inner struct with some data + for i in 0..num_inner_fields { + let value_offset = inner_struct_start + inner_bitset_width + i * 8; + let value = (i as i64) * 100; + data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); + } + + NestedRowData { data } + } + + fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { + spark_row.point_to_slice(&self.data); + } +} + +struct DeeplyNestedRowData { + data: Vec, +} + +impl DeeplyNestedRowData { + fn new(num_inner_fields: usize) -> Self { + let row_size = get_deeply_nested_row_size(num_inner_fields); + let mut data = vec![0u8; row_size]; + + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let level1_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let level2_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let inner_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_inner_fields); + + // Calculate offsets + let level1_struct_start = top_level_bitset_width + 8; + let level1_struct_size = level1_bitset_width + 8; + let level2_struct_start = level1_struct_start + level1_struct_size; + let level2_struct_size = level2_bitset_width + 8; + let inner_struct_start = level2_struct_start + level2_struct_size; + let inner_struct_size = inner_bitset_width + num_inner_fields * 8; + + // Write top-level struct pointer (points to level1 struct) + let level1_offset_and_size = + ((level1_struct_start as i64) << 32) | (level1_struct_size as i64); + data[top_level_bitset_width..top_level_bitset_width + 8] + .copy_from_slice(&level1_offset_and_size.to_le_bytes()); + + // Write level1 struct pointer (points to level2 struct) + let level2_relative_offset = level2_struct_start - level1_struct_start; + let level2_offset_and_size = + ((level2_relative_offset as i64) << 32) | (level2_struct_size as i64); + data[level1_struct_start + level1_bitset_width + ..level1_struct_start + level1_bitset_width + 8] + .copy_from_slice(&level2_offset_and_size.to_le_bytes()); + + // Write level2 struct pointer (points to inner struct) + let inner_relative_offset = inner_struct_start - level2_struct_start; + let inner_offset_and_size = + ((inner_relative_offset as i64) << 32) | (inner_struct_size as i64); + data[level2_struct_start + level2_bitset_width + ..level2_struct_start + level2_bitset_width + 8] + .copy_from_slice(&inner_offset_and_size.to_le_bytes()); + + // Fill inner struct with some data + for i in 0..num_inner_fields { + let value_offset = inner_struct_start + inner_bitset_width + i * 8; + let value = (i as i64) * 100; + data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); + } + + DeeplyNestedRowData { data } + } + + fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { + spark_row.point_to_slice(&self.data); + } +} + +fn benchmark_nested_struct_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("nested_struct_conversion"); + + // Test nested structs with different inner field counts + for num_fields in [5, 10, 20] { + for num_rows in [1000, 10000] { + let schema = vec![make_nested_struct_schema(num_fields)]; + + // Create row data + let rows: Vec = (0..num_rows) + .map(|_| NestedRowData::new(num_fields)) + .collect(); + + let spark_rows: Vec = rows + .iter() + .map(|row_data| { + let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); + row_data.to_spark_row(&mut spark_row); + spark_row.set_not_null_at(0); + spark_row + }) + .collect(); + + let mut row_addresses: Vec = + spark_rows.iter().map(|row| row.get_row_addr()).collect(); + let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); + + let row_address_ptr = row_addresses.as_mut_ptr(); + let row_size_ptr = row_sizes.as_mut_ptr(); + + group.bench_with_input( + BenchmarkId::new( + format!("inner_fields_{}", num_fields), + format!("rows_{}", num_rows), + ), + &(num_rows, &schema), + |b, (num_rows, schema)| { + b.iter(|| { + let tempfile = Builder::new().tempfile().unwrap(); + + process_sorted_row_partition( + *num_rows, + BATCH_SIZE, + row_address_ptr, + row_size_ptr, + schema, + tempfile.path().to_str().unwrap().to_string(), + 1.0, + false, + 0, + None, + &CompressionCodec::Zstd(1), + ) + .unwrap(); + }); + }, + ); + + std::mem::drop(spark_rows); + } + } + + group.finish(); +} + +fn benchmark_deeply_nested_struct_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("deeply_nested_struct_conversion"); + + // Test deeply nested structs (3 levels) with different inner field counts + for num_fields in [5, 10, 20] { + for num_rows in [1000, 10000] { + let schema = vec![make_deeply_nested_struct_schema(num_fields)]; + + // Create row data + let rows: Vec = (0..num_rows) + .map(|_| DeeplyNestedRowData::new(num_fields)) + .collect(); + + let spark_rows: Vec = rows + .iter() + .map(|row_data| { + let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); + row_data.to_spark_row(&mut spark_row); + spark_row.set_not_null_at(0); + spark_row + }) + .collect(); + + let mut row_addresses: Vec = + spark_rows.iter().map(|row| row.get_row_addr()).collect(); + let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); + + let row_address_ptr = row_addresses.as_mut_ptr(); + let row_size_ptr = row_sizes.as_mut_ptr(); + + group.bench_with_input( + BenchmarkId::new( + format!("inner_fields_{}", num_fields), + format!("rows_{}", num_rows), + ), + &(num_rows, &schema), + |b, (num_rows, schema)| { + b.iter(|| { + let tempfile = Builder::new().tempfile().unwrap(); + + process_sorted_row_partition( + *num_rows, + BATCH_SIZE, + row_address_ptr, + row_size_ptr, + schema, + tempfile.path().to_str().unwrap().to_string(), + 1.0, + false, + 0, + None, + &CompressionCodec::Zstd(1), + ) + .unwrap(); + }); + }, + ); + + std::mem::drop(spark_rows); + } + } + + group.finish(); +} + +/// Create a schema with a list column: List +fn make_list_schema(element_type: DataType) -> DataType { + DataType::List(Arc::new(Field::new("item", element_type, true))) +} + +/// Calculate row size for a list with the given number of elements. +/// UnsafeRow layout for list: [null bits] [list pointer (offset, size)] +/// List data: [num_elements (8 bytes)] [null bits] [element data] +fn get_list_row_size(num_elements: usize, element_size: usize) -> usize { + // Top-level row has 1 column (the list) + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let list_pointer_size = 8; + + // List header: num_elements (8 bytes) + null bitset + let list_null_bitset = ((num_elements + 63) / 64) * 8; + let list_header = 8 + list_null_bitset; + let list_data_size = num_elements * element_size; + + top_level_bitset_width + list_pointer_size + list_header + list_data_size +} + +struct ListRowData { + data: Vec, +} + +impl ListRowData { + fn new_int64_list(num_elements: usize) -> Self { + let row_size = get_list_row_size(num_elements, 8); + let mut data = vec![0u8; row_size]; + + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let list_null_bitset = ((num_elements + 63) / 64) * 8; + + // List starts after top-level header + pointer + let list_offset = top_level_bitset_width + 8; + let list_size = 8 + list_null_bitset + num_elements * 8; + + // Write list pointer (offset in upper 32 bits, size in lower 32 bits) + let offset_and_size = ((list_offset as i64) << 32) | (list_size as i64); + data[top_level_bitset_width..top_level_bitset_width + 8] + .copy_from_slice(&offset_and_size.to_le_bytes()); + + // Write number of elements at list start + data[list_offset..list_offset + 8].copy_from_slice(&(num_elements as i64).to_le_bytes()); + + // Fill list with data (after header) + let data_start = list_offset + 8 + list_null_bitset; + for i in 0..num_elements { + let value_offset = data_start + i * 8; + let value = (i as i64) * 100; + data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); + } + + ListRowData { data } + } + + fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { + spark_row.point_to_slice(&self.data); + } +} + +fn benchmark_list_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("list_conversion"); + + // Test with different list sizes and row counts + for num_elements in [10, 100] { + for num_rows in [1000, 10000] { + let schema = vec![make_list_schema(DataType::Int64)]; + + // Create row data - each row has a list with num_elements items + let rows: Vec = (0..num_rows) + .map(|_| ListRowData::new_int64_list(num_elements)) + .collect(); + + let spark_rows: Vec = rows + .iter() + .map(|row_data| { + let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); + row_data.to_spark_row(&mut spark_row); + spark_row.set_not_null_at(0); + spark_row + }) + .collect(); + + let mut row_addresses: Vec = + spark_rows.iter().map(|row| row.get_row_addr()).collect(); + let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); + + let row_address_ptr = row_addresses.as_mut_ptr(); + let row_size_ptr = row_sizes.as_mut_ptr(); + + group.bench_with_input( + BenchmarkId::new( + format!("elements_{}", num_elements), + format!("rows_{}", num_rows), + ), + &(num_rows, &schema), + |b, (num_rows, schema)| { + b.iter(|| { + let tempfile = Builder::new().tempfile().unwrap(); + + process_sorted_row_partition( + *num_rows, + BATCH_SIZE, + row_address_ptr, + row_size_ptr, + schema, + tempfile.path().to_str().unwrap().to_string(), + 1.0, + false, + 0, + None, + &CompressionCodec::Zstd(1), + ) + .unwrap(); + }); + }, + ); + + std::mem::drop(spark_rows); + } + } + + group.finish(); +} + +/// Create a schema with a map column: Map +fn make_map_schema() -> DataType { + // Map is represented as List> in Arrow + let key_field = Field::new("key", DataType::Int64, false); + let value_field = Field::new("value", DataType::Int64, true); + let entries_field = Field::new( + "entries", + DataType::Struct(Fields::from(vec![key_field, value_field])), + false, + ); + DataType::Map(Arc::new(entries_field), false) +} + +/// Calculate row size for a map with the given number of entries. +/// UnsafeRow layout for map: [null bits] [map pointer (offset, size)] +/// Map data: [key_array_size (8 bytes)] [key_array] [value_array] +/// Array format: [num_elements (8 bytes)] [null bits] [element data] +fn get_map_row_size(num_entries: usize) -> usize { + // Top-level row has 1 column (the map) + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let map_pointer_size = 8; + + // Key array: num_elements (8) + null bitset + data + let key_null_bitset = ((num_entries + 63) / 64) * 8; + let key_array_size = 8 + key_null_bitset + num_entries * 8; + + // Value array: num_elements (8) + null bitset + data + let value_null_bitset = ((num_entries + 63) / 64) * 8; + let value_array_size = 8 + value_null_bitset + num_entries * 8; + + // Map header (key array size) + key array + value array + let map_size = 8 + key_array_size + value_array_size; + + top_level_bitset_width + map_pointer_size + map_size +} + +struct MapRowData { + data: Vec, +} + +impl MapRowData { + fn new_int64_map(num_entries: usize) -> Self { + let row_size = get_map_row_size(num_entries); + let mut data = vec![0u8; row_size]; + + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + let key_null_bitset = ((num_entries + 63) / 64) * 8; + let value_null_bitset = ((num_entries + 63) / 64) * 8; + + let key_array_size = 8 + key_null_bitset + num_entries * 8; + let value_array_size = 8 + value_null_bitset + num_entries * 8; + let map_size = 8 + key_array_size + value_array_size; + + // Map starts after top-level header + pointer + let map_offset = top_level_bitset_width + 8; + + // Write map pointer (offset in upper 32 bits, size in lower 32 bits) + let offset_and_size = ((map_offset as i64) << 32) | (map_size as i64); + data[top_level_bitset_width..top_level_bitset_width + 8] + .copy_from_slice(&offset_and_size.to_le_bytes()); + + // Write key array size at map start + data[map_offset..map_offset + 8].copy_from_slice(&(key_array_size as i64).to_le_bytes()); + + // Key array starts after map header + let key_array_offset = map_offset + 8; + // Write number of elements + data[key_array_offset..key_array_offset + 8] + .copy_from_slice(&(num_entries as i64).to_le_bytes()); + // Fill key data (after header) + let key_data_start = key_array_offset + 8 + key_null_bitset; + for i in 0..num_entries { + let value_offset = key_data_start + i * 8; + let value = i as i64; + data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); + } + + // Value array starts after key array + let value_array_offset = key_array_offset + key_array_size; + // Write number of elements + data[value_array_offset..value_array_offset + 8] + .copy_from_slice(&(num_entries as i64).to_le_bytes()); + // Fill value data (after header) + let value_data_start = value_array_offset + 8 + value_null_bitset; + for i in 0..num_entries { + let value_offset = value_data_start + i * 8; + let value = (i as i64) * 100; + data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); + } + + MapRowData { data } + } + + fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { + spark_row.point_to_slice(&self.data); + } +} + +fn benchmark_map_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("map_conversion"); + + // Test with different map sizes and row counts + for num_entries in [10, 100] { + for num_rows in [1000, 10000] { + let schema = vec![make_map_schema()]; + + // Create row data - each row has a map with num_entries items + let rows: Vec = (0..num_rows) + .map(|_| MapRowData::new_int64_map(num_entries)) + .collect(); + + let spark_rows: Vec = rows + .iter() + .map(|row_data| { + let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); + row_data.to_spark_row(&mut spark_row); + spark_row.set_not_null_at(0); + spark_row + }) + .collect(); + + let mut row_addresses: Vec = + spark_rows.iter().map(|row| row.get_row_addr()).collect(); + let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); + + let row_address_ptr = row_addresses.as_mut_ptr(); + let row_size_ptr = row_sizes.as_mut_ptr(); + + group.bench_with_input( + BenchmarkId::new( + format!("entries_{}", num_entries), + format!("rows_{}", num_rows), + ), + &(num_rows, &schema), + |b, (num_rows, schema)| { + b.iter(|| { + let tempfile = Builder::new().tempfile().unwrap(); + + process_sorted_row_partition( + *num_rows, + BATCH_SIZE, + row_address_ptr, + row_size_ptr, + schema, + tempfile.path().to_str().unwrap().to_string(), + 1.0, + false, + 0, + None, + &CompressionCodec::Zstd(1), + ) + .unwrap(); + }); + }, + ); + + std::mem::drop(spark_rows); + } + } + + group.finish(); +} + +/// Benchmark for primitive type columns (many Int64 columns). +/// This tests the baseline performance without complex type overhead. +fn benchmark_primitive_columns(c: &mut Criterion) { + let mut group = c.benchmark_group("primitive_columns"); + + const NUM_COLS: usize = 100; + let row_size: usize = SparkUnsafeRow::get_row_bitset_width(NUM_COLS) + NUM_COLS * 8; + + for num_rows in [1000, 10000] { + let schema = vec![DataType::Int64; NUM_COLS]; + + // Create row data + let row_data: Vec> = (0..num_rows) + .map(|_| { + let mut data = vec![0u8; row_size]; + // Fill with some data after the bitset + for i in SparkUnsafeRow::get_row_bitset_width(NUM_COLS)..row_size { + data[i] = i as u8; + } + data + }) + .collect(); + + let spark_rows: Vec = row_data + .iter() + .map(|data| { + let mut spark_row = SparkUnsafeRow::new_with_num_fields(NUM_COLS); + spark_row.point_to_slice(data); + for i in 0..NUM_COLS { + spark_row.set_not_null_at(i); + } + spark_row + }) + .collect(); + + let mut row_addresses: Vec = + spark_rows.iter().map(|row| row.get_row_addr()).collect(); + let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); + + let row_address_ptr = row_addresses.as_mut_ptr(); + let row_size_ptr = row_sizes.as_mut_ptr(); + + group.bench_with_input( + BenchmarkId::new("cols_100", format!("rows_{}", num_rows)), + &(num_rows, &schema), + |b, (num_rows, schema)| { + b.iter(|| { + let tempfile = Builder::new().tempfile().unwrap(); + + process_sorted_row_partition( + *num_rows, + BATCH_SIZE, + row_address_ptr, + row_size_ptr, + schema, + tempfile.path().to_str().unwrap().to_string(), + 1.0, + false, + 0, + None, + &CompressionCodec::Zstd(1), + ) + .unwrap(); + }); + }, + ); + + std::mem::drop(spark_rows); + } + + group.finish(); +} + +fn config() -> Criterion { + Criterion::default() +} + +criterion_group! { + name = benches; + config = config(); + targets = benchmark_primitive_columns, benchmark_struct_conversion, benchmark_nested_struct_conversion, benchmark_deeply_nested_struct_conversion, benchmark_list_conversion, benchmark_map_conversion +} +criterion_main!(benches); diff --git a/native/core/benches/row_columnar.rs b/native/core/benches/row_columnar.rs deleted file mode 100644 index a62574111b..0000000000 --- a/native/core/benches/row_columnar.rs +++ /dev/null @@ -1,113 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::datatypes::DataType as ArrowDataType; -use comet::execution::shuffle::row::{ - process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow, -}; -use comet::execution::shuffle::CompressionCodec; -use criterion::{criterion_group, criterion_main, Criterion}; -use tempfile::Builder; - -const NUM_ROWS: usize = 10000; -const BATCH_SIZE: usize = 5000; -const NUM_COLS: usize = 100; -const ROW_SIZE: usize = SparkUnsafeRow::get_row_bitset_width(NUM_COLS) + NUM_COLS * 8; - -fn benchmark(c: &mut Criterion) { - let mut group = c.benchmark_group("row_array_conversion"); - - group.bench_function("row_to_array", |b| { - let spark_rows = (0..NUM_ROWS) - .map(|_| { - let mut spark_row = SparkUnsafeRow::new_with_num_fields(NUM_COLS); - let mut row = Row::new(); - - for i in SparkUnsafeRow::get_row_bitset_width(NUM_COLS)..ROW_SIZE { - row.data[i] = i as u8; - } - - row.to_spark_row(&mut spark_row); - - for i in 0..NUM_COLS { - spark_row.set_not_null_at(i); - } - - spark_row - }) - .collect::>(); - - let mut row_addresses = spark_rows - .iter() - .map(|row| row.get_row_addr()) - .collect::>(); - let mut row_sizes = spark_rows - .iter() - .map(|row| row.get_row_size()) - .collect::>(); - - let row_address_ptr = row_addresses.as_mut_ptr(); - let row_size_ptr = row_sizes.as_mut_ptr(); - let schema = vec![ArrowDataType::Int64; NUM_COLS]; - - b.iter(|| { - let tempfile = Builder::new().tempfile().unwrap(); - - process_sorted_row_partition( - NUM_ROWS, - BATCH_SIZE, - row_address_ptr, - row_size_ptr, - &schema, - tempfile.path().to_str().unwrap().to_string(), - 1.0, - false, - 0, - None, - &CompressionCodec::Zstd(1), - ) - .unwrap(); - }); - }); -} - -struct Row { - data: Box<[u8; ROW_SIZE]>, -} - -impl Row { - pub fn new() -> Self { - Row { - data: Box::new([0u8; ROW_SIZE]), - } - } - - pub fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { - spark_row.point_to_slice(self.data.as_ref()); - } -} - -fn config() -> Criterion { - Criterion::default() -} - -criterion_group! { - name = benches; - config = config(); - targets = benchmark -} -criterion_main!(benches); From 69fb543943e46c7edb22744296f882de5a57ca27 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 10:45:51 -0700 Subject: [PATCH 2/6] lint --- native/core/benches/jvm_shuffle.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/native/core/benches/jvm_shuffle.rs b/native/core/benches/jvm_shuffle.rs index f07151c36a..c8c13e9a52 100644 --- a/native/core/benches/jvm_shuffle.rs +++ b/native/core/benches/jvm_shuffle.rs @@ -791,8 +791,7 @@ fn benchmark_primitive_columns(c: &mut Criterion) { }) .collect(); - let mut row_addresses: Vec = - spark_rows.iter().map(|row| row.get_row_addr()).collect(); + let mut row_addresses: Vec = spark_rows.iter().map(|row| row.get_row_addr()).collect(); let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); let row_address_ptr = row_addresses.as_mut_ptr(); From a337b0f1df71aa613083d197adf07acd006e82dc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 12:25:59 -0700 Subject: [PATCH 3/6] fix: resolve clippy warnings in jvm_shuffle benchmark Use div_ceil() instead of manual ceiling division and replace needless range loop with iterator pattern. Co-Authored-By: Claude Opus 4.5 --- native/core/benches/jvm_shuffle.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/native/core/benches/jvm_shuffle.rs b/native/core/benches/jvm_shuffle.rs index c8c13e9a52..c7c4b7af3d 100644 --- a/native/core/benches/jvm_shuffle.rs +++ b/native/core/benches/jvm_shuffle.rs @@ -481,7 +481,7 @@ fn get_list_row_size(num_elements: usize, element_size: usize) -> usize { let list_pointer_size = 8; // List header: num_elements (8 bytes) + null bitset - let list_null_bitset = ((num_elements + 63) / 64) * 8; + let list_null_bitset = num_elements.div_ceil(64) * 8; let list_header = 8 + list_null_bitset; let list_data_size = num_elements * element_size; @@ -498,7 +498,7 @@ impl ListRowData { let mut data = vec![0u8; row_size]; let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let list_null_bitset = ((num_elements + 63) / 64) * 8; + let list_null_bitset = num_elements.div_ceil(64) * 8; // List starts after top-level header + pointer let list_offset = top_level_bitset_width + 8; @@ -616,11 +616,11 @@ fn get_map_row_size(num_entries: usize) -> usize { let map_pointer_size = 8; // Key array: num_elements (8) + null bitset + data - let key_null_bitset = ((num_entries + 63) / 64) * 8; + let key_null_bitset = num_entries.div_ceil(64) * 8; let key_array_size = 8 + key_null_bitset + num_entries * 8; // Value array: num_elements (8) + null bitset + data - let value_null_bitset = ((num_entries + 63) / 64) * 8; + let value_null_bitset = num_entries.div_ceil(64) * 8; let value_array_size = 8 + value_null_bitset + num_entries * 8; // Map header (key array size) + key array + value array @@ -639,8 +639,8 @@ impl MapRowData { let mut data = vec![0u8; row_size]; let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let key_null_bitset = ((num_entries + 63) / 64) * 8; - let value_null_bitset = ((num_entries + 63) / 64) * 8; + let key_null_bitset = num_entries.div_ceil(64) * 8; + let value_null_bitset = num_entries.div_ceil(64) * 8; let key_array_size = 8 + key_null_bitset + num_entries * 8; let value_array_size = 8 + value_null_bitset + num_entries * 8; @@ -772,8 +772,13 @@ fn benchmark_primitive_columns(c: &mut Criterion) { .map(|_| { let mut data = vec![0u8; row_size]; // Fill with some data after the bitset - for i in SparkUnsafeRow::get_row_bitset_width(NUM_COLS)..row_size { - data[i] = i as u8; + for (i, byte) in data + .iter_mut() + .enumerate() + .take(row_size) + .skip(SparkUnsafeRow::get_row_bitset_width(NUM_COLS)) + { + *byte = i as u8; } data }) From 5d6d9a5dbefc29cb93b13ef6d5fc295253a26177 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 12:32:15 -0700 Subject: [PATCH 4/6] rename --- native/core/Cargo.toml | 2 +- native/core/benches/{jvm_shuffle.rs => row_to_columnar.rs} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename native/core/benches/{jvm_shuffle.rs => row_to_columnar.rs} (100%) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 499175fba2..83e5995c38 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -122,7 +122,7 @@ name = "bit_util" harness = false [[bench]] -name = "jvm_shuffle" +name = "row_to_columnar" harness = false [[bench]] diff --git a/native/core/benches/jvm_shuffle.rs b/native/core/benches/row_to_columnar.rs similarity index 100% rename from native/core/benches/jvm_shuffle.rs rename to native/core/benches/row_to_columnar.rs From 8d1302b03d22d56dff5c4f7d24e11dff6ef0f650 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 12:32:44 -0700 Subject: [PATCH 5/6] rename --- native/core/Cargo.toml | 2 +- native/core/benches/{row_to_columnar.rs => row_columnar.rs} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename native/core/benches/{row_to_columnar.rs => row_columnar.rs} (100%) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 83e5995c38..07d4c6cc8f 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -122,7 +122,7 @@ name = "bit_util" harness = false [[bench]] -name = "row_to_columnar" +name = "row_columnar" harness = false [[bench]] diff --git a/native/core/benches/row_to_columnar.rs b/native/core/benches/row_columnar.rs similarity index 100% rename from native/core/benches/row_to_columnar.rs rename to native/core/benches/row_columnar.rs From c51c3ab69a5d50d5acf9d4ed0816cd85ee0f15dd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 6 Feb 2026 11:08:47 -0700 Subject: [PATCH 6/6] refactor: simplify row_columnar benchmark to address review feedback Replace magic numbers with named constants (INT64_SIZE, UNSAFE_ROW_POINTER_SIZE, ARRAY_HEADER_SIZE), unify three struct schema builders and three row-data structs into single parameterized functions, and extract duplicated benchmark runner into a shared helper. Reduces the file from ~845 to ~305 lines. Co-Authored-By: Claude Opus 4.6 --- native/core/benches/row_columnar.rs | 1012 ++++++++------------------- 1 file changed, 284 insertions(+), 728 deletions(-) diff --git a/native/core/benches/row_columnar.rs b/native/core/benches/row_columnar.rs index c7c4b7af3d..5c39645ba6 100644 --- a/native/core/benches/row_columnar.rs +++ b/native/core/benches/row_columnar.rs @@ -17,15 +17,9 @@ //! Benchmarks for JVM shuffle row-to-columnar conversion. //! -//! This benchmark measures the performance of converting Spark UnsafeRow -//! to Arrow arrays via `process_sorted_row_partition()`, which is called -//! by JVM shuffle (CometColumnarShuffle) when writing shuffle data. -//! -//! Covers: -//! - Primitive types (Int64) -//! - Struct (flat, nested, deeply nested) -//! - List -//! - Map +//! Measures `process_sorted_row_partition()` performance for converting Spark +//! UnsafeRow data to Arrow arrays, covering primitive, struct (flat/nested), +//! list, and map types. use arrow::datatypes::{DataType, Field, Fields}; use comet::execution::shuffle::row::{ @@ -38,801 +32,360 @@ use tempfile::Builder; const BATCH_SIZE: usize = 5000; -/// Create a struct schema with the given number of int64 fields. -fn make_struct_schema(num_fields: usize) -> DataType { - let fields: Vec = (0..num_fields) - .map(|i| Field::new(format!("f{}", i), DataType::Int64, true)) - .collect(); - DataType::Struct(Fields::from(fields)) -} - -/// Calculate the row size for a struct with the given number of fields. -/// UnsafeRow layout: [null bits] [fixed-length values] -/// For struct: the struct value is stored as offset+size (8 bytes) pointing to nested row -fn get_row_size(num_struct_fields: usize) -> usize { - // Top-level row has 1 column (the struct) - let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - // Struct pointer (offset + size) is 8 bytes - let struct_pointer_size = 8; - // Nested struct row - let nested_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_struct_fields); - let nested_data_size = num_struct_fields * 8; // int64 values - - top_level_bitset_width + struct_pointer_size + nested_bitset_width + nested_data_size -} +/// Size of an Int64 value in bytes. +const INT64_SIZE: usize = 8; -struct RowData { - data: Vec, -} +/// Size of a pointer in Spark's UnsafeRow format. Encodes a 32-bit offset +/// (upper bits) and 32-bit size (lower bits) — always 8 bytes regardless of +/// hardware architecture. +const UNSAFE_ROW_POINTER_SIZE: usize = 8; -impl RowData { - fn new(num_struct_fields: usize) -> Self { - let row_size = get_row_size(num_struct_fields); - let mut data = vec![0u8; row_size]; - - // Top-level row layout: - // [null bits for 1 field] [struct pointer (offset, size)] - let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - - // Nested struct starts after top-level row header + pointer - let nested_offset = top_level_bitset_width + 8; - let nested_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_struct_fields); - let nested_size = nested_bitset_width + num_struct_fields * 8; - - // Write struct pointer (offset in upper 32 bits, size in lower 32 bits) - let offset_and_size = ((nested_offset as i64) << 32) | (nested_size as i64); - data[top_level_bitset_width..top_level_bitset_width + 8] - .copy_from_slice(&offset_and_size.to_le_bytes()); - - // Fill nested struct with some data - for i in 0..num_struct_fields { - let value_offset = nested_offset + nested_bitset_width + i * 8; - let value = (i as i64) * 100; - data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); - } +/// Size of the element-count field in UnsafeRow array/map headers. +const ARRAY_HEADER_SIZE: usize = 8; - RowData { data } - } +// ─── UnsafeRow helpers ────────────────────────────────────────────────────── - fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { - spark_row.point_to_slice(&self.data); - } +/// Write an UnsafeRow offset+size pointer at `pos` in `data`. +fn write_pointer(data: &mut [u8], pos: usize, offset: usize, size: usize) { + let value = ((offset as i64) << 32) | (size as i64); + data[pos..pos + UNSAFE_ROW_POINTER_SIZE].copy_from_slice(&value.to_le_bytes()); } -fn benchmark_struct_conversion(c: &mut Criterion) { - let mut group = c.benchmark_group("struct_conversion"); - - // Test with different struct sizes and row counts - for num_fields in [5, 10, 20] { - for num_rows in [1000, 10000] { - let schema = vec![make_struct_schema(num_fields)]; - - // Create row data - let rows: Vec = (0..num_rows).map(|_| RowData::new(num_fields)).collect(); - - let spark_rows: Vec = rows - .iter() - .map(|row_data| { - let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); - row_data.to_spark_row(&mut spark_row); - // Mark the struct column as not null - spark_row.set_not_null_at(0); - spark_row - }) - .collect(); - - let mut row_addresses: Vec = - spark_rows.iter().map(|row| row.get_row_addr()).collect(); - let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); - - let row_address_ptr = row_addresses.as_mut_ptr(); - let row_size_ptr = row_sizes.as_mut_ptr(); - - group.bench_with_input( - BenchmarkId::new( - format!("fields_{}", num_fields), - format!("rows_{}", num_rows), - ), - &(num_rows, &schema), - |b, (num_rows, schema)| { - b.iter(|| { - let tempfile = Builder::new().tempfile().unwrap(); - - process_sorted_row_partition( - *num_rows, - BATCH_SIZE, - row_address_ptr, - row_size_ptr, - schema, - tempfile.path().to_str().unwrap().to_string(), - 1.0, - false, - 0, - None, - &CompressionCodec::Zstd(1), - ) - .unwrap(); - }); - }, - ); - - // Keep spark_rows alive for the benchmark - std::mem::drop(spark_rows); - } - } - - group.finish(); -} - -/// Create a schema with nested structs: Struct> -fn make_nested_struct_schema(num_fields: usize) -> DataType { - let inner_fields: Vec = (0..num_fields) - .map(|i| Field::new(format!("inner_f{}", i), DataType::Int64, true)) - .collect(); - let inner_struct = DataType::Struct(Fields::from(inner_fields)); - let outer_fields = vec![Field::new("nested", inner_struct, true)]; - DataType::Struct(Fields::from(outer_fields)) +/// Byte size of a null-bitset for `n` elements (64-bit words, rounded up). +fn null_bitset_size(n: usize) -> usize { + n.div_ceil(64) * 8 } -/// Create a schema with deeply nested structs (3 levels): Struct>> -fn make_deeply_nested_struct_schema(num_fields: usize) -> DataType { - let inner_fields: Vec = (0..num_fields) - .map(|i| Field::new(format!("inner_f{}", i), DataType::Int64, true)) +// ─── Schema builders ──────────────────────────────────────────────────────── + +/// Create a struct schema with `depth` nesting levels and `num_leaf_fields` +/// Int64 leaf fields. +/// +/// - depth=1: `Struct` +/// - depth=2: `Struct>` +/// - depth=3: `Struct>>` +fn make_struct_schema(depth: usize, num_leaf_fields: usize) -> DataType { + let leaf_fields: Vec = (0..num_leaf_fields) + .map(|i| Field::new(format!("f{i}"), DataType::Int64, true)) .collect(); - let inner_struct = DataType::Struct(Fields::from(inner_fields)); - let middle_fields = vec![Field::new("level2", inner_struct, true)]; - let middle_struct = DataType::Struct(Fields::from(middle_fields)); - let outer_fields = vec![Field::new("level1", middle_struct, true)]; - DataType::Struct(Fields::from(outer_fields)) -} - -/// Calculate row size for nested struct: Struct> -fn get_nested_row_size(num_inner_fields: usize) -> usize { - // Top-level row has 1 column (the outer struct) - let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let struct_pointer_size = 8; - - // Outer struct has 1 field (the inner struct) - let outer_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let outer_struct_size = outer_bitset_width + 8; // pointer to inner struct - - // Inner struct has num_inner_fields int64 fields - let inner_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_inner_fields); - let inner_data_size = num_inner_fields * 8; - let inner_struct_size = inner_bitset_width + inner_data_size; - - top_level_bitset_width + struct_pointer_size + outer_struct_size + inner_struct_size + let mut dt = DataType::Struct(Fields::from(leaf_fields)); + for _ in 0..depth - 1 { + dt = DataType::Struct(Fields::from(vec![Field::new("nested", dt, true)])); + } + dt } -/// Calculate row size for deeply nested struct: Struct>> -fn get_deeply_nested_row_size(num_inner_fields: usize) -> usize { - // Top-level row has 1 column (the level1 struct) - let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let struct_pointer_size = 8; - - // Level 1 struct has 1 field (the level2 struct) - let level1_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let level1_struct_size = level1_bitset_width + 8; - - // Level 2 struct has 1 field (the inner struct) - let level2_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let level2_struct_size = level2_bitset_width + 8; - - // Inner struct has num_inner_fields int64 fields - let inner_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_inner_fields); - let inner_data_size = num_inner_fields * 8; - let inner_struct_size = inner_bitset_width + inner_data_size; - - top_level_bitset_width - + struct_pointer_size - + level1_struct_size - + level2_struct_size - + inner_struct_size +fn make_list_schema() -> DataType { + DataType::List(Arc::new(Field::new("item", DataType::Int64, true))) } -struct NestedRowData { - data: Vec, +fn make_map_schema() -> DataType { + let entries = Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Int64, false), + Field::new("value", DataType::Int64, true), + ])), + false, + ); + DataType::Map(Arc::new(entries), false) } -impl NestedRowData { - fn new(num_inner_fields: usize) -> Self { - let row_size = get_nested_row_size(num_inner_fields); - let mut data = vec![0u8; row_size]; - - let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let outer_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let inner_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_inner_fields); - - // Calculate offsets - let outer_struct_start = top_level_bitset_width + 8; - let outer_struct_size = outer_bitset_width + 8; - let inner_struct_start = outer_struct_start + outer_struct_size; - let inner_struct_size = inner_bitset_width + num_inner_fields * 8; - - // Write top-level struct pointer (points to outer struct) - let outer_offset_and_size = - ((outer_struct_start as i64) << 32) | (outer_struct_size as i64); - data[top_level_bitset_width..top_level_bitset_width + 8] - .copy_from_slice(&outer_offset_and_size.to_le_bytes()); - - // Write outer struct pointer (points to inner struct) - // Offset is relative to outer struct start - let inner_relative_offset = inner_struct_start - outer_struct_start; - let inner_offset_and_size = - ((inner_relative_offset as i64) << 32) | (inner_struct_size as i64); - data[outer_struct_start + outer_bitset_width..outer_struct_start + outer_bitset_width + 8] - .copy_from_slice(&inner_offset_and_size.to_le_bytes()); - - // Fill inner struct with some data - for i in 0..num_inner_fields { - let value_offset = inner_struct_start + inner_bitset_width + i * 8; - let value = (i as i64) * 100; - data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); +// ─── Row data builders ────────────────────────────────────────────────────── + +/// Build a binary UnsafeRow containing a struct column with `depth` nesting +/// levels and `num_leaf_fields` Int64 fields at the innermost level. +fn build_struct_row(depth: usize, num_leaf_fields: usize) -> Vec { + let top_bitset = SparkUnsafeRow::get_row_bitset_width(1); + let inter_bitset = SparkUnsafeRow::get_row_bitset_width(1); + let leaf_bitset = SparkUnsafeRow::get_row_bitset_width(num_leaf_fields); + + let inter_level_size = inter_bitset + UNSAFE_ROW_POINTER_SIZE; + let leaf_level_size = leaf_bitset + num_leaf_fields * INT64_SIZE; + + let total = top_bitset + + UNSAFE_ROW_POINTER_SIZE + + (depth - 1) * inter_level_size + + leaf_level_size; + let mut data = vec![0u8; total]; + + // Absolute start position of each struct level in the buffer + let mut struct_starts = Vec::with_capacity(depth); + let mut pos = top_bitset + UNSAFE_ROW_POINTER_SIZE; + for level in 0..depth { + struct_starts.push(pos); + if level < depth - 1 { + pos += inter_level_size; } - - NestedRowData { data } } - fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { - spark_row.point_to_slice(&self.data); + // Top-level pointer → first struct (absolute offset from row start) + let first_size = if depth == 1 { + leaf_level_size + } else { + inter_level_size + }; + write_pointer(&mut data, top_bitset, struct_starts[0], first_size); + + // Intermediate struct pointers (offsets relative to their own struct start) + for level in 0..depth - 1 { + let next_size = if level + 1 == depth - 1 { + leaf_level_size + } else { + inter_level_size + }; + write_pointer( + &mut data, + struct_starts[level] + inter_bitset, + struct_starts[level + 1] - struct_starts[level], + next_size, + ); } -} -struct DeeplyNestedRowData { - data: Vec, -} - -impl DeeplyNestedRowData { - fn new(num_inner_fields: usize) -> Self { - let row_size = get_deeply_nested_row_size(num_inner_fields); - let mut data = vec![0u8; row_size]; - - let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let level1_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let level2_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let inner_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_inner_fields); - - // Calculate offsets - let level1_struct_start = top_level_bitset_width + 8; - let level1_struct_size = level1_bitset_width + 8; - let level2_struct_start = level1_struct_start + level1_struct_size; - let level2_struct_size = level2_bitset_width + 8; - let inner_struct_start = level2_struct_start + level2_struct_size; - let inner_struct_size = inner_bitset_width + num_inner_fields * 8; - - // Write top-level struct pointer (points to level1 struct) - let level1_offset_and_size = - ((level1_struct_start as i64) << 32) | (level1_struct_size as i64); - data[top_level_bitset_width..top_level_bitset_width + 8] - .copy_from_slice(&level1_offset_and_size.to_le_bytes()); - - // Write level1 struct pointer (points to level2 struct) - let level2_relative_offset = level2_struct_start - level1_struct_start; - let level2_offset_and_size = - ((level2_relative_offset as i64) << 32) | (level2_struct_size as i64); - data[level1_struct_start + level1_bitset_width - ..level1_struct_start + level1_bitset_width + 8] - .copy_from_slice(&level2_offset_and_size.to_le_bytes()); - - // Write level2 struct pointer (points to inner struct) - let inner_relative_offset = inner_struct_start - level2_struct_start; - let inner_offset_and_size = - ((inner_relative_offset as i64) << 32) | (inner_struct_size as i64); - data[level2_struct_start + level2_bitset_width - ..level2_struct_start + level2_bitset_width + 8] - .copy_from_slice(&inner_offset_and_size.to_le_bytes()); - - // Fill inner struct with some data - for i in 0..num_inner_fields { - let value_offset = inner_struct_start + inner_bitset_width + i * 8; - let value = (i as i64) * 100; - data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); - } - - DeeplyNestedRowData { data } + // Fill leaf struct with sample data + let leaf_start = *struct_starts.last().unwrap(); + for i in 0..num_leaf_fields { + let off = leaf_start + leaf_bitset + i * INT64_SIZE; + data[off..off + INT64_SIZE].copy_from_slice(&((i as i64) * 100).to_le_bytes()); } - fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { - spark_row.point_to_slice(&self.data); - } + data } -fn benchmark_nested_struct_conversion(c: &mut Criterion) { - let mut group = c.benchmark_group("nested_struct_conversion"); - - // Test nested structs with different inner field counts - for num_fields in [5, 10, 20] { - for num_rows in [1000, 10000] { - let schema = vec![make_nested_struct_schema(num_fields)]; - - // Create row data - let rows: Vec = (0..num_rows) - .map(|_| NestedRowData::new(num_fields)) - .collect(); - - let spark_rows: Vec = rows - .iter() - .map(|row_data| { - let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); - row_data.to_spark_row(&mut spark_row); - spark_row.set_not_null_at(0); - spark_row - }) - .collect(); - - let mut row_addresses: Vec = - spark_rows.iter().map(|row| row.get_row_addr()).collect(); - let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); - - let row_address_ptr = row_addresses.as_mut_ptr(); - let row_size_ptr = row_sizes.as_mut_ptr(); - - group.bench_with_input( - BenchmarkId::new( - format!("inner_fields_{}", num_fields), - format!("rows_{}", num_rows), - ), - &(num_rows, &schema), - |b, (num_rows, schema)| { - b.iter(|| { - let tempfile = Builder::new().tempfile().unwrap(); - - process_sorted_row_partition( - *num_rows, - BATCH_SIZE, - row_address_ptr, - row_size_ptr, - schema, - tempfile.path().to_str().unwrap().to_string(), - 1.0, - false, - 0, - None, - &CompressionCodec::Zstd(1), - ) - .unwrap(); - }); - }, - ); - - std::mem::drop(spark_rows); - } +/// Build a binary UnsafeRow containing a `List` column. +fn build_list_row(num_elements: usize) -> Vec { + let top_bitset = SparkUnsafeRow::get_row_bitset_width(1); + let elem_null_bitset = null_bitset_size(num_elements); + let list_size = ARRAY_HEADER_SIZE + elem_null_bitset + num_elements * INT64_SIZE; + let total = top_bitset + UNSAFE_ROW_POINTER_SIZE + list_size; + let mut data = vec![0u8; total]; + + let list_offset = top_bitset + UNSAFE_ROW_POINTER_SIZE; + write_pointer(&mut data, top_bitset, list_offset, list_size); + + // Element count + data[list_offset..list_offset + ARRAY_HEADER_SIZE] + .copy_from_slice(&(num_elements as i64).to_le_bytes()); + + // Element values + let data_start = list_offset + ARRAY_HEADER_SIZE + elem_null_bitset; + for i in 0..num_elements { + let off = data_start + i * INT64_SIZE; + data[off..off + INT64_SIZE].copy_from_slice(&((i as i64) * 100).to_le_bytes()); } - group.finish(); + data } -fn benchmark_deeply_nested_struct_conversion(c: &mut Criterion) { - let mut group = c.benchmark_group("deeply_nested_struct_conversion"); - - // Test deeply nested structs (3 levels) with different inner field counts - for num_fields in [5, 10, 20] { - for num_rows in [1000, 10000] { - let schema = vec![make_deeply_nested_struct_schema(num_fields)]; - - // Create row data - let rows: Vec = (0..num_rows) - .map(|_| DeeplyNestedRowData::new(num_fields)) - .collect(); - - let spark_rows: Vec = rows - .iter() - .map(|row_data| { - let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); - row_data.to_spark_row(&mut spark_row); - spark_row.set_not_null_at(0); - spark_row - }) - .collect(); - - let mut row_addresses: Vec = - spark_rows.iter().map(|row| row.get_row_addr()).collect(); - let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); - - let row_address_ptr = row_addresses.as_mut_ptr(); - let row_size_ptr = row_sizes.as_mut_ptr(); - - group.bench_with_input( - BenchmarkId::new( - format!("inner_fields_{}", num_fields), - format!("rows_{}", num_rows), - ), - &(num_rows, &schema), - |b, (num_rows, schema)| { - b.iter(|| { - let tempfile = Builder::new().tempfile().unwrap(); - - process_sorted_row_partition( - *num_rows, - BATCH_SIZE, - row_address_ptr, - row_size_ptr, - schema, - tempfile.path().to_str().unwrap().to_string(), - 1.0, - false, - 0, - None, - &CompressionCodec::Zstd(1), - ) - .unwrap(); - }); - }, - ); - - std::mem::drop(spark_rows); - } +/// Build a binary UnsafeRow containing a `Map` column. +fn build_map_row(num_entries: usize) -> Vec { + let top_bitset = SparkUnsafeRow::get_row_bitset_width(1); + let entry_null_bitset = null_bitset_size(num_entries); + let array_size = ARRAY_HEADER_SIZE + entry_null_bitset + num_entries * INT64_SIZE; + // Map layout: [key_array_size header] [key_array] [value_array] + let map_size = ARRAY_HEADER_SIZE + 2 * array_size; + let total = top_bitset + UNSAFE_ROW_POINTER_SIZE + map_size; + let mut data = vec![0u8; total]; + + let map_offset = top_bitset + UNSAFE_ROW_POINTER_SIZE; + write_pointer(&mut data, top_bitset, map_offset, map_size); + + // Key array size header + data[map_offset..map_offset + ARRAY_HEADER_SIZE] + .copy_from_slice(&(array_size as i64).to_le_bytes()); + + // Key array: [element count] [null bitset] [data] + let key_offset = map_offset + ARRAY_HEADER_SIZE; + data[key_offset..key_offset + ARRAY_HEADER_SIZE] + .copy_from_slice(&(num_entries as i64).to_le_bytes()); + let key_data = key_offset + ARRAY_HEADER_SIZE + entry_null_bitset; + for i in 0..num_entries { + let off = key_data + i * INT64_SIZE; + data[off..off + INT64_SIZE].copy_from_slice(&(i as i64).to_le_bytes()); } - group.finish(); -} + // Value array: [element count] [null bitset] [data] + let val_offset = key_offset + array_size; + data[val_offset..val_offset + ARRAY_HEADER_SIZE] + .copy_from_slice(&(num_entries as i64).to_le_bytes()); + let val_data = val_offset + ARRAY_HEADER_SIZE + entry_null_bitset; + for i in 0..num_entries { + let off = val_data + i * INT64_SIZE; + data[off..off + INT64_SIZE].copy_from_slice(&((i as i64) * 100).to_le_bytes()); + } -/// Create a schema with a list column: List -fn make_list_schema(element_type: DataType) -> DataType { - DataType::List(Arc::new(Field::new("item", element_type, true))) + data } -/// Calculate row size for a list with the given number of elements. -/// UnsafeRow layout for list: [null bits] [list pointer (offset, size)] -/// List data: [num_elements (8 bytes)] [null bits] [element data] -fn get_list_row_size(num_elements: usize, element_size: usize) -> usize { - // Top-level row has 1 column (the list) - let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let list_pointer_size = 8; +// ─── Benchmark runner ─────────────────────────────────────────────────────── + +/// Common benchmark harness: wraps raw row bytes in SparkUnsafeRow and runs +/// `process_sorted_row_partition` under Criterion. +fn run_benchmark( + group: &mut criterion::BenchmarkGroup, + name: &str, + param: &str, + schema: &[DataType], + rows: &[Vec], + num_top_level_fields: usize, +) { + let num_rows = rows.len(); + + let spark_rows: Vec = rows + .iter() + .map(|data| { + let mut row = SparkUnsafeRow::new_with_num_fields(num_top_level_fields); + row.point_to_slice(data); + for i in 0..num_top_level_fields { + row.set_not_null_at(i); + } + row + }) + .collect(); - // List header: num_elements (8 bytes) + null bitset - let list_null_bitset = num_elements.div_ceil(64) * 8; - let list_header = 8 + list_null_bitset; - let list_data_size = num_elements * element_size; + let mut addrs: Vec = spark_rows.iter().map(|r| r.get_row_addr()).collect(); + let mut sizes: Vec = spark_rows.iter().map(|r| r.get_row_size()).collect(); + let addr_ptr = addrs.as_mut_ptr(); + let size_ptr = sizes.as_mut_ptr(); + + group.bench_with_input( + BenchmarkId::new(name, param), + &num_rows, + |b, &n| { + b.iter(|| { + let tmp = Builder::new().tempfile().unwrap(); + process_sorted_row_partition( + n, + BATCH_SIZE, + addr_ptr, + size_ptr, + schema, + tmp.path().to_str().unwrap().to_string(), + 1.0, + false, + 0, + None, + &CompressionCodec::Zstd(1), + ) + .unwrap(); + }); + }, + ); - top_level_bitset_width + list_pointer_size + list_header + list_data_size + drop(spark_rows); } -struct ListRowData { - data: Vec, -} +// ─── Benchmarks ───────────────────────────────────────────────────────────── -impl ListRowData { - fn new_int64_list(num_elements: usize) -> Self { - let row_size = get_list_row_size(num_elements, 8); - let mut data = vec![0u8; row_size]; +/// 100 primitive Int64 columns — baseline without complex-type overhead. +fn benchmark_primitive_columns(c: &mut Criterion) { + let mut group = c.benchmark_group("primitive_columns"); + const NUM_COLS: usize = 100; + let bitset = SparkUnsafeRow::get_row_bitset_width(NUM_COLS); + let row_size = bitset + NUM_COLS * INT64_SIZE; - let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let list_null_bitset = num_elements.div_ceil(64) * 8; + for num_rows in [1000, 10000] { + let schema = vec![DataType::Int64; NUM_COLS]; + let rows: Vec> = (0..num_rows) + .map(|_| { + let mut data = vec![0u8; row_size]; + for (i, byte) in data.iter_mut().enumerate().take(row_size).skip(bitset) { + *byte = i as u8; + } + data + }) + .collect(); - // List starts after top-level header + pointer - let list_offset = top_level_bitset_width + 8; - let list_size = 8 + list_null_bitset + num_elements * 8; + run_benchmark( + &mut group, + "cols_100", + &format!("rows_{num_rows}"), + &schema, + &rows, + NUM_COLS, + ); + } - // Write list pointer (offset in upper 32 bits, size in lower 32 bits) - let offset_and_size = ((list_offset as i64) << 32) | (list_size as i64); - data[top_level_bitset_width..top_level_bitset_width + 8] - .copy_from_slice(&offset_and_size.to_le_bytes()); + group.finish(); +} - // Write number of elements at list start - data[list_offset..list_offset + 8].copy_from_slice(&(num_elements as i64).to_le_bytes()); +/// Struct columns at varying nesting depths (1 = flat, 2 = nested, 3 = deeply nested). +fn benchmark_struct_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("struct_conversion"); - // Fill list with data (after header) - let data_start = list_offset + 8 + list_null_bitset; - for i in 0..num_elements { - let value_offset = data_start + i * 8; - let value = (i as i64) * 100; - data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); + for (depth, label) in [(1, "flat"), (2, "nested"), (3, "deeply_nested")] { + for num_fields in [5, 10, 20] { + for num_rows in [1000, 10000] { + let schema = vec![make_struct_schema(depth, num_fields)]; + let rows: Vec> = (0..num_rows) + .map(|_| build_struct_row(depth, num_fields)) + .collect(); + + run_benchmark( + &mut group, + &format!("{label}_fields_{num_fields}"), + &format!("rows_{num_rows}"), + &schema, + &rows, + 1, + ); + } } - - ListRowData { data } } - fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { - spark_row.point_to_slice(&self.data); - } + group.finish(); } +/// List columns with varying element counts. fn benchmark_list_conversion(c: &mut Criterion) { let mut group = c.benchmark_group("list_conversion"); - // Test with different list sizes and row counts for num_elements in [10, 100] { for num_rows in [1000, 10000] { - let schema = vec![make_list_schema(DataType::Int64)]; - - // Create row data - each row has a list with num_elements items - let rows: Vec = (0..num_rows) - .map(|_| ListRowData::new_int64_list(num_elements)) - .collect(); - - let spark_rows: Vec = rows - .iter() - .map(|row_data| { - let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); - row_data.to_spark_row(&mut spark_row); - spark_row.set_not_null_at(0); - spark_row - }) + let schema = vec![make_list_schema()]; + let rows: Vec> = (0..num_rows) + .map(|_| build_list_row(num_elements)) .collect(); - let mut row_addresses: Vec = - spark_rows.iter().map(|row| row.get_row_addr()).collect(); - let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); - - let row_address_ptr = row_addresses.as_mut_ptr(); - let row_size_ptr = row_sizes.as_mut_ptr(); - - group.bench_with_input( - BenchmarkId::new( - format!("elements_{}", num_elements), - format!("rows_{}", num_rows), - ), - &(num_rows, &schema), - |b, (num_rows, schema)| { - b.iter(|| { - let tempfile = Builder::new().tempfile().unwrap(); - - process_sorted_row_partition( - *num_rows, - BATCH_SIZE, - row_address_ptr, - row_size_ptr, - schema, - tempfile.path().to_str().unwrap().to_string(), - 1.0, - false, - 0, - None, - &CompressionCodec::Zstd(1), - ) - .unwrap(); - }); - }, + run_benchmark( + &mut group, + &format!("elements_{num_elements}"), + &format!("rows_{num_rows}"), + &schema, + &rows, + 1, ); - - std::mem::drop(spark_rows); } } group.finish(); } -/// Create a schema with a map column: Map -fn make_map_schema() -> DataType { - // Map is represented as List> in Arrow - let key_field = Field::new("key", DataType::Int64, false); - let value_field = Field::new("value", DataType::Int64, true); - let entries_field = Field::new( - "entries", - DataType::Struct(Fields::from(vec![key_field, value_field])), - false, - ); - DataType::Map(Arc::new(entries_field), false) -} - -/// Calculate row size for a map with the given number of entries. -/// UnsafeRow layout for map: [null bits] [map pointer (offset, size)] -/// Map data: [key_array_size (8 bytes)] [key_array] [value_array] -/// Array format: [num_elements (8 bytes)] [null bits] [element data] -fn get_map_row_size(num_entries: usize) -> usize { - // Top-level row has 1 column (the map) - let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let map_pointer_size = 8; - - // Key array: num_elements (8) + null bitset + data - let key_null_bitset = num_entries.div_ceil(64) * 8; - let key_array_size = 8 + key_null_bitset + num_entries * 8; - - // Value array: num_elements (8) + null bitset + data - let value_null_bitset = num_entries.div_ceil(64) * 8; - let value_array_size = 8 + value_null_bitset + num_entries * 8; - - // Map header (key array size) + key array + value array - let map_size = 8 + key_array_size + value_array_size; - - top_level_bitset_width + map_pointer_size + map_size -} - -struct MapRowData { - data: Vec, -} - -impl MapRowData { - fn new_int64_map(num_entries: usize) -> Self { - let row_size = get_map_row_size(num_entries); - let mut data = vec![0u8; row_size]; - - let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); - let key_null_bitset = num_entries.div_ceil(64) * 8; - let value_null_bitset = num_entries.div_ceil(64) * 8; - - let key_array_size = 8 + key_null_bitset + num_entries * 8; - let value_array_size = 8 + value_null_bitset + num_entries * 8; - let map_size = 8 + key_array_size + value_array_size; - - // Map starts after top-level header + pointer - let map_offset = top_level_bitset_width + 8; - - // Write map pointer (offset in upper 32 bits, size in lower 32 bits) - let offset_and_size = ((map_offset as i64) << 32) | (map_size as i64); - data[top_level_bitset_width..top_level_bitset_width + 8] - .copy_from_slice(&offset_and_size.to_le_bytes()); - - // Write key array size at map start - data[map_offset..map_offset + 8].copy_from_slice(&(key_array_size as i64).to_le_bytes()); - - // Key array starts after map header - let key_array_offset = map_offset + 8; - // Write number of elements - data[key_array_offset..key_array_offset + 8] - .copy_from_slice(&(num_entries as i64).to_le_bytes()); - // Fill key data (after header) - let key_data_start = key_array_offset + 8 + key_null_bitset; - for i in 0..num_entries { - let value_offset = key_data_start + i * 8; - let value = i as i64; - data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); - } - - // Value array starts after key array - let value_array_offset = key_array_offset + key_array_size; - // Write number of elements - data[value_array_offset..value_array_offset + 8] - .copy_from_slice(&(num_entries as i64).to_le_bytes()); - // Fill value data (after header) - let value_data_start = value_array_offset + 8 + value_null_bitset; - for i in 0..num_entries { - let value_offset = value_data_start + i * 8; - let value = (i as i64) * 100; - data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); - } - - MapRowData { data } - } - - fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { - spark_row.point_to_slice(&self.data); - } -} - +/// Map columns with varying entry counts. fn benchmark_map_conversion(c: &mut Criterion) { let mut group = c.benchmark_group("map_conversion"); - // Test with different map sizes and row counts for num_entries in [10, 100] { for num_rows in [1000, 10000] { let schema = vec![make_map_schema()]; - - // Create row data - each row has a map with num_entries items - let rows: Vec = (0..num_rows) - .map(|_| MapRowData::new_int64_map(num_entries)) + let rows: Vec> = (0..num_rows) + .map(|_| build_map_row(num_entries)) .collect(); - let spark_rows: Vec = rows - .iter() - .map(|row_data| { - let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); - row_data.to_spark_row(&mut spark_row); - spark_row.set_not_null_at(0); - spark_row - }) - .collect(); - - let mut row_addresses: Vec = - spark_rows.iter().map(|row| row.get_row_addr()).collect(); - let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); - - let row_address_ptr = row_addresses.as_mut_ptr(); - let row_size_ptr = row_sizes.as_mut_ptr(); - - group.bench_with_input( - BenchmarkId::new( - format!("entries_{}", num_entries), - format!("rows_{}", num_rows), - ), - &(num_rows, &schema), - |b, (num_rows, schema)| { - b.iter(|| { - let tempfile = Builder::new().tempfile().unwrap(); - - process_sorted_row_partition( - *num_rows, - BATCH_SIZE, - row_address_ptr, - row_size_ptr, - schema, - tempfile.path().to_str().unwrap().to_string(), - 1.0, - false, - 0, - None, - &CompressionCodec::Zstd(1), - ) - .unwrap(); - }); - }, + run_benchmark( + &mut group, + &format!("entries_{num_entries}"), + &format!("rows_{num_rows}"), + &schema, + &rows, + 1, ); - - std::mem::drop(spark_rows); } } group.finish(); } -/// Benchmark for primitive type columns (many Int64 columns). -/// This tests the baseline performance without complex type overhead. -fn benchmark_primitive_columns(c: &mut Criterion) { - let mut group = c.benchmark_group("primitive_columns"); - - const NUM_COLS: usize = 100; - let row_size: usize = SparkUnsafeRow::get_row_bitset_width(NUM_COLS) + NUM_COLS * 8; - - for num_rows in [1000, 10000] { - let schema = vec![DataType::Int64; NUM_COLS]; - - // Create row data - let row_data: Vec> = (0..num_rows) - .map(|_| { - let mut data = vec![0u8; row_size]; - // Fill with some data after the bitset - for (i, byte) in data - .iter_mut() - .enumerate() - .take(row_size) - .skip(SparkUnsafeRow::get_row_bitset_width(NUM_COLS)) - { - *byte = i as u8; - } - data - }) - .collect(); - - let spark_rows: Vec = row_data - .iter() - .map(|data| { - let mut spark_row = SparkUnsafeRow::new_with_num_fields(NUM_COLS); - spark_row.point_to_slice(data); - for i in 0..NUM_COLS { - spark_row.set_not_null_at(i); - } - spark_row - }) - .collect(); - - let mut row_addresses: Vec = spark_rows.iter().map(|row| row.get_row_addr()).collect(); - let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); - - let row_address_ptr = row_addresses.as_mut_ptr(); - let row_size_ptr = row_sizes.as_mut_ptr(); - - group.bench_with_input( - BenchmarkId::new("cols_100", format!("rows_{}", num_rows)), - &(num_rows, &schema), - |b, (num_rows, schema)| { - b.iter(|| { - let tempfile = Builder::new().tempfile().unwrap(); - - process_sorted_row_partition( - *num_rows, - BATCH_SIZE, - row_address_ptr, - row_size_ptr, - schema, - tempfile.path().to_str().unwrap().to_string(), - 1.0, - false, - 0, - None, - &CompressionCodec::Zstd(1), - ) - .unwrap(); - }); - }, - ); - - std::mem::drop(spark_rows); - } - - group.finish(); -} - fn config() -> Criterion { Criterion::default() } @@ -840,6 +393,9 @@ fn config() -> Criterion { criterion_group! { name = benches; config = config(); - targets = benchmark_primitive_columns, benchmark_struct_conversion, benchmark_nested_struct_conversion, benchmark_deeply_nested_struct_conversion, benchmark_list_conversion, benchmark_map_conversion + targets = benchmark_primitive_columns, + benchmark_struct_conversion, + benchmark_list_conversion, + benchmark_map_conversion } criterion_main!(benches);