From 4c2726eeafe17cc5c23094324a530c5ca7bac1c7 Mon Sep 17 00:00:00 2001 From: Bruno Cauet Date: Sun, 8 Feb 2026 09:41:27 +0100 Subject: [PATCH 1/2] Implement min, max, sum for run-end-encoded arrays. Efficient implementations: * min & max work directly on the values child array. * sum folds over run lengths & values, without decompressing the array. In particular, those implementations takes care of the logical offset & len of the run-end-encoded arrays. This is non-trivial: * We get the physical start & end indices in O(log(#runs)), but those are incorrect for empty arrays. * Slicing can happen in the middle of a run. For sum, we need to track the logical start & end and reduce the run length accordingly. Finally, one caveat: the aggregation functions only work when the child values array is a primitive array. That's fine ~always, but some client might store the values in an unexpected type. They'll either get None or an Error, depending on the aggregation function used. --- arrow-arith/src/aggregate.rs | 351 ++++++++++++++++++++++++++++++++++- 1 file changed, 347 insertions(+), 4 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index a043259694c1..7c7a9832a17f 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -540,7 +540,7 @@ pub fn min_string_view(array: &StringViewArray) -> Option<&str> { /// Returns the sum of values in the array. /// /// This doesn't detect overflow. Once overflowing, the result will wrap around. -/// For an overflow-checking variant, use `sum_array_checked` instead. +/// For an overflow-checking variant, use [`sum_array_checked`] instead. pub fn sum_array>(array: A) -> Option where T: ArrowNumericType, @@ -567,6 +567,12 @@ where Some(sum) } + DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() { + DataType::Int16 => ree::sum_wrapping::(&array), + DataType::Int32 => ree::sum_wrapping::(&array), + DataType::Int64 => ree::sum_wrapping::(&array), + _ => None, + }, _ => sum::(as_primitive_array(&array)), } } @@ -574,7 +580,9 @@ where /// Returns the sum of values in the array. /// /// This detects overflow and returns an `Err` for that. For an non-overflow-checking variant, -/// use `sum_array` instead. +/// use [`sum_array`] instead. +/// Additionally returns an `Err` on run-end-encoded arrays with a provided +/// values type parameter that is incorrect. pub fn sum_array_checked>( array: A, ) -> Result, ArrowError> @@ -603,10 +611,137 @@ where Ok(Some(sum)) } + DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() { + DataType::Int16 => ree::sum_checked::(&array), + DataType::Int32 => ree::sum_checked::(&array), + DataType::Int64 => ree::sum_checked::(&array), + _ => Ok(None), + }, _ => sum_checked::(as_primitive_array(&array)), } } +// Logic for summing run-end-encoded arrays. +mod ree { + use std::convert::Infallible; + + use arrow_array::cast::AsArray; + use arrow_array::types::RunEndIndexType; + use arrow_array::{Array, ArrowNativeTypeOp, ArrowNumericType, PrimitiveArray, TypedRunArray}; + use arrow_buffer::ArrowNativeType; + use arrow_schema::ArrowError; + + /// Downcasts an array to a TypedRunArray. + // Once specialization gets stabilized, this method can be templated over the source + // array type and can directly pick up the type of the child values array. + fn downcast<'a, I: RunEndIndexType, V: ArrowNumericType>( + array: &'a dyn Array, + ) -> Option>> { + let array = array.as_run_opt::()?; + // This fails if the values child array is not the PrimitiveArray. That is okay as: + // * BooleanArray & StringArray are not primitive, but their Item is not + // ArrowNumericType, so they are not in scope for this function. + // * Having the values child array be either dict-encoded, or run-end-encoded is unlikely. + // Note however that the Arrow specification does not forbid using an exotic type as the + // values child array. + array.downcast::>() + } + + /// Computes the sum (wrapping) of the array values. + pub(super) fn sum_wrapping( + array: &dyn Array, + ) -> Option { + if array.null_count() == array.len() { + return None; + } + + let ree = downcast::(array)?; + + let sum = fold(ree, |acc, val, len| -> Result { + Ok(acc.add_wrapping(val.mul_wrapping(V::Native::usize_as(len)))) + }) + // Safety: error type is Infallible. + .unwrap(); + + Some(sum) + } + + /// Computes the sum (erroring on overflow) of the array values. + pub(super) fn sum_checked( + array: &dyn Array, + ) -> Result, ArrowError> { + if array.null_count() == array.len() { + return Ok(None); + } + + let Some(ree) = downcast::(array) else { + return Err(ArrowError::InvalidArgumentError( + "Input array is not a TypedRunArray<'_, _, PrimitiveArray".to_string(), + )); + }; + + let sum = fold(ree, |acc, val, len| -> Result { + let Some(len) = V::Native::from_usize(len) else { + return Err(ArrowError::ArithmeticOverflow(format!( + "Cannot convert a run-end index ({:?}) to the value type ({})", + len, + std::any::type_name::() + ))); + }; + acc.add_checked(val.mul_checked(len)?) + }); + + sum.map(Some) + } + + /// Folds over the values in a run-end-encoded array. + fn fold<'a, I: RunEndIndexType, V: ArrowNumericType, F, E>( + array: TypedRunArray<'a, I, PrimitiveArray>, + mut f: F, + ) -> Result + where + F: FnMut(V::Native, V::Native, usize) -> Result, + { + let run_ends = array.run_ends(); + + let logical_start = run_ends.offset(); + let logical_end = run_ends.offset() + run_ends.len(); + + // Beware: the computed physical range is incorrect for empty arrays. Bail out if that + // happens. We don't need to, as the calling functions already handle that case. But + // ignoring it here is a recipe for disaster in the future. + if array.is_empty() { + return Ok(V::Native::ZERO); + } + + let physical_range = + run_ends.get_start_physical_index()..run_ends.get_end_physical_index() + 1; + let physical_run_ends = &run_ends.values()[physical_range.clone()]; + let physical_values = { + let values_array = array.values(); + let values_data = values_array.values(); + &values_data[physical_range] + }; + + let mut prev_end = logical_start; + let mut acc = V::Native::ZERO; + + for (run_end, val) in physical_run_ends.iter().zip(physical_values) { + let current_run_end = run_end.as_usize().max(logical_start).min(logical_end); + let run_length = current_run_end - prev_end; + + acc = f(acc, *val, run_length)?; + + prev_end = current_run_end; + if current_run_end == logical_end { + break; + } + } + + Ok(acc) + } +} + /// Returns the min of values in the array of `ArrowNumericType` type, or dictionary /// array with value of `ArrowNumericType` type. pub fn min_array>(array: A) -> Option @@ -639,6 +774,48 @@ where { match array.data_type() { DataType::Dictionary(_, _) => min_max_helper::(array, cmp), + DataType::RunEndEncoded(run_ends, _) => { + // We can directly perform min/max on the values child array, as any + // run must have non-zero length. We just need take care of the logical offset & len. + fn values_and_boundaries( + array: &dyn Array, + ) -> Option<(&ArrayRef, Option>)> { + let array = array.as_run_opt::()?; + // If the array is empty, start & end physical indices will be 0. Which is + // incorrect: array[0] does not exist. + let range = if array.len() == 0 { + None + } else { + Some(array.get_start_physical_index()..array.get_end_physical_index() + 1) + }; + Some((array.values(), range)) + } + let (values, range) = match run_ends.data_type() { + DataType::Int16 => values_and_boundaries::(&array)?, + DataType::Int32 => values_and_boundaries::(&array)?, + DataType::Int64 => values_and_boundaries::(&array)?, + _ => return None, + }; + + // We will fail here if the values child array is not the PrimitiveArray. That + // is okay as: + // * BooleanArray & StringArray are not primitive, but their Item is not + // ArrowNumericType, so they are not in scope for this function. + // * Having the values child array be either dict-encoded, or run-end-encoded does not + // make sense. Nor does using a custom array type. + // Note however that the Apache specification does not forbid using an exotic type as + // the values child array. + // The type parameter `A` is a TypedRunArray<'_, RunEndIndexType, ValuesArrayType>. + // Once specialization gets stabilized, this implementation can be changed to + // directly pick up `ValuesArrayType`. + let values = values.as_any().downcast_ref::>()?; + + if let Some(std::ops::Range { start, end }) = range { + m(&values.slice(start, end - start)) + } else { + m(values) + } + } _ => m(as_primitive_array(&array)), } } @@ -751,7 +928,7 @@ pub fn bool_or(array: &BooleanArray) -> Option { /// Returns `Ok(None)` if the array is empty or only contains null values. /// /// This detects overflow and returns an `Err` for that. For an non-overflow-checking variant, -/// use `sum` instead. +/// use [`sum`] instead. pub fn sum_checked(array: &PrimitiveArray) -> Result, ArrowError> where T: ArrowNumericType, @@ -799,7 +976,7 @@ where /// Returns `None` if the array is empty or only contains null values. /// /// This doesn't detect overflow in release mode by default. Once overflowing, the result will -/// wrap around. For an overflow-checking variant, use `sum_checked` instead. +/// wrap around. For an overflow-checking variant, use [`sum_checked`] instead. pub fn sum(array: &PrimitiveArray) -> Option where T::Native: ArrowNativeTypeOp, @@ -1750,4 +1927,170 @@ mod tests { sum_checked(&a).expect_err("overflow should be detected"); sum_array_checked::(&a).expect_err("overflow should be detected"); } + + /// Helper for building a RunArray. + fn make_run_array<'a, I: RunEndIndexType, V: ArrowNumericType, ItemType>( + values: impl IntoIterator, + ) -> RunArray + where + ItemType: Clone + Into> + 'static, + { + let mut builder = arrow_array::builder::PrimitiveRunBuilder::::new(); + for v in values.into_iter() { + builder.append_option((*v).clone().into()); + } + builder.finish() + } + + #[test] + fn test_ree_sum_array_basic() { + let run_array = make_run_array::(&[10, 10, 20, 30, 30, 30]); + let typed_array = run_array.downcast::().unwrap(); + + let result = sum_array::(typed_array); + assert_eq!(result, Some(130)); + + let result = sum_array_checked::(typed_array).unwrap(); + assert_eq!(result, Some(130)); + } + + #[test] + fn test_ree_sum_array_empty() { + let run_array = make_run_array::(&[]); + let typed_array = run_array.downcast::().unwrap(); + + let result = sum_array::(typed_array); + assert_eq!(result, None); + + let result = sum_array_checked::(typed_array).unwrap(); + assert_eq!(result, None); + } + + #[test] + fn test_ree_sum_array_with_nulls() { + let run_array = + make_run_array::(&[Some(10), None, Some(20), None, Some(30)]); + let typed_array = run_array.downcast::().unwrap(); + + let result = sum_array::(typed_array); + assert_eq!(result, Some(60)); + + let result = sum_array_checked::(typed_array).unwrap(); + assert_eq!(result, Some(60)); + } + + #[test] + fn test_ree_sum_array_with_only_nulls() { + let run_array = make_run_array::(&[None, None, None, None, None]); + let typed_array = run_array.downcast::().unwrap(); + + let result = sum_array::(typed_array); + assert_eq!(result, Some(0)); + + let result = sum_array_checked::(typed_array).unwrap(); + assert_eq!(result, Some(0)); + } + + #[test] + fn test_ree_sum_array_overflow() { + let run_array = make_run_array::(&[126, 2]); + let typed_array = run_array.downcast::().unwrap(); + + // i8 range is -128..=127. 126+2 overflows to -128. + let result = sum_array::(typed_array); + assert_eq!(result, Some(-128)); + + let result = sum_array_checked::(typed_array); + assert!(result.is_err()); + } + + #[test] + fn test_ree_sum_array_sliced() { + let run_array = make_run_array::(&[10, 10, 10, 20, 30, 30, 30]); + // Skip 1 value at the start and 1 at the end. + let sliced = run_array.slice(1, 5); + let typed_array = sliced.downcast::().unwrap(); + + let result = sum_array::(typed_array); + assert_eq!(result, Some(100)); + + let result = sum_array_checked::(typed_array).unwrap(); + assert_eq!(result, Some(100)); + } + + #[test] + fn test_ree_min_max_array_basic() { + let run_array = make_run_array::(&[30, 30, 10, 20, 20]); + let typed_array = run_array.downcast::().unwrap(); + + let result = min_array::(typed_array); + assert_eq!(result, Some(10)); + + let result = max_array::(typed_array); + assert_eq!(result, Some(30)); + } + + #[test] + fn test_ree_min_max_array_empty() { + let run_array = make_run_array::(&[]); + let typed_array = run_array.downcast::().unwrap(); + + let result = min_array::(typed_array); + assert_eq!(result, None); + + let result = max_array::(typed_array); + assert_eq!(result, None); + } + + #[test] + fn test_ree_min_max_array_float() { + let run_array = make_run_array::(&[5.5, 5.5, 2.1, 8.9, 8.9]); + let typed_array = run_array.downcast::().unwrap(); + + let result = min_array::(typed_array); + assert_eq!(result, Some(2.1)); + + let result = max_array::(typed_array); + assert_eq!(result, Some(8.9)); + } + + #[test] + fn test_ree_min_max_array_with_nulls() { + let run_array = make_run_array::(&[None, Some(10)]); + let typed_array = run_array.downcast::().unwrap(); + + let result = min_array::(typed_array); + assert_eq!(result, Some(10)); + + let result = max_array::(typed_array); + assert_eq!(result, Some(10)); + } + + #[test] + fn test_ree_min_max_array_sliced() { + let run_array = make_run_array::(&[0, 30, 30, 10, 20, 20, 100]); + // Skip 1 value at the start and 1 at the end. + let sliced = run_array.slice(1, 5); + let typed_array = sliced.downcast::().unwrap(); + + let result = min_array::(typed_array); + assert_eq!(result, Some(10)); + + let result = max_array::(typed_array); + assert_eq!(result, Some(30)); + } + + #[test] + fn test_ree_min_max_array_sliced_mid_run() { + let run_array = make_run_array::(&[0, 0, 30, 10, 20, 100, 100]); + // Skip 1 value at the start and 1 at the end. + let sliced = run_array.slice(1, 5); + let typed_array = sliced.downcast::().unwrap(); + + let result = min_array::(typed_array); + assert_eq!(result, Some(0)); + + let result = max_array::(typed_array); + assert_eq!(result, Some(100)); + } } From 4fcf374d0a3ed08257032785dc1e88bfa7252d3c Mon Sep 17 00:00:00 2001 From: Bruno Cauet Date: Sat, 14 Feb 2026 15:38:40 +0100 Subject: [PATCH 2/2] Clean up, address comments --- arrow-arith/src/aggregate.rs | 128 ++++++++++------------------------- 1 file changed, 37 insertions(+), 91 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 7c7a9832a17f..aee5163389c1 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -632,18 +632,11 @@ mod ree { use arrow_schema::ArrowError; /// Downcasts an array to a TypedRunArray. - // Once specialization gets stabilized, this method can be templated over the source - // array type and can directly pick up the type of the child values array. fn downcast<'a, I: RunEndIndexType, V: ArrowNumericType>( array: &'a dyn Array, ) -> Option>> { let array = array.as_run_opt::()?; - // This fails if the values child array is not the PrimitiveArray. That is okay as: - // * BooleanArray & StringArray are not primitive, but their Item is not - // ArrowNumericType, so they are not in scope for this function. - // * Having the values child array be either dict-encoded, or run-end-encoded is unlikely. - // Note however that the Arrow specification does not forbid using an exotic type as the - // values child array. + // We only support RunArray wrapping primitive types. array.downcast::>() } @@ -651,36 +644,24 @@ mod ree { pub(super) fn sum_wrapping( array: &dyn Array, ) -> Option { - if array.null_count() == array.len() { - return None; - } - let ree = downcast::(array)?; - - let sum = fold(ree, |acc, val, len| -> Result { + let Ok(sum) = fold(ree, |acc, val, len| -> Result { + println!("Adding {:?}x{} to {:?}", val, len, acc); Ok(acc.add_wrapping(val.mul_wrapping(V::Native::usize_as(len)))) - }) - // Safety: error type is Infallible. - .unwrap(); - - Some(sum) + }); + sum } /// Computes the sum (erroring on overflow) of the array values. pub(super) fn sum_checked( array: &dyn Array, ) -> Result, ArrowError> { - if array.null_count() == array.len() { - return Ok(None); - } - let Some(ree) = downcast::(array) else { return Err(ArrowError::InvalidArgumentError( "Input array is not a TypedRunArray<'_, _, PrimitiveArray".to_string(), )); }; - - let sum = fold(ree, |acc, val, len| -> Result { + fold(ree, |acc, val, len| -> Result { let Some(len) = V::Native::from_usize(len) else { return Err(ArrowError::ArithmeticOverflow(format!( "Cannot convert a run-end index ({:?}) to the value type ({})", @@ -689,48 +670,41 @@ mod ree { ))); }; acc.add_checked(val.mul_checked(len)?) - }); - - sum.map(Some) + }) } /// Folds over the values in a run-end-encoded array. fn fold<'a, I: RunEndIndexType, V: ArrowNumericType, F, E>( array: TypedRunArray<'a, I, PrimitiveArray>, mut f: F, - ) -> Result + ) -> Result, E> where F: FnMut(V::Native, V::Native, usize) -> Result, { let run_ends = array.run_ends(); - let logical_start = run_ends.offset(); let logical_end = run_ends.offset() + run_ends.len(); + let run_ends = run_ends.sliced_values(); - // Beware: the computed physical range is incorrect for empty arrays. Bail out if that - // happens. We don't need to, as the calling functions already handle that case. But - // ignoring it here is a recipe for disaster in the future. - if array.is_empty() { - return Ok(V::Native::ZERO); - } - - let physical_range = - run_ends.get_start_physical_index()..run_ends.get_end_physical_index() + 1; - let physical_run_ends = &run_ends.values()[physical_range.clone()]; - let physical_values = { - let values_array = array.values(); - let values_data = values_array.values(); - &values_data[physical_range] - }; + let values_slice = array.run_array().values_slice(); + let values = values_slice + .as_any() + .downcast_ref::>() + // Safety: we know the values array is PrimitiveArray. + .unwrap(); - let mut prev_end = logical_start; + let mut prev_end = 0; let mut acc = V::Native::ZERO; + let mut has_non_null_value = false; - for (run_end, val) in physical_run_ends.iter().zip(physical_values) { - let current_run_end = run_end.as_usize().max(logical_start).min(logical_end); + for (run_end, value) in run_ends.zip(values) { + let current_run_end = run_end.as_usize().clamp(logical_start, logical_end); let run_length = current_run_end - prev_end; - acc = f(acc, *val, run_length)?; + if let Some(value) = value { + has_non_null_value = true; + acc = f(acc, value, run_length)?; + } prev_end = current_run_end; if current_run_end == logical_end { @@ -738,7 +712,7 @@ mod ree { } } - Ok(acc) + Ok(if has_non_null_value { Some(acc) } else { None }) } } @@ -776,45 +750,17 @@ where DataType::Dictionary(_, _) => min_max_helper::(array, cmp), DataType::RunEndEncoded(run_ends, _) => { // We can directly perform min/max on the values child array, as any - // run must have non-zero length. We just need take care of the logical offset & len. - fn values_and_boundaries( - array: &dyn Array, - ) -> Option<(&ArrayRef, Option>)> { - let array = array.as_run_opt::()?; - // If the array is empty, start & end physical indices will be 0. Which is - // incorrect: array[0] does not exist. - let range = if array.len() == 0 { - None - } else { - Some(array.get_start_physical_index()..array.get_end_physical_index() + 1) - }; - Some((array.values(), range)) - } - let (values, range) = match run_ends.data_type() { - DataType::Int16 => values_and_boundaries::(&array)?, - DataType::Int32 => values_and_boundaries::(&array)?, - DataType::Int64 => values_and_boundaries::(&array)?, + // run must have non-zero length. + let array: &dyn Array = &array; + let values = match run_ends.data_type() { + DataType::Int16 => array.as_run_opt::()?.values_slice(), + DataType::Int32 => array.as_run_opt::()?.values_slice(), + DataType::Int64 => array.as_run_opt::()?.values_slice(), _ => return None, }; - - // We will fail here if the values child array is not the PrimitiveArray. That - // is okay as: - // * BooleanArray & StringArray are not primitive, but their Item is not - // ArrowNumericType, so they are not in scope for this function. - // * Having the values child array be either dict-encoded, or run-end-encoded does not - // make sense. Nor does using a custom array type. - // Note however that the Apache specification does not forbid using an exotic type as - // the values child array. - // The type parameter `A` is a TypedRunArray<'_, RunEndIndexType, ValuesArrayType>. - // Once specialization gets stabilized, this implementation can be changed to - // directly pick up `ValuesArrayType`. + // We only support RunArray wrapping primitive types. let values = values.as_any().downcast_ref::>()?; - - if let Some(std::ops::Range { start, end }) = range { - m(&values.slice(start, end - start)) - } else { - m(values) - } + m(values) } _ => m(as_primitive_array(&array)), } @@ -1985,10 +1931,10 @@ mod tests { let typed_array = run_array.downcast::().unwrap(); let result = sum_array::(typed_array); - assert_eq!(result, Some(0)); + assert_eq!(result, None); let result = sum_array_checked::(typed_array).unwrap(); - assert_eq!(result, Some(0)); + assert_eq!(result, None); } #[test] @@ -2006,9 +1952,9 @@ mod tests { #[test] fn test_ree_sum_array_sliced() { - let run_array = make_run_array::(&[10, 10, 10, 20, 30, 30, 30]); - // Skip 1 value at the start and 1 at the end. - let sliced = run_array.slice(1, 5); + let run_array = make_run_array::(&[0, 10, 10, 10, 20, 30, 30, 30]); + // Skip 2 values at the start and 1 at the end. + let sliced = run_array.slice(2, 5); let typed_array = sliced.downcast::().unwrap(); let result = sum_array::(typed_array);