Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read Avro files' metadata asynchronously (#614)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Nov 19, 2021
1 parent 9d4107c commit 3f12bd6
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 243 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ full = [
"io_parquet_compression",
"io_avro",
"io_avro_compression",
"io_avro_async",
"regex",
"merge_sort",
"compute",
Expand Down Expand Up @@ -142,6 +143,7 @@ io_avro_compression = [
"libflate",
"snap",
]
io_avro_async = ["io_avro", "futures"]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down Expand Up @@ -171,6 +173,7 @@ skip_feature_sets = [
["io_csv_async"],
["io_csv_read_async"],
["io_avro"],
["io_avro_async"],
["io_avro_compression"],
["io_json"],
["io_flight"],
Expand Down
78 changes: 78 additions & 0 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
//! Read and write from and to Apache Avro
pub mod read;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;

use crate::error::ArrowError;

Expand All @@ -10,3 +13,78 @@ impl From<avro_rs::Error> for ArrowError {
ArrowError::External("".to_string(), Box::new(error))
}
}

// macros that can operate in sync and async code.
macro_rules! avro_decode {
($reader:ident $($_await:tt)*) => {
{
let mut i = 0u64;
let mut buf = [0u8; 1];

let mut j = 0;
loop {
if j > 9 {
// if j * 7 > 64
return Err(ArrowError::ExternalFormat(
"zigzag decoding failed - corrupt avro file".to_string(),
));
}
$reader.read_exact(&mut buf[..])$($_await)*?;
i |= (u64::from(buf[0] & 0x7F)) << (j * 7);
if (buf[0] >> 7) == 0 {
break;
} else {
j += 1;
}
}

Ok(i)
}
}
}

macro_rules! read_header {
($reader:ident $($_await:tt)*) => {{
let mut items = HashMap::new();

loop {
let len = zigzag_i64($reader)$($_await)*? as usize;
if len == 0 {
break Ok(items);
}

items.reserve(len);
for _ in 0..len {
let key = _read_binary($reader)$($_await)*?;
let key = String::from_utf8(key)
.map_err(|_| ArrowError::ExternalFormat("Invalid Avro header".to_string()))?;
let value = _read_binary($reader)$($_await)*?;
items.insert(key, value);
}
}
}};
}

macro_rules! read_metadata {
($reader:ident $($_await:tt)*) => {{
let mut magic_number = [0u8; 4];
$reader.read_exact(&mut magic_number)$($_await)*?;

// see https://avro.apache.org/docs/current/spec.html#Object+Container+Files
if magic_number != [b'O', b'b', b'j', 1u8] {
return Err(ArrowError::ExternalFormat(
"Avro header does not contain a valid magic number".to_string(),
));
}

let header = read_header($reader)$($_await)*?;

let (schema, compression) = deserialize_header(header)?;

let marker = read_file_marker($reader)$($_await)*?;

Ok((schema, compression, marker))
}};
}

pub(crate) use {avro_decode, read_header, read_metadata};
91 changes: 91 additions & 0 deletions src/io/avro/read/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! APIs to read from Avro format to arrow.
use std::io::Read;

use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::error::{ArrowError, Result};

use super::util;

fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
let rows = match util::zigzag_i64(reader) {
Ok(a) => a,
Err(ArrowError::Io(io_err)) => {
if let std::io::ErrorKind::UnexpectedEof = io_err.kind() {
// end
return Ok((0, 0));
} else {
return Err(ArrowError::Io(io_err));
}
}
Err(other) => return Err(other),
};
let bytes = util::zigzag_i64(reader)?;
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
let (rows, bytes) = read_size(reader)?;
if rows == 0 {
return Ok(0);
};

buf.resize(bytes, 0);
reader.read_exact(buf)?;

let mut marker = [0u8; 16];
reader.read_exact(&mut marker)?;

assert!(!(marker != file_marker));
Ok(rows)
}

/// [`FallibleStreamingIterator`] of compressed avro blocks
pub struct BlockStreamIterator<R: Read> {
buf: (Vec<u8>, usize),
reader: R,
file_marker: [u8; 16],
}

impl<R: Read> BlockStreamIterator<R> {
/// Creates a new [`BlockStreamIterator`].
pub fn new(reader: R, file_marker: [u8; 16]) -> Self {
Self {
reader,
file_marker,
buf: (vec![], 0),
}
}

/// The buffer of [`BlockStreamIterator`].
pub fn buffer(&mut self) -> &mut Vec<u8> {
&mut self.buf.0
}

/// Deconstructs itself
pub fn into_inner(self) -> (R, Vec<u8>) {
(self.reader, self.buf.0)
}
}

impl<R: Read> FallibleStreamingIterator for BlockStreamIterator<R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);

fn advance(&mut self) -> Result<()> {
let (buf, rows) = &mut self.buf;
*rows = read_block(&mut self.reader, buf, self.file_marker)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
Some(&self.buf)
} else {
None
}
}
}
100 changes: 100 additions & 0 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//! APIs to read from Avro format to arrow.
use std::io::Read;

use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::error::{ArrowError, Result};

use super::BlockStreamIterator;
use super::Compression;

/// Decompresses an avro block.
/// Returns whether the buffers where swapped.
fn decompress_block(
block: &mut Vec<u8>,
decompress: &mut Vec<u8>,
compression: Option<Compression>,
) -> Result<bool> {
match compression {
None => {
std::mem::swap(block, decompress);
Ok(true)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Deflate) => {
decompress.clear();
let mut decoder = libflate::deflate::Decoder::new(&block[..]);
decoder.read_to_end(decompress)?;
Ok(false)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
let len = snap::raw::decompress_len(&block[..block.len() - 4])
.map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?;
decompress.clear();
decompress.resize(len, 0);
snap::raw::Decoder::new()
.decompress(&block[..block.len() - 4], decompress)
.map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?;
Ok(false)
}
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Deflate) => Err(ArrowError::Other(
"The avro file is deflate-encoded but feature 'io_avro_compression' is not active."
.to_string(),
)),
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Snappy) => Err(ArrowError::Other(
"The avro file is snappy-encoded but feature 'io_avro_compression' is not active."
.to_string(),
)),
}
}

/// [`FallibleStreamingIterator`] of decompressed Avro blocks
pub struct Decompressor<R: Read> {
blocks: BlockStreamIterator<R>,
codec: Option<Compression>,
buf: (Vec<u8>, usize),
was_swapped: bool,
}

impl<R: Read> Decompressor<R> {
/// Creates a new [`Decompressor`].
pub fn new(blocks: BlockStreamIterator<R>, codec: Option<Compression>) -> Self {
Self {
blocks,
codec,
buf: (vec![], 0),
was_swapped: false,
}
}

/// Deconstructs itself into its internal reader
pub fn into_inner(self) -> R {
self.blocks.into_inner().0
}
}

impl<'a, R: Read> FallibleStreamingIterator for Decompressor<R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);

fn advance(&mut self) -> Result<()> {
if self.was_swapped {
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
}
self.blocks.advance()?;
self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?;
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
Some(&self.buf)
} else {
None
}
}
}
29 changes: 29 additions & 0 deletions src/io/avro/read/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::collections::HashMap;

use avro_rs::{Error, Schema};
use serde_json;

use crate::error::Result;

use super::Compression;

/// Deserializes the Avro header into an Avro [`Schema`] and optional [`Compression`].
pub(crate) fn deserialize_header(
header: HashMap<String, Vec<u8>>,
) -> Result<(Schema, Option<Compression>)> {
let json = header
.get("avro.schema")
.and_then(|bytes| serde_json::from_slice(bytes.as_ref()).ok())
.ok_or(Error::GetAvroSchemaFromMap)?;
let schema = Schema::parse(&json)?;

let compression = header.get("avro.codec").and_then(|bytes| {
let bytes: &[u8] = bytes.as_ref();
match bytes {
b"snappy" => Some(Compression::Snappy),
b"deflate" => Some(Compression::Deflate),
_ => None,
}
});
Ok((schema, compression))
}
Loading

0 comments on commit 3f12bd6

Please sign in to comment.