Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read Avro files' metadata asynchronously #614

Merged
merged 5 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ full = [
"io_parquet_compression",
"io_avro",
"io_avro_compression",
"io_avro_async",
"regex",
"merge_sort",
"compute",
Expand Down Expand Up @@ -142,6 +143,7 @@ io_avro_compression = [
"libflate",
"snap",
]
io_avro_async = ["io_avro", "futures"]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down Expand Up @@ -171,6 +173,7 @@ skip_feature_sets = [
["io_csv_async"],
["io_csv_read_async"],
["io_avro"],
["io_avro_async"],
["io_avro_compression"],
["io_json"],
["io_flight"],
Expand Down
78 changes: 78 additions & 0 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
//! Read and write from and to Apache Avro

pub mod read;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;

use crate::error::ArrowError;

Expand All @@ -10,3 +13,78 @@ impl From<avro_rs::Error> for ArrowError {
ArrowError::External("".to_string(), Box::new(error))
}
}

// macros that can operate in sync and async code.
macro_rules! avro_decode {
($reader:ident $($_await:tt)*) => {
{
let mut i = 0u64;
let mut buf = [0u8; 1];

let mut j = 0;
loop {
if j > 9 {
// if j * 7 > 64
return Err(ArrowError::ExternalFormat(
"zigzag decoding failed - corrupt avro file".to_string(),
));
}
$reader.read_exact(&mut buf[..])$($_await)*?;
i |= (u64::from(buf[0] & 0x7F)) << (j * 7);
if (buf[0] >> 7) == 0 {
break;
} else {
j += 1;
}
}

Ok(i)
}
}
}

macro_rules! read_header {
($reader:ident $($_await:tt)*) => {{
let mut items = HashMap::new();

loop {
let len = zigzag_i64($reader)$($_await)*? as usize;
if len == 0 {
break Ok(items);
}

items.reserve(len);
for _ in 0..len {
let key = _read_binary($reader)$($_await)*?;
let key = String::from_utf8(key)
.map_err(|_| ArrowError::ExternalFormat("Invalid Avro header".to_string()))?;
let value = _read_binary($reader)$($_await)*?;
items.insert(key, value);
}
}
}};
}

macro_rules! read_metadata {
($reader:ident $($_await:tt)*) => {{
let mut magic_number = [0u8; 4];
$reader.read_exact(&mut magic_number)$($_await)*?;

// see https://avro.apache.org/docs/current/spec.html#Object+Container+Files
if magic_number != [b'O', b'b', b'j', 1u8] {
return Err(ArrowError::ExternalFormat(
"Avro header does not contain a valid magic number".to_string(),
));
}

let header = read_header($reader)$($_await)*?;

let (schema, compression) = deserialize_header(header)?;

let marker = read_file_marker($reader)$($_await)*?;

Ok((schema, compression, marker))
}};
}

pub(crate) use {avro_decode, read_header, read_metadata};
91 changes: 91 additions & 0 deletions src/io/avro/read/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! APIs to read from Avro format to arrow.
use std::io::Read;

use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::error::{ArrowError, Result};

use super::util;

fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
let rows = match util::zigzag_i64(reader) {
Ok(a) => a,
Err(ArrowError::Io(io_err)) => {
if let std::io::ErrorKind::UnexpectedEof = io_err.kind() {
// end
return Ok((0, 0));
} else {
return Err(ArrowError::Io(io_err));
}
}
Err(other) => return Err(other),
};
let bytes = util::zigzag_i64(reader)?;
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
let (rows, bytes) = read_size(reader)?;
if rows == 0 {
return Ok(0);
};

buf.resize(bytes, 0);
reader.read_exact(buf)?;

let mut marker = [0u8; 16];
reader.read_exact(&mut marker)?;

assert!(!(marker != file_marker));
Ok(rows)
}

/// [`FallibleStreamingIterator`] of compressed avro blocks
pub struct BlockStreamIterator<R: Read> {
buf: (Vec<u8>, usize),
reader: R,
file_marker: [u8; 16],
}

impl<R: Read> BlockStreamIterator<R> {
/// Creates a new [`BlockStreamIterator`].
pub fn new(reader: R, file_marker: [u8; 16]) -> Self {
Self {
reader,
file_marker,
buf: (vec![], 0),
}
}

/// The buffer of [`BlockStreamIterator`].
pub fn buffer(&mut self) -> &mut Vec<u8> {
&mut self.buf.0
}

/// Deconstructs itself
pub fn into_inner(self) -> (R, Vec<u8>) {
(self.reader, self.buf.0)
}
}

impl<R: Read> FallibleStreamingIterator for BlockStreamIterator<R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);

fn advance(&mut self) -> Result<()> {
let (buf, rows) = &mut self.buf;
*rows = read_block(&mut self.reader, buf, self.file_marker)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
Some(&self.buf)
} else {
None
}
}
}
100 changes: 100 additions & 0 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//! APIs to read from Avro format to arrow.
use std::io::Read;

use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::error::{ArrowError, Result};

use super::BlockStreamIterator;
use super::Compression;

/// Decompresses an avro block.
/// Returns whether the buffers where swapped.
fn decompress_block(
block: &mut Vec<u8>,
decompress: &mut Vec<u8>,
compression: Option<Compression>,
) -> Result<bool> {
match compression {
None => {
std::mem::swap(block, decompress);
Ok(true)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Deflate) => {
decompress.clear();
let mut decoder = libflate::deflate::Decoder::new(&block[..]);
decoder.read_to_end(decompress)?;
Ok(false)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
let len = snap::raw::decompress_len(&block[..block.len() - 4])
.map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?;
decompress.clear();
decompress.resize(len, 0);
snap::raw::Decoder::new()
.decompress(&block[..block.len() - 4], decompress)
.map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?;
Ok(false)
}
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Deflate) => Err(ArrowError::Other(
"The avro file is deflate-encoded but feature 'io_avro_compression' is not active."
.to_string(),
)),
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Snappy) => Err(ArrowError::Other(
"The avro file is snappy-encoded but feature 'io_avro_compression' is not active."
.to_string(),
)),
}
}

/// [`FallibleStreamingIterator`] of decompressed Avro blocks
pub struct Decompressor<R: Read> {
blocks: BlockStreamIterator<R>,
codec: Option<Compression>,
buf: (Vec<u8>, usize),
was_swapped: bool,
}

impl<R: Read> Decompressor<R> {
/// Creates a new [`Decompressor`].
pub fn new(blocks: BlockStreamIterator<R>, codec: Option<Compression>) -> Self {
Self {
blocks,
codec,
buf: (vec![], 0),
was_swapped: false,
}
}

/// Deconstructs itself into its internal reader
pub fn into_inner(self) -> R {
self.blocks.into_inner().0
}
}

impl<'a, R: Read> FallibleStreamingIterator for Decompressor<R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);

fn advance(&mut self) -> Result<()> {
if self.was_swapped {
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
}
self.blocks.advance()?;
self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?;
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
Some(&self.buf)
} else {
None
}
}
}
29 changes: 29 additions & 0 deletions src/io/avro/read/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::collections::HashMap;

use avro_rs::{Error, Schema};
use serde_json;

use crate::error::Result;

use super::Compression;

/// Deserializes the Avro header into an Avro [`Schema`] and optional [`Compression`].
pub(crate) fn deserialize_header(
header: HashMap<String, Vec<u8>>,
) -> Result<(Schema, Option<Compression>)> {
let json = header
.get("avro.schema")
.and_then(|bytes| serde_json::from_slice(bytes.as_ref()).ok())
.ok_or(Error::GetAvroSchemaFromMap)?;
let schema = Schema::parse(&json)?;

let compression = header.get("avro.codec").and_then(|bytes| {
let bytes: &[u8] = bytes.as_ref();
match bytes {
b"snappy" => Some(Compression::Snappy),
b"deflate" => Some(Compression::Deflate),
_ => None,
}
});
Ok((schema, compression))
}
Loading