Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Made Avro read API use Block and CompressedBlock (#698)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Dec 22, 2021
1 parent 45e72e9 commit f583531
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 97 deletions.
8 changes: 4 additions & 4 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ async fn main() -> Result<()> {
let blocks = block_stream(&mut reader, marker).await;

pin_mut!(blocks);
while let Some((mut block, rows)) = blocks.next().await.transpose()? {
// the content here is blocking. In general this should run on spawn_blocking
while let Some(mut block) = blocks.next().await.transpose()? {
let schema = schema.clone();
let avro_schemas = avro_schemas.clone();
// the content here is CPU-bounded. It should run on a dedicated thread pool
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = vec![];
let mut decompressed = Block::new(0, vec![]);
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, rows, schema, &avro_schemas)
deserialize(&decompressed, schema, &avro_schemas)
});
let batch = handle.await.unwrap()?;
assert!(batch.num_rows() > 0);
Expand Down
38 changes: 38 additions & 0 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,41 @@ macro_rules! read_metadata {
}

pub(crate) use {avro_decode, read_header, read_metadata};

/// A compressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct CompressedBlock {
/// The number of rows
pub number_of_rows: usize,
/// The compressed data
pub data: Vec<u8>,
}

impl CompressedBlock {
/// Creates a new CompressedBlock
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// An uncompressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Block {
/// The number of rows
pub number_of_rows: usize,
/// The uncompressed data
pub data: Vec<u8>,
}

impl Block {
/// Creates a new Block
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}
47 changes: 28 additions & 19 deletions src/io/avro/read/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::error::{ArrowError, Result};

use super::super::CompressedBlock;
use super::util;

fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
Expand All @@ -24,29 +25,38 @@ fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the `reader` into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
/// Reads a [`CompressedBlock`] from the `reader`.
/// # Error
/// This function errors iff either the block cannot be read or the sync marker does not match
fn read_block<R: Read>(
reader: &mut R,
block: &mut CompressedBlock,
file_marker: [u8; 16],
) -> Result<()> {
let (rows, bytes) = read_size(reader)?;
block.number_of_rows = rows;
if rows == 0 {
return Ok(0);
return Ok(());
};

buf.clear();
buf.resize(bytes, 0);
reader.read_exact(buf)?;
block.data.clear();
block.data.resize(bytes, 0);
reader.read_exact(&mut block.data)?;

let mut marker = [0u8; 16];
reader.read_exact(&mut marker)?;

assert!(!(marker != file_marker));
Ok(rows)
if marker != file_marker {
return Err(ArrowError::ExternalFormat(
"Avro: the sync marker in the block does not correspond to the file marker".to_string(),
));
}
Ok(())
}

/// [`FallibleStreamingIterator`] of compressed avro blocks
pub struct BlockStreamIterator<R: Read> {
buf: (Vec<u8>, usize),
buf: CompressedBlock,
reader: R,
file_marker: [u8; 16],
}
Expand All @@ -57,33 +67,32 @@ impl<R: Read> BlockStreamIterator<R> {
Self {
reader,
file_marker,
buf: (vec![], 0),
buf: CompressedBlock::new(0, vec![]),
}
}

/// The buffer of [`BlockStreamIterator`].
pub fn buffer(&mut self) -> &mut Vec<u8> {
&mut self.buf.0
pub fn buffer(&mut self) -> &mut CompressedBlock {
&mut self.buf
}

/// Deconstructs itself
pub fn into_inner(self) -> (R, Vec<u8>) {
(self.reader, self.buf.0)
(self.reader, self.buf.data)
}
}

impl<R: Read> FallibleStreamingIterator for BlockStreamIterator<R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);
type Item = CompressedBlock;

fn advance(&mut self) -> Result<()> {
let (buf, rows) = &mut self.buf;
*rows = read_block(&mut self.reader, buf, self.file_marker)?;
read_block(&mut self.reader, &mut self.buf, self.file_marker)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
if self.buf.number_of_rows > 0 {
Some(&self.buf)
} else {
None
Expand Down
22 changes: 13 additions & 9 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::error::{ArrowError, Result};

use super::super::{Block, CompressedBlock};
use super::BlockStreamIterator;
use super::Compression;

/// Decompresses an Avro block.
/// Returns whether the buffers where swapped.
pub fn decompress_block(
block: &mut Vec<u8>,
decompressed: &mut Vec<u8>,
block: &mut CompressedBlock,
decompressed: &mut Block,
compression: Option<Compression>,
) -> Result<bool> {
decompressed.number_of_rows = block.number_of_rows;
let block = &mut block.data;
let decompressed = &mut decompressed.data;

match compression {
None => {
std::mem::swap(block, decompressed);
Expand Down Expand Up @@ -55,7 +60,7 @@ pub fn decompress_block(
pub struct Decompressor<R: Read> {
blocks: BlockStreamIterator<R>,
codec: Option<Compression>,
buf: (Vec<u8>, usize),
buf: Block,
was_swapped: bool,
}

Expand All @@ -65,7 +70,7 @@ impl<R: Read> Decompressor<R> {
Self {
blocks,
codec,
buf: (vec![], 0),
buf: Block::new(0, vec![]),
was_swapped: false,
}
}
Expand All @@ -78,20 +83,19 @@ impl<R: Read> Decompressor<R> {

impl<'a, R: Read> FallibleStreamingIterator for Decompressor<R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);
type Item = Block;

fn advance(&mut self) -> Result<()> {
if self.was_swapped {
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
std::mem::swap(&mut self.blocks.buffer().data, &mut self.buf.data);
}
self.blocks.advance()?;
self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?;
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf, self.codec)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
if self.buf.number_of_rows > 0 {
Some(&self.buf)
} else {
None
Expand Down
9 changes: 6 additions & 3 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::error::Result;
use crate::record_batch::RecordBatch;
use crate::types::months_days_ns;

use super::super::Block;
use super::nested::*;
use super::util;

Expand Down Expand Up @@ -241,13 +242,15 @@ fn deserialize_value<'a>(
Ok(block)
}

/// Deserializes an Avro block into a [`RecordBatch`].
/// Deserializes a [`Block`] into a [`RecordBatch`].
pub fn deserialize(
mut block: &[u8],
rows: usize,
block: &Block,
schema: Arc<Schema>,
avro_schemas: &[AvroSchema],
) -> Result<RecordBatch> {
let rows = block.number_of_rows;
let mut block = block.data.as_ref();

// create mutables, one per field
let mut arrays: Vec<Box<dyn MutableArray>> = schema
.fields()
Expand Down
9 changes: 4 additions & 5 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#![deny(missing_docs)]
//! APIs to read from Avro format to arrow.
use std::io::Read;
use std::sync::Arc;
Expand Down Expand Up @@ -73,9 +72,9 @@ impl<R: Read> Iterator for Reader<R> {
let schema = self.schema.clone();
let avro_schemas = &self.avro_schemas;

self.iter.next().transpose().map(|x| {
let (data, rows) = x?;
deserialize(data, *rows, schema, avro_schemas)
})
self.iter
.next()
.transpose()
.map(|maybe_block| deserialize(maybe_block?, schema, avro_schemas))
}
}
39 changes: 23 additions & 16 deletions src/io/avro/read_async/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use futures::Stream;

use crate::error::{ArrowError, Result};

use super::CompressedBlock;

use super::utils::zigzag_i64;

async fn read_size<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<(usize, usize)> {
Expand All @@ -25,43 +27,48 @@ async fn read_size<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<(usize
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
/// Reads a [`CompressedBlock`] from the `reader`.
/// # Error
/// This function errors iff either the block cannot be read or the sync marker does not match
async fn read_block<R: AsyncRead + Unpin + Send>(
reader: &mut R,
buf: &mut Vec<u8>,
block: &mut CompressedBlock,
file_marker: [u8; 16],
) -> Result<usize> {
) -> Result<()> {
let (rows, bytes) = read_size(reader).await?;
block.number_of_rows = rows;
if rows == 0 {
return Ok(0);
return Ok(());
};

buf.clear();
buf.resize(bytes, 0);
reader.read_exact(buf).await?;
block.data.clear();
block.data.resize(bytes, 0);
reader.read_exact(&mut block.data).await?;

let mut marker = [0u8; 16];
reader.read_exact(&mut marker).await?;

assert!(!(marker != file_marker));
Ok(rows)
if marker != file_marker {
return Err(ArrowError::ExternalFormat(
"Avro: the sync marker in the block does not correspond to the file marker".to_string(),
));
}
Ok(())
}

/// Returns a fallible [`Stream`] of Avro blocks bound to `reader`
pub async fn block_stream<R: AsyncRead + Unpin + Send>(
reader: &mut R,
file_marker: [u8; 16],
) -> impl Stream<Item = Result<(Vec<u8>, usize)>> + '_ {
) -> impl Stream<Item = Result<CompressedBlock>> + '_ {
try_stream! {
loop {
let mut buffer = vec![];
let rows = read_block(reader, &mut buffer, file_marker).await?;
if rows == 0 {
let mut block = CompressedBlock::new(0, vec![]);
read_block(reader, &mut block, file_marker).await?;
if block.number_of_rows == 0 {
break
}
yield (buffer, rows)
yield block
}
}
}
3 changes: 2 additions & 1 deletion src/io/avro/read_async/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Async Avro
//! Async read Avro
mod block;
mod metadata;
pub(self) mod utils;

pub use super::{Block, CompressedBlock};
pub use block::block_stream;
pub use metadata::read_metadata;
Loading

0 comments on commit f583531

Please sign in to comment.