diff --git a/examples/avro_read.rs b/examples/avro_read.rs new file mode 100644 index 00000000000..33a1e717b47 --- /dev/null +++ b/examples/avro_read.rs @@ -0,0 +1,31 @@ +use std::fs::File; +use std::io::BufReader; +use std::sync::Arc; + +use arrow2::error::Result; +use arrow2::io::avro::read; + +fn main() -> Result<()> { + use std::env; + let args: Vec = env::args().collect(); + + let path = &args[1]; + + let file = &mut BufReader::new(File::open(path)?); + + let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; + + println!("{:#?}", avro_schema); + + let reader = read::Reader::new( + read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec), + avro_schema, + Arc::new(schema), + ); + + for batch in reader { + let batch = batch?; + assert!(batch.num_rows() > 0); + } + Ok(()) +} diff --git a/examples/csv_read_async.rs b/examples/csv_read_async.rs index fa0e9481fc5..c893f59d809 100644 --- a/examples/csv_read_async.rs +++ b/examples/csv_read_async.rs @@ -1,10 +1,8 @@ use std::sync::Arc; -use futures::io::Cursor; use tokio::fs::File; use tokio_util::compat::*; -use arrow2::array::*; use arrow2::error::Result; use arrow2::io::csv::read_async::*; diff --git a/examples/growable.rs b/examples/growable.rs index cc8b7573e49..c79d9cd0cbb 100644 --- a/examples/growable.rs +++ b/examples/growable.rs @@ -1,5 +1,5 @@ use arrow2::array::growable::{Growable, GrowablePrimitive}; -use arrow2::array::{Array, PrimitiveArray}; +use arrow2::array::PrimitiveArray; fn main() { // say we have two sorted arrays diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs index 19d58d130e6..38f180d4739 100644 --- a/examples/parquet_read_parallel.rs +++ b/examples/parquet_read_parallel.rs @@ -1,11 +1,10 @@ -use crossbeam_channel::unbounded; -use parquet2::metadata::ColumnChunkMetaData; - use std::fs::File; use std::sync::Arc; use std::thread; use std::time::SystemTime; +use crossbeam_channel::unbounded; + use arrow2::{ array::Array, error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator, record_batch::RecordBatch, diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index 2ef82a324c7..a4a87620314 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -62,23 +62,45 @@ fn make_mutable( }) } -#[inline] +fn is_union_null_first(avro_field: &AvroSchema) -> bool { + if let AvroSchema::Union(schemas) = avro_field { + schemas.variants()[0] == AvroSchema::Null + } else { + unreachable!() + } +} + fn deserialize_item<'a>( array: &mut dyn MutableArray, is_nullable: bool, + avro_field: &AvroSchema, mut block: &'a [u8], ) -> Result<&'a [u8]> { if is_nullable { - // variant 0 is always the null in a union array - if util::zigzag_i64(&mut block)? == 0 { + let variant = util::zigzag_i64(&mut block)?; + let is_null_first = is_union_null_first(avro_field); + if is_null_first && variant == 0 || !is_null_first && variant != 0 { array.push_null(); return Ok(block); } } + deserialize_value(array, avro_field, block) +} +fn deserialize_value<'a>( + array: &mut dyn MutableArray, + avro_field: &AvroSchema, + mut block: &'a [u8], +) -> Result<&'a [u8]> { let data_type = array.data_type(); match data_type { DataType::List(inner) => { + let avro_inner = if let AvroSchema::Array(inner) = avro_field { + inner.as_ref() + } else { + unreachable!() + }; + let is_nullable = inner.is_nullable(); let array = array .as_mut_any() @@ -93,7 +115,7 @@ fn deserialize_item<'a>( let values = array.mut_values(); for _ in 0..len { - block = deserialize_item(values, is_nullable, block)?; + block = deserialize_item(values, is_nullable, avro_inner, block)?; } array.try_push_valid()?; } @@ -238,8 +260,12 @@ pub fn deserialize( // this is _the_ expensive transpose (rows -> columns) for _ in 0..rows { - for (array, field) in arrays.iter_mut().zip(schema.fields().iter()) { - block = deserialize_item(array.as_mut(), field.is_nullable(), block)? + for ((array, field), avro_field) in arrays + .iter_mut() + .zip(schema.fields().iter()) + .zip(avro_schemas.iter()) + { + block = deserialize_item(array.as_mut(), field.is_nullable(), avro_field, block)? } } let columns = arrays.iter_mut().map(|array| array.as_arc()).collect();