Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 58 additions & 8 deletions rust/sleeper_core/src/datafusion/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use datafusion::{
DFSchema, plan_err,
tree_node::{Transformed, TreeNode, TreeNodeRecursion},
},
config::ExecutionOptions,
dataframe::DataFrame,
error::DataFusionError,
execution::{SessionStateBuilder, context::SessionContext},
Expand All @@ -45,13 +44,21 @@ use crate::datafusion::{cast_udf::CastUDF, metrics::RowCounts};
/// try to match this, but may not get there exactly.
pub const MAX_PART_COUNT: u64 = 5000;
/// The fraction of total file size to estimate for Parquet metadata. This errs
/// on the larger side. We will bound this between a minimum of 512KiB and maximum
/// of 10MiB.
/// on the larger side. We will bound this between a minimum of [`MIN_METADATA_SIZE_HINT`] and maximum
/// of [`MAX_METADATA_SIZE_HINT`].
pub const META_DATA_SIZE_FRACTION: f64 = 0.0001;
const _: () = assert!(
0f64 <= META_DATA_SIZE_FRACTION && META_DATA_SIZE_FRACTION <= 1f64,
"META_DATA_SIZE_FRACTION out of range"
);
/// Minimum size (bytes) for the Parquet metadata size hint. This controls how much of the end of a Parquet file `DataFusion`
/// will fetch when trying to load metadata.
pub const MIN_METADATA_SIZE_HINT: u64 = 512 * 1024;
/// Maximum size (bytes) for the Parquet metadata size hint. This controls how much of the end of a Parquet file `DataFusion`
/// will fetch when trying to load metadata.
pub const MAX_METADATA_SIZE_HINT: u64 = 10 * 1024 * 1024;
/// Minimum upload size (bytes) for multipart PUT requests from `ObjectStore`.
pub const MIN_PUT_SIZE: usize = 32 * 1024 * 1024;

/// Write explanation of logical query plan to log output.
///
Expand All @@ -78,7 +85,7 @@ pub async fn explain_plan(frame: &DataFrame) -> Result<(), DataFusionError> {
/// too many small parts. We conseratively set the upload size so that fewer, larger uploads are created.
pub fn calculate_upload_size(total_input_size: u64) -> Result<usize, DataFusionError> {
let upload_size = std::cmp::max(
ExecutionOptions::default().objectstore_writer_buffer_size,
MIN_PUT_SIZE,
usize::try_from(total_input_size / MAX_PART_COUNT)
.map_err(|e| DataFusionError::External(Box::new(e)))?,
);
Expand All @@ -90,14 +97,15 @@ pub fn calculate_upload_size(total_input_size: u64) -> Result<usize, DataFusionE
}

/// Calculate the metadata size hint to use based on the largest file size. This value will be clamped
/// between 512KiB and 10MiB.
/// between [`MIN_METADATA_SIZE_HINT`] and [`MAX_METADATA_SIZE_HINT`].
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
pub fn calculate_metadata_size_hint(largest_file: u64) -> u64 {
((largest_file as f64 * META_DATA_SIZE_FRACTION) as u64).clamp(512 * 1024, 10 * 1024 * 1024)
((largest_file as f64 * META_DATA_SIZE_FRACTION) as u64)
.clamp(MIN_METADATA_SIZE_HINT, MAX_METADATA_SIZE_HINT)
}

/// Checks if a physical plan contains a `SortExec` stage.
Expand Down Expand Up @@ -323,8 +331,8 @@ pub fn add_numeric_casts(
#[cfg(test)]
mod tests {
use crate::datafusion::util::{
add_numeric_casts, apply_full_sort_ordering, calculate_metadata_size_hint,
remove_coalesce_physical_stage,
MIN_PUT_SIZE, add_numeric_casts, apply_full_sort_ordering, calculate_metadata_size_hint,
calculate_upload_size, remove_coalesce_physical_stage,
};
use arrow::{
array::RecordBatch,
Expand Down Expand Up @@ -635,4 +643,46 @@ mod tests {
// Then
assert_eq!(metadata_size, 10 * 1024 * 1024);
}

#[test]
fn should_return_minimum_upload_size_with_zero() -> Result<(), Error> {
// Given
let input_size = 0;

// When
let upload_size = calculate_upload_size(input_size)?;

// Then
assert_eq!(upload_size, MIN_PUT_SIZE);

Ok(())
}

#[test]
fn should_return_minimum_upload_size_with_10_g() -> Result<(), Error> {
// Given
let input_size = 10 * 1024 * 1024 * 1024; //10GiB

// When
let upload_size = calculate_upload_size(input_size)?;

// Then
assert_eq!(upload_size, MIN_PUT_SIZE);

Ok(())
}

#[test]
fn should_return_scaled_upload_size_with_300_g() -> Result<(), Error> {
// Given
let input_size = 300 * 1024 * 1024 * 1024; //300GiB

// When
let upload_size = calculate_upload_size(input_size)?;

// Then
assert_eq!(upload_size, 64_424_509);

Ok(())
}
}
Loading