Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
page_writer: Box<dyn PageWriter + 'a>,
) -> Self {
let codec = props.compression(descr.path());
let codec_options = CodecOptionsBuilder::default().build();
let codec_options = CodecOptionsBuilder::default()
.set_zstd_window_log_override(props.zstd_window_log_override(descr.path()))
.build();
let compressor = create_codec(codec, &codec_options).unwrap();
let encoder = E::try_new(&descr, props.as_ref()).unwrap();

Expand Down
50 changes: 47 additions & 3 deletions parquet/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ pub trait Codec: Send {
pub struct CodecOptions {
/// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP.
backward_compatible_lz4: bool,
/// Optional override for the zstd window log size, which is normally derived
/// from the compression level by the zstd library (e.g. 21 = 2MB at levels
/// 3-8, 22 = 4MB at levels 9-16, up to 27 = 128MB at level 22).
zstd_window_log_override: Option<u32>,
}

impl Default for CodecOptions {
Expand All @@ -90,12 +94,17 @@ impl Default for CodecOptions {
pub struct CodecOptionsBuilder {
/// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP.
backward_compatible_lz4: bool,
/// Optional override for the zstd window log size, which is normally derived
/// from the compression level by the zstd library (e.g. 21 = 2MB at levels
/// 3-8, 22 = 4MB at levels 9-16, up to 27 = 128MB at level 22).
zstd_window_log_override: Option<u32>,
}

impl Default for CodecOptionsBuilder {
fn default() -> Self {
Self {
backward_compatible_lz4: true,
zstd_window_log_override: None,
}
}
}
Expand All @@ -114,9 +123,18 @@ impl CodecOptionsBuilder {
self
}

/// Overrides the zstd window log size that would normally be derived from
/// the compression level (e.g. 27 = 128MB window).
/// Pass `None` to use the zstd default for the given compression level.
pub fn set_zstd_window_log_override(mut self, value: Option<u32>) -> CodecOptionsBuilder {
self.zstd_window_log_override = value;
self
}

pub fn build(self) -> CodecOptions {
CodecOptions {
backward_compatible_lz4: self.backward_compatible_lz4,
zstd_window_log_override: self.zstd_window_log_override,
}
}
}
Expand Down Expand Up @@ -179,7 +197,10 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result<Option<
}
CodecType::ZSTD(level) => {
#[cfg(any(feature = "zstd", test))]
return Ok(Some(Box::new(ZSTDCodec::new(level))));
return Ok(Some(Box::new(ZSTDCodec::new(
level,
_options.zstd_window_log_override,
))));
Err(ParquetError::General(
"Disabled feature at compile time: zstd".into(),
))
Expand Down Expand Up @@ -511,12 +532,13 @@ mod zstd_codec {
/// Codec for Zstandard compression algorithm.
pub struct ZSTDCodec {
level: ZstdLevel,
window_log: Option<u32>,
}

impl ZSTDCodec {
/// Creates new Zstandard compression codec.
pub(crate) fn new(level: ZstdLevel) -> Self {
Self { level }
pub(crate) fn new(level: ZstdLevel, window_log: Option<u32>) -> Self {
Self { level, window_log }
}
}

Expand All @@ -536,6 +558,9 @@ mod zstd_codec {

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = zstd::Encoder::new(output_buf, self.level.0)?;
if let Some(wl) = self.window_log {
encoder.window_log(wl)?;
}
encoder.write_all(input_buf)?;
match encoder.finish() {
Ok(_) => Ok(()),
Expand Down Expand Up @@ -917,6 +942,25 @@ mod tests {
}
}

#[test]
fn test_codec_zstd_with_window_log() {
let level = ZstdLevel::try_new(3).unwrap();
let options = CodecOptionsBuilder::default()
.set_zstd_window_log_override(Some(27))
.build();
let mut codec = create_codec(CodecType::ZSTD(level), &options)
.unwrap()
.unwrap();
let data = b"hello world hello world hello world";
let mut compressed = Vec::new();
codec.compress(data, &mut compressed).unwrap();
let mut decompressed = Vec::new();
codec
.decompress(&compressed, &mut decompressed, Some(data.len()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test will pass on main I think without your changes (assuming the API was added)

In other words, if we accidentally disconnected the options or didn't pass it through this test would still pass and thus it doesn't prevent regressions

Maybe we can add a test that uses two different window log overrides and shows that the larger override results in smaller files?

.unwrap();
assert_eq!(&decompressed, data);
}

#[test]
fn test_codec_lz4_raw() {
test_codec_with_size(CodecType::LZ4_RAW);
Expand Down
37 changes: 37 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,14 @@ impl WriterProperties {
.unwrap_or(DEFAULT_COMPRESSION)
}

/// Returns the optional zstd window log override for a column.
pub(crate) fn zstd_window_log_override(&self, col: &ColumnPath) -> Option<u32> {
self.column_properties
.get(col)
.and_then(|c| c.zstd_window_log_override())
.or_else(|| self.default_column_properties.zstd_window_log_override())
}

/// Returns `true` if dictionary encoding is enabled for a column.
///
/// For more details see [`WriterPropertiesBuilder::set_dictionary_enabled`]
Expand Down Expand Up @@ -742,6 +750,16 @@ impl WriterPropertiesBuilder {
self
}

/// Overrides the zstd window log size for all columns. The window log is
/// normally derived from the compression level by the zstd library
/// (e.g. 21 = 2MB at levels 3-8, up to 27 = 128MB at level 22).
/// Only applies when using [`Compression::ZSTD`].
pub fn set_zstd_window_log_override(mut self, value: u32) -> Self {
self.default_column_properties
.set_zstd_window_log_override(value);
self
}

/// Sets default flag to enable/disable dictionary encoding for all columns (defaults to `true`
/// via [`DEFAULT_DICTIONARY_ENABLED`]).
///
Expand Down Expand Up @@ -873,6 +891,14 @@ impl WriterPropertiesBuilder {
self
}

/// Overrides the zstd window log size for a specific column.
///
/// Takes precedence over [`Self::set_zstd_window_log_override`].
pub fn set_column_zstd_window_log_override(mut self, col: ColumnPath, value: u32) -> Self {
self.get_mut_props(col).set_zstd_window_log_override(value);
self
}

/// Sets compression codec for a specific column.
///
/// Takes precedence over [`Self::set_compression`].
Expand Down Expand Up @@ -1071,6 +1097,7 @@ struct ColumnProperties {
write_page_header_statistics: Option<bool>,
/// bloom filter related properties
bloom_filter_properties: Option<BloomFilterProperties>,
zstd_window_log_override: Option<u32>,
}

impl ColumnProperties {
Expand All @@ -1095,6 +1122,11 @@ impl ColumnProperties {
self.codec = Some(value);
}

/// Sets the zstd window log override for this column.
fn set_zstd_window_log_override(&mut self, value: u32) {
self.zstd_window_log_override = Some(value);
}

/// Sets whether dictionary encoding is enabled for this column.
fn set_dictionary_enabled(&mut self, enabled: bool) {
self.dictionary_enabled = Some(enabled);
Expand Down Expand Up @@ -1191,6 +1223,11 @@ impl ColumnProperties {
fn bloom_filter_properties(&self) -> Option<&BloomFilterProperties> {
self.bloom_filter_properties.as_ref()
}

/// Returns the optional zstd window log override for this column.
fn zstd_window_log_override(&self) -> Option<u32> {
self.zstd_window_log_override
}
}

/// Reference counted reader properties.
Expand Down
Loading