diff --git a/Cargo.toml b/Cargo.toml index 1adfafda747..a232d5e4fdc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,6 +113,7 @@ full = [ "io_parquet_compression", "io_avro", "io_avro_compression", + "io_avro_async", "regex", "merge_sort", "compute", @@ -142,6 +143,7 @@ io_avro_compression = [ "libflate", "snap", ] +io_avro_async = ["io_avro", "futures"] # io_json: its dependencies + error handling # serde_derive: there is some derive around io_json_integration = ["io_json", "serde_derive", "hex"] @@ -171,6 +173,7 @@ skip_feature_sets = [ ["io_csv_async"], ["io_csv_read_async"], ["io_avro"], + ["io_avro_async"], ["io_avro_compression"], ["io_json"], ["io_flight"], diff --git a/src/io/avro/mod.rs b/src/io/avro/mod.rs index 75ce3db6984..2cf3c216d1b 100644 --- a/src/io/avro/mod.rs +++ b/src/io/avro/mod.rs @@ -2,6 +2,9 @@ //! Read and write from and to Apache Avro pub mod read; +#[cfg(feature = "io_avro_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))] +pub mod read_async; use crate::error::ArrowError; @@ -10,3 +13,78 @@ impl From for ArrowError { ArrowError::External("".to_string(), Box::new(error)) } } + +// macros that can operate in sync and async code. +macro_rules! avro_decode { + ($reader:ident $($_await:tt)*) => { + { + let mut i = 0u64; + let mut buf = [0u8; 1]; + + let mut j = 0; + loop { + if j > 9 { + // if j * 7 > 64 + return Err(ArrowError::ExternalFormat( + "zigzag decoding failed - corrupt avro file".to_string(), + )); + } + $reader.read_exact(&mut buf[..])$($_await)*?; + i |= (u64::from(buf[0] & 0x7F)) << (j * 7); + if (buf[0] >> 7) == 0 { + break; + } else { + j += 1; + } + } + + Ok(i) + } + } +} + +macro_rules! read_header { + ($reader:ident $($_await:tt)*) => {{ + let mut items = HashMap::new(); + + loop { + let len = zigzag_i64($reader)$($_await)*? as usize; + if len == 0 { + break Ok(items); + } + + items.reserve(len); + for _ in 0..len { + let key = _read_binary($reader)$($_await)*?; + let key = String::from_utf8(key) + .map_err(|_| ArrowError::ExternalFormat("Invalid Avro header".to_string()))?; + let value = _read_binary($reader)$($_await)*?; + items.insert(key, value); + } + } + }}; +} + +macro_rules! read_metadata { + ($reader:ident $($_await:tt)*) => {{ + let mut magic_number = [0u8; 4]; + $reader.read_exact(&mut magic_number)$($_await)*?; + + // see https://avro.apache.org/docs/current/spec.html#Object+Container+Files + if magic_number != [b'O', b'b', b'j', 1u8] { + return Err(ArrowError::ExternalFormat( + "Avro header does not contain a valid magic number".to_string(), + )); + } + + let header = read_header($reader)$($_await)*?; + + let (schema, compression) = deserialize_header(header)?; + + let marker = read_file_marker($reader)$($_await)*?; + + Ok((schema, compression, marker)) + }}; +} + +pub(crate) use {avro_decode, read_header, read_metadata}; diff --git a/src/io/avro/read/block.rs b/src/io/avro/read/block.rs new file mode 100644 index 00000000000..f061d1e8fe4 --- /dev/null +++ b/src/io/avro/read/block.rs @@ -0,0 +1,91 @@ +//! APIs to read from Avro format to arrow. +use std::io::Read; + +use fallible_streaming_iterator::FallibleStreamingIterator; + +use crate::error::{ArrowError, Result}; + +use super::util; + +fn read_size(reader: &mut R) -> Result<(usize, usize)> { + let rows = match util::zigzag_i64(reader) { + Ok(a) => a, + Err(ArrowError::Io(io_err)) => { + if let std::io::ErrorKind::UnexpectedEof = io_err.kind() { + // end + return Ok((0, 0)); + } else { + return Err(ArrowError::Io(io_err)); + } + } + Err(other) => return Err(other), + }; + let bytes = util::zigzag_i64(reader)?; + Ok((rows as usize, bytes as usize)) +} + +/// Reads a block from the file into `buf`. +/// # Panic +/// Panics iff the block marker does not equal to the file's marker +fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) -> Result { + let (rows, bytes) = read_size(reader)?; + if rows == 0 { + return Ok(0); + }; + + buf.resize(bytes, 0); + reader.read_exact(buf)?; + + let mut marker = [0u8; 16]; + reader.read_exact(&mut marker)?; + + assert!(!(marker != file_marker)); + Ok(rows) +} + +/// [`FallibleStreamingIterator`] of compressed avro blocks +pub struct BlockStreamIterator { + buf: (Vec, usize), + reader: R, + file_marker: [u8; 16], +} + +impl BlockStreamIterator { + /// Creates a new [`BlockStreamIterator`]. + pub fn new(reader: R, file_marker: [u8; 16]) -> Self { + Self { + reader, + file_marker, + buf: (vec![], 0), + } + } + + /// The buffer of [`BlockStreamIterator`]. + pub fn buffer(&mut self) -> &mut Vec { + &mut self.buf.0 + } + + /// Deconstructs itself + pub fn into_inner(self) -> (R, Vec) { + (self.reader, self.buf.0) + } +} + +impl FallibleStreamingIterator for BlockStreamIterator { + type Error = ArrowError; + type Item = (Vec, usize); + + fn advance(&mut self) -> Result<()> { + let (buf, rows) = &mut self.buf; + *rows = read_block(&mut self.reader, buf, self.file_marker)?; + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + if self.buf.1 > 0 { + Some(&self.buf) + } else { + None + } + } +} diff --git a/src/io/avro/read/decompress.rs b/src/io/avro/read/decompress.rs new file mode 100644 index 00000000000..2fdeb613d40 --- /dev/null +++ b/src/io/avro/read/decompress.rs @@ -0,0 +1,100 @@ +//! APIs to read from Avro format to arrow. +use std::io::Read; + +use fallible_streaming_iterator::FallibleStreamingIterator; + +use crate::error::{ArrowError, Result}; + +use super::BlockStreamIterator; +use super::Compression; + +/// Decompresses an avro block. +/// Returns whether the buffers where swapped. +fn decompress_block( + block: &mut Vec, + decompress: &mut Vec, + compression: Option, +) -> Result { + match compression { + None => { + std::mem::swap(block, decompress); + Ok(true) + } + #[cfg(feature = "io_avro_compression")] + Some(Compression::Deflate) => { + decompress.clear(); + let mut decoder = libflate::deflate::Decoder::new(&block[..]); + decoder.read_to_end(decompress)?; + Ok(false) + } + #[cfg(feature = "io_avro_compression")] + Some(Compression::Snappy) => { + let len = snap::raw::decompress_len(&block[..block.len() - 4]) + .map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?; + decompress.clear(); + decompress.resize(len, 0); + snap::raw::Decoder::new() + .decompress(&block[..block.len() - 4], decompress) + .map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?; + Ok(false) + } + #[cfg(not(feature = "io_avro_compression"))] + Some(Compression::Deflate) => Err(ArrowError::Other( + "The avro file is deflate-encoded but feature 'io_avro_compression' is not active." + .to_string(), + )), + #[cfg(not(feature = "io_avro_compression"))] + Some(Compression::Snappy) => Err(ArrowError::Other( + "The avro file is snappy-encoded but feature 'io_avro_compression' is not active." + .to_string(), + )), + } +} + +/// [`FallibleStreamingIterator`] of decompressed Avro blocks +pub struct Decompressor { + blocks: BlockStreamIterator, + codec: Option, + buf: (Vec, usize), + was_swapped: bool, +} + +impl Decompressor { + /// Creates a new [`Decompressor`]. + pub fn new(blocks: BlockStreamIterator, codec: Option) -> Self { + Self { + blocks, + codec, + buf: (vec![], 0), + was_swapped: false, + } + } + + /// Deconstructs itself into its internal reader + pub fn into_inner(self) -> R { + self.blocks.into_inner().0 + } +} + +impl<'a, R: Read> FallibleStreamingIterator for Decompressor { + type Error = ArrowError; + type Item = (Vec, usize); + + fn advance(&mut self) -> Result<()> { + if self.was_swapped { + std::mem::swap(self.blocks.buffer(), &mut self.buf.0); + } + self.blocks.advance()?; + self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?; + self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default(); + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + if self.buf.1 > 0 { + Some(&self.buf) + } else { + None + } + } +} diff --git a/src/io/avro/read/header.rs b/src/io/avro/read/header.rs new file mode 100644 index 00000000000..e15aebf626e --- /dev/null +++ b/src/io/avro/read/header.rs @@ -0,0 +1,29 @@ +use std::collections::HashMap; + +use avro_rs::{Error, Schema}; +use serde_json; + +use crate::error::Result; + +use super::Compression; + +/// Deserializes the Avro header into an Avro [`Schema`] and optional [`Compression`]. +pub(crate) fn deserialize_header( + header: HashMap>, +) -> Result<(Schema, Option)> { + let json = header + .get("avro.schema") + .and_then(|bytes| serde_json::from_slice(bytes.as_ref()).ok()) + .ok_or(Error::GetAvroSchemaFromMap)?; + let schema = Schema::parse(&json)?; + + let compression = header.get("avro.codec").and_then(|bytes| { + let bytes: &[u8] = bytes.as_ref(); + match bytes { + b"snappy" => Some(Compression::Snappy), + b"deflate" => Some(Compression::Deflate), + _ => None, + } + }); + Ok((schema, compression)) +} diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index de3ec6c36a0..ca5f56a6e22 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -6,13 +6,20 @@ use std::sync::Arc; use avro_rs::Schema as AvroSchema; use fallible_streaming_iterator::FallibleStreamingIterator; +mod block; +mod decompress; +pub use block::BlockStreamIterator; +pub use decompress::Decompressor; mod deserialize; +mod header; mod nested; mod schema; mod util; +pub(super) use header::deserialize_header; + use crate::datatypes::Schema; -use crate::error::{ArrowError, Result}; +use crate::error::Result; use crate::record_batch::RecordBatch; /// Valid compressions @@ -41,193 +48,30 @@ pub fn read_metadata( Ok((avro_schema, schema, codec, marker)) } -fn read_size(reader: &mut R) -> Result<(usize, usize)> { - let rows = match util::zigzag_i64(reader) { - Ok(a) => a, - Err(ArrowError::Io(io_err)) => { - if let std::io::ErrorKind::UnexpectedEof = io_err.kind() { - // end - return Ok((0, 0)); - } else { - return Err(ArrowError::Io(io_err)); - } - } - Err(other) => return Err(other), - }; - let bytes = util::zigzag_i64(reader)?; - Ok((rows as usize, bytes as usize)) -} - -/// Reads a block from the file into `buf`. -/// # Panic -/// Panics iff the block marker does not equal to the file's marker -fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) -> Result { - let (rows, bytes) = read_size(reader)?; - if rows == 0 { - return Ok(0); - }; - - buf.resize(bytes, 0); - reader.read_exact(buf)?; - - let mut marker = [0u8; 16]; - reader.read_exact(&mut marker)?; - - assert!(!(marker != file_marker)); - Ok(rows) -} - -/// Decompresses an avro block. -/// Returns whether the buffers where swapped. -fn decompress_block( - block: &mut Vec, - decompress: &mut Vec, - codec: Option, -) -> Result { - match codec { - None => { - std::mem::swap(block, decompress); - Ok(true) - } - #[cfg(feature = "io_avro_compression")] - Some(Compression::Deflate) => { - decompress.clear(); - let mut decoder = libflate::deflate::Decoder::new(&block[..]); - decoder.read_to_end(decompress)?; - Ok(false) - } - #[cfg(feature = "io_avro_compression")] - Some(Compression::Snappy) => { - let len = snap::raw::decompress_len(&block[..block.len() - 4]) - .map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?; - decompress.clear(); - decompress.resize(len, 0); - snap::raw::Decoder::new() - .decompress(&block[..block.len() - 4], decompress) - .map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?; - Ok(false) - } - #[cfg(not(feature = "io_avro_compression"))] - Some(Compression::Deflate) => Err(ArrowError::Other( - "The avro file is deflate-encoded but feature 'io_avro_compression' is not active." - .to_string(), - )), - #[cfg(not(feature = "io_avro_compression"))] - Some(Compression::Snappy) => Err(ArrowError::Other( - "The avro file is snappy-encoded but feature 'io_avro_compression' is not active." - .to_string(), - )), - } -} - -/// [`StreamingIterator`] of blocks of avro data -pub struct BlockStreamIterator<'a, R: Read> { - buf: (Vec, usize), - reader: &'a mut R, - file_marker: [u8; 16], -} - -impl<'a, R: Read> BlockStreamIterator<'a, R> { - /// Creates a new [`BlockStreamIterator`]. - pub fn new(reader: &'a mut R, file_marker: [u8; 16]) -> Self { - Self { - reader, - file_marker, - buf: (vec![], 0), - } - } - - /// The buffer of [`BlockStreamIterator`]. - pub fn buffer(&mut self) -> &mut Vec { - &mut self.buf.0 - } -} - -impl<'a, R: Read> FallibleStreamingIterator for BlockStreamIterator<'a, R> { - type Error = ArrowError; - type Item = (Vec, usize); - - fn advance(&mut self) -> Result<()> { - let (buf, rows) = &mut self.buf; - *rows = read_block(self.reader, buf, self.file_marker)?; - Ok(()) - } - - fn get(&self) -> Option<&Self::Item> { - if self.buf.1 > 0 { - Some(&self.buf) - } else { - None - } - } -} - -/// [`StreamingIterator`] of blocks of decompressed avro data -pub struct Decompressor<'a, R: Read> { - blocks: BlockStreamIterator<'a, R>, - codec: Option, - buf: (Vec, usize), - was_swapped: bool, -} - -impl<'a, R: Read> Decompressor<'a, R> { - /// Creates a new [`Decompressor`]. - pub fn new(blocks: BlockStreamIterator<'a, R>, codec: Option) -> Self { - Self { - blocks, - codec, - buf: (vec![], 0), - was_swapped: false, - } - } -} - -impl<'a, R: Read> FallibleStreamingIterator for Decompressor<'a, R> { - type Error = ArrowError; - type Item = (Vec, usize); - - fn advance(&mut self) -> Result<()> { - if self.was_swapped { - std::mem::swap(self.blocks.buffer(), &mut self.buf.0); - } - self.blocks.advance()?; - self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?; - self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default(); - Ok(()) - } - - fn get(&self) -> Option<&Self::Item> { - if self.buf.1 > 0 { - Some(&self.buf) - } else { - None - } - } -} - -/// Single threaded, blocking reader of Avro files; [`Iterator`] of [`RecordBatch`]es. -pub struct Reader<'a, R: Read> { - iter: Decompressor<'a, R>, +/// Single threaded, blocking reader of Avro; [`Iterator`] of [`RecordBatch`]es. +pub struct Reader { + iter: Decompressor, schema: Arc, avro_schemas: Vec, } -impl<'a, R: Read> Reader<'a, R> { +impl Reader { /// Creates a new [`Reader`]. - pub fn new( - iter: Decompressor<'a, R>, - avro_schemas: Vec, - schema: Arc, - ) -> Self { + pub fn new(iter: Decompressor, avro_schemas: Vec, schema: Arc) -> Self { Self { iter, avro_schemas, schema, } } + + /// Deconstructs itself into its internal reader + pub fn into_inner(self) -> R { + self.iter.into_inner() + } } -impl<'a, R: Read> Iterator for Reader<'a, R> { +impl Iterator for Reader { type Item = Result; fn next(&mut self) -> Option { diff --git a/src/io/avro/read/util.rs b/src/io/avro/read/util.rs index a47cb6617bc..01a9db713a5 100644 --- a/src/io/avro/read/util.rs +++ b/src/io/avro/read/util.rs @@ -1,11 +1,12 @@ +use std::collections::HashMap; use std::io::Read; -use avro_rs::{from_avro_datum, types::Value, AvroResult, Error, Schema}; -use serde_json::from_slice; +use avro_rs::Schema; -use crate::error::Result; +use crate::error::{ArrowError, Result}; -use super::Compression; +use super::super::{avro_decode, read_header, read_metadata}; +use super::{deserialize_header, Compression}; pub fn zigzag_i64(reader: &mut R) -> Result { let z = decode_variable(reader)?; @@ -17,77 +18,29 @@ pub fn zigzag_i64(reader: &mut R) -> Result { } fn decode_variable(reader: &mut R) -> Result { - let mut i = 0u64; - let mut buf = [0u8; 1]; + avro_decode!(reader) +} - let mut j = 0; - loop { - if j > 9 { - // if j * 7 > 64 - panic!() - } - reader.read_exact(&mut buf[..])?; - i |= (u64::from(buf[0] & 0x7F)) << (j * 7); - if (buf[0] >> 7) == 0 { - break; - } else { - j += 1; - } - } +fn _read_binary(reader: &mut R) -> Result> { + let len: usize = zigzag_i64(reader)? as usize; + let mut buf = vec![0u8; len]; + reader.read_exact(&mut buf)?; + Ok(buf) +} - Ok(i) +fn read_header(reader: &mut R) -> Result>> { + read_header!(reader) } -fn read_file_marker(reader: &mut R) -> AvroResult<[u8; 16]> { +fn read_file_marker(reader: &mut R) -> Result<[u8; 16]> { let mut marker = [0u8; 16]; - reader.read_exact(&mut marker).map_err(Error::ReadMarker)?; + reader.read_exact(&mut marker)?; Ok(marker) } /// Reads the schema from `reader`, returning the file's [`Schema`] and [`Compression`]. /// # Error /// This function errors iff the header is not a valid avro file header. -pub fn read_schema(reader: &mut R) -> AvroResult<(Schema, Option, [u8; 16])> { - let meta_schema = Schema::Map(Box::new(Schema::Bytes)); - - let mut buf = [0u8; 4]; - reader.read_exact(&mut buf).map_err(Error::ReadHeader)?; - - if buf != [b'O', b'b', b'j', 1u8] { - return Err(Error::HeaderMagic); - } - - if let Value::Map(meta) = from_avro_datum(&meta_schema, reader, None)? { - // TODO: surface original parse schema errors instead of coalescing them here - let json = meta - .get("avro.schema") - .and_then(|bytes| { - if let Value::Bytes(ref bytes) = *bytes { - from_slice(bytes.as_ref()).ok() - } else { - None - } - }) - .ok_or(Error::GetAvroSchemaFromMap)?; - let schema = Schema::parse(&json)?; - - let codec = meta.get("avro.codec").and_then(|codec| { - if let Value::Bytes(bytes) = codec { - let bytes: &[u8] = bytes.as_ref(); - match bytes { - b"snappy" => Some(Compression::Snappy), - b"deflate" => Some(Compression::Deflate), - _ => None, - } - } else { - None - } - }); - - let marker = read_file_marker(reader)?; - - Ok((schema, codec, marker)) - } else { - Err(Error::GetHeaderMetadata) - } +pub fn read_schema(reader: &mut R) -> Result<(Schema, Option, [u8; 16])> { + read_metadata!(reader) } diff --git a/src/io/avro/read_async/header.rs b/src/io/avro/read_async/header.rs new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/io/avro/read_async/mod.rs b/src/io/avro/read_async/mod.rs new file mode 100644 index 00000000000..c58fff2f617 --- /dev/null +++ b/src/io/avro/read_async/mod.rs @@ -0,0 +1,53 @@ +//! Async Avro +use std::collections::HashMap; + +use avro_rs::Schema; +use futures::AsyncRead; +use futures::AsyncReadExt; + +use crate::error::{ArrowError, Result}; + +use super::read::deserialize_header; +use super::read::Compression; +use super::{avro_decode, read_header, read_metadata}; + +/// Reads Avro's metadata from `reader` into a [`Schema`], [`Compression`] and magic marker. +#[allow(clippy::type_complexity)] +pub async fn read_metadata_async( + reader: &mut R, +) -> Result<(Schema, Option, [u8; 16])> { + read_metadata!(reader.await) +} + +/// Reads the file marker asynchronously +async fn read_file_marker(reader: &mut R) -> Result<[u8; 16]> { + let mut marker = [0u8; 16]; + reader.read_exact(&mut marker).await?; + Ok(marker) +} + +async fn zigzag_i64(reader: &mut R) -> Result { + let z = decode_variable(reader).await?; + Ok(if z & 0x1 == 0 { + (z >> 1) as i64 + } else { + !(z >> 1) as i64 + }) +} + +async fn decode_variable(reader: &mut R) -> Result { + avro_decode!(reader.await) +} + +async fn _read_binary(reader: &mut R) -> Result> { + let len: usize = zigzag_i64(reader).await? as usize; + let mut buf = vec![0u8; len]; + reader.read_exact(&mut buf).await?; + Ok(buf) +} + +async fn read_header( + reader: &mut R, +) -> Result>> { + read_header!(reader.await) +} diff --git a/tests/it/io/avro/mod.rs b/tests/it/io/avro/mod.rs index 918582b61ee..ee0459c04a4 100644 --- a/tests/it/io/avro/mod.rs +++ b/tests/it/io/avro/mod.rs @@ -1,3 +1,5 @@ //! Read and write from and to Apache Avro mod read; +#[cfg(feature = "io_avro_async")] +mod read_async; diff --git a/tests/it/io/avro/read/mod.rs b/tests/it/io/avro/read.rs similarity index 98% rename from tests/it/io/avro/read/mod.rs rename to tests/it/io/avro/read.rs index a6387b27be6..24059343412 100644 --- a/tests/it/io/avro/read/mod.rs +++ b/tests/it/io/avro/read.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use arrow2::types::months_days_ns; use avro_rs::types::{Record, Value}; use avro_rs::{Codec, Writer}; use avro_rs::{Days, Duration, Millis, Months, Schema as AvroSchema}; @@ -10,6 +9,7 @@ use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::avro::read; use arrow2::record_batch::RecordBatch; +use arrow2::types::months_days_ns; fn schema() -> (AvroSchema, Schema) { let raw_schema = r#" @@ -82,7 +82,7 @@ fn schema() -> (AvroSchema, Schema) { (AvroSchema::parse_str(raw_schema).unwrap(), schema) } -fn write(codec: Codec) -> Result<(Vec, RecordBatch)> { +pub(super) fn write(codec: Codec) -> Result<(Vec, RecordBatch)> { let (avro, schema) = schema(); // a writer needs a schema and something to write to let mut writer = Writer::with_codec(&avro, Vec::new(), codec); diff --git a/tests/it/io/avro/read_async.rs b/tests/it/io/avro/read_async.rs new file mode 100644 index 00000000000..dc4f873a527 --- /dev/null +++ b/tests/it/io/avro/read_async.rs @@ -0,0 +1,23 @@ +use avro_rs::Codec; + +use arrow2::error::Result; +use arrow2::io::avro::read; + +use super::read::write; + +async fn _test_metadata(codec: Codec) -> Result<()> { + let (data, expected) = write(codec).unwrap(); + + let file = &mut &data[..]; + + let (_, schema, _, _) = read::read_metadata(file)?; + + assert_eq!(&schema, expected.schema().as_ref()); + + Ok(()) +} + +#[tokio::test] +async fn read_metadata() -> Result<()> { + _test_metadata(Codec::Null).await +}