Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error in reading nullable from nulls. (#631)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Nov 24, 2021
1 parent bc3da78 commit 7f2c5bd
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 12 deletions.
31 changes: 31 additions & 0 deletions examples/avro_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;

use arrow2::error::Result;
use arrow2::io::avro::read;

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let path = &args[1];

let file = &mut BufReader::new(File::open(path)?);

let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?;

println!("{:#?}", avro_schema);

let reader = read::Reader::new(
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
Arc::new(schema),
);

for batch in reader {
let batch = batch?;
assert!(batch.num_rows() > 0);
}
Ok(())
}
2 changes: 0 additions & 2 deletions examples/csv_read_async.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::sync::Arc;

use futures::io::Cursor;
use tokio::fs::File;
use tokio_util::compat::*;

use arrow2::array::*;
use arrow2::error::Result;
use arrow2::io::csv::read_async::*;

Expand Down
2 changes: 1 addition & 1 deletion examples/growable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow2::array::growable::{Growable, GrowablePrimitive};
use arrow2::array::{Array, PrimitiveArray};
use arrow2::array::PrimitiveArray;

fn main() {
// say we have two sorted arrays
Expand Down
5 changes: 2 additions & 3 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crossbeam_channel::unbounded;
use parquet2::metadata::ColumnChunkMetaData;

use std::fs::File;
use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

use crossbeam_channel::unbounded;

use arrow2::{
array::Array, error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator,
record_batch::RecordBatch,
Expand Down
38 changes: 32 additions & 6 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,45 @@ fn make_mutable(
})
}

#[inline]
fn is_union_null_first(avro_field: &AvroSchema) -> bool {
if let AvroSchema::Union(schemas) = avro_field {
schemas.variants()[0] == AvroSchema::Null
} else {
unreachable!()
}
}

fn deserialize_item<'a>(
array: &mut dyn MutableArray,
is_nullable: bool,
avro_field: &AvroSchema,
mut block: &'a [u8],
) -> Result<&'a [u8]> {
if is_nullable {
// variant 0 is always the null in a union array
if util::zigzag_i64(&mut block)? == 0 {
let variant = util::zigzag_i64(&mut block)?;
let is_null_first = is_union_null_first(avro_field);
if is_null_first && variant == 0 || !is_null_first && variant != 0 {
array.push_null();
return Ok(block);
}
}
deserialize_value(array, avro_field, block)
}

fn deserialize_value<'a>(
array: &mut dyn MutableArray,
avro_field: &AvroSchema,
mut block: &'a [u8],
) -> Result<&'a [u8]> {
let data_type = array.data_type();
match data_type {
DataType::List(inner) => {
let avro_inner = if let AvroSchema::Array(inner) = avro_field {
inner.as_ref()
} else {
unreachable!()
};

let is_nullable = inner.is_nullable();
let array = array
.as_mut_any()
Expand All @@ -93,7 +115,7 @@ fn deserialize_item<'a>(

let values = array.mut_values();
for _ in 0..len {
block = deserialize_item(values, is_nullable, block)?;
block = deserialize_item(values, is_nullable, avro_inner, block)?;
}
array.try_push_valid()?;
}
Expand Down Expand Up @@ -238,8 +260,12 @@ pub fn deserialize(

// this is _the_ expensive transpose (rows -> columns)
for _ in 0..rows {
for (array, field) in arrays.iter_mut().zip(schema.fields().iter()) {
block = deserialize_item(array.as_mut(), field.is_nullable(), block)?
for ((array, field), avro_field) in arrays
.iter_mut()
.zip(schema.fields().iter())
.zip(avro_schemas.iter())
{
block = deserialize_item(array.as_mut(), field.is_nullable(), avro_field, block)?
}
}
let columns = arrays.iter_mut().map(|array| array.as_arc()).collect();
Expand Down

0 comments on commit 7f2c5bd

Please sign in to comment.