Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added compression to Avro write
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 22, 2021
1 parent f583531 commit 4fca046
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 31 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ avro-schema = { version = "0.2", optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "1", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }

Expand Down Expand Up @@ -142,6 +143,7 @@ io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "
io_avro_compression = [
"libflate",
"snap",
"crc",
]
io_avro_async = ["io_avro", "futures", "async-stream"]
# io_json: its dependencies + error handling
Expand Down
7 changes: 1 addition & 6 deletions examples/avro_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ fn write_avro<W: std::io::Write>(

let mut compressed_block = write::CompressedBlock::default();

if let Some(compression) = compression {
write::compress(&block, &mut compressed_block, compression)?;
} else {
compressed_block.number_of_rows = block.number_of_rows;
std::mem::swap(&mut compressed_block.data, &mut block.data);
}
let _was_compressed = write::compress(&mut block, &mut compressed_block, compression)?;

write::write_metadata(file, avro_fields.clone(), compression)?;

Expand Down
18 changes: 14 additions & 4 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,23 @@ pub fn decompress_block(
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
let len = snap::raw::decompress_len(&block[..block.len() - 4])
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
let (crc, block) = block.split_at(block.len() - 4);

let len = snap::raw::decompress_len(block)
.map_err(|e| ArrowError::ExternalFormat(e.to_string()))?;
decompressed.clear();
decompressed.resize(len, 0);
snap::raw::Decoder::new()
.decompress(&block[..block.len() - 4], decompressed)
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
.decompress(block, decompressed)
.map_err(|e| ArrowError::ExternalFormat(e.to_string()))?;

let expected_crc = u32::from_be_bytes([crc[0], crc[1], crc[2], crc[3]]);
let actual_crc = crc::crc32::checksum_ieee(decompressed);
if expected_crc != actual_crc {
return Err(ArrowError::ExternalFormat(
"The crc of snap-compressed block does not match".to_string(),
));
}
Ok(false)
}
#[cfg(not(feature = "io_avro_compression"))]
Expand Down
17 changes: 2 additions & 15 deletions src/io/avro/write/block.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::io::Write;

use crate::{error::Result, io::avro::Compression};
use crate::error::Result;

use super::super::{Block, CompressedBlock};
use super::super::CompressedBlock;
use super::{util::zigzag_encode, SYNC_NUMBER};

/// Writes a [`CompressedBlock`] to `writer`
Expand All @@ -17,16 +17,3 @@ pub fn write_block<W: Write>(writer: &mut W, compressed_block: &CompressedBlock)

Ok(())
}

/// Compresses an [`Block`] to a [`CompressedBlock`].
pub fn compress(
block: &Block,
compressed_block: &mut CompressedBlock,
compression: Compression,
) -> Result<()> {
compressed_block.number_of_rows = block.number_of_rows;
match compression {
Compression::Deflate => todo!(),
Compression::Snappy => todo!(),
}
}
60 changes: 60 additions & 0 deletions src/io/avro/write/compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! APIs to read from Avro format to arrow.
use crate::error::Result;

use super::Compression;
use super::{Block, CompressedBlock};

/// Compresses a [`Block`] to a [`CompressedBlock`].
pub fn compress(
block: &mut Block,
compressed: &mut CompressedBlock,
compression: Option<Compression>,
) -> Result<bool> {
compressed.number_of_rows = block.number_of_rows;
let block = &mut block.data;
let compressed = &mut compressed.data;

match compression {
None => {
std::mem::swap(block, compressed);
Ok(true)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Deflate) => {
use std::io::Write;
compressed.clear();
let mut encoder = libflate::deflate::Encoder::new(compressed);
encoder.write_all(block)?;
encoder.finish();
Ok(false)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
use snap::raw::{max_compress_len, Encoder};

compressed.clear();

let required_len = max_compress_len(block.len());
compressed.resize(required_len, 0);
let compressed_bytes = Encoder::new()
.compress(block, compressed)
.map_err(|e| crate::error::ArrowError::ExternalFormat(e.to_string()))?;
compressed.truncate(compressed_bytes);

let crc = crc::crc32::checksum_ieee(block);
compressed.extend(crc.to_be_bytes());
Ok(false)
}
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Deflate) => Err(crate::error::ArrowError::InvalidArgumentError(
"Trying to compress Avro with deflate but feature 'io_avro_compression' is not active."
.to_string(),
)),
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Snappy) => Err(crate::error::ArrowError::InvalidArgumentError(
"Trying to compress Avro with snappy but feature 'io_avro_compression' is not active."
.to_string(),
)),
}
}
2 changes: 2 additions & 0 deletions src/io/avro/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ mod serialize;
pub use serialize::{can_serialize, new_serializer, BoxSerializer};
mod block;
pub use block::*;
mod compress;
mod util;
pub use compress::compress;

pub use super::{Block, CompressedBlock};

Expand Down
17 changes: 11 additions & 6 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@ fn write_avro<R: AsRef<dyn Array>>(

let mut compressed_block = write::CompressedBlock::default();

if let Some(compression) = compression {
write::compress(&block, &mut compressed_block, compression)?;
} else {
compressed_block.number_of_rows = block.number_of_rows;
std::mem::swap(&mut compressed_block.data, &mut block.data);
}
write::compress(&mut block, &mut compressed_block, compression)?;

let mut file = vec![];

Expand Down Expand Up @@ -93,3 +88,13 @@ fn roundtrip(compression: Option<write::Compression>) -> Result<()> {
fn no_compression() -> Result<()> {
roundtrip(None)
}

#[test]
fn snappy() -> Result<()> {
roundtrip(Some(write::Compression::Snappy))
}

#[test]
fn deflate() -> Result<()> {
roundtrip(Some(write::Compression::Deflate))
}

0 comments on commit 4fca046

Please sign in to comment.