diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 7df0aa0697..0351186c1f 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -304,7 +304,7 @@ jobs: - name: Java test steps uses: ./.github/actions/java-test with: - artifact_name: ${{ matrix.os }}-${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} + artifact_name: ${{ matrix.os }}-${{ matrix.profile.name }}-${{ matrix.profile.scan_impl }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} suites: ${{ matrix.suite.name == 'sql' && matrix.profile.name == 'Spark 3.4, JDK 11, Scala 2.12' && '' || matrix.suite.value }} maven_opts: ${{ matrix.profile.maven_opts }} scan_impl: ${{ matrix.profile.scan_impl }} diff --git a/.gitignore b/.gitignore index 05b37627bd..7818e87f92 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ spark/benchmarks .DS_Store comet-event-trace.json __pycache__ +hs_err_pid*.log diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index c09f6a61e6..6f5e42bdba 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -111,16 +111,100 @@ Cast operations in Comet fall into three levels of support: ### Legacy Mode + +| | binary | boolean | byte | date | decimal | double | float | integer | long | short | string | timestamp | +|---|---|---|---|---|---|---|---|---|---|---|---|---| +| binary | - | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | C | N/A | +| boolean | N/A | - | C | N/A | U | C | C | C | C | C | C | U | +| byte | U | C | - | N/A | C | C | C | C | C | C | C | U | +| date | N/A | U | U | - | U | U | U | U | U | U | C | U | +| decimal | N/A | C | C | N/A | - | C | C | C | C | C | C | U | +| double | N/A | C | C | N/A | I | - | C | C | C | C | C | U | +| float | N/A | C | C | N/A | I | C | - | C | C | C | C | U | +| integer | U | C | C | N/A | C | C | C | - | C | C | C | U | +| long | U | C | C | N/A | C | C | C | C | - | C | C | U | +| short | U | C | C | N/A | C | C | C | C | C | - | C | U | +| string | C | C | C | C | I | C | C | C | C | C | - | I | +| timestamp | N/A | U | U | C | U | U | U | U | C | U | C | - | + + +**Notes:** + +- **decimal -> string**: There can be formatting differences in some case due to Spark using scientific notation where Comet does not +- **double -> decimal**: There can be rounding differences +- **double -> string**: There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 +- **float -> decimal**: There can be rounding differences +- **float -> string**: There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 +- **string -> date**: Only supports years between 262143 BC and 262142 AD +- **string -> decimal**: Does not support fullwidth unicode digits (e.g \\uFF10) + or strings containing null bytes (e.g \\u0000) +- **string -> timestamp**: Not all valid formats are supported ### Try Mode + +| | binary | boolean | byte | date | decimal | double | float | integer | long | short | string | timestamp | +|---|---|---|---|---|---|---|---|---|---|---|---|---| +| binary | - | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | C | N/A | +| boolean | N/A | - | C | N/A | U | C | C | C | C | C | C | U | +| byte | U | C | - | N/A | C | C | C | C | C | C | C | U | +| date | N/A | U | U | - | U | U | U | U | U | U | C | U | +| decimal | N/A | C | C | N/A | - | C | C | C | C | C | C | U | +| double | N/A | C | C | N/A | I | - | C | C | C | C | C | U | +| float | N/A | C | C | N/A | I | C | - | C | C | C | C | U | +| integer | U | C | C | N/A | C | C | C | - | C | C | C | U | +| long | U | C | C | N/A | C | C | C | C | - | C | C | U | +| short | U | C | C | N/A | C | C | C | C | C | - | C | U | +| string | C | C | C | C | I | C | C | C | C | C | - | I | +| timestamp | N/A | U | U | C | U | U | U | U | C | U | C | - | + + +**Notes:** + +- **decimal -> string**: There can be formatting differences in some case due to Spark using scientific notation where Comet does not +- **double -> decimal**: There can be rounding differences +- **double -> string**: There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 +- **float -> decimal**: There can be rounding differences +- **float -> string**: There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 +- **string -> date**: Only supports years between 262143 BC and 262142 AD +- **string -> decimal**: Does not support fullwidth unicode digits (e.g \\uFF10) + or strings containing null bytes (e.g \\u0000) +- **string -> timestamp**: Not all valid formats are supported ### ANSI Mode + +| | binary | boolean | byte | date | decimal | double | float | integer | long | short | string | timestamp | +|---|---|---|---|---|---|---|---|---|---|---|---|---| +| binary | - | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | C | N/A | +| boolean | N/A | - | C | N/A | U | C | C | C | C | C | C | U | +| byte | U | C | - | N/A | C | C | C | C | C | C | C | U | +| date | N/A | U | U | - | U | U | U | U | U | U | C | U | +| decimal | N/A | C | C | N/A | - | C | C | C | C | C | C | U | +| double | N/A | C | C | N/A | I | - | C | C | C | C | C | U | +| float | N/A | C | C | N/A | I | C | - | C | C | C | C | U | +| integer | U | C | C | N/A | C | C | C | - | C | C | C | U | +| long | U | C | C | N/A | C | C | C | C | - | C | C | U | +| short | U | C | C | N/A | C | C | C | C | C | - | C | U | +| string | C | C | C | C | I | C | C | C | C | C | - | I | +| timestamp | N/A | U | U | C | U | U | U | U | C | U | C | - | + + +**Notes:** + +- **decimal -> string**: There can be formatting differences in some case due to Spark using scientific notation where Comet does not +- **double -> decimal**: There can be rounding differences +- **double -> string**: There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 +- **float -> decimal**: There can be rounding differences +- **float -> string**: There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 +- **string -> date**: Only supports years between 262143 BC and 262142 AD +- **string -> decimal**: Does not support fullwidth unicode digits (e.g \\uFF10) + or strings containing null bytes (e.g \\u0000) +- **string -> timestamp**: ANSI mode not supported See the [tracking issue](https://github.com/apache/datafusion-comet/issues/286) for more details. diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index 821607ddb9..73e8081022 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -302,7 +302,14 @@ pub(crate) fn append_field( /// A macro for generating code of appending value into field builder of Arrow struct builder. macro_rules! append_field_to_builder { ($builder_type:ty, $accessor:expr) => {{ - let field_builder = struct_builder.field_builder::<$builder_type>(idx).unwrap(); + let field_builder = struct_builder + .field_builder::<$builder_type>(idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get field builder for index {} at nested depth", + idx + )) + })?; if row.is_null_row() { // The row is null. @@ -375,8 +382,14 @@ pub(crate) fn append_field( } DataType::Struct(fields) => { // Appending value into struct field builder of Arrow struct builder. - let field_builder = struct_builder.field_builder::(idx).unwrap(); - + let field_builder = struct_builder + .field_builder::(idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get field builder for index {} at nested depth", + idx + )) + })?; let nested_row = if row.is_null_row() || row.is_null_at(idx) { // The row is null, or the field in the row is null, i.e., a null nested row. // Append a null value to the row builder. @@ -391,44 +404,31 @@ pub(crate) fn append_field( append_field(field.data_type(), field_builder, &nested_row, field_idx)?; } } + DataType::Map(field, _) => { let field_builder = struct_builder .field_builder::, Box>>(idx) - .unwrap(); + .ok_or_else(|| { + CometError::Internal(format!("Failed to get MapBuilder at idx {}", idx)) + })?; // Changed from .unwrap() - if row.is_null_row() { - // The row is null. + if row.is_null_row() || row.is_null_at(idx) { field_builder.append(false)?; } else { - let is_null = row.is_null_at(idx); - - if is_null { - // The field in the row is null. - // Append a null value to the map builder. - field_builder.append(false)?; - } else { - append_map_elements(field, field_builder, &row.get_map(idx))?; - } + append_map_elements(field, field_builder, &row.get_map(idx))?; } } DataType::List(field) => { let field_builder = struct_builder .field_builder::>>(idx) - .unwrap(); + .ok_or_else(|| { + CometError::Internal(format!("Failed to get ListBuilder at idx {}", idx)) + })?; // Changed from .unwrap() - if row.is_null_row() { - // The row is null. + if row.is_null_row() || row.is_null_at(idx) { field_builder.append_null(); } else { - let is_null = row.is_null_at(idx); - - if is_null { - // The field in the row is null. - // Append a null value to the list builder. - field_builder.append_null(); - } else { - append_list_element(field.data_type(), field_builder, &row.get_array(idx))? - } + append_list_element(field.data_type(), field_builder, &row.get_array(idx))? } } _ => { @@ -448,10 +448,9 @@ pub(crate) fn append_columns( row_end: usize, schema: &[DataType], column_idx: usize, - builder: &mut Box, + builder: &mut dyn ArrayBuilder, prefer_dictionary_ratio: f64, ) -> Result<(), CometError> { - /// A macro for generating code of appending values into Arrow array builders. macro_rules! append_column_to_builder { ($builder_type:ty, $accessor:expr) => {{ let element_builder = builder @@ -465,11 +464,7 @@ pub(crate) fn append_columns( let row_size = unsafe { *row_sizes_ptr.add(i) }; row.point_to(row_addr, row_size); - let is_null = row.is_null_at(column_idx); - - if is_null { - // The element value is null. - // Append a null value to the element builder. + if row.is_null_at(column_idx) { element_builder.append_null(); } else { $accessor(element_builder, &row, column_idx); @@ -484,189 +479,521 @@ pub(crate) fn append_columns( DataType::Boolean => { append_column_to_builder!( BooleanBuilder, - |builder: &mut BooleanBuilder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_boolean(idx)) + |b: &mut BooleanBuilder, r: &SparkUnsafeRow, i: usize| b + .append_value(r.get_boolean(i)) ); + Ok(()) } DataType::Int8 => { append_column_to_builder!( Int8Builder, - |builder: &mut Int8Builder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_byte(idx)) + |b: &mut Int8Builder, r: &SparkUnsafeRow, i: usize| b.append_value(r.get_byte(i)) ); + Ok(()) } DataType::Int16 => { append_column_to_builder!( Int16Builder, - |builder: &mut Int16Builder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_short(idx)) + |b: &mut Int16Builder, r: &SparkUnsafeRow, i: usize| b.append_value(r.get_short(i)) ); + Ok(()) } DataType::Int32 => { append_column_to_builder!( Int32Builder, - |builder: &mut Int32Builder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_int(idx)) + |b: &mut Int32Builder, r: &SparkUnsafeRow, i: usize| b.append_value(r.get_int(i)) ); + Ok(()) } DataType::Int64 => { append_column_to_builder!( Int64Builder, - |builder: &mut Int64Builder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_long(idx)) + |b: &mut Int64Builder, r: &SparkUnsafeRow, i: usize| b.append_value(r.get_long(i)) ); + Ok(()) } DataType::Float32 => { append_column_to_builder!( Float32Builder, - |builder: &mut Float32Builder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_float(idx)) + |b: &mut Float32Builder, r: &SparkUnsafeRow, i: usize| b + .append_value(r.get_float(i)) ); + Ok(()) } DataType::Float64 => { append_column_to_builder!( Float64Builder, - |builder: &mut Float64Builder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_double(idx)) + |b: &mut Float64Builder, r: &SparkUnsafeRow, i: usize| b + .append_value(r.get_double(i)) ); + Ok(()) } DataType::Decimal128(p, _) => { append_column_to_builder!( Decimal128Builder, - |builder: &mut Decimal128Builder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_decimal(idx, *p)) + |b: &mut Decimal128Builder, r: &SparkUnsafeRow, i: usize| b + .append_value(r.get_decimal(i, *p)) ); + Ok(()) } DataType::Utf8 => { if prefer_dictionary_ratio > 1.0 { append_column_to_builder!( StringDictionaryBuilder, - |builder: &mut StringDictionaryBuilder, - row: &SparkUnsafeRow, - idx| builder.append_value(row.get_string(idx)) + |b: &mut StringDictionaryBuilder, r: &SparkUnsafeRow, i: usize| b + .append_value(r.get_string(i)) ); } else { append_column_to_builder!( StringBuilder, - |builder: &mut StringBuilder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_string(idx)) + |b: &mut StringBuilder, r: &SparkUnsafeRow, i: usize| b + .append_value(r.get_string(i)) ); } + Ok(()) } DataType::Binary => { if prefer_dictionary_ratio > 1.0 { append_column_to_builder!( BinaryDictionaryBuilder, - |builder: &mut BinaryDictionaryBuilder, - row: &SparkUnsafeRow, - idx| builder.append_value(row.get_binary(idx)) + |b: &mut BinaryDictionaryBuilder, r: &SparkUnsafeRow, i: usize| b + .append_value(r.get_binary(i)) ); } else { append_column_to_builder!( BinaryBuilder, - |builder: &mut BinaryBuilder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_binary(idx)) + |b: &mut BinaryBuilder, r: &SparkUnsafeRow, i: usize| b + .append_value(r.get_binary(i)) ); } + Ok(()) } DataType::Date32 => { append_column_to_builder!( Date32Builder, - |builder: &mut Date32Builder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_date(idx)) + |b: &mut Date32Builder, r: &SparkUnsafeRow, i: usize| b.append_value(r.get_date(i)) ); + Ok(()) } DataType::Timestamp(TimeUnit::Microsecond, _) => { append_column_to_builder!( TimestampMicrosecondBuilder, - |builder: &mut TimestampMicrosecondBuilder, row: &SparkUnsafeRow, idx| builder - .append_value(row.get_timestamp(idx)) + |b: &mut TimestampMicrosecondBuilder, r: &SparkUnsafeRow, i: usize| b + .append_value(r.get_timestamp(i)) ); + Ok(()) } DataType::Map(field, _) => { - let map_builder = downcast_builder_ref!( - MapBuilder, Box>, - builder - ); + let map_builder = builder + .as_any_mut() + .downcast_mut::, Box>>() + .ok_or_else(|| CometError::Internal("Expected MapBuilder".to_string()))?; let mut row = SparkUnsafeRow::new(schema); - for i in row_start..row_end { - let row_addr = unsafe { *row_addresses_ptr.add(i) }; - let row_size = unsafe { *row_sizes_ptr.add(i) }; - row.point_to(row_addr, row_size); - - let is_null = row.is_null_at(column_idx); - - if is_null { - // The map is null. - // Append a null value to the map builder. + let (addr, size) = unsafe { (*row_addresses_ptr.add(i), *row_sizes_ptr.add(i)) }; + row.point_to(addr, size); + if row.is_null_at(column_idx) { map_builder.append(false)?; } else { - append_map_elements(field, map_builder, &row.get_map(column_idx))? + append_map_elements(field, map_builder, &row.get_map(column_idx))?; } } + Ok(()) } DataType::List(field) => { - let list_builder = downcast_builder_ref!(ListBuilder>, builder); + let list_builder = builder + .as_any_mut() + .downcast_mut::>>() + .ok_or_else(|| CometError::Internal("Expected ListBuilder".to_string()))?; let mut row = SparkUnsafeRow::new(schema); - for i in row_start..row_end { - let row_addr = unsafe { *row_addresses_ptr.add(i) }; - let row_size = unsafe { *row_sizes_ptr.add(i) }; - row.point_to(row_addr, row_size); - - let is_null = row.is_null_at(column_idx); - - if is_null { - // The list is null. - // Append a null value to the list builder. + let (addr, size) = unsafe { (*row_addresses_ptr.add(i), *row_sizes_ptr.add(i)) }; + row.point_to(addr, size); + if row.is_null_at(column_idx) { list_builder.append_null(); } else { append_list_element( field.data_type(), list_builder, &row.get_array(column_idx), - )? + )?; } } + Ok(()) } DataType::Struct(fields) => { let struct_builder = builder .as_any_mut() .downcast_mut::() - .expect("StructBuilder"); + .ok_or_else(|| CometError::Internal("Expected StructBuilder".to_string()))?; + + // Build struct validity array + let mut struct_is_null = vec![false; row_end - row_start]; let mut row = SparkUnsafeRow::new(schema); - for i in row_start..row_end { + for (row_idx, i) in (row_start..row_end).enumerate() { let row_addr = unsafe { *row_addresses_ptr.add(i) }; let row_size = unsafe { *row_sizes_ptr.add(i) }; row.point_to(row_addr, row_size); + struct_is_null[row_idx] = row.is_null_at(column_idx); + } - let is_null = row.is_null_at(column_idx); + // Process each field in field-major order + let nested_field_types: Vec = + fields.iter().map(|f| f.data_type().clone()).collect(); + + for (field_idx, field_dt) in nested_field_types.iter().enumerate() { + match field_dt { + DataType::Boolean => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get BooleanBuilder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_boolean(field_idx)); + } + } + } + } + DataType::Int8 => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get Int8Builder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_byte(field_idx)); + } + } + } + } + DataType::Int16 => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get Int16Builder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_short(field_idx)); + } + } + } + } + DataType::Int32 => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get Int32Builder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_int(field_idx)); + } + } + } + } + DataType::Int64 => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get Int64Builder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_long(field_idx)); + } + } + } + } + DataType::Float32 => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get Float32Builder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_float(field_idx)); + } + } + } + } + DataType::Float64 => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get Float64Builder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_double(field_idx)); + } + } + } + } + DataType::Decimal128(p, _) => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get Decimal128Builder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder + .append_value(nested_row.get_decimal(field_idx, *p)); + } + } + } + } + DataType::Utf8 => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get StringBuilder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_string(field_idx)); + } + } + } + } + DataType::Binary => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get BinaryBuilder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_binary(field_idx)); + } + } + } + } + DataType::Date32 => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get Date32Builder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_date(field_idx)); + } + } + } + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + let field_builder = struct_builder + .field_builder::(field_idx) + .ok_or_else(|| { + CometError::Internal(format!( + "Failed to get TimestampMicrosecondBuilder at idx {}", + field_idx + )) + })?; + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + let nested_row = row.get_struct(column_idx, fields.len()); + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_timestamp(field_idx)); + } + } + } + } + // For nested complex types (Struct, List, Map), fall back to row-major processing + dt @ (DataType::Struct(_) | DataType::List(_) | DataType::Map(_, _)) => { + for (row_idx, i) in (row_start..row_end).enumerate() { + let nested_row = if struct_is_null[row_idx] { + SparkUnsafeRow::default() + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + row.get_struct(column_idx, fields.len()) + }; + append_field(dt, struct_builder, &nested_row, field_idx)?; + } + } + _ => { + return Err(CometError::Internal(format!( + "Unsupported nested struct field type: {:?}", + field_dt + ))); + } + } + } - let nested_row = if is_null { - // The struct is null. - // Append a null value to the struct builder and field builders. + // Append validity for the struct itself + for is_null in struct_is_null { + if is_null { struct_builder.append_null(); - SparkUnsafeRow::default() } else { struct_builder.append(true); - row.get_struct(column_idx, fields.len()) - }; - - for (idx, field) in fields.into_iter().enumerate() { - append_field(field.data_type(), struct_builder, &nested_row, idx)?; } } + + Ok(()) } - _ => { - unreachable!("Unsupported data type of column: {:?}", dt) - } + _ => unreachable!("Unsupported type: {:?}", dt), } - - Ok(()) } - fn make_builders( dt: &DataType, row_num: usize,