Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read Avro logical types, List,Enum, Duration a…
Browse files Browse the repository at this point in the history
…nd `Fixed`. (#493)

* Added support for Avro List,Enum,Fixed.

* Added support for Duration.

* Added test showing support for logical types.
  • Loading branch information
jorgecarleitao authored Oct 6, 2021
1 parent 2324a8f commit 81edcdd
Show file tree
Hide file tree
Showing 6 changed files with 554 additions and 128 deletions.
6 changes: 6 additions & 0 deletions src/array/fixed_size_binary/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ impl MutableFixedSizeBinaryArray {
Ok(primitive)
}

/// returns the (fixed) size of the [`MutableFixedSizeBinaryArray`].
#[inline]
pub fn size(&self) -> usize {
self.size
}

fn init_validity(&mut self) {
let mut validity = MutableBitmap::new();
validity.extend_constant(self.len(), true);
Expand Down
302 changes: 206 additions & 96 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,135 +1,245 @@
use std::convert::TryInto;
use std::sync::Arc;

use avro_rs::Schema as AvroSchema;

use crate::array::*;
use crate::datatypes::*;
use crate::error::ArrowError;
use crate::error::Result;
use crate::record_batch::RecordBatch;
use crate::types::months_days_ns;

use super::nested::*;
use super::util;

pub fn deserialize(mut block: &[u8], rows: usize, schema: Arc<Schema>) -> Result<RecordBatch> {
// create mutables, one per field
let mut arrays: Vec<Box<dyn MutableArray>> = schema
.fields()
.iter()
.map(|field| match field.data_type().to_physical_type() {
PhysicalType::Boolean => {
Ok(Box::new(MutableBooleanArray::with_capacity(rows)) as Box<dyn MutableArray>)
}
PhysicalType::Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Ok(Box::new(MutablePrimitiveArray::<$T>::with_capacity(rows)) as Box<dyn MutableArray>)
}),
PhysicalType::Utf8 => {
Ok(Box::new(MutableUtf8Array::<i32>::with_capacity(rows)) as Box<dyn MutableArray>)
fn make_mutable(
data_type: &DataType,
avro_schema: Option<&AvroSchema>,
capacity: usize,
) -> Result<Box<dyn MutableArray>> {
Ok(match data_type.to_physical_type() {
PhysicalType::Boolean => {
Box::new(MutableBooleanArray::with_capacity(capacity)) as Box<dyn MutableArray>
}
PhysicalType::Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Box::new(MutablePrimitiveArray::<$T>::with_capacity(capacity).to(data_type.clone()))
as Box<dyn MutableArray>
}),
PhysicalType::Binary => {
Box::new(MutableBinaryArray::<i32>::with_capacity(capacity)) as Box<dyn MutableArray>
}
PhysicalType::Utf8 => {
Box::new(MutableUtf8Array::<i32>::with_capacity(capacity)) as Box<dyn MutableArray>
}
PhysicalType::Dictionary(_) => {
if let Some(AvroSchema::Enum { symbols, .. }) = avro_schema {
let values = Utf8Array::<i32>::from_slice(symbols);
Box::new(FixedItemsUtf8Dictionary::with_capacity(values, capacity))
as Box<dyn MutableArray>
} else {
unreachable!()
}
PhysicalType::Binary => {
Ok(Box::new(MutableBinaryArray::<i32>::with_capacity(rows))
as Box<dyn MutableArray>)
}
_ => match data_type {
DataType::List(inner) => {
let values = make_mutable(inner.data_type(), None, 0)?;
Box::new(DynMutableListArray::<i32>::new_with_capacity(
values, capacity,
)) as Box<dyn MutableArray>
}
DataType::FixedSizeBinary(size) => Box::new(MutableFixedSizeBinaryArray::with_capacity(
*size as usize,
capacity,
)) as Box<dyn MutableArray>,
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Deserializing type {:?} is still not implemented",
other
)))
}
})
.collect::<Result<_>>()?;
},
})
}

// this is _the_ expensive transpose (rows -> columns)
for _ in 0..rows {
for (array, field) in arrays.iter_mut().zip(schema.fields().iter()) {
if field.is_nullable() {
// variant 0 is always the null in a union array
if util::zigzag_i64(&mut block)? == 0 {
array.push_null();
continue;
#[inline]
fn deserialize_item<'a>(
array: &mut dyn MutableArray,
is_nullable: bool,
mut block: &'a [u8],
) -> Result<&'a [u8]> {
if is_nullable {
// variant 0 is always the null in a union array
if util::zigzag_i64(&mut block)? == 0 {
array.push_null();
return Ok(block);
}
}

let data_type = array.data_type();
match data_type {
DataType::List(inner) => {
let is_nullable = inner.is_nullable();
let array = array
.as_mut_any()
.downcast_mut::<DynMutableListArray<i32>>()
.unwrap();
loop {
let len = util::zigzag_i64(&mut block)? as usize;

if len == 0 {
break;
}

let values = array.mut_values();
for _ in 0..len {
block = deserialize_item(values, is_nullable, block)?;
}
array.try_push_valid()?;
}
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
// https://avro.apache.org/docs/current/spec.html#Duration
// 12 bytes, months, days, millis in LE
let data = &block[..12];
block = &block[12..];

let value = months_days_ns::new(
i32::from_le_bytes([data[0], data[1], data[2], data[3]]),
i32::from_le_bytes([data[4], data[5], data[6], data[7]]),
i32::from_le_bytes([data[8], data[9], data[10], data[11]]) as i64 * 1_000_000,
);

match array.data_type().to_physical_type() {
PhysicalType::Boolean => {
let is_valid = block[0] == 1;
block = &block[1..];
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<months_days_ns>>()
.unwrap();
array.push(Some(value))
}
_ => match data_type.to_physical_type() {
PhysicalType::Boolean => {
let is_valid = block[0] == 1;
block = &block[1..];
let array = array
.as_mut_any()
.downcast_mut::<MutableBooleanArray>()
.unwrap();
array.push(Some(is_valid))
}
PhysicalType::Primitive(primitive) => match primitive {
PrimitiveType::Int32 => {
let value = util::zigzag_i64(&mut block)? as i32;
let array = array
.as_mut_any()
.downcast_mut::<MutableBooleanArray>()
.downcast_mut::<MutablePrimitiveArray<i32>>()
.unwrap();
array.push(Some(is_valid))
array.push(Some(value))
}
PhysicalType::Primitive(primitive) => {
use crate::datatypes::PrimitiveType::*;
match primitive {
Int32 => {
let value = util::zigzag_i64(&mut block)? as i32;
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i32>>()
.unwrap();
array.push(Some(value))
}
Int64 => {
let value = util::zigzag_i64(&mut block)? as i64;
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i64>>()
.unwrap();
array.push(Some(value))
}
Float32 => {
let value = f32::from_le_bytes(block[..4].try_into().unwrap());
block = &block[4..];
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<f32>>()
.unwrap();
array.push(Some(value))
}
Float64 => {
let value = f64::from_le_bytes(block[..8].try_into().unwrap());
block = &block[8..];
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<f64>>()
.unwrap();
array.push(Some(value))
}
_ => unreachable!(),
}
PrimitiveType::Int64 => {
let value = util::zigzag_i64(&mut block)? as i64;
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i64>>()
.unwrap();
array.push(Some(value))
}
PhysicalType::Utf8 => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
let data = simdutf8::basic::from_utf8(&block[..len])?;
block = &block[len..];

PrimitiveType::Float32 => {
let value =
f32::from_le_bytes(block[..std::mem::size_of::<f32>()].try_into().unwrap());
block = &block[std::mem::size_of::<f32>()..];
let array = array
.as_mut_any()
.downcast_mut::<MutableUtf8Array<i32>>()
.downcast_mut::<MutablePrimitiveArray<f32>>()
.unwrap();
array.push(Some(data))
array.push(Some(value))
}
PhysicalType::Binary => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
let data = &block[..len];
block = &block[len..];

PrimitiveType::Float64 => {
let value =
f64::from_le_bytes(block[..std::mem::size_of::<f64>()].try_into().unwrap());
block = &block[std::mem::size_of::<f64>()..];
let array = array
.as_mut_any()
.downcast_mut::<MutableBinaryArray<i32>>()
.downcast_mut::<MutablePrimitiveArray<f64>>()
.unwrap();
array.push(Some(data))
array.push(Some(value))
}
_ => todo!(),
};
_ => unreachable!(),
},
PhysicalType::Utf8 => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
let data = simdutf8::basic::from_utf8(&block[..len])?;
block = &block[len..];

let array = array
.as_mut_any()
.downcast_mut::<MutableUtf8Array<i32>>()
.unwrap();
array.push(Some(data))
}
PhysicalType::Binary => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
let data = &block[..len];
block = &block[len..];

let array = array
.as_mut_any()
.downcast_mut::<MutableBinaryArray<i32>>()
.unwrap();
array.push(Some(data));
}
PhysicalType::FixedSizeBinary => {
let array = array
.as_mut_any()
.downcast_mut::<MutableFixedSizeBinaryArray>()
.unwrap();
let len = array.size();
let data = &block[..len];
block = &block[len..];
array.push(Some(data));
}
PhysicalType::Dictionary(_) => {
let index = util::zigzag_i64(&mut block)? as i32;
let array = array
.as_mut_any()
.downcast_mut::<FixedItemsUtf8Dictionary>()
.unwrap();
array.push_valid(index);
}
_ => todo!(),
},
};
Ok(block)
}

pub fn deserialize(
mut block: &[u8],
rows: usize,
schema: Arc<Schema>,
avro_schemas: &[AvroSchema],
) -> Result<RecordBatch> {
// create mutables, one per field
let mut arrays: Vec<Box<dyn MutableArray>> = schema
.fields()
.iter()
.zip(avro_schemas.iter())
.map(|(field, avro_schema)| {
let data_type = field.data_type().to_logical_type();
make_mutable(data_type, Some(avro_schema), rows)
})
.collect::<Result<_>>()?;

// this is _the_ expensive transpose (rows -> columns)
for _ in 0..rows {
for (array, field) in arrays.iter_mut().zip(schema.fields().iter()) {
block = deserialize_item(array.as_mut(), field.is_nullable(), block)?
}
}
let columns = arrays.iter_mut().map(|array| array.as_arc()).collect();
Expand Down
Loading

0 comments on commit 81edcdd

Please sign in to comment.