Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added compression options/levels for GZIP and BROTLI codecs. #132

Merged
merged 1 commit into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Functionality to compress and decompress data according to the parquet specification
pub use super::parquet_bridge::{Compression, CompressionOptions, ZstdLevel};
pub use super::parquet_bridge::{
BrotliLevel, Compression, CompressionLevel, CompressionOptions, GzipLevel, ZstdLevel,
};

use crate::error::{Error, Result};

Expand All @@ -14,35 +16,36 @@ pub fn compress(
) -> Result<()> {
match compression {
#[cfg(feature = "brotli")]
CompressionOptions::Brotli => {
CompressionOptions::Brotli(level) => {
use std::io::Write;
const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9
const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22

let q = level.unwrap_or_default();
let mut encoder = brotli::CompressorWriter::new(
output_buf,
BROTLI_DEFAULT_BUFFER_SIZE,
BROTLI_DEFAULT_COMPRESSION_QUALITY,
q.compression_level(),
BROTLI_DEFAULT_LG_WINDOW_SIZE,
);
encoder.write_all(input_buf)?;
encoder.flush().map_err(|e| e.into())
}
#[cfg(not(feature = "brotli"))]
CompressionOptions::Brotli => Err(Error::FeatureNotActive(
CompressionOptions::Brotli(_) => Err(Error::FeatureNotActive(
crate::error::Feature::Brotli,
"compress to brotli".to_string(),
)),
#[cfg(feature = "gzip")]
CompressionOptions::Gzip => {
CompressionOptions::Gzip(level) => {
use std::io::Write;
let mut encoder = flate2::write::GzEncoder::new(output_buf, Default::default());
let level = level.unwrap_or_default();
let mut encoder = flate2::write::GzEncoder::new(output_buf, level.into());
encoder.write_all(input_buf)?;
encoder.try_finish().map_err(|e| e.into())
}
#[cfg(not(feature = "gzip"))]
CompressionOptions::Gzip => Err(Error::FeatureNotActive(
CompressionOptions::Gzip(_) => Err(Error::FeatureNotActive(
crate::error::Feature::Gzip,
"compress to gzip".to_string(),
)),
Expand Down Expand Up @@ -95,9 +98,7 @@ pub fn compress(
#[cfg(feature = "zstd")]
CompressionOptions::Zstd(level) => {
use std::io::Write;
let level = level
.map(|v| v.compression_level())
.unwrap_or(zstd::DEFAULT_COMPRESSION_LEVEL);
let level = level.map(|v| v.compression_level()).unwrap_or_default();

let mut encoder = zstd::Encoder::new(output_buf, level)?;
encoder.write_all(input_buf)?;
Expand Down Expand Up @@ -235,13 +236,41 @@ mod tests {
}

#[test]
fn test_codec_gzip() {
test_codec(CompressionOptions::Gzip);
fn test_codec_gzip_default() {
test_codec(CompressionOptions::Gzip(None));
}

#[test]
fn test_codec_brotli() {
test_codec(CompressionOptions::Brotli);
fn test_codec_gzip_low_compression() {
test_codec(CompressionOptions::Gzip(Some(
GzipLevel::try_new(1).unwrap(),
)));
}

#[test]
fn test_codec_gzip_high_compression() {
test_codec(CompressionOptions::Gzip(Some(
GzipLevel::try_new(10).unwrap(),
)));
}

#[test]
fn test_codec_brotli_default() {
test_codec(CompressionOptions::Brotli(None));
}

#[test]
fn test_codec_brotli_low_compression() {
test_codec(CompressionOptions::Brotli(Some(
BrotliLevel::try_new(1).unwrap(),
)));
}

#[test]
fn test_codec_brotli_high_compression() {
test_codec(CompressionOptions::Brotli(Some(
BrotliLevel::try_new(11).unwrap(),
)));
}

#[test]
Expand Down
129 changes: 111 additions & 18 deletions src/parquet_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ impl From<Compression> for CompressionCodec {
}

/// Defines the compression settings for writing a parquet file.
///
/// If None is provided as a compression setting, then the default compression level is used.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub enum CompressionOptions {
Uncompressed,
Snappy,
Gzip,
Gzip(Option<GzipLevel>),
Lzo,
Brotli,
Brotli(Option<BrotliLevel>),
Lz4,
Zstd(Option<ZstdLevel>),
Lz4Raw,
Expand All @@ -114,9 +116,9 @@ impl From<CompressionOptions> for Compression {
match value {
CompressionOptions::Uncompressed => Compression::Uncompressed,
CompressionOptions::Snappy => Compression::Snappy,
CompressionOptions::Gzip => Compression::Gzip,
CompressionOptions::Gzip(_) => Compression::Gzip,
CompressionOptions::Lzo => Compression::Lzo,
CompressionOptions::Brotli => Compression::Brotli,
CompressionOptions::Brotli(_) => Compression::Brotli,
CompressionOptions::Lz4 => Compression::Lz4,
CompressionOptions::Zstd(_) => Compression::Zstd,
CompressionOptions::Lz4Raw => Compression::Lz4Raw,
Expand All @@ -129,29 +131,26 @@ impl From<CompressionOptions> for CompressionCodec {
match codec {
CompressionOptions::Uncompressed => CompressionCodec::UNCOMPRESSED,
CompressionOptions::Snappy => CompressionCodec::SNAPPY,
CompressionOptions::Gzip => CompressionCodec::GZIP,
CompressionOptions::Gzip(_) => CompressionCodec::GZIP,
CompressionOptions::Lzo => CompressionCodec::LZO,
CompressionOptions::Brotli => CompressionCodec::BROTLI,
CompressionOptions::Brotli(_) => CompressionCodec::BROTLI,
CompressionOptions::Lz4 => CompressionCodec::LZ4,
CompressionOptions::Zstd(_) => CompressionCodec::ZSTD,
CompressionOptions::Lz4Raw => CompressionCodec::LZ4_RAW,
}
}
}

/// Represents a valid zstd compression level.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct ZstdLevel(i32);
/// Defines valid compression levels.
pub trait CompressionLevel<T: std::fmt::Display + std::cmp::PartialOrd> {
const MINIMUM_LEVEL: T;
const MAXIMUM_LEVEL: T;

impl ZstdLevel {
/// Attempts to create a zstd compression level from a given compression level.
///
/// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`])
#[cfg(feature = "zstd")]
pub fn try_new(level: i32) -> Result<Self, Error> {
let compression_range = zstd::compression_level_range();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this call so the trait could be used across the different compression types

/// Tests if the provided compression level is valid.
fn is_valid_level(level: T) -> Result<(), Error> {
let compression_range = Self::MINIMUM_LEVEL..=Self::MAXIMUM_LEVEL;
if compression_range.contains(&level) {
Ok(Self(level))
Ok(())
} else {
Err(Error::General(format!(
"valid compression range {}..={} exceeded.",
Expand All @@ -160,8 +159,102 @@ impl ZstdLevel {
)))
}
}
}

/// Represents a valid brotli compression level.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct BrotliLevel(u32);

impl Default for BrotliLevel {
fn default() -> Self {
Self(1)
}
}

impl CompressionLevel<u32> for BrotliLevel {
const MINIMUM_LEVEL: u32 = 0;
const MAXIMUM_LEVEL: u32 = 11;
}

impl BrotliLevel {
/// Attempts to create a brotli compression level.
///
/// Compression levels must be valid.
pub fn try_new(level: u32) -> Result<Self, Error> {
Self::is_valid_level(level).map(|_| Self(level))
}

/// Returns the compression level.
pub fn compression_level(&self) -> u32 {
self.0
}
}

#[cfg(feature = "brotli")]
impl From<BrotliLevel> for flate2::Compression {
fn from(level: BrotliLevel) -> Self {
Self::new(level.compression_level() as u32)
}
}

/// Represents a valid gzip compression level.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct GzipLevel(u8);

impl Default for GzipLevel {
fn default() -> Self {
// The default as of miniz_oxide 0.5.1 is 6 for compression level
// (miniz_oxide::deflate::CompressionLevel::DefaultLevel)
Self(6)
}
}

impl CompressionLevel<u8> for GzipLevel {
const MINIMUM_LEVEL: u8 = 0;
const MAXIMUM_LEVEL: u8 = 10;
}

impl GzipLevel {
/// Attempts to create a gzip compression level.
///
/// Compression levels must be valid (i.e. be acceptable for [`flate2::Compression`]).
pub fn try_new(level: u8) -> Result<Self, Error> {
Self::is_valid_level(level).map(|_| Self(level))
}

/// Returns the compression level.
pub fn compression_level(&self) -> u8 {
self.0
}
}

#[cfg(feature = "gzip")]
impl From<GzipLevel> for flate2::Compression {
fn from(level: GzipLevel) -> Self {
Self::new(level.compression_level() as u32)
}
}

/// Represents a valid zstd compression level.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct ZstdLevel(i32);

impl CompressionLevel<i32> for ZstdLevel {
// zstd binds to C, and hence zstd::compression_level_range() is not const as this calls the
// underlying C library.
const MINIMUM_LEVEL: i32 = 1;
const MAXIMUM_LEVEL: i32 = 22;
}

impl ZstdLevel {
/// Attempts to create a zstd compression level from a given compression level.
///
/// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`]).
pub fn try_new(level: i32) -> Result<Self, Error> {
Self::is_valid_level(level).map(|_| Self(level))
}

/// Returns the zstd compression level.
/// Returns the compression level.
pub fn compression_level(&self) -> i32 {
self.0
}
Expand Down
7 changes: 5 additions & 2 deletions tests/it/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod primitive;
use std::io::{Cursor, Read, Seek};
use std::sync::Arc;

use parquet2::compression::CompressionOptions;
use parquet2::compression::{BrotliLevel, CompressionOptions};
use parquet2::error::Result;
use parquet2::metadata::SchemaDescriptor;
use parquet2::read::read_metadata;
Expand Down Expand Up @@ -128,7 +128,10 @@ fn int32_lz4_short_i32_array() -> Result<()> {

#[test]
fn int32_brotli() -> Result<()> {
test_column("id", CompressionOptions::Brotli)
test_column(
"id",
CompressionOptions::Brotli(Some(BrotliLevel::default())),
)
}

#[test]
Expand Down