Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to write compressed Arrow IPC (feather v2) #566

Merged
merged 1 commit into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/io/ipc/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,48 @@ pub fn decompress_zstd(_input_buf: &[u8], _output_buf: &mut [u8]) -> Result<()>
use crate::error::ArrowError;
Err(ArrowError::Ipc("The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC.".to_string()))
}

#[cfg(feature = "io_ipc_compression")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
use std::io::Write;
let mut encoder = lz4::EncoderBuilder::new().build(output_buf).unwrap();
encoder.write_all(input_buf).map_err(|e| e.into())
}

#[cfg(feature = "io_ipc_compression")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
pub fn compress_zstd(input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
use std::io::Write;
let mut encoder = zstd::Encoder::new(output_buf, 17)?.auto_finish();
encoder.write_all(input_buf).map_err(|e| e.into())
}

#[cfg(not(feature = "io_ipc_compression"))]
pub fn compress_lz4(_input_buf: &[u8], _output_buf: &mut Vec<u8>) -> Result<()> {
use crate::error::ArrowError;
Err(ArrowError::Ipc("The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC.".to_string()))
}

#[cfg(not(feature = "io_ipc_compression"))]
pub fn compress_zstd(_input_buf: &[u8], _output_buf: &mut Vec<u8>) -> Result<()> {
use crate::error::ArrowError;
Err(ArrowError::Ipc("The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC.".to_string()))
}

#[cfg(test)]
mod tests {
use super::*;

#[cfg(feature = "io_ipc_compression")]
#[test]
fn round_trip() {
let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();
let mut buffer = vec![];
compress_zstd(&data, &mut buffer).unwrap();

let mut result = vec![0; 200];
decompress_zstd(&buffer, &mut result).unwrap();
assert_eq!(data, result);
}
}
5 changes: 3 additions & 2 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn read_boolean<R: Read + Seek>(
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().unwrap().0;

Expand All @@ -27,7 +28,7 @@ pub fn read_boolean<R: Read + Seek>(
reader,
block_offset,
is_little_endian,
None,
compression,
)?;

let values = read_bitmap(
Expand All @@ -36,7 +37,7 @@ pub fn read_boolean<R: Read + Seek>(
reader,
block_offset,
is_little_endian,
None,
compression,
)?;
Ok(BooleanArray::from_data(data_type, values, validity))
}
Expand Down
3 changes: 2 additions & 1 deletion src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
compression: Option<ipc::Message::BodyCompression>,
is_little_endian: bool,
) -> Result<DictionaryArray<T>>
where
Expand All @@ -29,7 +30,7 @@ where
reader,
block_offset,
is_little_endian,
None,
compression,
)?;

Ok(DictionaryArray::<T>::from_data(keys, values.clone()))
Expand Down
2 changes: 2 additions & 0 deletions src/io/ipc/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub fn read<R: Read + Seek>(
reader,
block_offset,
is_little_endian,
compression,
)
.map(|x| Arc::new(x) as Arc<dyn Array>),
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Expand Down Expand Up @@ -169,6 +170,7 @@ pub fn read<R: Read + Seek>(
buffers,
reader,
block_offset,
compression,
is_little_endian,
)
.map(|x| Arc::new(x) as Arc<dyn Array>)
Expand Down
49 changes: 18 additions & 31 deletions src/io/ipc/read/read_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,33 +107,26 @@ fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
let mut slice = vec![0u8; buffer_length];
reader.read_exact(&mut slice)?;

// Safety:
// This is safe because T is NativeType, which by definition can be transmuted to u8
let out_slice = unsafe {
std::slice::from_raw_parts_mut(
buffer.as_mut_ptr() as *mut u8,
length * std::mem::size_of::<T>(),
)
};

match compression.codec() {
CompressionType::LZ4_FRAME => {
// fast case where we can just copy the contents as is
unsafe {
// transmute T to bytes.
let out_slice = std::slice::from_raw_parts_mut(
buffer.as_mut_ptr() as *mut u8,
length * std::mem::size_of::<T>(),
);
compression::decompress_lz4(&slice[8..], out_slice)?
}
compression::decompress_lz4(&slice[8..], out_slice)?;
Ok(buffer)
}
CompressionType::ZSTD => {
// fast case where we can just copy the contents as is
unsafe {
// transmute T to bytes.
let out_slice = std::slice::from_raw_parts_mut(
buffer.as_mut_ptr() as *mut u8,
length * std::mem::size_of::<T>(),
);
compression::decompress_zstd(&slice[8..], out_slice)?
}
compression::decompress_zstd(&slice[8..], out_slice)?;
Ok(buffer)
}
_ => Err(ArrowError::NotYetImplemented(
"Non LZ4 compressed IPC".to_string(),
"Compression format".to_string(),
)),
}
}
Expand Down Expand Up @@ -184,25 +177,19 @@ fn read_compressed_bitmap<R: Read + Seek>(
reader: &mut R,
) -> Result<MutableBuffer<u8>> {
let mut buffer = MutableBuffer::<u8>::from_len_zeroed((length + 7) / 8);

// read all first
// todo: move this allocation to an external buffer for re-use
let mut slice = vec![0u8; bytes];
reader.read_exact(&mut slice)?;

match compression.codec() {
CompressionType::LZ4_FRAME => {
// decompress first
// todo: move this allocation to an external buffer for re-use
let mut slice = vec![0u8; bytes];
reader.read_exact(&mut slice)?;

compression::decompress_lz4(&slice[8..], &mut buffer)?;

Ok(buffer)
}
CompressionType::ZSTD => {
// decompress first
// todo: move this allocation to an external buffer for re-use
let mut slice = vec![0u8; bytes];
reader.read_exact(&mut slice)?;

compression::decompress_zstd(&slice[8..], &mut buffer)?;

Ok(buffer)
}
_ => Err(ArrowError::NotYetImplemented(
Expand Down
60 changes: 54 additions & 6 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{collections::HashMap, sync::Arc};

use arrow_format::ipc;
use arrow_format::ipc::flatbuffers::FlatBufferBuilder;
use arrow_format::ipc::Message::CompressionType;

use crate::array::Array;
use crate::error::{ArrowError, Result};
Expand All @@ -31,8 +32,17 @@ use crate::{array::DictionaryArray, datatypes::*};
use super::super::CONTINUATION_MARKER;
use super::{write, write_dictionary};

/// Compression codec
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Compression {
/// LZ4 (framed)
LZ4,
/// ZSTD
ZSTD,
}

/// IPC write options used to control the behaviour of the writer
#[derive(Debug)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct IpcWriteOptions {
/// Write padding after memory buffers to this multiple of bytes.
/// Generally 8 or 64, defaults to 8
Expand All @@ -48,6 +58,8 @@ pub struct IpcWriteOptions {
/// version 2.0.0: V4, with legacy format enabled
/// version 4.0.0: V5
metadata_version: ipc::Schema::MetadataVersion,
/// Whether the buffers should be compressed
compression: Option<Compression>,
}

impl IpcWriteOptions {
Expand All @@ -56,6 +68,7 @@ impl IpcWriteOptions {
alignment: usize,
write_legacy_ipc_format: bool,
metadata_version: ipc::Schema::MetadataVersion,
compression: Option<Compression>,
) -> Result<Self> {
if alignment == 0 || alignment % 8 != 0 {
return Err(ArrowError::InvalidArgumentError(
Expand All @@ -72,6 +85,7 @@ impl IpcWriteOptions {
alignment,
write_legacy_ipc_format,
metadata_version,
compression,
}),
ipc::Schema::MetadataVersion::V5 => {
if write_legacy_ipc_format {
Expand All @@ -83,6 +97,7 @@ impl IpcWriteOptions {
alignment,
write_legacy_ipc_format,
metadata_version,
compression,
})
}
}
Expand All @@ -101,6 +116,7 @@ impl Default for IpcWriteOptions {
alignment: 8,
write_legacy_ipc_format: false,
metadata_version: ipc::Schema::MetadataVersion::V5,
compression: None,
}
}
}
Expand Down Expand Up @@ -157,18 +173,34 @@ fn record_batch_to_bytes(batch: &RecordBatch, write_options: &IpcWriteOptions) -
&mut nodes,
&mut offset,
is_native_little_endian(),
write_options.compression,
)
}

// write data
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);

let compression = if let Some(compression) = write_options.compression {
let compression = match compression {
Compression::LZ4 => CompressionType::LZ4_FRAME,
Compression::ZSTD => CompressionType::ZSTD,
};
let mut compression_builder = ipc::Message::BodyCompressionBuilder::new(&mut fbb);
compression_builder.add_codec(compression);
Some(compression_builder.finish())
} else {
None
};

let root = {
let mut batch_builder = ipc::Message::RecordBatchBuilder::new(&mut fbb);
batch_builder.add_length(batch.num_rows() as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
if let Some(compression) = compression {
batch_builder.add_compression(compression)
}
let b = batch_builder.finish();
b.as_union_value()
};
Expand Down Expand Up @@ -209,18 +241,34 @@ fn dictionary_batch_to_bytes(
&mut nodes,
&mut 0,
is_little_endian,
write_options.compression,
false,
);

// write data
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);

let compression = if let Some(compression) = write_options.compression {
let compression = match compression {
Compression::LZ4 => CompressionType::LZ4_FRAME,
Compression::ZSTD => CompressionType::ZSTD,
};
let mut compression_builder = ipc::Message::BodyCompressionBuilder::new(&mut fbb);
compression_builder.add_codec(compression);
Some(compression_builder.finish())
} else {
None
};

let root = {
let mut batch_builder = ipc::Message::RecordBatchBuilder::new(&mut fbb);
batch_builder.add_length(length as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
if let Some(compression) = compression {
batch_builder.add_compression(compression)
}
batch_builder.finish()
};

Expand Down Expand Up @@ -358,18 +406,18 @@ pub fn write_message<W: Write>(
}

fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
let len = data.len() as u32;
let pad_len = pad_to_8(len) as u32;
let len = data.len();
let pad_len = pad_to_8(data.len());
let total_len = len + pad_len;

// write body buffer
writer.write_all(data)?;
if pad_len > 0 {
writer.write_all(&vec![0u8; pad_len as usize][..])?;
writer.write_all(&vec![0u8; pad_len][..])?;
}

writer.flush()?;
Ok(total_len as usize)
Ok(total_len)
}

/// Write a record batch to the writer, writing the message size before the message
Expand Down Expand Up @@ -411,6 +459,6 @@ pub fn write_continuation<W: Write>(

/// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes
#[inline]
pub(crate) fn pad_to_8(len: u32) -> usize {
pub(crate) fn pad_to_8(len: usize) -> usize {
(((len + 7) & !7) - len) as usize
}
2 changes: 1 addition & 1 deletion src/io/ipc/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod stream;
mod writer;

pub use arrow_format::ipc::Schema::MetadataVersion;
pub use common::IpcWriteOptions;
pub use common::{Compression, IpcWriteOptions};
pub use schema::schema_to_bytes;
pub use serialize::{write, write_dictionary};
pub use stream::StreamWriter;
Expand Down
Loading