Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Made Avro read API use Block and CompressedBlock #698

Merged
merged 1 commit into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ async fn main() -> Result<()> {
let blocks = block_stream(&mut reader, marker).await;

pin_mut!(blocks);
while let Some((mut block, rows)) = blocks.next().await.transpose()? {
// the content here is blocking. In general this should run on spawn_blocking
while let Some(mut block) = blocks.next().await.transpose()? {
let schema = schema.clone();
let avro_schemas = avro_schemas.clone();
// the content here is CPU-bounded. It should run on a dedicated thread pool
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = vec![];
let mut decompressed = Block::new(0, vec![]);
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, rows, schema, &avro_schemas)
deserialize(&decompressed, schema, &avro_schemas)
});
let batch = handle.await.unwrap()?;
assert!(batch.num_rows() > 0);
Expand Down
38 changes: 38 additions & 0 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,41 @@ macro_rules! read_metadata {
}

pub(crate) use {avro_decode, read_header, read_metadata};

/// A compressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct CompressedBlock {
/// The number of rows
pub number_of_rows: usize,
/// The compressed data
pub data: Vec<u8>,
}

impl CompressedBlock {
/// Creates a new CompressedBlock
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// An uncompressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Block {
/// The number of rows
pub number_of_rows: usize,
/// The uncompressed data
pub data: Vec<u8>,
}

impl Block {
/// Creates a new Block
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}
47 changes: 28 additions & 19 deletions src/io/avro/read/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::error::{ArrowError, Result};

use super::super::CompressedBlock;
use super::util;

fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
Expand All @@ -24,29 +25,38 @@ fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the `reader` into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
/// Reads a [`CompressedBlock`] from the `reader`.
/// # Error
/// This function errors iff either the block cannot be read or the sync marker does not match
fn read_block<R: Read>(
reader: &mut R,
block: &mut CompressedBlock,
file_marker: [u8; 16],
) -> Result<()> {
let (rows, bytes) = read_size(reader)?;
block.number_of_rows = rows;
if rows == 0 {
return Ok(0);
return Ok(());
};

buf.clear();
buf.resize(bytes, 0);
reader.read_exact(buf)?;
block.data.clear();
block.data.resize(bytes, 0);
reader.read_exact(&mut block.data)?;

let mut marker = [0u8; 16];
reader.read_exact(&mut marker)?;

assert!(!(marker != file_marker));
Ok(rows)
if marker != file_marker {
return Err(ArrowError::ExternalFormat(
"Avro: the sync marker in the block does not correspond to the file marker".to_string(),
));
}
Ok(())
}

/// [`FallibleStreamingIterator`] of compressed avro blocks
pub struct BlockStreamIterator<R: Read> {
buf: (Vec<u8>, usize),
buf: CompressedBlock,
reader: R,
file_marker: [u8; 16],
}
Expand All @@ -57,33 +67,32 @@ impl<R: Read> BlockStreamIterator<R> {
Self {
reader,
file_marker,
buf: (vec![], 0),
buf: CompressedBlock::new(0, vec![]),
}
}

/// The buffer of [`BlockStreamIterator`].
pub fn buffer(&mut self) -> &mut Vec<u8> {
&mut self.buf.0
pub fn buffer(&mut self) -> &mut CompressedBlock {
&mut self.buf
}

/// Deconstructs itself
pub fn into_inner(self) -> (R, Vec<u8>) {
(self.reader, self.buf.0)
(self.reader, self.buf.data)
}
}

impl<R: Read> FallibleStreamingIterator for BlockStreamIterator<R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);
type Item = CompressedBlock;

fn advance(&mut self) -> Result<()> {
let (buf, rows) = &mut self.buf;
*rows = read_block(&mut self.reader, buf, self.file_marker)?;
read_block(&mut self.reader, &mut self.buf, self.file_marker)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
if self.buf.number_of_rows > 0 {
Some(&self.buf)
} else {
None
Expand Down
22 changes: 13 additions & 9 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::error::{ArrowError, Result};

use super::super::{Block, CompressedBlock};
use super::BlockStreamIterator;
use super::Compression;

/// Decompresses an Avro block.
/// Returns whether the buffers where swapped.
pub fn decompress_block(
block: &mut Vec<u8>,
decompressed: &mut Vec<u8>,
block: &mut CompressedBlock,
decompressed: &mut Block,
compression: Option<Compression>,
) -> Result<bool> {
decompressed.number_of_rows = block.number_of_rows;
let block = &mut block.data;
let decompressed = &mut decompressed.data;

match compression {
None => {
std::mem::swap(block, decompressed);
Expand Down Expand Up @@ -55,7 +60,7 @@ pub fn decompress_block(
pub struct Decompressor<R: Read> {
blocks: BlockStreamIterator<R>,
codec: Option<Compression>,
buf: (Vec<u8>, usize),
buf: Block,
was_swapped: bool,
}

Expand All @@ -65,7 +70,7 @@ impl<R: Read> Decompressor<R> {
Self {
blocks,
codec,
buf: (vec![], 0),
buf: Block::new(0, vec![]),
was_swapped: false,
}
}
Expand All @@ -78,20 +83,19 @@ impl<R: Read> Decompressor<R> {

impl<'a, R: Read> FallibleStreamingIterator for Decompressor<R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);
type Item = Block;

fn advance(&mut self) -> Result<()> {
if self.was_swapped {
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
std::mem::swap(&mut self.blocks.buffer().data, &mut self.buf.data);
}
self.blocks.advance()?;
self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?;
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf, self.codec)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
if self.buf.number_of_rows > 0 {
Some(&self.buf)
} else {
None
Expand Down
9 changes: 6 additions & 3 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::error::Result;
use crate::record_batch::RecordBatch;
use crate::types::months_days_ns;

use super::super::Block;
use super::nested::*;
use super::util;

Expand Down Expand Up @@ -241,13 +242,15 @@ fn deserialize_value<'a>(
Ok(block)
}

/// Deserializes an Avro block into a [`RecordBatch`].
/// Deserializes a [`Block`] into a [`RecordBatch`].
pub fn deserialize(
mut block: &[u8],
rows: usize,
block: &Block,
schema: Arc<Schema>,
avro_schemas: &[AvroSchema],
) -> Result<RecordBatch> {
let rows = block.number_of_rows;
let mut block = block.data.as_ref();

// create mutables, one per field
let mut arrays: Vec<Box<dyn MutableArray>> = schema
.fields()
Expand Down
9 changes: 4 additions & 5 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#![deny(missing_docs)]
//! APIs to read from Avro format to arrow.
use std::io::Read;
use std::sync::Arc;
Expand Down Expand Up @@ -73,9 +72,9 @@ impl<R: Read> Iterator for Reader<R> {
let schema = self.schema.clone();
let avro_schemas = &self.avro_schemas;

self.iter.next().transpose().map(|x| {
let (data, rows) = x?;
deserialize(data, *rows, schema, avro_schemas)
})
self.iter
.next()
.transpose()
.map(|maybe_block| deserialize(maybe_block?, schema, avro_schemas))
}
}
39 changes: 23 additions & 16 deletions src/io/avro/read_async/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use futures::Stream;

use crate::error::{ArrowError, Result};

use super::CompressedBlock;

use super::utils::zigzag_i64;

async fn read_size<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<(usize, usize)> {
Expand All @@ -25,43 +27,48 @@ async fn read_size<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<(usize
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
/// Reads a [`CompressedBlock`] from the `reader`.
/// # Error
/// This function errors iff either the block cannot be read or the sync marker does not match
async fn read_block<R: AsyncRead + Unpin + Send>(
reader: &mut R,
buf: &mut Vec<u8>,
block: &mut CompressedBlock,
file_marker: [u8; 16],
) -> Result<usize> {
) -> Result<()> {
let (rows, bytes) = read_size(reader).await?;
block.number_of_rows = rows;
if rows == 0 {
return Ok(0);
return Ok(());
};

buf.clear();
buf.resize(bytes, 0);
reader.read_exact(buf).await?;
block.data.clear();
block.data.resize(bytes, 0);
reader.read_exact(&mut block.data).await?;

let mut marker = [0u8; 16];
reader.read_exact(&mut marker).await?;

assert!(!(marker != file_marker));
Ok(rows)
if marker != file_marker {
return Err(ArrowError::ExternalFormat(
"Avro: the sync marker in the block does not correspond to the file marker".to_string(),
));
}
Ok(())
}

/// Returns a fallible [`Stream`] of Avro blocks bound to `reader`
pub async fn block_stream<R: AsyncRead + Unpin + Send>(
reader: &mut R,
file_marker: [u8; 16],
) -> impl Stream<Item = Result<(Vec<u8>, usize)>> + '_ {
) -> impl Stream<Item = Result<CompressedBlock>> + '_ {
try_stream! {
loop {
let mut buffer = vec![];
let rows = read_block(reader, &mut buffer, file_marker).await?;
if rows == 0 {
let mut block = CompressedBlock::new(0, vec![]);
read_block(reader, &mut block, file_marker).await?;
if block.number_of_rows == 0 {
break
}
yield (buffer, rows)
yield block
}
}
}
3 changes: 2 additions & 1 deletion src/io/avro/read_async/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Async Avro
//! Async read Avro

mod block;
mod metadata;
pub(self) mod utils;

pub use super::{Block, CompressedBlock};
pub use block::block_stream;
pub use metadata::read_metadata;
Loading