Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for compressed Avro write #699

Merged
merged 1 commit into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ avro-schema = { version = "0.2", optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "1", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }

Expand Down Expand Up @@ -142,6 +143,7 @@ io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "
io_avro_compression = [
"libflate",
"snap",
"crc",
]
io_avro_async = ["io_avro", "futures", "async-stream"]
# io_json: its dependencies + error handling
Expand Down
9 changes: 2 additions & 7 deletions examples/avro_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ fn write_avro<W: std::io::Write>(
.collect::<Vec<_>>();
let mut block = write::Block::new(arrays[0].len(), vec![]);

write::serialize(&mut serializers, &mut block)?;
write::serialize(&mut serializers, &mut block);

let mut compressed_block = write::CompressedBlock::default();

if let Some(compression) = compression {
write::compress(&block, &mut compressed_block, compression)?;
} else {
compressed_block.number_of_rows = block.number_of_rows;
std::mem::swap(&mut compressed_block.data, &mut block.data);
}
let _was_compressed = write::compress(&mut block, &mut compressed_block, compression)?;

write::write_metadata(file, avro_fields.clone(), compression)?;

Expand Down
19 changes: 15 additions & 4 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,24 @@ pub fn decompress_block(
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
let len = snap::raw::decompress_len(&block[..block.len() - 4])
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
let crc = &block[block.len() - 4..];
let block = &block[..block.len() - 4];

let len = snap::raw::decompress_len(block)
.map_err(|e| ArrowError::ExternalFormat(e.to_string()))?;
decompressed.clear();
decompressed.resize(len, 0);
snap::raw::Decoder::new()
.decompress(&block[..block.len() - 4], decompressed)
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
.decompress(block, decompressed)
.map_err(|e| ArrowError::ExternalFormat(e.to_string()))?;

let expected_crc = u32::from_be_bytes([crc[0], crc[1], crc[2], crc[3]]);
let actual_crc = crc::crc32::checksum_ieee(decompressed);
if expected_crc != actual_crc {
return Err(ArrowError::ExternalFormat(
"The crc of snap-compressed block does not match".to_string(),
));
}
Ok(false)
}
#[cfg(not(feature = "io_avro_compression"))]
Expand Down
17 changes: 2 additions & 15 deletions src/io/avro/write/block.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::io::Write;

use crate::{error::Result, io::avro::Compression};
use crate::error::Result;

use super::super::{Block, CompressedBlock};
use super::super::CompressedBlock;
use super::{util::zigzag_encode, SYNC_NUMBER};

/// Writes a [`CompressedBlock`] to `writer`
Expand All @@ -17,16 +17,3 @@ pub fn write_block<W: Write>(writer: &mut W, compressed_block: &CompressedBlock)

Ok(())
}

/// Compresses an [`Block`] to a [`CompressedBlock`].
pub fn compress(
block: &Block,
compressed_block: &mut CompressedBlock,
compression: Compression,
) -> Result<()> {
compressed_block.number_of_rows = block.number_of_rows;
match compression {
Compression::Deflate => todo!(),
Compression::Snappy => todo!(),
}
}
60 changes: 60 additions & 0 deletions src/io/avro/write/compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! APIs to read from Avro format to arrow.
use crate::error::Result;

use super::Compression;
use super::{Block, CompressedBlock};

/// Compresses a [`Block`] to a [`CompressedBlock`].
pub fn compress(
block: &mut Block,
compressed: &mut CompressedBlock,
compression: Option<Compression>,
) -> Result<bool> {
compressed.number_of_rows = block.number_of_rows;
let block = &mut block.data;
let compressed = &mut compressed.data;

match compression {
None => {
std::mem::swap(block, compressed);
Ok(true)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Deflate) => {
use std::io::Write;
compressed.clear();
let mut encoder = libflate::deflate::Encoder::new(compressed);
encoder.write_all(block)?;
encoder.finish();
Ok(false)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
use snap::raw::{max_compress_len, Encoder};

compressed.clear();

let required_len = max_compress_len(block.len());
compressed.resize(required_len, 0);
let compressed_bytes = Encoder::new()
.compress(block, compressed)
.map_err(|e| crate::error::ArrowError::ExternalFormat(e.to_string()))?;
compressed.truncate(compressed_bytes);

let crc = crc::crc32::checksum_ieee(block);
compressed.extend(crc.to_be_bytes());
Ok(false)
}
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Deflate) => Err(crate::error::ArrowError::InvalidArgumentError(
"Trying to compress Avro with deflate but feature 'io_avro_compression' is not active."
.to_string(),
)),
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Snappy) => Err(crate::error::ArrowError::InvalidArgumentError(
"Trying to compress Avro with snappy but feature 'io_avro_compression' is not active."
.to_string(),
)),
}
}
9 changes: 4 additions & 5 deletions src/io/avro/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
//! APIs to write to Avro format.
use std::io::Write;

use avro_schema::{Field as AvroField, Record, Schema as AvroSchema};

use crate::error::Result;
Expand All @@ -15,7 +13,9 @@ mod serialize;
pub use serialize::{can_serialize, new_serializer, BoxSerializer};
mod block;
pub use block::*;
mod compress;
mod util;
pub use compress::compress;

pub use super::{Block, CompressedBlock};

Expand Down Expand Up @@ -52,7 +52,7 @@ pub fn write_metadata<W: std::io::Write>(
/// # Panics
/// Panics iff the number of items in any of the serializers is not equal to the number of rows
/// declared in the `block`.
pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -> Result<()> {
pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) {
let Block {
data,
number_of_rows,
Expand All @@ -64,8 +64,7 @@ pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -
for _ in 0..*number_of_rows {
for serializer in &mut *serializers {
let item_data = serializer.next().unwrap();
data.write_all(item_data)?;
data.extend(item_data);
}
}
Ok(())
}
19 changes: 12 additions & 7 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,11 @@ fn write_avro<R: AsRef<dyn Array>>(
.collect::<Vec<_>>();
let mut block = write::Block::new(arrays[0].as_ref().len(), vec![]);

write::serialize(&mut serializers, &mut block)?;
write::serialize(&mut serializers, &mut block);

let mut compressed_block = write::CompressedBlock::default();

if let Some(compression) = compression {
write::compress(&block, &mut compressed_block, compression)?;
} else {
compressed_block.number_of_rows = block.number_of_rows;
std::mem::swap(&mut compressed_block.data, &mut block.data);
}
write::compress(&mut block, &mut compressed_block, compression)?;

let mut file = vec![];

Expand Down Expand Up @@ -93,3 +88,13 @@ fn roundtrip(compression: Option<write::Compression>) -> Result<()> {
fn no_compression() -> Result<()> {
roundtrip(None)
}

#[test]
fn snappy() -> Result<()> {
roundtrip(Some(write::Compression::Snappy))
}

#[test]
fn deflate() -> Result<()> {
roundtrip(Some(write::Compression::Deflate))
}