diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 417c0112759a..b49e7a899d51 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -366,7 +366,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { page_writer: Box, ) -> Self { let codec = props.compression(descr.path()); - let codec_options = CodecOptionsBuilder::default().build(); + let codec_options = CodecOptionsBuilder::default() + .set_zstd_window_log_override(props.zstd_window_log_override(descr.path())) + .build(); let compressor = create_codec(codec, &codec_options).unwrap(); let encoder = E::try_new(&descr, props.as_ref()).unwrap(); diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index 530838955c0f..0e1cc42eb8b0 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -79,6 +79,10 @@ pub trait Codec: Send { pub struct CodecOptions { /// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP. backward_compatible_lz4: bool, + /// Optional override for the zstd window log size, which is normally derived + /// from the compression level by the zstd library (e.g. 21 = 2MB at levels + /// 3-8, 22 = 4MB at levels 9-16, up to 27 = 128MB at level 22). + zstd_window_log_override: Option, } impl Default for CodecOptions { @@ -90,12 +94,17 @@ impl Default for CodecOptions { pub struct CodecOptionsBuilder { /// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP. backward_compatible_lz4: bool, + /// Optional override for the zstd window log size, which is normally derived + /// from the compression level by the zstd library (e.g. 21 = 2MB at levels + /// 3-8, 22 = 4MB at levels 9-16, up to 27 = 128MB at level 22). + zstd_window_log_override: Option, } impl Default for CodecOptionsBuilder { fn default() -> Self { Self { backward_compatible_lz4: true, + zstd_window_log_override: None, } } } @@ -114,9 +123,18 @@ impl CodecOptionsBuilder { self } + /// Overrides the zstd window log size that would normally be derived from + /// the compression level (e.g. 27 = 128MB window). + /// Pass `None` to use the zstd default for the given compression level. + pub fn set_zstd_window_log_override(mut self, value: Option) -> CodecOptionsBuilder { + self.zstd_window_log_override = value; + self + } + pub fn build(self) -> CodecOptions { CodecOptions { backward_compatible_lz4: self.backward_compatible_lz4, + zstd_window_log_override: self.zstd_window_log_override, } } } @@ -179,7 +197,10 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result { #[cfg(any(feature = "zstd", test))] - return Ok(Some(Box::new(ZSTDCodec::new(level)))); + return Ok(Some(Box::new(ZSTDCodec::new( + level, + _options.zstd_window_log_override, + )))); Err(ParquetError::General( "Disabled feature at compile time: zstd".into(), )) @@ -511,12 +532,13 @@ mod zstd_codec { /// Codec for Zstandard compression algorithm. pub struct ZSTDCodec { level: ZstdLevel, + window_log: Option, } impl ZSTDCodec { /// Creates new Zstandard compression codec. - pub(crate) fn new(level: ZstdLevel) -> Self { - Self { level } + pub(crate) fn new(level: ZstdLevel, window_log: Option) -> Self { + Self { level, window_log } } } @@ -536,6 +558,9 @@ mod zstd_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let mut encoder = zstd::Encoder::new(output_buf, self.level.0)?; + if let Some(wl) = self.window_log { + encoder.window_log(wl)?; + } encoder.write_all(input_buf)?; match encoder.finish() { Ok(_) => Ok(()), @@ -917,6 +942,25 @@ mod tests { } } + #[test] + fn test_codec_zstd_with_window_log() { + let level = ZstdLevel::try_new(3).unwrap(); + let options = CodecOptionsBuilder::default() + .set_zstd_window_log_override(Some(27)) + .build(); + let mut codec = create_codec(CodecType::ZSTD(level), &options) + .unwrap() + .unwrap(); + let data = b"hello world hello world hello world"; + let mut compressed = Vec::new(); + codec.compress(data, &mut compressed).unwrap(); + let mut decompressed = Vec::new(); + codec + .decompress(&compressed, &mut decompressed, Some(data.len())) + .unwrap(); + assert_eq!(&decompressed, data); + } + #[test] fn test_codec_lz4_raw() { test_codec_with_size(CodecType::LZ4_RAW); diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 38a5a804c0b7..a9227c7e89dd 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -378,6 +378,14 @@ impl WriterProperties { .unwrap_or(DEFAULT_COMPRESSION) } + /// Returns the optional zstd window log override for a column. + pub(crate) fn zstd_window_log_override(&self, col: &ColumnPath) -> Option { + self.column_properties + .get(col) + .and_then(|c| c.zstd_window_log_override()) + .or_else(|| self.default_column_properties.zstd_window_log_override()) + } + /// Returns `true` if dictionary encoding is enabled for a column. /// /// For more details see [`WriterPropertiesBuilder::set_dictionary_enabled`] @@ -742,6 +750,16 @@ impl WriterPropertiesBuilder { self } + /// Overrides the zstd window log size for all columns. The window log is + /// normally derived from the compression level by the zstd library + /// (e.g. 21 = 2MB at levels 3-8, up to 27 = 128MB at level 22). + /// Only applies when using [`Compression::ZSTD`]. + pub fn set_zstd_window_log_override(mut self, value: u32) -> Self { + self.default_column_properties + .set_zstd_window_log_override(value); + self + } + /// Sets default flag to enable/disable dictionary encoding for all columns (defaults to `true` /// via [`DEFAULT_DICTIONARY_ENABLED`]). /// @@ -873,6 +891,14 @@ impl WriterPropertiesBuilder { self } + /// Overrides the zstd window log size for a specific column. + /// + /// Takes precedence over [`Self::set_zstd_window_log_override`]. + pub fn set_column_zstd_window_log_override(mut self, col: ColumnPath, value: u32) -> Self { + self.get_mut_props(col).set_zstd_window_log_override(value); + self + } + /// Sets compression codec for a specific column. /// /// Takes precedence over [`Self::set_compression`]. @@ -1071,6 +1097,7 @@ struct ColumnProperties { write_page_header_statistics: Option, /// bloom filter related properties bloom_filter_properties: Option, + zstd_window_log_override: Option, } impl ColumnProperties { @@ -1095,6 +1122,11 @@ impl ColumnProperties { self.codec = Some(value); } + /// Sets the zstd window log override for this column. + fn set_zstd_window_log_override(&mut self, value: u32) { + self.zstd_window_log_override = Some(value); + } + /// Sets whether dictionary encoding is enabled for this column. fn set_dictionary_enabled(&mut self, enabled: bool) { self.dictionary_enabled = Some(enabled); @@ -1191,6 +1223,11 @@ impl ColumnProperties { fn bloom_filter_properties(&self) -> Option<&BloomFilterProperties> { self.bloom_filter_properties.as_ref() } + + /// Returns the optional zstd window log override for this column. + fn zstd_window_log_override(&self) -> Option { + self.zstd_window_log_override + } } /// Reference counted reader properties.