From ff2fc8b19f124f2f1d143620676a22e6cb8a2f1c Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Mon, 1 Aug 2022 23:00:54 +0200 Subject: [PATCH] Migrated code to here (#1) * Added more content * Added toolkit to read files * Added examples * Added tests, docs and coverage --- .github/workflows/coverage.yml | 21 ++++++ .github/workflows/test.yml | 2 +- Cargo.toml | 22 +++++- README.md | 5 ++ src/error.rs | 22 ++++++ src/file.rs | 60 +++++++++++++++ src/lib.md | 81 ++++++++++++++++++++ src/lib.rs | 18 ++++- src/read/block.rs | 96 ++++++++++++++++++++++++ src/read/decode.rs | 80 ++++++++++++++++++++ src/read/decompress.rs | 123 +++++++++++++++++++++++++++++++ src/read/mod.rs | 104 ++++++++++++++++++++++++++ src/read_async/block.rs | 74 +++++++++++++++++++ src/read_async/decode.rs | 44 +++++++++++ src/read_async/mod.rs | 32 ++++++++ src/{ => schema}/de.rs | 0 src/{schema.rs => schema/mod.rs} | 58 +++++++++++++++ src/{ => schema}/se.rs | 0 src/write/block.rs | 20 +++++ src/write/compression.rs | 55 ++++++++++++++ src/write/encode.rs | 28 +++++++ src/write/file.rs | 67 +++++++++++++++++ src/write/mod.rs | 8 ++ src/write_async.rs | 55 ++++++++++++++ tests/it/file.rs | 91 +++++++++++++++++++++++ tests/it/main.rs | 36 ++++----- 26 files changed, 1176 insertions(+), 26 deletions(-) create mode 100644 .github/workflows/coverage.yml create mode 100644 src/error.rs create mode 100644 src/file.rs create mode 100644 src/read/block.rs create mode 100644 src/read/decode.rs create mode 100644 src/read/decompress.rs create mode 100644 src/read/mod.rs create mode 100644 src/read_async/block.rs create mode 100644 src/read_async/decode.rs create mode 100644 src/read_async/mod.rs rename src/{ => schema}/de.rs (100%) rename src/{schema.rs => schema/mod.rs} (66%) rename src/{ => schema}/se.rs (100%) create mode 100644 src/write/block.rs create mode 100644 src/write/compression.rs create mode 100644 src/write/encode.rs create mode 100644 src/write/file.rs create mode 100644 src/write/mod.rs create mode 100644 src/write_async.rs create mode 100644 tests/it/file.rs diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml new file mode 100644 index 0000000..effc75b --- /dev/null +++ b/.github/workflows/coverage.yml @@ -0,0 +1,21 @@ +name: Coverage + +on: [pull_request, push] + +jobs: + coverage: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup toolchain install stable --component llvm-tools-preview + - name: Install cargo-llvm-cov + uses: taiki-e/install-action@cargo-llvm-cov + - uses: Swatinem/rust-cache@v1 + - name: Generate code coverage + run: cargo llvm-cov --features full --lcov --output-path lcov.info + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v1 + with: + files: lcov.info + fail_ci_if_error: true diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ac8fd8d..407844b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,4 +12,4 @@ jobs: run: rustup update stable - uses: Swatinem/rust-cache@v1 - name: Run - run: cargo test + run: cargo test --features full diff --git a/Cargo.toml b/Cargo.toml index 25ecb43..3f3886e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "avro-schema" version = "0.2.2" license = "Apache-2.0" -description = "Implementation of Apache Avro spec" +description = "Apache Avro specification" homepage = "https://github.com/DataEngineeringLabs/avro-schema" repository = "https://github.com/DataEngineeringLabs/avro-schema" authors = ["Jorge C. Leitao "] @@ -12,3 +12,23 @@ edition = "2018" [dependencies] serde_json = { version = "1.0", default-features = false, features = ["std"] } serde = { version = "1.0", default-features = false } + +fallible-streaming-iterator = { version = "0.1" } + +libflate = { version = "1.1.1", optional = true } +snap = { version = "1", optional = true } +crc = { version = "2", optional = true } + +# for async +futures = { version = "0.3", optional = true } +async-stream = { version = "0.3.2", optional = true } + +[features] +default = [] +full = ["compression", "async"] +compression = [ + "libflate", + "snap", + "crc", +] +async = ["futures", "async-stream"] diff --git a/README.md b/README.md index 8b22d7e..2ef6a2a 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,14 @@ # Avro-schema - Avro schema in Rust +[![test](https://github.com/DataEngineeringLabs/avro-schema/actions/workflows/test.yml/badge.svg)](https://github.com/DataEngineeringLabs/avro-schema/actions/workflows/test.yml) +[![codecov](https://codecov.io/gh/DataEngineeringLabs/avro-schema/branch/main/graph/badge.svg)](https://codecov.io/gh/DataEngineeringLabs/avro-schema) + This crate contains the complete implementation of the schemas of the [Avro specification](https://avro.apache.org/docs/current/spec.html) in native Rust. +See API documentation with examples on how to read and write. + ## License Licensed under either of diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..490ebdb --- /dev/null +++ b/src/error.rs @@ -0,0 +1,22 @@ +//! Contains [`Error`] + +/// Error from this crate +#[derive(Debug, Clone, Copy)] +pub enum Error { + /// Generic error when the file is out of spec + OutOfSpec, + /// When reading or writing with compression but the feature flag "compression" is not active. + RequiresCompression, +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From for Error { + fn from(_: std::io::Error) -> Self { + Error::OutOfSpec + } +} diff --git a/src/file.rs b/src/file.rs new file mode 100644 index 0000000..0412ce5 --- /dev/null +++ b/src/file.rs @@ -0,0 +1,60 @@ +//! Contains structs found in Avro files +use crate::schema::Record; + +/// Avro file's Metadata +#[derive(Debug, Clone, PartialEq, Hash)] +pub struct FileMetadata { + /// The Record represented in the file's Schema + pub record: Record, + /// The files' compression + pub compression: Option, + /// The files' marker, present in every block + pub marker: [u8; 16], +} + +/// A compressed Avro block. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct CompressedBlock { + /// The number of rows + pub number_of_rows: usize, + /// The compressed data + pub data: Vec, +} + +impl CompressedBlock { + /// Creates a new CompressedBlock + pub fn new(number_of_rows: usize, data: Vec) -> Self { + Self { + number_of_rows, + data, + } + } +} + +/// An uncompressed Avro block. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct Block { + /// The number of rows + pub number_of_rows: usize, + /// The uncompressed data + pub data: Vec, +} + +impl Block { + /// Creates a new Block + pub fn new(number_of_rows: usize, data: Vec) -> Self { + Self { + number_of_rows, + data, + } + } +} + +/// Valid compressions +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum Compression { + /// Deflate + Deflate, + /// Snappy + Snappy, +} diff --git a/src/lib.md b/src/lib.md index f92b2d2..4865b2f 100644 --- a/src/lib.md +++ b/src/lib.md @@ -4,3 +4,84 @@ This is a library containing declarations of the [Avro specification](https://avro.apache.org/docs/current/spec.html) in Rust's struct and enums together with serialization and deserialization implementations based on `serde_json`. + +It also contains basic functionality to read and deserialize Avro's file's metadata +and blocks. + +Example of reading a file: + +```rust +use std::convert::TryInto; +use std::fs::File; +use std::io::BufReader; + +use avro_schema::error::Error; +use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator; + +fn read_avro(path: &str) -> Result<(), Error> { + let file = &mut BufReader::new(File::open(path)?); + + let metadata = avro_schema::read::read_metadata(file)?; + + println!("{:#?}", metadata); + + let mut blocks = + avro_schema::read::BlockStreamingIterator::new(file, metadata.compression, metadata.marker); + + while let Some(block) = blocks.next()? { + let _fields = &metadata.record.fields; + let length = block.number_of_rows; + let mut block: &[u8] = block.data.as_ref(); + // at this point you can deserialize the block based on `_fields` according + // to avro's specification. Note that `Block` is already decompressed. + // for example, if there was a single field with f32, we would use + for _ in 0..length { + let (item, remaining) = block.split_at(4); + block = remaining; + let _value = f32::from_le_bytes(item.try_into().unwrap()); + // if there were more fields, we would need to consume (or skip) the remaining + // here. You can use `avro_schema::read::decode::zigzag_i64` for integers :D + } + } + + Ok(()) +} +``` + +Example of writing a file + +```rust +use std::fs::File; + +use avro_schema::error::Error; +use avro_schema::file::Block; +use avro_schema::schema::{Field, Record, Schema}; + +fn write_avro(compression: Option) -> Result<(), Error> { + let mut file = File::create("test.avro")?; + + let record = Record::new("", vec![Field::new("value", Schema::Float)]); + + avro_schema::write::write_metadata(&mut file, record, compression)?; + + // given some data: + let array = vec![1.0f32, 2.0]; + + // we need to create a `Block` + let mut data: Vec = vec![]; + for item in array.iter() { + let bytes = item.to_le_bytes(); + data.extend(bytes); + } + let mut block = Block::new(array.len(), data); + + // once completed, we compress it + let mut compressed_block = avro_schema::file::CompressedBlock::default(); + let _ = avro_schema::write::compress(&mut block, &mut compressed_block, compression)?; + + // and finally write it to the file + avro_schema::write::write_block(&mut file, &compressed_block)?; + + Ok(()) +} +``` diff --git a/src/lib.rs b/src/lib.rs index e3fef27..c9ba6a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,17 @@ #![doc = include_str!("lib.md")] #![forbid(unsafe_code)] +#![forbid(missing_docs)] -mod de; -mod schema; -mod se; -pub use schema::*; +pub mod error; +pub mod file; +pub mod schema; + +pub mod read; +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +pub mod read_async; + +pub mod write; +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +pub mod write_async; diff --git a/src/read/block.rs b/src/read/block.rs new file mode 100644 index 0000000..918d277 --- /dev/null +++ b/src/read/block.rs @@ -0,0 +1,96 @@ +//! APIs to read from Avro format to arrow. +use std::io::Read; + +use fallible_streaming_iterator::FallibleStreamingIterator; + +use crate::{error::Error, file::CompressedBlock}; + +use super::decode; + +fn read_size(reader: &mut R) -> Result<(usize, usize), Error> { + let rows = match decode::internal_zigzag_i64(reader) { + Ok(a) => a, + Err(error) => match error { + decode::DecodeError::EndOfFile => return Ok((0, 0)), + decode::DecodeError::OutOfSpec => return Err(Error::OutOfSpec), + }, + }; + let bytes = decode::zigzag_i64(reader)?; + Ok((rows as usize, bytes as usize)) +} + +/// Reads a [`CompressedBlock`] from the `reader`. +/// # Error +/// This function errors iff either the block cannot be read or the sync marker does not match +fn read_block( + reader: &mut R, + block: &mut CompressedBlock, + marker: [u8; 16], +) -> Result<(), Error> { + let (rows, bytes) = read_size(reader)?; + block.number_of_rows = rows; + if rows == 0 { + return Ok(()); + }; + + block.data.clear(); + block + .data + .try_reserve(bytes) + .map_err(|_| Error::OutOfSpec)?; + reader.take(bytes as u64).read_to_end(&mut block.data)?; + + let mut block_marker = [0u8; 16]; + reader.read_exact(&mut block_marker)?; + + if block_marker != marker { + return Err(Error::OutOfSpec); + } + Ok(()) +} + +/// [`FallibleStreamingIterator`] of [`CompressedBlock`]. +pub struct CompressedBlockStreamingIterator { + buf: CompressedBlock, + reader: R, + marker: [u8; 16], +} + +impl CompressedBlockStreamingIterator { + /// Creates a new [`CompressedBlockStreamingIterator`]. + pub fn new(reader: R, marker: [u8; 16], scratch: Vec) -> Self { + Self { + reader, + marker, + buf: CompressedBlock::new(0, scratch), + } + } + + /// The buffer of [`CompressedBlockStreamingIterator`]. + pub fn buffer(&mut self) -> &mut CompressedBlock { + &mut self.buf + } + + /// Deconstructs itself + pub fn into_inner(self) -> (R, Vec) { + (self.reader, self.buf.data) + } +} + +impl FallibleStreamingIterator for CompressedBlockStreamingIterator { + type Error = Error; + type Item = CompressedBlock; + + fn advance(&mut self) -> Result<(), Error> { + read_block(&mut self.reader, &mut self.buf, self.marker)?; + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + if self.buf.number_of_rows > 0 { + Some(&self.buf) + } else { + None + } + } +} diff --git a/src/read/decode.rs b/src/read/decode.rs new file mode 100644 index 0000000..603cefd --- /dev/null +++ b/src/read/decode.rs @@ -0,0 +1,80 @@ +use std::collections::HashMap; +use std::io::Read; + +use crate::error::Error; +use crate::file::Compression; +use crate::schema::Schema; + +use super::{avro_decode, read_header}; + +pub enum DecodeError { + OutOfSpec, + EndOfFile, +} + +impl From for Error { + fn from(_: DecodeError) -> Self { + Error::OutOfSpec + } +} + +pub fn internal_zigzag_i64(reader: &mut R) -> Result { + let z = decode_variable(reader)?; + Ok(if z & 0x1 == 0 { + (z >> 1) as i64 + } else { + !(z >> 1) as i64 + }) +} + +pub fn zigzag_i64(reader: &mut R) -> Result { + let z = decode_variable(reader)?; + Ok(if z & 0x1 == 0 { + (z >> 1) as i64 + } else { + !(z >> 1) as i64 + }) +} + +#[inline] +fn decode_variable(reader: &mut R) -> Result { + avro_decode!(reader) +} + +fn _read_binary(reader: &mut R) -> Result, Error> { + let len: usize = zigzag_i64(reader)? as usize; + let mut buf = vec![]; + buf.try_reserve(len).map_err(|_| Error::OutOfSpec)?; + reader.take(len as u64).read_to_end(&mut buf)?; + Ok(buf) +} + +pub fn read_header(reader: &mut R) -> Result>, Error> { + read_header!(reader) +} + +pub(crate) fn read_file_marker(reader: &mut R) -> Result<[u8; 16], Error> { + let mut marker = [0u8; 16]; + reader.read_exact(&mut marker)?; + Ok(marker) +} + +/// Deserializes the Avro header into an Avro [`Schema`] and optional [`Compression`]. +pub(crate) fn deserialize_header( + header: HashMap>, +) -> Result<(Schema, Option), Error> { + let schema = header + .get("avro.schema") + .ok_or(Error::OutOfSpec) + .and_then(|bytes| serde_json::from_slice(bytes.as_ref()).map_err(|_| Error::OutOfSpec))?; + + let compression = header.get("avro.codec").and_then(|bytes| { + let bytes: &[u8] = bytes.as_ref(); + match bytes { + b"snappy" => Some(Compression::Snappy), + b"deflate" => Some(Compression::Deflate), + _ => None, + } + }); + Ok((schema, compression)) +} diff --git a/src/read/decompress.rs b/src/read/decompress.rs new file mode 100644 index 0000000..c087daa --- /dev/null +++ b/src/read/decompress.rs @@ -0,0 +1,123 @@ +//! APIs to read from Avro format to arrow. +use std::io::Read; + +use fallible_streaming_iterator::FallibleStreamingIterator; + +use crate::error::Error; + +use crate::file::Compression; +use crate::file::{Block, CompressedBlock}; + +use super::block::CompressedBlockStreamingIterator; + +#[cfg(feature = "compression")] +const CRC_TABLE: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); + +/// Decompresses a [`CompressedBlock`] into [`Block`] +/// Returns whether the buffers where swapped. +pub fn decompress_block( + block: &mut CompressedBlock, + decompressed: &mut Block, + compression: Option, +) -> Result { + decompressed.number_of_rows = block.number_of_rows; + let block = &mut block.data; + let decompressed = &mut decompressed.data; + + match compression { + None => { + std::mem::swap(block, decompressed); + Ok(true) + } + #[cfg(feature = "compression")] + Some(Compression::Deflate) => { + decompressed.clear(); + let mut decoder = libflate::deflate::Decoder::new(&block[..]); + decoder.read_to_end(decompressed)?; + Ok(false) + } + #[cfg(feature = "compression")] + Some(Compression::Snappy) => { + let crc = &block[block.len() - 4..]; + let block = &block[..block.len() - 4]; + + let len = snap::raw::decompress_len(block).map_err(|_| Error::OutOfSpec)?; + decompressed.clear(); + decompressed.resize(len, 0); + snap::raw::Decoder::new() + .decompress(block, decompressed) + .map_err(|_| Error::OutOfSpec)?; + + let expected_crc = u32::from_be_bytes([crc[0], crc[1], crc[2], crc[3]]); + + let actual_crc = CRC_TABLE.checksum(decompressed); + if expected_crc != actual_crc { + return Err(Error::OutOfSpec); + } + Ok(false) + } + #[cfg(not(feature = "compression"))] + Some(Compression::Deflate) => Err(Error::RequiresCompression), + #[cfg(not(feature = "compression"))] + Some(Compression::Snappy) => Err(Error::RequiresCompression), + } +} + +/// [`FallibleStreamingIterator`] of decompressed [`Block`] +pub struct BlockStreamingIterator { + blocks: CompressedBlockStreamingIterator, + compression: Option, + buf: Block, + was_swapped: bool, +} + +/// Returns a [`FallibleStreamingIterator`] of [`Block`]. +pub fn block_iterator( + reader: R, + compression: Option, + marker: [u8; 16], +) -> BlockStreamingIterator { + BlockStreamingIterator::::new(reader, compression, marker) +} + +impl BlockStreamingIterator { + /// Returns a new [`BlockStreamingIterator`]. + pub fn new(reader: R, compression: Option, marker: [u8; 16]) -> Self { + Self { + blocks: CompressedBlockStreamingIterator::new(reader, marker, vec![]), + compression, + buf: Block::new(0, vec![]), + was_swapped: false, + } + } + + /// Deconstructs itself into its internal reader + #[inline] + pub fn into_inner(self) -> R { + self.blocks.into_inner().0 + } +} + +impl FallibleStreamingIterator for BlockStreamingIterator { + type Error = Error; + type Item = Block; + + #[inline] + fn advance(&mut self) -> Result<(), Error> { + if self.was_swapped { + std::mem::swap(&mut self.blocks.buffer().data, &mut self.buf.data); + } + self.blocks.advance()?; + self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf, self.compression)?; + Ok(()) + } + + #[inline] + fn get(&self) -> Option<&Self::Item> { + if self.buf.number_of_rows > 0 { + Some(&self.buf) + } else { + None + } + } +} diff --git a/src/read/mod.rs b/src/read/mod.rs new file mode 100644 index 0000000..a62119e --- /dev/null +++ b/src/read/mod.rs @@ -0,0 +1,104 @@ +//! Functions to read and decompress Files' metadata and blocks +mod block; +mod decode; +pub(crate) mod decompress; + +use std::io::Read; + +use crate::error::Error; +use crate::file::FileMetadata; +use crate::schema::Schema; + +pub use fallible_streaming_iterator; + +// macros that can operate in sync and async code. +macro_rules! avro_decode { + ($reader:ident $($_await:tt)*) => { + { + let mut i = 0u64; + let mut buf = [0u8; 1]; + let mut j = 0; + loop { + if j > 9 { + // if j * 7 > 64 + return Err(DecodeError::OutOfSpec); + } + $reader.read_exact(&mut buf[..])$($_await)*.map_err(|_| DecodeError::EndOfFile)?; + i |= (u64::from(buf[0] & 0x7F)) << (j * 7); + if (buf[0] >> 7) == 0 { + break; + } else { + j += 1; + } + } + + Ok(i) + } + } +} + +macro_rules! read_header { + ($reader:ident $($_await:tt)*) => {{ + let mut items = HashMap::new(); + + loop { + let len = zigzag_i64($reader)$($_await)*.map_err(|_| Error::OutOfSpec)? as usize; + if len == 0 { + break Ok(items); + } + + items.reserve(len); + for _ in 0..len { + let key = _read_binary($reader)$($_await)*?; + let key = String::from_utf8(key) + .map_err(|_| Error::OutOfSpec)?; + let value = _read_binary($reader)$($_await)*?; + items.insert(key, value); + } + } + }}; +} + +macro_rules! read_metadata_macro { + ($reader:ident $($_await:tt)*) => {{ + let mut magic_number = [0u8; 4]; + $reader.read_exact(&mut magic_number)$($_await)*.map_err(|_| Error::OutOfSpec)?; + + // see https://avro.apache.org/docs/current/spec.html#Object+Container+Files + if magic_number != [b'O', b'b', b'j', 1u8] { + return Err(Error::OutOfSpec); + } + + let header = decode::read_header($reader)$($_await)*?; + + let (schema, compression) = deserialize_header(header)?; + + let marker = decode::read_file_marker($reader)$($_await)*?; + + let record = if let Schema::Record(record) = schema { + record + } else { + return Err(Error::OutOfSpec) + }; + + Ok(FileMetadata { + record, + compression, + marker, + }) + }}; +} + +#[allow(unused_imports)] +pub(crate) use { + avro_decode, decode::deserialize_header, decode::DecodeError, read_header, read_metadata_macro, +}; + +/// Reads the metadata from `reader` into [`FileMetadata`]. +/// # Error +/// This function errors iff the header is not a valid avro file header. +pub fn read_metadata(reader: &mut R) -> Result { + read_metadata_macro!(reader) +} + +pub use decompress::{block_iterator, BlockStreamingIterator}; diff --git a/src/read_async/block.rs b/src/read_async/block.rs new file mode 100644 index 0000000..fe9161d --- /dev/null +++ b/src/read_async/block.rs @@ -0,0 +1,74 @@ +//! APIs to read from Avro format to arrow. +use async_stream::try_stream; +use futures::AsyncRead; +use futures::AsyncReadExt; +use futures::Stream; + +use crate::error::Error; +use crate::file::CompressedBlock; +use crate::read::DecodeError; + +use super::decode::zigzag_i64; + +async fn read_size(reader: &mut R) -> Result<(usize, usize), Error> { + let rows = match zigzag_i64(reader).await { + Ok(a) => a, + Err(error) => match error { + DecodeError::EndOfFile => return Ok((0, 0)), + DecodeError::OutOfSpec => return Err(Error::OutOfSpec), + }, + }; + + let bytes = zigzag_i64(reader).await?; + Ok((rows as usize, bytes as usize)) +} + +/// Reads a [`CompressedBlock`] from the `reader`. +/// # Error +/// This function errors iff either the block cannot be read or the sync marker does not match +async fn read_block( + reader: &mut R, + block: &mut CompressedBlock, + marker: [u8; 16], +) -> Result<(), Error> { + let (rows, bytes) = read_size(reader).await?; + block.number_of_rows = rows; + if rows == 0 { + return Ok(()); + }; + + block.data.clear(); + block + .data + .try_reserve(bytes) + .map_err(|_| Error::OutOfSpec)?; + reader + .take(bytes as u64) + .read_to_end(&mut block.data) + .await?; + + let mut block_marker = [0u8; 16]; + reader.read_exact(&mut block_marker).await?; + + if block_marker != marker { + return Err(Error::OutOfSpec); + } + Ok(()) +} + +/// Returns a fallible [`Stream`] of Avro blocks bound to `reader` +pub async fn block_stream( + reader: &mut R, + marker: [u8; 16], +) -> impl Stream> + '_ { + try_stream! { + loop { + let mut block = CompressedBlock::new(0, vec![]); + read_block(reader, &mut block, marker).await?; + if block.number_of_rows == 0 { + break + } + yield block + } + } +} diff --git a/src/read_async/decode.rs b/src/read_async/decode.rs new file mode 100644 index 0000000..af4fa56 --- /dev/null +++ b/src/read_async/decode.rs @@ -0,0 +1,44 @@ +use std::collections::HashMap; + +use futures::AsyncRead; +use futures::AsyncReadExt; + +use crate::error::Error; +use crate::read::DecodeError; +use crate::read::{avro_decode, read_header}; + +pub async fn zigzag_i64(reader: &mut R) -> Result { + let z = decode_variable(reader).await?; + Ok(if z & 0x1 == 0 { + (z >> 1) as i64 + } else { + !(z >> 1) as i64 + }) +} + +async fn decode_variable(reader: &mut R) -> Result { + avro_decode!(reader.await) +} + +/// Reads the file marker asynchronously +pub(crate) async fn read_file_marker( + reader: &mut R, +) -> Result<[u8; 16], Error> { + let mut marker = [0u8; 16]; + reader.read_exact(&mut marker).await?; + Ok(marker) +} + +async fn _read_binary(reader: &mut R) -> Result, Error> { + let len: usize = zigzag_i64(reader).await? as usize; + let mut buf = vec![]; + buf.try_reserve(len).map_err(|_| Error::OutOfSpec)?; + reader.take(len as u64).read_to_end(&mut buf).await?; + Ok(buf) +} + +pub(crate) async fn read_header( + reader: &mut R, +) -> Result>, Error> { + read_header!(reader.await) +} diff --git a/src/read_async/mod.rs b/src/read_async/mod.rs new file mode 100644 index 0000000..68e73f0 --- /dev/null +++ b/src/read_async/mod.rs @@ -0,0 +1,32 @@ +//! Async Avro +use futures::AsyncRead; +use futures::AsyncReadExt; + +use crate::error::Error; +use crate::file::FileMetadata; + +use crate::read::read_metadata_macro; +use crate::schema::Schema; + +mod block; +mod decode; +use crate::read::deserialize_header; +use decode::*; + +/// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker. +pub async fn read_metadata( + reader: &mut R, +) -> Result { + read_metadata_macro!(reader.await) +} + +async fn _read_binary(reader: &mut R) -> Result, Error> { + let len: usize = zigzag_i64(reader).await? as usize; + let mut buf = vec![]; + buf.try_reserve(len).map_err(|_| Error::OutOfSpec)?; + reader.take(len as u64).read_to_end(&mut buf).await?; + Ok(buf) +} + +pub use super::read::decompress::decompress_block; +pub use block::block_stream; diff --git a/src/de.rs b/src/schema/de.rs similarity index 100% rename from src/de.rs rename to src/schema/de.rs diff --git a/src/schema.rs b/src/schema/mod.rs similarity index 66% rename from src/schema.rs rename to src/schema/mod.rs index b7dee79..c7c4a3f 100644 --- a/src/schema.rs +++ b/src/schema/mod.rs @@ -1,28 +1,49 @@ +//! Contains structs defining Avro's logical types +mod de; +mod se; + /// An Avro Schema. It describes all _physical_ and _logical_ types. /// See [the spec](https://avro.apache.org/docs/current/spec.html) for details. #[derive(Debug, Clone, PartialEq, Hash)] pub enum Schema { + /// A null type Null, + /// Boolean (physically represented as a single byte) Boolean, + /// 32 bit signed integer (physically represented as a zigzag encoded variable number of bytes) Int(Option), + /// 64 bit signed integer (physically represented as a zigzag encoded variable number of bytes) Long(Option), + /// 32 bit float (physically represented as 4 bytes in little endian) Float, + /// 64 bit float (physically represented as 8 bytes in little endian) Double, + /// variable length bytes (physically represented by a zigzag encoded positive integer followed by its number of bytes) Bytes(Option), + /// variable length utf8 (physically represented by a zigzag encoded positive integer followed by its number of bytes) String(Option), + /// Record Record(Record), + /// Enum with a known number of variants Enum(Enum), + /// Array of a uniform type with N entries Array(Box), + /// A map String -> type Map(Box), + /// A union of a heterogeneous number of types Union(Vec), + /// todo Fixed(Fixed), } /// Order of a [`Field`]. #[derive(Debug, Clone, Copy, PartialEq, Hash)] pub enum Order { + /// Ascending order Ascending, + /// Descending order Descending, + /// Order is to be ignored Ignore, } @@ -30,15 +51,22 @@ pub enum Order { /// See [the spec](https://avro.apache.org/docs/current/spec.html) for details. #[derive(Debug, Clone, PartialEq, Hash)] pub struct Field { + /// Its name pub name: String, + /// Its optional documentation pub doc: Option, + /// Its Schema pub schema: Schema, + /// Its default value pub default: Option, + /// Its optional order pub order: Option, + /// Its aliases pub aliases: Vec, } impl Field { + /// Returns a new [`Field`] without a doc, default, order or aliases pub fn new>(name: I, schema: Schema) -> Self { Self { name: name.into(), @@ -54,14 +82,20 @@ impl Field { /// Struct to hold data from a [`Schema::Record`]. #[derive(Debug, Clone, PartialEq, Hash)] pub struct Record { + /// Its name pub name: String, + /// Its optional namespace pub namespace: Option, + /// Its optional documentation pub doc: Option, + /// Its aliases pub aliases: Vec, + /// Its children fields pub fields: Vec, } impl Record { + /// Returns a new [`Record`] without a namespace, doc or aliases pub fn new>(name: I, fields: Vec) -> Self { Self { name: name.into(), @@ -76,15 +110,22 @@ impl Record { /// Struct to hold data from a [`Schema::Fixed`]. #[derive(Debug, Clone, PartialEq, Hash)] pub struct Fixed { + /// Its name pub name: String, + /// Its optional namespace pub namespace: Option, + /// Its optional documentation pub doc: Option, + /// Its aliases pub aliases: Vec, + /// Its size pub size: usize, + /// Its optional logical type pub logical: Option, } impl Fixed { + /// Returns a new [`Fixed`] without a namespace, doc or aliases pub fn new>(name: I, size: usize) -> Self { Self { name: name.into(), @@ -100,11 +141,17 @@ impl Fixed { /// Struct to hold data from a [`Schema::Enum`]. #[derive(Debug, Clone, PartialEq, Hash)] pub struct Enum { + /// Its name pub name: String, + /// Its optional namespace pub namespace: Option, + /// Its aliases pub aliases: Vec, + /// Its optional documentation pub doc: Option, + /// Its set of symbols pub symbols: Vec, + /// Its default symbol pub default: Option, } @@ -143,35 +190,46 @@ impl From for Schema { /// Enum of all logical types of [`Schema::Int`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum IntLogical { + /// A date Date, + /// A time Time, } /// Enum of all logical types of [`Schema::Long`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum LongLogical { + /// A time Time, + /// A timestamp TimestampMillis, + /// A timestamp TimestampMicros, + /// A timestamp without timezone LocalTimestampMillis, + /// A timestamp without timezone LocalTimestampMicros, } /// Enum of all logical types of [`Schema::String`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum StringLogical { + /// A UUID Uuid, } /// Enum of all logical types of [`Schema::Fixed`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum FixedLogical { + /// A decimal Decimal(usize, usize), + /// A duration Duration, } /// Enum of all logical types of [`Schema::Bytes`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum BytesLogical { + /// A decimal Decimal(usize, usize), } diff --git a/src/se.rs b/src/schema/se.rs similarity index 100% rename from src/se.rs rename to src/schema/se.rs diff --git a/src/write/block.rs b/src/write/block.rs new file mode 100644 index 0000000..46997c2 --- /dev/null +++ b/src/write/block.rs @@ -0,0 +1,20 @@ +use std::io::Write; + +use crate::error::Error; + +use crate::file::CompressedBlock; + +use super::{encode::zigzag_encode, file::SYNC_NUMBER}; + +/// Writes a [`CompressedBlock`] to `writer` +pub fn write_block(writer: &mut W, block: &CompressedBlock) -> Result<(), Error> { + // write size and rows + zigzag_encode(block.number_of_rows as i64, writer)?; + zigzag_encode(block.data.len() as i64, writer)?; + + writer.write_all(&block.data)?; + + writer.write_all(&SYNC_NUMBER)?; + + Ok(()) +} diff --git a/src/write/compression.rs b/src/write/compression.rs new file mode 100644 index 0000000..afeb1a3 --- /dev/null +++ b/src/write/compression.rs @@ -0,0 +1,55 @@ +//! APIs to read from Avro format to arrow. + +use crate::error::Error; + +use crate::file::{Block, CompressedBlock, Compression}; + +#[cfg(feature = "compression")] +const CRC_TABLE: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); + +/// Compresses a [`Block`] to a [`CompressedBlock`]. +pub fn compress( + block: &mut Block, + compressed: &mut CompressedBlock, + compression: Option, +) -> Result { + compressed.number_of_rows = block.number_of_rows; + let block = &mut block.data; + let compressed = &mut compressed.data; + + match compression { + None => { + std::mem::swap(block, compressed); + Ok(true) + } + #[cfg(feature = "compression")] + Some(Compression::Deflate) => { + use std::io::Write; + compressed.clear(); + let mut encoder = libflate::deflate::Encoder::new(compressed); + encoder.write_all(block)?; + encoder.finish(); + Ok(false) + } + #[cfg(feature = "compression")] + Some(Compression::Snappy) => { + use snap::raw::{max_compress_len, Encoder}; + + compressed.clear(); + + let required_len = max_compress_len(block.len()); + compressed.resize(required_len, 0); + let compressed_bytes = Encoder::new() + .compress(block, compressed) + .map_err(|_| Error::OutOfSpec)?; + compressed.truncate(compressed_bytes); + + compressed.extend(CRC_TABLE.checksum(block).to_be_bytes()); + Ok(false) + } + #[cfg(not(feature = "compression"))] + Some(Compression::Deflate) => Err(Error::RequiresCompression), + #[cfg(not(feature = "compression"))] + Some(Compression::Snappy) => Err(Error::RequiresCompression), + } +} diff --git a/src/write/encode.rs b/src/write/encode.rs new file mode 100644 index 0000000..4746a77 --- /dev/null +++ b/src/write/encode.rs @@ -0,0 +1,28 @@ +//! Functions used to encode Avro physical types +use crate::error::Error; + +/// Zigzag encoding of a signed integer. +#[inline] +pub fn zigzag_encode(n: i64, writer: &mut W) -> Result<(), Error> { + _zigzag_encode(((n << 1) ^ (n >> 63)) as u64, writer) +} + +#[inline] +fn _zigzag_encode(mut z: u64, writer: &mut W) -> Result<(), Error> { + loop { + if z <= 0x7F { + writer.write_all(&[(z & 0x7F) as u8])?; + break; + } else { + writer.write_all(&[(0x80 | (z & 0x7F)) as u8])?; + z >>= 7; + } + } + Ok(()) +} + +pub(crate) fn write_binary(bytes: &[u8], writer: &mut W) -> Result<(), Error> { + zigzag_encode(bytes.len() as i64, writer)?; + writer.write_all(bytes)?; + Ok(()) +} diff --git a/src/write/file.rs b/src/write/file.rs new file mode 100644 index 0000000..3da91b0 --- /dev/null +++ b/src/write/file.rs @@ -0,0 +1,67 @@ +use std::collections::HashMap; + +use crate::error::Error; +use crate::file::Compression; +use crate::schema::{Record, Schema}; + +use super::encode; + +pub(crate) const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; +// * Four bytes, ASCII 'O', 'b', 'j', followed by 1. +pub(crate) const AVRO_MAGIC: [u8; 4] = [b'O', b'b', b'j', 1u8]; + +/// Serializes an [`Schema`] and optional [`Compression`] into an avro header. +fn serialize_header( + schema: &Schema, + compression: Option, +) -> Result>, Error> { + let schema = serde_json::to_string(schema).map_err(|_| Error::OutOfSpec)?; + + let mut header = HashMap::>::default(); + + header.insert("avro.schema".to_string(), schema.into_bytes()); + if let Some(compression) = compression { + let value = match compression { + Compression::Snappy => b"snappy".to_vec(), + Compression::Deflate => b"deflate".to_vec(), + }; + header.insert("avro.codec".to_string(), value); + }; + + Ok(header) +} + +/// Writes Avro's metadata to `writer`. +pub fn write_metadata( + writer: &mut W, + record: Record, + compression: Option, +) -> Result<(), Error> { + writer.write_all(&AVRO_MAGIC)?; + + // * file metadata, including the schema. + let schema = Schema::Record(record); + + write_schema(writer, &schema, compression)?; + + // The 16-byte, randomly-generated sync marker for this file. + writer.write_all(&SYNC_NUMBER)?; + + Ok(()) +} + +pub(crate) fn write_schema( + writer: &mut W, + schema: &Schema, + compression: Option, +) -> Result<(), Error> { + let header = serialize_header(schema, compression)?; + + encode::zigzag_encode(header.len() as i64, writer)?; + for (name, item) in header { + encode::write_binary(name.as_bytes(), writer)?; + encode::write_binary(&item, writer)?; + } + writer.write_all(&[0])?; + Ok(()) +} diff --git a/src/write/mod.rs b/src/write/mod.rs new file mode 100644 index 0000000..03f26cf --- /dev/null +++ b/src/write/mod.rs @@ -0,0 +1,8 @@ +//! Functions to compress and write Files' metadata and blocks +mod compression; +pub use compression::compress; +mod block; +pub mod encode; +pub(crate) mod file; +pub use block::write_block; +pub use file::write_metadata; diff --git a/src/write_async.rs b/src/write_async.rs new file mode 100644 index 0000000..96b2bbd --- /dev/null +++ b/src/write_async.rs @@ -0,0 +1,55 @@ +//! Functions to asynchronously write Files' metadata and blocks +use futures::{AsyncWrite, AsyncWriteExt}; + +use crate::{ + error::Error, + file::{CompressedBlock, Compression}, + schema::{Record, Schema}, + write::encode::zigzag_encode, + write::file::{write_schema, AVRO_MAGIC, SYNC_NUMBER}, +}; + +/// Writes Avro's metadata to `writer`. +pub async fn write_metadata( + writer: &mut W, + record: Record, + compression: Option, +) -> Result<(), Error> +where + W: AsyncWrite + Unpin, +{ + writer.write_all(&AVRO_MAGIC).await?; + + // * file metadata, including the schema. + let schema = Schema::Record(record); + + let mut scratch = vec![]; + write_schema(&mut scratch, &schema, compression)?; + + writer.write_all(&scratch).await?; + + // The 16-byte, randomly-generated sync marker for this file. + writer.write_all(&SYNC_NUMBER).await?; + + Ok(()) +} + +/// Writes a [`CompressedBlock`] to `writer` +pub async fn write_block(writer: &mut W, block: &CompressedBlock) -> Result<(), Error> +where + W: AsyncWrite + Unpin, +{ + // write size and rows + let mut scratch = Vec::with_capacity(10); + zigzag_encode(block.number_of_rows as i64, &mut scratch)?; + writer.write_all(&scratch).await?; + scratch.clear(); + zigzag_encode(block.data.len() as i64, &mut scratch)?; + writer.write_all(&scratch).await?; + + writer.write_all(&block.data).await?; + + writer.write_all(&SYNC_NUMBER).await?; + + Ok(()) +} diff --git a/tests/it/file.rs b/tests/it/file.rs new file mode 100644 index 0000000..913ce7b --- /dev/null +++ b/tests/it/file.rs @@ -0,0 +1,91 @@ +use std::convert::TryInto; + +use avro_schema::error::Error; +use avro_schema::file::{Block, Compression}; +use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator; +use avro_schema::schema::{Field, Record, Schema}; + +fn read_avro(mut data: &[u8]) -> Result, Error> { + let metadata = avro_schema::read::read_metadata(&mut data)?; + + let mut blocks = avro_schema::read::BlockStreamingIterator::new( + &mut data, + metadata.compression, + metadata.marker, + ); + + let mut values = vec![]; + while let Some(block) = blocks.next()? { + let _fields = &metadata.record.fields; + let length = block.number_of_rows; + let mut block: &[u8] = block.data.as_ref(); + // at this point you can deserialize the block based on `_fields` according + // to avro's specification. Note that `Block` is already decompressed. + // for example, if there was a single field with f32, we would use + for _ in 0..length { + let (item, remaining) = block.split_at(4); + block = remaining; + let value = f32::from_le_bytes(item.try_into().unwrap()); + values.push(value) + // if there were more fields, we would need to consume (or skip) the remaining + // here. You can use `avro_schema::read::decode::zigzag_i64` for integers :D + } + } + + Ok(values) +} + +fn write_avro( + compression: Option, + array: &[f32], +) -> Result, Error> { + let mut file = vec![]; + + let record = Record::new("", vec![Field::new("value", Schema::Float)]); + + avro_schema::write::write_metadata(&mut file, record, compression)?; + + // we need to create a `Block` + let mut data: Vec = vec![]; + for item in array.iter() { + let bytes = item.to_le_bytes(); + data.extend(bytes); + } + let mut block = Block::new(array.len(), data); + + // once completed, we compress it + let mut compressed_block = avro_schema::file::CompressedBlock::default(); + let _ = avro_schema::write::compress(&mut block, &mut compressed_block, compression)?; + + // and finally write it to the file + avro_schema::write::write_block(&mut file, &compressed_block)?; + + Ok(file) +} + +#[test] +fn round_trip() -> Result<(), Error> { + let original = vec![0.1, 0.2]; + let file = write_avro(None, &original)?; + let read = read_avro(&file)?; + assert_eq!(read, original); + Ok(()) +} + +#[test] +fn round_trip_deflate() -> Result<(), Error> { + let original = vec![0.1, 0.2]; + let file = write_avro(Some(Compression::Deflate), &original)?; + let read = read_avro(&file)?; + assert_eq!(read, original); + Ok(()) +} + +#[test] +fn round_trip_snappy() -> Result<(), Error> { + let original = vec![0.1, 0.2]; + let file = write_avro(Some(Compression::Snappy), &original)?; + let read = read_avro(&file)?; + assert_eq!(read, original); + Ok(()) +} diff --git a/tests/it/main.rs b/tests/it/main.rs index 0a51e3d..099bdcc 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -1,6 +1,8 @@ +mod file; + use serde_json::Result; -use avro_schema::*; +use avro_schema::schema::{BytesLogical, Field, LongLogical, Schema}; fn cases() -> Vec<(&'static str, Schema)> { use Schema::*; @@ -33,7 +35,7 @@ fn cases() -> Vec<(&'static str, Schema)> { (r#"{"type": "double"}"#, Double), ( r#"{"type": "enum", "name": "Test", "symbols": ["A", "B"]}"#, - Enum(avro_schema::Enum::new( + Enum(avro_schema::schema::Enum::new( "Test", vec!["A".to_string(), "B".to_string()], )), @@ -52,7 +54,7 @@ fn cases() -> Vec<(&'static str, Schema)> { "type": "map", "values": {"type": "enum", "name": "Test", "symbols": ["A", "B"]} }"#, - Map(Box::new(Enum(avro_schema::Enum::new( + Map(Box::new(Enum(avro_schema::schema::Enum::new( "Test", vec!["A".to_string(), "B".to_string()], )))), @@ -67,7 +69,8 @@ fn cases() -> Vec<(&'static str, Schema)> { "items": {"type": "enum", "name": "Test", "symbols": ["A", "B"]} }"#, Array(Box::new( - avro_schema::Enum::new("Test", vec!["A".to_string(), "B".to_string()]).into(), + avro_schema::schema::Enum::new("Test", vec!["A".to_string(), "B".to_string()]) + .into(), )), ), ( @@ -104,7 +107,7 @@ fn cases() -> Vec<(&'static str, Schema)> { } ] }"#, - Record(avro_schema::Record { + Record(avro_schema::schema::Record { name: "HandshakeResponse".to_string(), namespace: Some("org.apache.avro.ipc".to_string()), doc: None, @@ -112,7 +115,7 @@ fn cases() -> Vec<(&'static str, Schema)> { fields: vec![ Field::new( "match", - avro_schema::Enum::new( + avro_schema::schema::Enum::new( "HandshakeMatch", vec!["BOTH".to_string(), "CLIENT".to_string(), "NONE".to_string()], ) @@ -121,18 +124,21 @@ fn cases() -> Vec<(&'static str, Schema)> { Field::new("serverProtocol", Union(vec![Null, String(None)])), Field::new( "serverHash", - Union(vec![Null, avro_schema::Fixed::new("MD5", 16).into()]), + Union(vec![ + Null, + avro_schema::schema::Fixed::new("MD5", 16).into(), + ]), ), Field::new("meta", Union(vec![Null, Map(Box::new(Bytes(None)))])), Field::new( "duration", - avro_schema::Fixed { + avro_schema::schema::Fixed { name: "duration".to_string(), size: 12, namespace: None, doc: None, aliases: vec![], - logical: Some(FixedLogical::Duration), + logical: Some(avro_schema::schema::FixedLogical::Duration), } .into(), ), @@ -145,18 +151,8 @@ fn cases() -> Vec<(&'static str, Schema)> { #[test] fn test_deserialize() -> Result<()> { for (data, expected) in cases() { - let v: avro_schema::Schema = serde_json::from_str(data)?; + let v: avro_schema::schema::Schema = serde_json::from_str(data)?; assert_eq!(v, expected); } Ok(()) } - -#[test] -fn test_round_trip() -> Result<()> { - for (_, expected) in cases() { - let serialized = serde_json::to_string(&expected)?; - let v: avro_schema::Schema = serde_json::from_str(&serialized)?; - assert_eq!(expected, v); - } - Ok(()) -}