Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read Avro. (#406)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Sep 17, 2021
1 parent d5cc0db commit 1655f53
Show file tree
Hide file tree
Showing 11 changed files with 727 additions and 1 deletion.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.4", optional = true, default_features = false, features = ["stream"] }

avro-rs = { version = "0.13", optional = true, default_features = false }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }
multiversion = { version = "0.6.1", optional = true }
Expand All @@ -86,6 +88,7 @@ full = [
"io_print",
"io_parquet",
"io_parquet_compression",
"io_avro",
"regex",
"merge_sort",
"ahash",
Expand All @@ -107,6 +110,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-rs", "streaming-iterator", "serde_json"]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down
11 changes: 11 additions & 0 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! Read and write from and to Apache Avro
pub mod read;

use crate::error::ArrowError;

impl From<avro_rs::SerError> for ArrowError {
fn from(error: avro_rs::SerError) -> Self {
ArrowError::External("".to_string(), Box::new(error))
}
}
138 changes: 138 additions & 0 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::convert::TryInto;
use std::sync::Arc;

use crate::array::*;
use crate::datatypes::*;
use crate::error::ArrowError;
use crate::error::Result;
use crate::record_batch::RecordBatch;

use super::util;

pub fn deserialize(mut block: &[u8], rows: usize, schema: Arc<Schema>) -> Result<RecordBatch> {
// create mutables, one per field
let mut arrays: Vec<Box<dyn MutableArray>> = schema
.fields()
.iter()
.map(|field| match field.data_type().to_physical_type() {
PhysicalType::Boolean => {
Ok(Box::new(MutableBooleanArray::with_capacity(rows)) as Box<dyn MutableArray>)
}
PhysicalType::Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Ok(Box::new(MutablePrimitiveArray::<$T>::with_capacity(rows)) as Box<dyn MutableArray>)
}),
PhysicalType::Utf8 => {
Ok(Box::new(MutableUtf8Array::<i32>::with_capacity(rows)) as Box<dyn MutableArray>)
}
PhysicalType::Binary => {
Ok(Box::new(MutableBinaryArray::<i32>::with_capacity(rows))
as Box<dyn MutableArray>)
}
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Deserializing type {:?} is still not implemented",
other
)))
}
})
.collect::<Result<_>>()?;

// this is _the_ expensive transpose (rows -> columns)
for _ in 0..rows {
for (array, field) in arrays.iter_mut().zip(schema.fields().iter()) {
if field.is_nullable() {
// variant 0 is always the null in a union array
if util::zigzag_i64(&mut block)? == 0 {
array.push_null();
continue;
}
}

match array.data_type().to_physical_type() {
PhysicalType::Boolean => {
let is_valid = block[0] == 1;
block = &block[1..];
let array = array
.as_mut_any()
.downcast_mut::<MutableBooleanArray>()
.unwrap();
array.push(Some(is_valid))
}
PhysicalType::Primitive(primitive) => {
use crate::datatypes::PrimitiveType::*;
match primitive {
Int32 => {
let value = util::zigzag_i64(&mut block)? as i32;
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i32>>()
.unwrap();
array.push(Some(value))
}
Int64 => {
let value = util::zigzag_i64(&mut block)? as i64;
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i64>>()
.unwrap();
array.push(Some(value))
}
Float32 => {
let value = f32::from_le_bytes(block[..4].try_into().unwrap());
block = &block[4..];
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<f32>>()
.unwrap();
array.push(Some(value))
}
Float64 => {
let value = f64::from_le_bytes(block[..8].try_into().unwrap());
block = &block[8..];
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<f64>>()
.unwrap();
array.push(Some(value))
}
_ => unreachable!(),
}
}
PhysicalType::Utf8 => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
let data = std::str::from_utf8(&block[..len])?;
block = &block[len..];

let array = array
.as_mut_any()
.downcast_mut::<MutableUtf8Array<i32>>()
.unwrap();
array.push(Some(data))
}
PhysicalType::Binary => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
let data = &block[..len];
block = &block[len..];

let array = array
.as_mut_any()
.downcast_mut::<MutableBinaryArray<i32>>()
.unwrap();
array.push(Some(data))
}
_ => todo!(),
};
}
}
let columns = arrays.iter_mut().map(|array| array.as_arc()).collect();

RecordBatch::try_new(schema, columns)
}
172 changes: 172 additions & 0 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use std::io::Read;
use std::sync::Arc;

use avro_rs::Codec;
use streaming_iterator::StreamingIterator;

mod deserialize;
mod schema;
mod util;

use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

pub fn read_metadata<R: std::io::Read>(reader: &mut R) -> Result<(Schema, Codec, [u8; 16])> {
let (schema, codec, marker) = util::read_schema(reader)?;
Ok((schema::convert_schema(&schema)?, codec, marker))
}

fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
let rows = match util::zigzag_i64(reader) {
Ok(a) => a,
Err(ArrowError::Io(io_err)) => {
if let std::io::ErrorKind::UnexpectedEof = io_err.kind() {
// end
return Ok((0, 0));
} else {
return Err(ArrowError::Io(io_err));
}
}
Err(other) => return Err(other),
};
let bytes = util::zigzag_i64(reader)?;
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
let (rows, bytes) = read_size(reader)?;
if rows == 0 {
return Ok(0);
};

buf.resize(bytes, 0);
reader.read_exact(buf)?;

let mut marker = [0u8; 16];
reader.read_exact(&mut marker)?;

if marker != file_marker {
panic!();
}
Ok(rows)
}

fn decompress_block(buf: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -> Result<bool> {
match codec {
Codec::Null => {
std::mem::swap(buf, decompress);
Ok(false)
}
Codec::Deflate => {
todo!()
}
}
}

/// [`StreamingIterator`] of blocks of avro data
pub struct BlockStreamIterator<'a, R: Read> {
buf: (Vec<u8>, usize),
reader: &'a mut R,
file_marker: [u8; 16],
}

impl<'a, R: Read> BlockStreamIterator<'a, R> {
pub fn new(reader: &'a mut R, file_marker: [u8; 16]) -> Self {
Self {
reader,
file_marker,
buf: (vec![], 0),
}
}

pub fn buffer(&mut self) -> &mut Vec<u8> {
&mut self.buf.0
}
}

impl<'a, R: Read> StreamingIterator for BlockStreamIterator<'a, R> {
type Item = (Vec<u8>, usize);

fn advance(&mut self) {
let (buf, rows) = &mut self.buf;
// todo: surface this error
*rows = read_block(self.reader, buf, self.file_marker).unwrap();
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
Some(&self.buf)
} else {
None
}
}
}

/// [`StreamingIterator`] of blocks of decompressed avro data
pub struct Decompressor<'a, R: Read> {
blocks: BlockStreamIterator<'a, R>,
codec: Codec,
buf: (Vec<u8>, usize),
was_swapped: bool,
}

impl<'a, R: Read> Decompressor<'a, R> {
pub fn new(blocks: BlockStreamIterator<'a, R>, codec: Codec) -> Self {
Self {
blocks,
codec,
buf: (vec![], 0),
was_swapped: false,
}
}
}

impl<'a, R: Read> StreamingIterator for Decompressor<'a, R> {
type Item = (Vec<u8>, usize);

fn advance(&mut self) {
if self.was_swapped {
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
}
self.blocks.advance();
self.was_swapped =
decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec).unwrap();
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
Some(&self.buf)
} else {
None
}
}
}

/// Single threaded, blocking reader of Avro files; [`Iterator`] of [`RecordBatch`]es.
pub struct Reader<'a, R: Read> {
iter: Decompressor<'a, R>,
schema: Arc<Schema>,
}

impl<'a, R: Read> Reader<'a, R> {
pub fn new(iter: Decompressor<'a, R>, schema: Arc<Schema>) -> Self {
Self { iter, schema }
}
}

impl<'a, R: Read> Iterator for Reader<'a, R> {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
if let Some((data, rows)) = self.iter.next() {
Some(deserialize::deserialize(data, *rows, self.schema.clone()))
} else {
None
}
}
}
Loading

0 comments on commit 1655f53

Please sign in to comment.