Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed error in reading nullable from Avro. #631

Merged
merged 1 commit into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions examples/avro_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;

use arrow2::error::Result;
use arrow2::io::avro::read;

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let path = &args[1];

let file = &mut BufReader::new(File::open(path)?);

let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?;

println!("{:#?}", avro_schema);

let reader = read::Reader::new(
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
Arc::new(schema),
);

for batch in reader {
let batch = batch?;
assert!(batch.num_rows() > 0);
}
Ok(())
}
2 changes: 0 additions & 2 deletions examples/csv_read_async.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::sync::Arc;

use futures::io::Cursor;
use tokio::fs::File;
use tokio_util::compat::*;

use arrow2::array::*;
use arrow2::error::Result;
use arrow2::io::csv::read_async::*;

Expand Down
2 changes: 1 addition & 1 deletion examples/growable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow2::array::growable::{Growable, GrowablePrimitive};
use arrow2::array::{Array, PrimitiveArray};
use arrow2::array::PrimitiveArray;

fn main() {
// say we have two sorted arrays
Expand Down
5 changes: 2 additions & 3 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crossbeam_channel::unbounded;
use parquet2::metadata::ColumnChunkMetaData;

use std::fs::File;
use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

use crossbeam_channel::unbounded;

use arrow2::{
array::Array, error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator,
record_batch::RecordBatch,
Expand Down
38 changes: 32 additions & 6 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,45 @@ fn make_mutable(
})
}

#[inline]
fn is_union_null_first(avro_field: &AvroSchema) -> bool {
if let AvroSchema::Union(schemas) = avro_field {
schemas.variants()[0] == AvroSchema::Null
} else {
unreachable!()
}
}

fn deserialize_item<'a>(
array: &mut dyn MutableArray,
is_nullable: bool,
avro_field: &AvroSchema,
mut block: &'a [u8],
) -> Result<&'a [u8]> {
if is_nullable {
// variant 0 is always the null in a union array
if util::zigzag_i64(&mut block)? == 0 {
let variant = util::zigzag_i64(&mut block)?;
let is_null_first = is_union_null_first(avro_field);
if is_null_first && variant == 0 || !is_null_first && variant != 0 {
array.push_null();
return Ok(block);
}
}
deserialize_value(array, avro_field, block)
}

fn deserialize_value<'a>(
array: &mut dyn MutableArray,
avro_field: &AvroSchema,
mut block: &'a [u8],
) -> Result<&'a [u8]> {
let data_type = array.data_type();
match data_type {
DataType::List(inner) => {
let avro_inner = if let AvroSchema::Array(inner) = avro_field {
inner.as_ref()
} else {
unreachable!()
};

let is_nullable = inner.is_nullable();
let array = array
.as_mut_any()
Expand All @@ -93,7 +115,7 @@ fn deserialize_item<'a>(

let values = array.mut_values();
for _ in 0..len {
block = deserialize_item(values, is_nullable, block)?;
block = deserialize_item(values, is_nullable, avro_inner, block)?;
}
array.try_push_valid()?;
}
Expand Down Expand Up @@ -238,8 +260,12 @@ pub fn deserialize(

// this is _the_ expensive transpose (rows -> columns)
for _ in 0..rows {
for (array, field) in arrays.iter_mut().zip(schema.fields().iter()) {
block = deserialize_item(array.as_mut(), field.is_nullable(), block)?
for ((array, field), avro_field) in arrays
.iter_mut()
.zip(schema.fields().iter())
.zip(avro_schemas.iter())
{
block = deserialize_item(array.as_mut(), field.is_nullable(), avro_field, block)?
}
}
let columns = arrays.iter_mut().map(|array| array.as_arc()).collect();
Expand Down