From 81edcdd452a11a4e5507bd49bbec60c051aeea8c Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Wed, 6 Oct 2021 23:31:16 +0200 Subject: [PATCH] Added support to read Avro logical types, `List`,`Enum`, `Duration` and `Fixed`. (#493) * Added support for Avro List,Enum,Fixed. * Added support for Duration. * Added test showing support for logical types. --- src/array/fixed_size_binary/mutable.rs | 6 + src/io/avro/read/deserialize.rs | 302 +++++++++++++++++-------- src/io/avro/read/mod.rs | 39 +++- src/io/avro/read/nested.rs | 201 ++++++++++++++++ src/io/avro/read/schema.rs | 29 +-- tests/it/io/avro/read/mod.rs | 105 ++++++++- 6 files changed, 554 insertions(+), 128 deletions(-) create mode 100644 src/io/avro/read/nested.rs diff --git a/src/array/fixed_size_binary/mutable.rs b/src/array/fixed_size_binary/mutable.rs index 2e5b728035f..2e0d218015e 100644 --- a/src/array/fixed_size_binary/mutable.rs +++ b/src/array/fixed_size_binary/mutable.rs @@ -129,6 +129,12 @@ impl MutableFixedSizeBinaryArray { Ok(primitive) } + /// returns the (fixed) size of the [`MutableFixedSizeBinaryArray`]. + #[inline] + pub fn size(&self) -> usize { + self.size + } + fn init_validity(&mut self) { let mut validity = MutableBitmap::new(); validity.extend_constant(self.len(), true); diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index 514de185a59..2ef82a324c7 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -1,135 +1,245 @@ use std::convert::TryInto; use std::sync::Arc; +use avro_rs::Schema as AvroSchema; + use crate::array::*; use crate::datatypes::*; use crate::error::ArrowError; use crate::error::Result; use crate::record_batch::RecordBatch; +use crate::types::months_days_ns; +use super::nested::*; use super::util; -pub fn deserialize(mut block: &[u8], rows: usize, schema: Arc) -> Result { - // create mutables, one per field - let mut arrays: Vec> = schema - .fields() - .iter() - .map(|field| match field.data_type().to_physical_type() { - PhysicalType::Boolean => { - Ok(Box::new(MutableBooleanArray::with_capacity(rows)) as Box) - } - PhysicalType::Primitive(primitive) => with_match_primitive_type!(primitive, |$T| { - Ok(Box::new(MutablePrimitiveArray::<$T>::with_capacity(rows)) as Box) - }), - PhysicalType::Utf8 => { - Ok(Box::new(MutableUtf8Array::::with_capacity(rows)) as Box) +fn make_mutable( + data_type: &DataType, + avro_schema: Option<&AvroSchema>, + capacity: usize, +) -> Result> { + Ok(match data_type.to_physical_type() { + PhysicalType::Boolean => { + Box::new(MutableBooleanArray::with_capacity(capacity)) as Box + } + PhysicalType::Primitive(primitive) => with_match_primitive_type!(primitive, |$T| { + Box::new(MutablePrimitiveArray::<$T>::with_capacity(capacity).to(data_type.clone())) + as Box + }), + PhysicalType::Binary => { + Box::new(MutableBinaryArray::::with_capacity(capacity)) as Box + } + PhysicalType::Utf8 => { + Box::new(MutableUtf8Array::::with_capacity(capacity)) as Box + } + PhysicalType::Dictionary(_) => { + if let Some(AvroSchema::Enum { symbols, .. }) = avro_schema { + let values = Utf8Array::::from_slice(symbols); + Box::new(FixedItemsUtf8Dictionary::with_capacity(values, capacity)) + as Box + } else { + unreachable!() } - PhysicalType::Binary => { - Ok(Box::new(MutableBinaryArray::::with_capacity(rows)) - as Box) + } + _ => match data_type { + DataType::List(inner) => { + let values = make_mutable(inner.data_type(), None, 0)?; + Box::new(DynMutableListArray::::new_with_capacity( + values, capacity, + )) as Box } + DataType::FixedSizeBinary(size) => Box::new(MutableFixedSizeBinaryArray::with_capacity( + *size as usize, + capacity, + )) as Box, other => { return Err(ArrowError::NotYetImplemented(format!( "Deserializing type {:?} is still not implemented", other ))) } - }) - .collect::>()?; + }, + }) +} - // this is _the_ expensive transpose (rows -> columns) - for _ in 0..rows { - for (array, field) in arrays.iter_mut().zip(schema.fields().iter()) { - if field.is_nullable() { - // variant 0 is always the null in a union array - if util::zigzag_i64(&mut block)? == 0 { - array.push_null(); - continue; +#[inline] +fn deserialize_item<'a>( + array: &mut dyn MutableArray, + is_nullable: bool, + mut block: &'a [u8], +) -> Result<&'a [u8]> { + if is_nullable { + // variant 0 is always the null in a union array + if util::zigzag_i64(&mut block)? == 0 { + array.push_null(); + return Ok(block); + } + } + + let data_type = array.data_type(); + match data_type { + DataType::List(inner) => { + let is_nullable = inner.is_nullable(); + let array = array + .as_mut_any() + .downcast_mut::>() + .unwrap(); + loop { + let len = util::zigzag_i64(&mut block)? as usize; + + if len == 0 { + break; + } + + let values = array.mut_values(); + for _ in 0..len { + block = deserialize_item(values, is_nullable, block)?; } + array.try_push_valid()?; } + } + DataType::Interval(IntervalUnit::MonthDayNano) => { + // https://avro.apache.org/docs/current/spec.html#Duration + // 12 bytes, months, days, millis in LE + let data = &block[..12]; + block = &block[12..]; + + let value = months_days_ns::new( + i32::from_le_bytes([data[0], data[1], data[2], data[3]]), + i32::from_le_bytes([data[4], data[5], data[6], data[7]]), + i32::from_le_bytes([data[8], data[9], data[10], data[11]]) as i64 * 1_000_000, + ); - match array.data_type().to_physical_type() { - PhysicalType::Boolean => { - let is_valid = block[0] == 1; - block = &block[1..]; + let array = array + .as_mut_any() + .downcast_mut::>() + .unwrap(); + array.push(Some(value)) + } + _ => match data_type.to_physical_type() { + PhysicalType::Boolean => { + let is_valid = block[0] == 1; + block = &block[1..]; + let array = array + .as_mut_any() + .downcast_mut::() + .unwrap(); + array.push(Some(is_valid)) + } + PhysicalType::Primitive(primitive) => match primitive { + PrimitiveType::Int32 => { + let value = util::zigzag_i64(&mut block)? as i32; let array = array .as_mut_any() - .downcast_mut::() + .downcast_mut::>() .unwrap(); - array.push(Some(is_valid)) + array.push(Some(value)) } - PhysicalType::Primitive(primitive) => { - use crate::datatypes::PrimitiveType::*; - match primitive { - Int32 => { - let value = util::zigzag_i64(&mut block)? as i32; - let array = array - .as_mut_any() - .downcast_mut::>() - .unwrap(); - array.push(Some(value)) - } - Int64 => { - let value = util::zigzag_i64(&mut block)? as i64; - let array = array - .as_mut_any() - .downcast_mut::>() - .unwrap(); - array.push(Some(value)) - } - Float32 => { - let value = f32::from_le_bytes(block[..4].try_into().unwrap()); - block = &block[4..]; - let array = array - .as_mut_any() - .downcast_mut::>() - .unwrap(); - array.push(Some(value)) - } - Float64 => { - let value = f64::from_le_bytes(block[..8].try_into().unwrap()); - block = &block[8..]; - let array = array - .as_mut_any() - .downcast_mut::>() - .unwrap(); - array.push(Some(value)) - } - _ => unreachable!(), - } + PrimitiveType::Int64 => { + let value = util::zigzag_i64(&mut block)? as i64; + let array = array + .as_mut_any() + .downcast_mut::>() + .unwrap(); + array.push(Some(value)) } - PhysicalType::Utf8 => { - let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| { - ArrowError::ExternalFormat( - "Avro format contains a non-usize number of bytes".to_string(), - ) - })?; - let data = simdutf8::basic::from_utf8(&block[..len])?; - block = &block[len..]; - + PrimitiveType::Float32 => { + let value = + f32::from_le_bytes(block[..std::mem::size_of::()].try_into().unwrap()); + block = &block[std::mem::size_of::()..]; let array = array .as_mut_any() - .downcast_mut::>() + .downcast_mut::>() .unwrap(); - array.push(Some(data)) + array.push(Some(value)) } - PhysicalType::Binary => { - let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| { - ArrowError::ExternalFormat( - "Avro format contains a non-usize number of bytes".to_string(), - ) - })?; - let data = &block[..len]; - block = &block[len..]; - + PrimitiveType::Float64 => { + let value = + f64::from_le_bytes(block[..std::mem::size_of::()].try_into().unwrap()); + block = &block[std::mem::size_of::()..]; let array = array .as_mut_any() - .downcast_mut::>() + .downcast_mut::>() .unwrap(); - array.push(Some(data)) + array.push(Some(value)) } - _ => todo!(), - }; + _ => unreachable!(), + }, + PhysicalType::Utf8 => { + let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| { + ArrowError::ExternalFormat( + "Avro format contains a non-usize number of bytes".to_string(), + ) + })?; + let data = simdutf8::basic::from_utf8(&block[..len])?; + block = &block[len..]; + + let array = array + .as_mut_any() + .downcast_mut::>() + .unwrap(); + array.push(Some(data)) + } + PhysicalType::Binary => { + let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| { + ArrowError::ExternalFormat( + "Avro format contains a non-usize number of bytes".to_string(), + ) + })?; + let data = &block[..len]; + block = &block[len..]; + + let array = array + .as_mut_any() + .downcast_mut::>() + .unwrap(); + array.push(Some(data)); + } + PhysicalType::FixedSizeBinary => { + let array = array + .as_mut_any() + .downcast_mut::() + .unwrap(); + let len = array.size(); + let data = &block[..len]; + block = &block[len..]; + array.push(Some(data)); + } + PhysicalType::Dictionary(_) => { + let index = util::zigzag_i64(&mut block)? as i32; + let array = array + .as_mut_any() + .downcast_mut::() + .unwrap(); + array.push_valid(index); + } + _ => todo!(), + }, + }; + Ok(block) +} + +pub fn deserialize( + mut block: &[u8], + rows: usize, + schema: Arc, + avro_schemas: &[AvroSchema], +) -> Result { + // create mutables, one per field + let mut arrays: Vec> = schema + .fields() + .iter() + .zip(avro_schemas.iter()) + .map(|(field, avro_schema)| { + let data_type = field.data_type().to_logical_type(); + make_mutable(data_type, Some(avro_schema), rows) + }) + .collect::>()?; + + // this is _the_ expensive transpose (rows -> columns) + for _ in 0..rows { + for (array, field) in arrays.iter_mut().zip(schema.fields().iter()) { + block = deserialize_item(array.as_mut(), field.is_nullable(), block)? } } let columns = arrays.iter_mut().map(|array| array.as_arc()).collect(); diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index 721b0e66d1f..02aa6a7dd4b 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -3,10 +3,11 @@ use std::io::Read; use std::sync::Arc; -use avro_rs::Codec; +use avro_rs::{Codec, Schema as AvroSchema}; use streaming_iterator::StreamingIterator; mod deserialize; +mod nested; mod schema; mod util; @@ -15,9 +16,19 @@ use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; /// Reads the avro metadata from `reader` into a [`Schema`], [`Codec`] and magic marker. -pub fn read_metadata(reader: &mut R) -> Result<(Schema, Codec, [u8; 16])> { - let (schema, codec, marker) = util::read_schema(reader)?; - Ok((schema::convert_schema(&schema)?, codec, marker)) +pub fn read_metadata( + reader: &mut R, +) -> Result<(Vec, Schema, Codec, [u8; 16])> { + let (avro_schema, codec, marker) = util::read_schema(reader)?; + let schema = schema::convert_schema(&avro_schema)?; + + let avro_schema = if let AvroSchema::Record { fields, .. } = avro_schema { + fields.into_iter().map(|x| x.schema).collect() + } else { + panic!() + }; + + Ok((avro_schema, schema, codec, marker)) } fn read_size(reader: &mut R) -> Result<(usize, usize)> { @@ -157,12 +168,21 @@ impl<'a, R: Read> StreamingIterator for Decompressor<'a, R> { pub struct Reader<'a, R: Read> { iter: Decompressor<'a, R>, schema: Arc, + avro_schemas: Vec, } impl<'a, R: Read> Reader<'a, R> { /// Creates a new [`Reader`]. - pub fn new(iter: Decompressor<'a, R>, schema: Arc) -> Self { - Self { iter, schema } + pub fn new( + iter: Decompressor<'a, R>, + avro_schemas: Vec, + schema: Arc, + ) -> Self { + Self { + iter, + avro_schemas, + schema, + } } } @@ -171,7 +191,12 @@ impl<'a, R: Read> Iterator for Reader<'a, R> { fn next(&mut self) -> Option { if let Some((data, rows)) = self.iter.next() { - Some(deserialize::deserialize(data, *rows, self.schema.clone())) + Some(deserialize::deserialize( + data, + *rows, + self.schema.clone(), + &self.avro_schemas, + )) } else { None } diff --git a/src/io/avro/read/nested.rs b/src/io/avro/read/nested.rs new file mode 100644 index 00000000000..3dc89a7c2d4 --- /dev/null +++ b/src/io/avro/read/nested.rs @@ -0,0 +1,201 @@ +use std::sync::Arc; + +use crate::array::*; +use crate::bitmap::*; +use crate::buffer::*; +use crate::datatypes::*; +use crate::error::*; + +/// Auxiliary struct +#[derive(Debug)] +pub struct DynMutableListArray { + data_type: DataType, + offsets: MutableBuffer, + values: Box, + validity: Option, +} + +impl DynMutableListArray { + pub fn new_from(values: Box, data_type: DataType, capacity: usize) -> Self { + let mut offsets = MutableBuffer::::with_capacity(capacity + 1); + offsets.push(O::default()); + assert_eq!(values.len(), 0); + ListArray::::get_child_field(&data_type); + Self { + data_type, + offsets, + values, + validity: None, + } + } + + /// Creates a new [`MutableListArray`] from a [`MutableArray`] and capacity. + pub fn new_with_capacity(values: Box, capacity: usize) -> Self { + let data_type = ListArray::::default_datatype(values.data_type().clone()); + Self::new_from(values, data_type, capacity) + } + + /// The values + pub fn mut_values(&mut self) -> &mut dyn MutableArray { + self.values.as_mut() + } + + #[inline] + pub fn try_push_valid(&mut self) -> Result<()> { + let size = self.values.len(); + let size = O::from_usize(size).ok_or(ArrowError::KeyOverflowError)?; // todo: make this error + assert!(size >= *self.offsets.last().unwrap()); + + self.offsets.push(size); + if let Some(validity) = &mut self.validity { + validity.push(true) + } + Ok(()) + } + + #[inline] + fn push_null(&mut self) { + self.offsets.push(self.last_offset()); + match &mut self.validity { + Some(validity) => validity.push(false), + None => self.init_validity(), + } + } + + #[inline] + fn last_offset(&self) -> O { + *self.offsets.last().unwrap() + } + + fn init_validity(&mut self) { + let len = self.offsets.len() - 1; + + let mut validity = MutableBitmap::new(); + validity.extend_constant(len, true); + validity.set(len - 1, false); + self.validity = Some(validity) + } +} + +impl MutableArray for DynMutableListArray { + fn len(&self) -> usize { + self.offsets.len() - 1 + } + + fn validity(&self) -> Option<&MutableBitmap> { + self.validity.as_ref() + } + + fn as_box(&mut self) -> Box { + Box::new(ListArray::from_data( + self.data_type.clone(), + std::mem::take(&mut self.offsets).into(), + self.values.as_arc(), + std::mem::take(&mut self.validity).map(|x| x.into()), + )) + } + + fn as_arc(&mut self) -> Arc { + Arc::new(ListArray::from_data( + self.data_type.clone(), + std::mem::take(&mut self.offsets).into(), + self.values.as_arc(), + std::mem::take(&mut self.validity).map(|x| x.into()), + )) + } + + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + + #[inline] + fn push_null(&mut self) { + self.push_null() + } + + fn shrink_to_fit(&mut self) { + todo!(); + } +} + +#[derive(Debug)] +pub struct FixedItemsUtf8Dictionary { + data_type: DataType, + keys: MutablePrimitiveArray, + values: Utf8Array, +} + +impl FixedItemsUtf8Dictionary { + pub fn with_capacity(values: Utf8Array, capacity: usize) -> Self { + Self { + data_type: DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(values.data_type().clone()), + ), + keys: MutablePrimitiveArray::::with_capacity(capacity), + values, + } + } + + pub fn push_valid(&mut self, key: i32) { + self.keys.push(Some(key)) + } + + /// pushes a null value + pub fn push_null(&mut self) { + self.keys.push(None) + } +} + +impl MutableArray for FixedItemsUtf8Dictionary { + fn len(&self) -> usize { + self.keys.len() + } + + fn validity(&self) -> Option<&MutableBitmap> { + self.keys.validity() + } + + fn as_box(&mut self) -> Box { + Box::new(DictionaryArray::from_data( + std::mem::take(&mut self.keys).into(), + Arc::new(self.values.clone()), + )) + } + + fn as_arc(&mut self) -> Arc { + Arc::new(DictionaryArray::from_data( + std::mem::take(&mut self.keys).into(), + Arc::new(self.values.clone()), + )) + } + + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + + #[inline] + fn push_null(&mut self) { + self.push_null() + } + + fn shrink_to_fit(&mut self) { + todo!(); + } +} diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index 6d4554e9185..8b9d648cf5f 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -111,16 +111,13 @@ fn schema_to_field( AvroSchema::Double => DataType::Float64, AvroSchema::Bytes => DataType::Binary, AvroSchema::String => DataType::Utf8, - AvroSchema::Array(item_schema) => { - DataType::List(Box::new(schema_to_field(item_schema, None, false, None)?)) - } - AvroSchema::Map(value_schema) => { - let value_field = schema_to_field(value_schema, Some("value"), false, None)?; - DataType::Dictionary( - Box::new(DataType::Utf8), - Box::new(value_field.data_type().clone()), - ) - } + AvroSchema::Array(item_schema) => DataType::List(Box::new(schema_to_field( + item_schema, + Some("item"), // default name for list items + false, + None, + )?)), + AvroSchema::Map(_) => todo!("Avro maps are mapped to MapArrays"), AvroSchema::Union(us) => { // If there are only two variants and one of them is null, set the other type as the field data type let has_nullable = us.find_schema(&Value::Null).is_some(); @@ -169,12 +166,10 @@ fn schema_to_field( .collect(); DataType::Struct(fields?) } - AvroSchema::Enum { name, .. } => { - return Ok(Field::new_dict( - &name.fullname(None), - DataType::Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)), - false, - 0, + AvroSchema::Enum { .. } => { + return Ok(Field::new( + name.unwrap_or_default(), + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false, )) } @@ -188,7 +183,7 @@ fn schema_to_field( AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond), AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None), AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None), - AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond), + AvroSchema::Duration => DataType::Interval(IntervalUnit::MonthDayNano), }; let name = name.unwrap_or_default(); diff --git a/tests/it/io/avro/read/mod.rs b/tests/it/io/avro/read/mod.rs index d7b4d3db837..c3b33ebd391 100644 --- a/tests/it/io/avro/read/mod.rs +++ b/tests/it/io/avro/read/mod.rs @@ -1,8 +1,9 @@ use std::sync::Arc; -use avro_rs::types::Record; -use avro_rs::Schema as AvroSchema; +use arrow2::types::months_days_ns; +use avro_rs::types::{Record, Value}; use avro_rs::Writer; +use avro_rs::{Days, Duration, Millis, Months, Schema as AvroSchema}; use arrow2::array::*; use arrow2::datatypes::*; @@ -19,10 +20,35 @@ fn schema() -> (AvroSchema, Schema) { {"name": "a", "type": "long"}, {"name": "b", "type": "string"}, {"name": "c", "type": "int"}, + { + "name": "date", + "type": "int", + "logicalType": "date" + }, {"name": "d", "type": "bytes"}, {"name": "e", "type": "double"}, {"name": "f", "type": "boolean"}, - {"name": "h", "type": ["null", "string"], "default": null} + {"name": "g", "type": ["null", "string"], "default": null}, + {"name": "h", "type": { + "type": "array", + "items": { + "name": "item", + "type": ["null", "int"], + "default": null + } + }}, + {"name": "enum", "type": { + "type": "enum", + "name": "", + "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + }}, + {"name": "duration", + "logicalType": "duration", + "type": { + "name": "duration", + "type": "fixed", + "size": 12 + }} ] } "#; @@ -31,10 +57,26 @@ fn schema() -> (AvroSchema, Schema) { Field::new("a", DataType::Int64, false), Field::new("b", DataType::Utf8, false), Field::new("c", DataType::Int32, false), + Field::new("date", DataType::Date32, false), Field::new("d", DataType::Binary, false), Field::new("e", DataType::Float64, false), Field::new("f", DataType::Boolean, false), - Field::new("h", DataType::Utf8, true), + Field::new("g", DataType::Utf8, true), + Field::new( + "h", + DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + false, + ), + Field::new( + "enum", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ), + Field::new( + "duration", + DataType::Interval(IntervalUnit::MonthDayNano), + false, + ), ]); (AvroSchema::parse_str(raw_schema).unwrap(), schema) @@ -50,30 +92,76 @@ fn write() -> Result<(Vec, RecordBatch)> { record.put("a", 27i64); record.put("b", "foo"); record.put("c", 1i32); + record.put("date", 1i32); record.put("d", b"foo".as_ref()); record.put("e", 1.0f64); record.put("f", true); - record.put("h", Some("foo")); + record.put("g", Some("foo")); + record.put( + "h", + Value::Array(vec![ + Value::Union(Box::new(Value::Int(1))), + Value::Union(Box::new(Value::Null)), + Value::Union(Box::new(Value::Int(3))), + ]), + ); + record.put("enum", Value::Enum(1, "HEARTS".to_string())); + record.put( + "duration", + Value::Duration(Duration::new(Months::new(1), Days::new(1), Millis::new(1))), + ); writer.append(record)?; let mut record = Record::new(writer.schema()).unwrap(); record.put("b", "bar"); record.put("a", 47i64); record.put("c", 1i32); + record.put("date", 2i32); record.put("d", b"bar".as_ref()); record.put("e", 2.0f64); record.put("f", false); - record.put("h", None::<&str>); + record.put("g", None::<&str>); + record.put( + "h", + Value::Array(vec![ + Value::Union(Box::new(Value::Int(1))), + Value::Union(Box::new(Value::Null)), + Value::Union(Box::new(Value::Int(3))), + ]), + ); + record.put("enum", Value::Enum(0, "SPADES".to_string())); + record.put( + "duration", + Value::Duration(Duration::new(Months::new(1), Days::new(2), Millis::new(1))), + ); writer.append(record)?; + let data = vec![ + Some(vec![Some(1i32), None, Some(3)]), + Some(vec![Some(1i32), None, Some(3)]), + ]; + + let mut array = MutableListArray::>::new(); + array.try_extend(data).unwrap(); + let columns = vec![ Arc::new(Int64Array::from_slice([27, 47])) as Arc, Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, Arc::new(Int32Array::from_slice([1, 1])) as Arc, + Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc, Arc::new(BinaryArray::::from_slice([b"foo", b"bar"])) as Arc, Arc::new(PrimitiveArray::::from_slice([1.0, 2.0])) as Arc, Arc::new(BooleanArray::from_slice([true, false])) as Arc, Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, + array.into_arc(), + Arc::new(DictionaryArray::::from_data( + Int32Array::from_slice([1, 0]), + Arc::new(Utf8Array::::from_slice(["SPADES", "HEARTS"])), + )) as Arc, + Arc::new(MonthsDaysNsArray::from_slice([ + months_days_ns::new(1, 1, 1_000_000), + months_days_ns::new(1, 2, 1_000_000), + ])) as Arc, ]; let expected = RecordBatch::try_new(Arc::new(schema), columns).unwrap(); @@ -83,14 +171,15 @@ fn write() -> Result<(Vec, RecordBatch)> { #[test] fn read() -> Result<()> { - let (data, expected) = write()?; + let (data, expected) = write().unwrap(); let file = &mut &data[..]; - let (schema, codec, file_marker) = read::read_metadata(file)?; + let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; let mut reader = read::Reader::new( read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec), + avro_schema, Arc::new(schema), );