Skip to content

Commit

Permalink
Added compression options/levels for GZIP and BROTLI codecs. (#132)
Browse files Browse the repository at this point in the history
* Sets the default GZIP compression == 6, as this is the default within miniz_oxide.
* brotli quality is kept the same, as no default was listed with the brotli rust crate.

This is a breaking API change, however it is an extension of the previous breaking change to zstd levels.

Co-authored-by: Ryan Jennings <[email protected]>
  • Loading branch information
TurnOfACard and Ryan Jennings authored Apr 28, 2022
1 parent 95826dd commit 3256d28
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 35 deletions.
59 changes: 44 additions & 15 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Functionality to compress and decompress data according to the parquet specification
pub use super::parquet_bridge::{Compression, CompressionOptions, ZstdLevel};
pub use super::parquet_bridge::{
BrotliLevel, Compression, CompressionLevel, CompressionOptions, GzipLevel, ZstdLevel,
};

use crate::error::{Error, Result};

Expand All @@ -14,35 +16,36 @@ pub fn compress(
) -> Result<()> {
match compression {
#[cfg(feature = "brotli")]
CompressionOptions::Brotli => {
CompressionOptions::Brotli(level) => {
use std::io::Write;
const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9
const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22

let q = level.unwrap_or_default();
let mut encoder = brotli::CompressorWriter::new(
output_buf,
BROTLI_DEFAULT_BUFFER_SIZE,
BROTLI_DEFAULT_COMPRESSION_QUALITY,
q.compression_level(),
BROTLI_DEFAULT_LG_WINDOW_SIZE,
);
encoder.write_all(input_buf)?;
encoder.flush().map_err(|e| e.into())
}
#[cfg(not(feature = "brotli"))]
CompressionOptions::Brotli => Err(Error::FeatureNotActive(
CompressionOptions::Brotli(_) => Err(Error::FeatureNotActive(
crate::error::Feature::Brotli,
"compress to brotli".to_string(),
)),
#[cfg(feature = "gzip")]
CompressionOptions::Gzip => {
CompressionOptions::Gzip(level) => {
use std::io::Write;
let mut encoder = flate2::write::GzEncoder::new(output_buf, Default::default());
let level = level.unwrap_or_default();
let mut encoder = flate2::write::GzEncoder::new(output_buf, level.into());
encoder.write_all(input_buf)?;
encoder.try_finish().map_err(|e| e.into())
}
#[cfg(not(feature = "gzip"))]
CompressionOptions::Gzip => Err(Error::FeatureNotActive(
CompressionOptions::Gzip(_) => Err(Error::FeatureNotActive(
crate::error::Feature::Gzip,
"compress to gzip".to_string(),
)),
Expand Down Expand Up @@ -95,9 +98,7 @@ pub fn compress(
#[cfg(feature = "zstd")]
CompressionOptions::Zstd(level) => {
use std::io::Write;
let level = level
.map(|v| v.compression_level())
.unwrap_or(zstd::DEFAULT_COMPRESSION_LEVEL);
let level = level.map(|v| v.compression_level()).unwrap_or_default();

let mut encoder = zstd::Encoder::new(output_buf, level)?;
encoder.write_all(input_buf)?;
Expand Down Expand Up @@ -235,13 +236,41 @@ mod tests {
}

#[test]
fn test_codec_gzip() {
test_codec(CompressionOptions::Gzip);
fn test_codec_gzip_default() {
test_codec(CompressionOptions::Gzip(None));
}

#[test]
fn test_codec_brotli() {
test_codec(CompressionOptions::Brotli);
fn test_codec_gzip_low_compression() {
test_codec(CompressionOptions::Gzip(Some(
GzipLevel::try_new(1).unwrap(),
)));
}

#[test]
fn test_codec_gzip_high_compression() {
test_codec(CompressionOptions::Gzip(Some(
GzipLevel::try_new(10).unwrap(),
)));
}

#[test]
fn test_codec_brotli_default() {
test_codec(CompressionOptions::Brotli(None));
}

#[test]
fn test_codec_brotli_low_compression() {
test_codec(CompressionOptions::Brotli(Some(
BrotliLevel::try_new(1).unwrap(),
)));
}

#[test]
fn test_codec_brotli_high_compression() {
test_codec(CompressionOptions::Brotli(Some(
BrotliLevel::try_new(11).unwrap(),
)));
}

#[test]
Expand Down
129 changes: 111 additions & 18 deletions src/parquet_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ impl From<Compression> for CompressionCodec {
}

/// Defines the compression settings for writing a parquet file.
///
/// If None is provided as a compression setting, then the default compression level is used.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub enum CompressionOptions {
Uncompressed,
Snappy,
Gzip,
Gzip(Option<GzipLevel>),
Lzo,
Brotli,
Brotli(Option<BrotliLevel>),
Lz4,
Zstd(Option<ZstdLevel>),
Lz4Raw,
Expand All @@ -114,9 +116,9 @@ impl From<CompressionOptions> for Compression {
match value {
CompressionOptions::Uncompressed => Compression::Uncompressed,
CompressionOptions::Snappy => Compression::Snappy,
CompressionOptions::Gzip => Compression::Gzip,
CompressionOptions::Gzip(_) => Compression::Gzip,
CompressionOptions::Lzo => Compression::Lzo,
CompressionOptions::Brotli => Compression::Brotli,
CompressionOptions::Brotli(_) => Compression::Brotli,
CompressionOptions::Lz4 => Compression::Lz4,
CompressionOptions::Zstd(_) => Compression::Zstd,
CompressionOptions::Lz4Raw => Compression::Lz4Raw,
Expand All @@ -129,29 +131,26 @@ impl From<CompressionOptions> for CompressionCodec {
match codec {
CompressionOptions::Uncompressed => CompressionCodec::UNCOMPRESSED,
CompressionOptions::Snappy => CompressionCodec::SNAPPY,
CompressionOptions::Gzip => CompressionCodec::GZIP,
CompressionOptions::Gzip(_) => CompressionCodec::GZIP,
CompressionOptions::Lzo => CompressionCodec::LZO,
CompressionOptions::Brotli => CompressionCodec::BROTLI,
CompressionOptions::Brotli(_) => CompressionCodec::BROTLI,
CompressionOptions::Lz4 => CompressionCodec::LZ4,
CompressionOptions::Zstd(_) => CompressionCodec::ZSTD,
CompressionOptions::Lz4Raw => CompressionCodec::LZ4_RAW,
}
}
}

/// Represents a valid zstd compression level.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct ZstdLevel(i32);
/// Defines valid compression levels.
pub trait CompressionLevel<T: std::fmt::Display + std::cmp::PartialOrd> {
const MINIMUM_LEVEL: T;
const MAXIMUM_LEVEL: T;

impl ZstdLevel {
/// Attempts to create a zstd compression level from a given compression level.
///
/// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`])
#[cfg(feature = "zstd")]
pub fn try_new(level: i32) -> Result<Self, Error> {
let compression_range = zstd::compression_level_range();
/// Tests if the provided compression level is valid.
fn is_valid_level(level: T) -> Result<(), Error> {
let compression_range = Self::MINIMUM_LEVEL..=Self::MAXIMUM_LEVEL;
if compression_range.contains(&level) {
Ok(Self(level))
Ok(())
} else {
Err(Error::General(format!(
"valid compression range {}..={} exceeded.",
Expand All @@ -160,8 +159,102 @@ impl ZstdLevel {
)))
}
}
}

/// Represents a valid brotli compression level.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct BrotliLevel(u32);

impl Default for BrotliLevel {
fn default() -> Self {
Self(1)
}
}

impl CompressionLevel<u32> for BrotliLevel {
const MINIMUM_LEVEL: u32 = 0;
const MAXIMUM_LEVEL: u32 = 11;
}

impl BrotliLevel {
/// Attempts to create a brotli compression level.
///
/// Compression levels must be valid.
pub fn try_new(level: u32) -> Result<Self, Error> {
Self::is_valid_level(level).map(|_| Self(level))
}

/// Returns the compression level.
pub fn compression_level(&self) -> u32 {
self.0
}
}

#[cfg(feature = "brotli")]
impl From<BrotliLevel> for flate2::Compression {
fn from(level: BrotliLevel) -> Self {
Self::new(level.compression_level() as u32)
}
}

/// Represents a valid gzip compression level.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct GzipLevel(u8);

impl Default for GzipLevel {
fn default() -> Self {
// The default as of miniz_oxide 0.5.1 is 6 for compression level
// (miniz_oxide::deflate::CompressionLevel::DefaultLevel)
Self(6)
}
}

impl CompressionLevel<u8> for GzipLevel {
const MINIMUM_LEVEL: u8 = 0;
const MAXIMUM_LEVEL: u8 = 10;
}

impl GzipLevel {
/// Attempts to create a gzip compression level.
///
/// Compression levels must be valid (i.e. be acceptable for [`flate2::Compression`]).
pub fn try_new(level: u8) -> Result<Self, Error> {
Self::is_valid_level(level).map(|_| Self(level))
}

/// Returns the compression level.
pub fn compression_level(&self) -> u8 {
self.0
}
}

#[cfg(feature = "gzip")]
impl From<GzipLevel> for flate2::Compression {
fn from(level: GzipLevel) -> Self {
Self::new(level.compression_level() as u32)
}
}

/// Represents a valid zstd compression level.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct ZstdLevel(i32);

impl CompressionLevel<i32> for ZstdLevel {
// zstd binds to C, and hence zstd::compression_level_range() is not const as this calls the
// underlying C library.
const MINIMUM_LEVEL: i32 = 1;
const MAXIMUM_LEVEL: i32 = 22;
}

impl ZstdLevel {
/// Attempts to create a zstd compression level from a given compression level.
///
/// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`]).
pub fn try_new(level: i32) -> Result<Self, Error> {
Self::is_valid_level(level).map(|_| Self(level))
}

/// Returns the zstd compression level.
/// Returns the compression level.
pub fn compression_level(&self) -> i32 {
self.0
}
Expand Down
7 changes: 5 additions & 2 deletions tests/it/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod primitive;
use std::io::{Cursor, Read, Seek};
use std::sync::Arc;

use parquet2::compression::CompressionOptions;
use parquet2::compression::{BrotliLevel, CompressionOptions};
use parquet2::error::Result;
use parquet2::metadata::SchemaDescriptor;
use parquet2::read::read_metadata;
Expand Down Expand Up @@ -128,7 +128,10 @@ fn int32_lz4_short_i32_array() -> Result<()> {

#[test]
fn int32_brotli() -> Result<()> {
test_column("id", CompressionOptions::Brotli)
test_column(
"id",
CompressionOptions::Brotli(Some(BrotliLevel::default())),
)
}

#[test]
Expand Down

0 comments on commit 3256d28

Please sign in to comment.