diff --git a/examples/avro_read_async.rs b/examples/avro_read_async.rs index 7d0dd8d808e..adf67855e2d 100644 --- a/examples/avro_read_async.rs +++ b/examples/avro_read_async.rs @@ -25,14 +25,14 @@ async fn main() -> Result<()> { let blocks = block_stream(&mut reader, marker).await; pin_mut!(blocks); - while let Some((mut block, rows)) = blocks.next().await.transpose()? { - // the content here is blocking. In general this should run on spawn_blocking + while let Some(mut block) = blocks.next().await.transpose()? { let schema = schema.clone(); let avro_schemas = avro_schemas.clone(); + // the content here is CPU-bounded. It should run on a dedicated thread pool let handle = tokio::task::spawn_blocking(move || { - let mut decompressed = vec![]; + let mut decompressed = Block::new(0, vec![]); decompress_block(&mut block, &mut decompressed, compression)?; - deserialize(&decompressed, rows, schema, &avro_schemas) + deserialize(&decompressed, schema, &avro_schemas) }); let batch = handle.await.unwrap()?; assert!(batch.num_rows() > 0); diff --git a/src/io/avro/mod.rs b/src/io/avro/mod.rs index 0dc4b8909e6..cd32ec2c880 100644 --- a/src/io/avro/mod.rs +++ b/src/io/avro/mod.rs @@ -90,3 +90,41 @@ macro_rules! read_metadata { } pub(crate) use {avro_decode, read_header, read_metadata}; + +/// A compressed Avro block. +#[derive(Debug, Clone, Default, PartialEq)] +pub struct CompressedBlock { + /// The number of rows + pub number_of_rows: usize, + /// The compressed data + pub data: Vec, +} + +impl CompressedBlock { + /// Creates a new CompressedBlock + pub fn new(number_of_rows: usize, data: Vec) -> Self { + Self { + number_of_rows, + data, + } + } +} + +/// An uncompressed Avro block. +#[derive(Debug, Clone, Default, PartialEq)] +pub struct Block { + /// The number of rows + pub number_of_rows: usize, + /// The uncompressed data + pub data: Vec, +} + +impl Block { + /// Creates a new Block + pub fn new(number_of_rows: usize, data: Vec) -> Self { + Self { + number_of_rows, + data, + } + } +} diff --git a/src/io/avro/read/block.rs b/src/io/avro/read/block.rs index 40f30ac3d06..0df7556276c 100644 --- a/src/io/avro/read/block.rs +++ b/src/io/avro/read/block.rs @@ -5,6 +5,7 @@ use fallible_streaming_iterator::FallibleStreamingIterator; use crate::error::{ArrowError, Result}; +use super::super::CompressedBlock; use super::util; fn read_size(reader: &mut R) -> Result<(usize, usize)> { @@ -24,29 +25,38 @@ fn read_size(reader: &mut R) -> Result<(usize, usize)> { Ok((rows as usize, bytes as usize)) } -/// Reads a block from the `reader` into `buf`. -/// # Panic -/// Panics iff the block marker does not equal to the file's marker -fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) -> Result { +/// Reads a [`CompressedBlock`] from the `reader`. +/// # Error +/// This function errors iff either the block cannot be read or the sync marker does not match +fn read_block( + reader: &mut R, + block: &mut CompressedBlock, + file_marker: [u8; 16], +) -> Result<()> { let (rows, bytes) = read_size(reader)?; + block.number_of_rows = rows; if rows == 0 { - return Ok(0); + return Ok(()); }; - buf.clear(); - buf.resize(bytes, 0); - reader.read_exact(buf)?; + block.data.clear(); + block.data.resize(bytes, 0); + reader.read_exact(&mut block.data)?; let mut marker = [0u8; 16]; reader.read_exact(&mut marker)?; - assert!(!(marker != file_marker)); - Ok(rows) + if marker != file_marker { + return Err(ArrowError::ExternalFormat( + "Avro: the sync marker in the block does not correspond to the file marker".to_string(), + )); + } + Ok(()) } /// [`FallibleStreamingIterator`] of compressed avro blocks pub struct BlockStreamIterator { - buf: (Vec, usize), + buf: CompressedBlock, reader: R, file_marker: [u8; 16], } @@ -57,33 +67,32 @@ impl BlockStreamIterator { Self { reader, file_marker, - buf: (vec![], 0), + buf: CompressedBlock::new(0, vec![]), } } /// The buffer of [`BlockStreamIterator`]. - pub fn buffer(&mut self) -> &mut Vec { - &mut self.buf.0 + pub fn buffer(&mut self) -> &mut CompressedBlock { + &mut self.buf } /// Deconstructs itself pub fn into_inner(self) -> (R, Vec) { - (self.reader, self.buf.0) + (self.reader, self.buf.data) } } impl FallibleStreamingIterator for BlockStreamIterator { type Error = ArrowError; - type Item = (Vec, usize); + type Item = CompressedBlock; fn advance(&mut self) -> Result<()> { - let (buf, rows) = &mut self.buf; - *rows = read_block(&mut self.reader, buf, self.file_marker)?; + read_block(&mut self.reader, &mut self.buf, self.file_marker)?; Ok(()) } fn get(&self) -> Option<&Self::Item> { - if self.buf.1 > 0 { + if self.buf.number_of_rows > 0 { Some(&self.buf) } else { None diff --git a/src/io/avro/read/decompress.rs b/src/io/avro/read/decompress.rs index 9fd64ddbf3b..243f815221f 100644 --- a/src/io/avro/read/decompress.rs +++ b/src/io/avro/read/decompress.rs @@ -5,16 +5,21 @@ use fallible_streaming_iterator::FallibleStreamingIterator; use crate::error::{ArrowError, Result}; +use super::super::{Block, CompressedBlock}; use super::BlockStreamIterator; use super::Compression; /// Decompresses an Avro block. /// Returns whether the buffers where swapped. pub fn decompress_block( - block: &mut Vec, - decompressed: &mut Vec, + block: &mut CompressedBlock, + decompressed: &mut Block, compression: Option, ) -> Result { + decompressed.number_of_rows = block.number_of_rows; + let block = &mut block.data; + let decompressed = &mut decompressed.data; + match compression { None => { std::mem::swap(block, decompressed); @@ -55,7 +60,7 @@ pub fn decompress_block( pub struct Decompressor { blocks: BlockStreamIterator, codec: Option, - buf: (Vec, usize), + buf: Block, was_swapped: bool, } @@ -65,7 +70,7 @@ impl Decompressor { Self { blocks, codec, - buf: (vec![], 0), + buf: Block::new(0, vec![]), was_swapped: false, } } @@ -78,20 +83,19 @@ impl Decompressor { impl<'a, R: Read> FallibleStreamingIterator for Decompressor { type Error = ArrowError; - type Item = (Vec, usize); + type Item = Block; fn advance(&mut self) -> Result<()> { if self.was_swapped { - std::mem::swap(self.blocks.buffer(), &mut self.buf.0); + std::mem::swap(&mut self.blocks.buffer().data, &mut self.buf.data); } self.blocks.advance()?; - self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?; - self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default(); + self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf, self.codec)?; Ok(()) } fn get(&self) -> Option<&Self::Item> { - if self.buf.1 > 0 { + if self.buf.number_of_rows > 0 { Some(&self.buf) } else { None diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index 33bd8d888ae..dda3fb4cb7e 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -10,6 +10,7 @@ use crate::error::Result; use crate::record_batch::RecordBatch; use crate::types::months_days_ns; +use super::super::Block; use super::nested::*; use super::util; @@ -241,13 +242,15 @@ fn deserialize_value<'a>( Ok(block) } -/// Deserializes an Avro block into a [`RecordBatch`]. +/// Deserializes a [`Block`] into a [`RecordBatch`]. pub fn deserialize( - mut block: &[u8], - rows: usize, + block: &Block, schema: Arc, avro_schemas: &[AvroSchema], ) -> Result { + let rows = block.number_of_rows; + let mut block = block.data.as_ref(); + // create mutables, one per field let mut arrays: Vec> = schema .fields() diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index 1cb32186fa4..b203884de18 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -1,4 +1,3 @@ -#![deny(missing_docs)] //! APIs to read from Avro format to arrow. use std::io::Read; use std::sync::Arc; @@ -73,9 +72,9 @@ impl Iterator for Reader { let schema = self.schema.clone(); let avro_schemas = &self.avro_schemas; - self.iter.next().transpose().map(|x| { - let (data, rows) = x?; - deserialize(data, *rows, schema, avro_schemas) - }) + self.iter + .next() + .transpose() + .map(|maybe_block| deserialize(maybe_block?, schema, avro_schemas)) } } diff --git a/src/io/avro/read_async/block.rs b/src/io/avro/read_async/block.rs index 47cf71aea93..fe31d554434 100644 --- a/src/io/avro/read_async/block.rs +++ b/src/io/avro/read_async/block.rs @@ -6,6 +6,8 @@ use futures::Stream; use crate::error::{ArrowError, Result}; +use super::CompressedBlock; + use super::utils::zigzag_i64; async fn read_size(reader: &mut R) -> Result<(usize, usize)> { @@ -25,43 +27,48 @@ async fn read_size(reader: &mut R) -> Result<(usize Ok((rows as usize, bytes as usize)) } -/// Reads a block from the file into `buf`. -/// # Panic -/// Panics iff the block marker does not equal to the file's marker +/// Reads a [`CompressedBlock`] from the `reader`. +/// # Error +/// This function errors iff either the block cannot be read or the sync marker does not match async fn read_block( reader: &mut R, - buf: &mut Vec, + block: &mut CompressedBlock, file_marker: [u8; 16], -) -> Result { +) -> Result<()> { let (rows, bytes) = read_size(reader).await?; + block.number_of_rows = rows; if rows == 0 { - return Ok(0); + return Ok(()); }; - buf.clear(); - buf.resize(bytes, 0); - reader.read_exact(buf).await?; + block.data.clear(); + block.data.resize(bytes, 0); + reader.read_exact(&mut block.data).await?; let mut marker = [0u8; 16]; reader.read_exact(&mut marker).await?; - assert!(!(marker != file_marker)); - Ok(rows) + if marker != file_marker { + return Err(ArrowError::ExternalFormat( + "Avro: the sync marker in the block does not correspond to the file marker".to_string(), + )); + } + Ok(()) } /// Returns a fallible [`Stream`] of Avro blocks bound to `reader` pub async fn block_stream( reader: &mut R, file_marker: [u8; 16], -) -> impl Stream, usize)>> + '_ { +) -> impl Stream> + '_ { try_stream! { loop { - let mut buffer = vec![]; - let rows = read_block(reader, &mut buffer, file_marker).await?; - if rows == 0 { + let mut block = CompressedBlock::new(0, vec![]); + read_block(reader, &mut block, file_marker).await?; + if block.number_of_rows == 0 { break } - yield (buffer, rows) + yield block } } } diff --git a/src/io/avro/read_async/mod.rs b/src/io/avro/read_async/mod.rs index d5d18500856..dc3d7c276ba 100644 --- a/src/io/avro/read_async/mod.rs +++ b/src/io/avro/read_async/mod.rs @@ -1,8 +1,9 @@ -//! Async Avro +//! Async read Avro mod block; mod metadata; pub(self) mod utils; +pub use super::{Block, CompressedBlock}; pub use block::block_stream; pub use metadata::read_metadata; diff --git a/src/io/avro/write/block.rs b/src/io/avro/write/block.rs index 562019f6c56..5729097d7b3 100644 --- a/src/io/avro/write/block.rs +++ b/src/io/avro/write/block.rs @@ -2,46 +2,9 @@ use std::io::Write; use crate::{error::Result, io::avro::Compression}; +use super::super::{Block, CompressedBlock}; use super::{util::zigzag_encode, SYNC_NUMBER}; -/// A compressed Avro block. -#[derive(Debug, Clone, Default, PartialEq)] -pub struct CompressedBlock { - /// The number of rows - pub number_of_rows: usize, - /// The compressed data - pub data: Vec, -} - -impl CompressedBlock { - /// Creates a new CompressedBlock - pub fn new(number_of_rows: usize, data: Vec) -> Self { - Self { - number_of_rows, - data, - } - } -} - -/// An uncompressed Avro block. -#[derive(Debug, Clone, Default, PartialEq)] -pub struct Block { - /// The number of rows - pub number_of_rows: usize, - /// The uncompressed data - pub data: Vec, -} - -impl Block { - /// Creates a new Block - pub fn new(number_of_rows: usize, data: Vec) -> Self { - Self { - number_of_rows, - data, - } - } -} - /// Writes a [`CompressedBlock`] to `writer` pub fn write_block(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> { // write size and rows diff --git a/src/io/avro/write/mod.rs b/src/io/avro/write/mod.rs index 3be4fc53103..c080498cfea 100644 --- a/src/io/avro/write/mod.rs +++ b/src/io/avro/write/mod.rs @@ -17,6 +17,8 @@ mod block; pub use block::*; mod util; +pub use super::{Block, CompressedBlock}; + const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; /// Writes Avro's metadata to `writer`. diff --git a/tests/it/io/avro/read_async.rs b/tests/it/io/avro/read_async.rs index 7b6b019b45b..5d0c688b7c7 100644 --- a/tests/it/io/avro/read_async.rs +++ b/tests/it/io/avro/read_async.rs @@ -24,8 +24,8 @@ async fn test(codec: Codec) -> Result<()> { let blocks = block_stream(&mut reader, marker).await; pin_mut!(blocks); - while let Some((block, rows)) = blocks.next().await.transpose()? { - assert!(rows > 0 || block.is_empty()) + while let Some(block) = blocks.next().await.transpose()? { + assert!(block.number_of_rows > 0 || block.data.is_empty()) } Ok(()) }