Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read Avro. #406

Merged
merged 3 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.4", optional = true, default_features = false, features = ["stream"] }

avro-rs = { version = "0.13", optional = true, default_features = false }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }
multiversion = { version = "0.6.1", optional = true }
Expand All @@ -86,6 +88,7 @@ full = [
"io_print",
"io_parquet",
"io_parquet_compression",
"io_avro",
"regex",
"merge_sort",
"ahash",
Expand All @@ -105,6 +108,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-rs", "streaming-iterator", "serde_json"]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down
11 changes: 11 additions & 0 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! Read and write from and to Apache Avro

pub mod read;

use crate::error::ArrowError;

impl From<avro_rs::SerError> for ArrowError {
fn from(error: avro_rs::SerError) -> Self {
ArrowError::External("".to_string(), Box::new(error))
}
}
138 changes: 138 additions & 0 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::convert::TryInto;
use std::sync::Arc;

use crate::array::*;
use crate::datatypes::*;
use crate::error::ArrowError;
use crate::error::Result;
use crate::record_batch::RecordBatch;

use super::util;

pub fn deserialize(mut block: &[u8], rows: usize, schema: Arc<Schema>) -> Result<RecordBatch> {
// create mutables, one per field
let mut arrays: Vec<Box<dyn MutableArray>> = schema
.fields()
.iter()
.map(|field| match field.data_type().to_physical_type() {
PhysicalType::Boolean => {
Ok(Box::new(MutableBooleanArray::with_capacity(rows)) as Box<dyn MutableArray>)
}
PhysicalType::Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Ok(Box::new(MutablePrimitiveArray::<$T>::with_capacity(rows)) as Box<dyn MutableArray>)
}),
PhysicalType::Utf8 => {
Ok(Box::new(MutableUtf8Array::<i32>::with_capacity(rows)) as Box<dyn MutableArray>)
}
PhysicalType::Binary => {
Ok(Box::new(MutableBinaryArray::<i32>::with_capacity(rows))
as Box<dyn MutableArray>)
}
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Deserializing type {:?} is still not implemented",
other
)))
}
})
.collect::<Result<_>>()?;

// this is _the_ expensive transpose (rows -> columns)
for _ in 0..rows {
for (array, field) in arrays.iter_mut().zip(schema.fields().iter()) {
if field.is_nullable() {
// variant 0 is always the null in a union array
if util::zigzag_i64(&mut block)? == 0 {
array.push_null();
continue;
}
}

match array.data_type().to_physical_type() {
PhysicalType::Boolean => {
let is_valid = block[0] == 1;
block = &block[1..];
let array = array
.as_mut_any()
.downcast_mut::<MutableBooleanArray>()
.unwrap();
array.push(Some(is_valid))
}
PhysicalType::Primitive(primitive) => {
use crate::datatypes::PrimitiveType::*;
match primitive {
Int32 => {
let value = util::zigzag_i64(&mut block)? as i32;
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i32>>()
.unwrap();
array.push(Some(value))
}
Int64 => {
let value = util::zigzag_i64(&mut block)? as i64;
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i64>>()
.unwrap();
array.push(Some(value))
}
Float32 => {
let value = f32::from_le_bytes(block[..4].try_into().unwrap());
block = &block[4..];
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<f32>>()
.unwrap();
array.push(Some(value))
}
Float64 => {
let value = f64::from_le_bytes(block[..8].try_into().unwrap());
block = &block[8..];
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<f64>>()
.unwrap();
array.push(Some(value))
}
_ => unreachable!(),
}
}
PhysicalType::Utf8 => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
let data = std::str::from_utf8(&block[..len])?;
block = &block[len..];

let array = array
.as_mut_any()
.downcast_mut::<MutableUtf8Array<i32>>()
.unwrap();
array.push(Some(data))
}
PhysicalType::Binary => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
let data = &block[..len];
block = &block[len..];

let array = array
.as_mut_any()
.downcast_mut::<MutableBinaryArray<i32>>()
.unwrap();
array.push(Some(data))
}
_ => todo!(),
};
}
}
let columns = arrays.iter_mut().map(|array| array.as_arc()).collect();

RecordBatch::try_new(schema, columns)
}
172 changes: 172 additions & 0 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use std::io::Read;
use std::sync::Arc;

use avro_rs::Codec;
use streaming_iterator::StreamingIterator;

mod deserialize;
mod schema;
mod util;

use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

pub fn read_metadata<R: std::io::Read>(reader: &mut R) -> Result<(Schema, Codec, [u8; 16])> {
let (schema, codec, marker) = util::read_schema(reader)?;
Ok((schema::convert_schema(&schema)?, codec, marker))
}

fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
let rows = match util::zigzag_i64(reader) {
Ok(a) => a,
Err(ArrowError::Io(io_err)) => {
if let std::io::ErrorKind::UnexpectedEof = io_err.kind() {
// end
return Ok((0, 0));
} else {
return Err(ArrowError::Io(io_err));
}
}
Err(other) => return Err(other),
};
let bytes = util::zigzag_i64(reader)?;
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
let (rows, bytes) = read_size(reader)?;
if rows == 0 {
return Ok(0);
};

buf.resize(bytes, 0);
reader.read_exact(buf)?;

let mut marker = [0u8; 16];
reader.read_exact(&mut marker)?;

if marker != file_marker {
panic!();
}
Ok(rows)
}

fn decompress_block(buf: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -> Result<bool> {
match codec {
Codec::Null => {
std::mem::swap(buf, decompress);
Ok(false)
}
Codec::Deflate => {
todo!()
}
}
}

/// [`StreamingIterator`] of blocks of avro data
pub struct BlockStreamIterator<'a, R: Read> {
buf: (Vec<u8>, usize),
reader: &'a mut R,
file_marker: [u8; 16],
}

impl<'a, R: Read> BlockStreamIterator<'a, R> {
pub fn new(reader: &'a mut R, file_marker: [u8; 16]) -> Self {
Self {
reader,
file_marker,
buf: (vec![], 0),
}
}

pub fn buffer(&mut self) -> &mut Vec<u8> {
&mut self.buf.0
}
}

impl<'a, R: Read> StreamingIterator for BlockStreamIterator<'a, R> {
type Item = (Vec<u8>, usize);

fn advance(&mut self) {
let (buf, rows) = &mut self.buf;
// todo: surface this error
*rows = read_block(self.reader, buf, self.file_marker).unwrap();
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
Some(&self.buf)
} else {
None
}
}
}

/// [`StreamingIterator`] of blocks of decompressed avro data
pub struct Decompressor<'a, R: Read> {
blocks: BlockStreamIterator<'a, R>,
codec: Codec,
buf: (Vec<u8>, usize),
was_swapped: bool,
}

impl<'a, R: Read> Decompressor<'a, R> {
pub fn new(blocks: BlockStreamIterator<'a, R>, codec: Codec) -> Self {
Self {
blocks,
codec,
buf: (vec![], 0),
was_swapped: false,
}
}
}

impl<'a, R: Read> StreamingIterator for Decompressor<'a, R> {
type Item = (Vec<u8>, usize);

fn advance(&mut self) {
if self.was_swapped {
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
}
self.blocks.advance();
self.was_swapped =
decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec).unwrap();
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
Some(&self.buf)
} else {
None
}
}
}

/// Single threaded, blocking reader of Avro files; [`Iterator`] of [`RecordBatch`]es.
pub struct Reader<'a, R: Read> {
iter: Decompressor<'a, R>,
schema: Arc<Schema>,
}

impl<'a, R: Read> Reader<'a, R> {
pub fn new(iter: Decompressor<'a, R>, schema: Arc<Schema>) -> Self {
Self { iter, schema }
}
}

impl<'a, R: Read> Iterator for Reader<'a, R> {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
if let Some((data, rows)) = self.iter.next() {
Some(deserialize::deserialize(data, *rows, self.schema.clone()))
} else {
None
}
}
}
Loading