Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for legacy lz4 decompression #151

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,20 @@ pub fn decompress(compression: Compression, input_buf: &[u8], output_buf: &mut [
crate::error::Feature::Lz4,
"decompress with lz4".to_string(),
)),

#[cfg(any(feature = "lz4_flex", feature = "lz4"))]
Compression::Lz4 => try_decompress_hadoop(input_buf, output_buf).or_else(|_| {
lz4_decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
.map(|_| {})
}
),

#[cfg(all(not(feature = "lz4_flex"), not(feature = "lz4")))]
Compression::Lz4 => Err(Error::FeatureNotActive(
crate::error::Feature::Lz4,
"decompress with legacy lz4".to_string(),
)),

#[cfg(feature = "zstd")]
Compression::Zstd => {
use std::io::Read;
Expand All @@ -209,6 +223,92 @@ pub fn decompress(compression: Compression, input_buf: &[u8], output_buf: &mut [
}
}

/// Try to decompress the buffer as if it was compressed with the Hadoop Lz4Codec.
/// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474).
/// Returns error if decompression failed.
#[cfg(any(feature = "lz4", feature = "lz4_flex"))]
fn try_decompress_hadoop(input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> {
// Parquet files written with the Hadoop Lz4Codec use their own framing.
// The input buffer can contain an arbitrary number of "frames", each
// with the following structure:
// - bytes 0..3: big-endian uint32_t representing the frame decompressed size
// - bytes 4..7: big-endian uint32_t representing the frame compressed size
// - bytes 8...: frame compressed data
//
// The Hadoop Lz4Codec source code can be found here:
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc

const SIZE_U32: usize = std::mem::size_of::<u32>();
const PREFIX_LEN: usize = SIZE_U32 * 2;
let mut input_len = input_buf.len();
let mut input = input_buf;
let mut output_len = output_buf.len();
let mut output: &mut [u8] = output_buf;
while input_len >= PREFIX_LEN {
let mut bytes = [0; SIZE_U32];
bytes.copy_from_slice(&input[0..4]);
let expected_decompressed_size = u32::from_be_bytes(bytes);
let mut bytes = [0; SIZE_U32];
bytes.copy_from_slice(&input[4..8]);
let expected_compressed_size = u32::from_be_bytes(bytes);
input = &input[PREFIX_LEN..];
input_len -= PREFIX_LEN;

if input_len < expected_compressed_size as usize {
return Err(general_err!("Not enough bytes for Hadoop frame".to_owned()));
}

if output_len < expected_decompressed_size as usize {
return Err(general_err!(
"Not enough bytes to hold advertised output".to_owned()
));
}
let decompressed_size = lz4_decompress_to_buffer(
&input[..expected_compressed_size as usize],
Some(output_len as i32),
output,
)?;
if decompressed_size != expected_decompressed_size as usize {
return Err(general_err!("unexpected decompressed size"));
}
input_len -= expected_compressed_size as usize;
output_len -= expected_decompressed_size as usize;
if input_len > expected_compressed_size as usize {
input = &input[expected_compressed_size as usize..];
output = &mut output[expected_decompressed_size as usize..];
} else {
break;
}
}
if input_len == 0 {
Ok(())
} else {
Err(general_err!("Not all input are consumed"))
}
}

#[cfg(all(feature = "lz4", not(feature = "lz4_flex")))]
#[inline]
fn lz4_decompress_to_buffer(
src: &[u8],
uncompressed_size: Option<i32>,
buffer: &mut [u8],
) -> Result<usize> {
let size = lz4::block::decompress_to_buffer(src, uncompressed_size, buffer)?;
Ok(size)
}

#[cfg(all(feature = "lz4_flex", not(feature = "lz4")))]
#[inline]
fn lz4_decompress_to_buffer(
src: &[u8],
_uncompressed_size: Option<i32>,
buffer: &mut [u8],
) -> Result<usize> {
let size = lz4_flex::block::decompress_into(src, buffer)?;
Ok(size)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
67 changes: 67 additions & 0 deletions tests/it/read/lz4_legacy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::get_path;
use crate::read::get_column;
use crate::Array;
use parquet2::error::Result;

fn verify_column_data(column: &str) -> Array {
match column {
"c0" => {
let expected = vec![1593604800, 1593604800, 1593604801, 1593604801];
let expected = expected.into_iter().map(Some).collect::<Vec<_>>();
Array::Int64(expected)
}
"c1" => {
let expected = vec!["abc", "def", "abc", "def"];
let expected = expected
.into_iter()
.map(|v| Some(v.as_bytes().to_vec()))
.collect::<Vec<_>>();
Array::Binary(expected)
}
"v11" => {
let expected = vec![42_f64, 7.7, 42.125, 7.7];
let expected = expected.into_iter().map(Some).collect::<Vec<_>>();
Array::Float64(expected)
}
_ => unreachable!(),
}
}

#[test]
fn test_lz4_inference() -> Result<()> {

// - file "hadoop_lz4_compressed.parquet" is compressed using the hadoop Lz4Codec
// - file "non_hadoop_lz4_compressed.parquet" is "the LZ4 block format without the custom Hadoop header".
// see https://github.com/apache/parquet-testing/pull/14

// Those two files, are all marked as compressed as Lz4, the decompressor should
// be able to distinguish them from each other.

let files = ["hadoop_lz4_compressed.parquet", "non_hadoop_lz4_compressed.parquet"];
let columns = ["c0", "c1", "v11"];
for file in files {
let mut path = get_path();
path.push(file);
let path = path.to_str().unwrap();
for column in columns {
let (result, _statistics) = get_column(path, column)?;
assert_eq!(result, verify_column_data(column), "of file {}", file);
}
}
Ok(())
}

#[test]
fn test_lz4_large_file() -> Result<()> {

//File "hadoop_lz4_compressed_larger.parquet" is compressed using the hadoop Lz4Codec,
//which contains 10000 rows.

let mut path = get_path();
let file = "hadoop_lz4_compressed_larger.parquet";
path.push(file);
let path = path.to_str().unwrap();
let (result, _statistics) = get_column(path, "a")?;
assert_eq!(result.len(), 10000);
Ok(())
}
3 changes: 3 additions & 0 deletions tests/it/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ mod primitive_nested;
mod struct_;
mod utils;

#[cfg(any(feature = "lz4", feature = "lz4_flex"))]
mod lz4_legacy;

use std::fs::File;

use futures::StreamExt;
Expand Down