diff --git a/src/compression.rs b/src/compression.rs index 0bbceca42..402f5660f 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -1,5 +1,7 @@ //! Functionality to compress and decompress data according to the parquet specification -pub use super::parquet_bridge::{Compression, CompressionOptions, ZstdLevel}; +pub use super::parquet_bridge::{ + BrotliLevel, Compression, CompressionLevel, CompressionOptions, GzipLevel, ZstdLevel, +}; use crate::error::{Error, Result}; @@ -14,35 +16,36 @@ pub fn compress( ) -> Result<()> { match compression { #[cfg(feature = "brotli")] - CompressionOptions::Brotli => { + CompressionOptions::Brotli(level) => { use std::io::Write; const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096; - const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9 const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22 + let q = level.unwrap_or_default(); let mut encoder = brotli::CompressorWriter::new( output_buf, BROTLI_DEFAULT_BUFFER_SIZE, - BROTLI_DEFAULT_COMPRESSION_QUALITY, + q.compression_level(), BROTLI_DEFAULT_LG_WINDOW_SIZE, ); encoder.write_all(input_buf)?; encoder.flush().map_err(|e| e.into()) } #[cfg(not(feature = "brotli"))] - CompressionOptions::Brotli => Err(Error::FeatureNotActive( + CompressionOptions::Brotli(_) => Err(Error::FeatureNotActive( crate::error::Feature::Brotli, "compress to brotli".to_string(), )), #[cfg(feature = "gzip")] - CompressionOptions::Gzip => { + CompressionOptions::Gzip(level) => { use std::io::Write; - let mut encoder = flate2::write::GzEncoder::new(output_buf, Default::default()); + let level = level.unwrap_or_default(); + let mut encoder = flate2::write::GzEncoder::new(output_buf, level.into()); encoder.write_all(input_buf)?; encoder.try_finish().map_err(|e| e.into()) } #[cfg(not(feature = "gzip"))] - CompressionOptions::Gzip => Err(Error::FeatureNotActive( + CompressionOptions::Gzip(_) => Err(Error::FeatureNotActive( crate::error::Feature::Gzip, "compress to gzip".to_string(), )), @@ -95,9 +98,7 @@ pub fn compress( #[cfg(feature = "zstd")] CompressionOptions::Zstd(level) => { use std::io::Write; - let level = level - .map(|v| v.compression_level()) - .unwrap_or(zstd::DEFAULT_COMPRESSION_LEVEL); + let level = level.map(|v| v.compression_level()).unwrap_or_default(); let mut encoder = zstd::Encoder::new(output_buf, level)?; encoder.write_all(input_buf)?; @@ -235,13 +236,41 @@ mod tests { } #[test] - fn test_codec_gzip() { - test_codec(CompressionOptions::Gzip); + fn test_codec_gzip_default() { + test_codec(CompressionOptions::Gzip(None)); } #[test] - fn test_codec_brotli() { - test_codec(CompressionOptions::Brotli); + fn test_codec_gzip_low_compression() { + test_codec(CompressionOptions::Gzip(Some( + GzipLevel::try_new(1).unwrap(), + ))); + } + + #[test] + fn test_codec_gzip_high_compression() { + test_codec(CompressionOptions::Gzip(Some( + GzipLevel::try_new(10).unwrap(), + ))); + } + + #[test] + fn test_codec_brotli_default() { + test_codec(CompressionOptions::Brotli(None)); + } + + #[test] + fn test_codec_brotli_low_compression() { + test_codec(CompressionOptions::Brotli(Some( + BrotliLevel::try_new(1).unwrap(), + ))); + } + + #[test] + fn test_codec_brotli_high_compression() { + test_codec(CompressionOptions::Brotli(Some( + BrotliLevel::try_new(11).unwrap(), + ))); } #[test] diff --git a/src/parquet_bridge.rs b/src/parquet_bridge.rs index c2d7baefe..61b14acb3 100644 --- a/src/parquet_bridge.rs +++ b/src/parquet_bridge.rs @@ -97,13 +97,15 @@ impl From for CompressionCodec { } /// Defines the compression settings for writing a parquet file. +/// +/// If None is provided as a compression setting, then the default compression level is used. #[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)] pub enum CompressionOptions { Uncompressed, Snappy, - Gzip, + Gzip(Option), Lzo, - Brotli, + Brotli(Option), Lz4, Zstd(Option), Lz4Raw, @@ -114,9 +116,9 @@ impl From for Compression { match value { CompressionOptions::Uncompressed => Compression::Uncompressed, CompressionOptions::Snappy => Compression::Snappy, - CompressionOptions::Gzip => Compression::Gzip, + CompressionOptions::Gzip(_) => Compression::Gzip, CompressionOptions::Lzo => Compression::Lzo, - CompressionOptions::Brotli => Compression::Brotli, + CompressionOptions::Brotli(_) => Compression::Brotli, CompressionOptions::Lz4 => Compression::Lz4, CompressionOptions::Zstd(_) => Compression::Zstd, CompressionOptions::Lz4Raw => Compression::Lz4Raw, @@ -129,9 +131,9 @@ impl From for CompressionCodec { match codec { CompressionOptions::Uncompressed => CompressionCodec::UNCOMPRESSED, CompressionOptions::Snappy => CompressionCodec::SNAPPY, - CompressionOptions::Gzip => CompressionCodec::GZIP, + CompressionOptions::Gzip(_) => CompressionCodec::GZIP, CompressionOptions::Lzo => CompressionCodec::LZO, - CompressionOptions::Brotli => CompressionCodec::BROTLI, + CompressionOptions::Brotli(_) => CompressionCodec::BROTLI, CompressionOptions::Lz4 => CompressionCodec::LZ4, CompressionOptions::Zstd(_) => CompressionCodec::ZSTD, CompressionOptions::Lz4Raw => CompressionCodec::LZ4_RAW, @@ -139,19 +141,16 @@ impl From for CompressionCodec { } } -/// Represents a valid zstd compression level. -#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)] -pub struct ZstdLevel(i32); +/// Defines valid compression levels. +pub trait CompressionLevel { + const MINIMUM_LEVEL: T; + const MAXIMUM_LEVEL: T; -impl ZstdLevel { - /// Attempts to create a zstd compression level from a given compression level. - /// - /// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`]) - #[cfg(feature = "zstd")] - pub fn try_new(level: i32) -> Result { - let compression_range = zstd::compression_level_range(); + /// Tests if the provided compression level is valid. + fn is_valid_level(level: T) -> Result<(), Error> { + let compression_range = Self::MINIMUM_LEVEL..=Self::MAXIMUM_LEVEL; if compression_range.contains(&level) { - Ok(Self(level)) + Ok(()) } else { Err(Error::General(format!( "valid compression range {}..={} exceeded.", @@ -160,8 +159,102 @@ impl ZstdLevel { ))) } } +} + +/// Represents a valid brotli compression level. +#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)] +pub struct BrotliLevel(u32); + +impl Default for BrotliLevel { + fn default() -> Self { + Self(1) + } +} + +impl CompressionLevel for BrotliLevel { + const MINIMUM_LEVEL: u32 = 0; + const MAXIMUM_LEVEL: u32 = 11; +} + +impl BrotliLevel { + /// Attempts to create a brotli compression level. + /// + /// Compression levels must be valid. + pub fn try_new(level: u32) -> Result { + Self::is_valid_level(level).map(|_| Self(level)) + } + + /// Returns the compression level. + pub fn compression_level(&self) -> u32 { + self.0 + } +} + +#[cfg(feature = "brotli")] +impl From for flate2::Compression { + fn from(level: BrotliLevel) -> Self { + Self::new(level.compression_level() as u32) + } +} + +/// Represents a valid gzip compression level. +#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)] +pub struct GzipLevel(u8); + +impl Default for GzipLevel { + fn default() -> Self { + // The default as of miniz_oxide 0.5.1 is 6 for compression level + // (miniz_oxide::deflate::CompressionLevel::DefaultLevel) + Self(6) + } +} + +impl CompressionLevel for GzipLevel { + const MINIMUM_LEVEL: u8 = 0; + const MAXIMUM_LEVEL: u8 = 10; +} + +impl GzipLevel { + /// Attempts to create a gzip compression level. + /// + /// Compression levels must be valid (i.e. be acceptable for [`flate2::Compression`]). + pub fn try_new(level: u8) -> Result { + Self::is_valid_level(level).map(|_| Self(level)) + } + + /// Returns the compression level. + pub fn compression_level(&self) -> u8 { + self.0 + } +} + +#[cfg(feature = "gzip")] +impl From for flate2::Compression { + fn from(level: GzipLevel) -> Self { + Self::new(level.compression_level() as u32) + } +} + +/// Represents a valid zstd compression level. +#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)] +pub struct ZstdLevel(i32); + +impl CompressionLevel for ZstdLevel { + // zstd binds to C, and hence zstd::compression_level_range() is not const as this calls the + // underlying C library. + const MINIMUM_LEVEL: i32 = 1; + const MAXIMUM_LEVEL: i32 = 22; +} + +impl ZstdLevel { + /// Attempts to create a zstd compression level from a given compression level. + /// + /// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`]). + pub fn try_new(level: i32) -> Result { + Self::is_valid_level(level).map(|_| Self(level)) + } - /// Returns the zstd compression level. + /// Returns the compression level. pub fn compression_level(&self) -> i32 { self.0 } diff --git a/tests/it/write/mod.rs b/tests/it/write/mod.rs index eece0c94b..1364c44bc 100644 --- a/tests/it/write/mod.rs +++ b/tests/it/write/mod.rs @@ -5,7 +5,7 @@ mod primitive; use std::io::{Cursor, Read, Seek}; use std::sync::Arc; -use parquet2::compression::CompressionOptions; +use parquet2::compression::{BrotliLevel, CompressionOptions}; use parquet2::error::Result; use parquet2::metadata::SchemaDescriptor; use parquet2::read::read_metadata; @@ -128,7 +128,10 @@ fn int32_lz4_short_i32_array() -> Result<()> { #[test] fn int32_brotli() -> Result<()> { - test_column("id", CompressionOptions::Brotli) + test_column( + "id", + CompressionOptions::Brotli(Some(BrotliLevel::default())), + ) } #[test]