From 4ff0b0138a4b4c946d74e8869917aed66446d512 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 13 Mar 2022 10:22:01 +0000 Subject: [PATCH] Added support to write ListArray to Avro --- src/io/avro/write/schema.rs | 7 ++- src/io/avro/write/serialize.rs | 93 +++++++++++++++++++++++++++++++++- tests/it/io/avro/write.rs | 33 ++++++++++++ 3 files changed, 128 insertions(+), 5 deletions(-) diff --git a/src/io/avro/write/schema.rs b/src/io/avro/write/schema.rs index 23e0949145a..6db94b84a04 100644 --- a/src/io/avro/write/schema.rs +++ b/src/io/avro/write/schema.rs @@ -36,10 +36,9 @@ fn _type_to_schema(data_type: &DataType) -> Result { DataType::LargeBinary => AvroSchema::Bytes(None), DataType::Utf8 => AvroSchema::String(None), DataType::LargeUtf8 => AvroSchema::String(None), - DataType::List(inner) => AvroSchema::Array(Box::new(type_to_schema( - &inner.data_type, - inner.is_nullable, - )?)), + DataType::LargeList(inner) | DataType::List(inner) => AvroSchema::Array(Box::new( + type_to_schema(&inner.data_type, inner.is_nullable)?, + )), DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)), DataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)), DataType::Time64(TimeUnit::Microsecond) => AvroSchema::Long(Some(LongLogical::Time)), diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs index 1cce28cc578..71ae79bfe4e 100644 --- a/src/io/avro/write/serialize.rs +++ b/src/io/avro/write/serialize.rs @@ -1,5 +1,6 @@ use avro_schema::Schema as AvroSchema; +use crate::bitmap::utils::zip_validity; use crate::datatypes::{IntervalUnit, PhysicalType, PrimitiveType}; use crate::types::months_days_ns; use crate::{array::*, datatypes::DataType}; @@ -61,6 +62,60 @@ fn binary_optional(array: &BinaryArray) -> BoxSerializer { )) } +fn list_required<'a, O: Offset>(array: &'a ListArray, schema: &AvroSchema) -> BoxSerializer<'a> { + let mut inner = new_serializer(array.values().as_ref(), schema); + let lengths = array + .offsets() + .windows(2) + .map(|w| (w[1] - w[0]).to_usize() as i64); + + Box::new(BufStreamingIterator::new( + lengths, + move |length, buf| { + util::zigzag_encode(length, buf).unwrap(); + let mut rows = 0; + while let Some(item) = inner.next() { + buf.extend_from_slice(item); + rows += 1; + if rows == length { + util::zigzag_encode(0, buf).unwrap(); + break; + } + } + }, + vec![], + )) +} + +fn list_optional<'a, O: Offset>(array: &'a ListArray, schema: &AvroSchema) -> BoxSerializer<'a> { + let mut inner = new_serializer(array.values().as_ref(), schema); + let lengths = array + .offsets() + .windows(2) + .map(|w| (w[1] - w[0]).to_usize() as i64); + let lengths = zip_validity(lengths, array.validity().as_ref().map(|x| x.iter())); + + Box::new(BufStreamingIterator::new( + lengths, + move |length, buf| { + util::zigzag_encode(length.is_some() as i64, buf).unwrap(); + if let Some(length) = length { + util::zigzag_encode(length, buf).unwrap(); + let mut rows = 0; + while let Some(item) = inner.next() { + buf.extend_from_slice(item); + rows += 1; + if rows == length { + util::zigzag_encode(0, buf).unwrap(); + break; + } + } + } + }, + vec![], + )) +} + /// Creates a [`StreamingIterator`] trait object that presents items from `array` /// encoded according to `schema`. /// # Panic @@ -297,6 +352,29 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } + + (PhysicalType::List, AvroSchema::Array(schema)) => { + list_required::(array.as_any().downcast_ref().unwrap(), schema.as_ref()) + } + (PhysicalType::LargeList, AvroSchema::Array(schema)) => { + list_required::(array.as_any().downcast_ref().unwrap(), schema.as_ref()) + } + (PhysicalType::List, AvroSchema::Union(inner)) => { + let schema = if let AvroSchema::Array(schema) = &inner[1] { + schema.as_ref() + } else { + unreachable!("The schema declaration does not match the deserialization") + }; + list_optional::(array.as_any().downcast_ref().unwrap(), schema) + } + (PhysicalType::LargeList, AvroSchema::Union(inner)) => { + let schema = if let AvroSchema::Array(schema) = &inner[1] { + schema.as_ref() + } else { + unreachable!("The schema declaration does not match the deserialization") + }; + list_optional::(array.as_any().downcast_ref().unwrap(), schema) + } (a, b) => todo!("{:?} -> {:?} not supported", a, b), } } @@ -304,9 +382,22 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria /// Whether [`new_serializer`] supports `data_type`. pub fn can_serialize(data_type: &DataType) -> bool { use DataType::*; + match data_type.to_logical_type() { + List(inner) => return can_serialize(&inner.data_type), + LargeList(inner) => return can_serialize(&inner.data_type), + _ => {} + }; + matches!( data_type, - Boolean | Int32 | Int64 | Utf8 | Binary | Interval(IntervalUnit::MonthDayNano) + Boolean + | Int32 + | Int64 + | Utf8 + | Binary + | LargeUtf8 + | LargeBinary + | Interval(IntervalUnit::MonthDayNano) ) } diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs index 4856237abf7..833dfc5ef59 100644 --- a/tests/it/io/avro/write.rs +++ b/tests/it/io/avro/write.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use arrow2::array::*; use arrow2::chunk::Chunk; use arrow2::datatypes::*; @@ -35,10 +37,23 @@ pub(super) fn schema() -> Schema { DataType::Interval(IntervalUnit::MonthDayNano), true, ), + Field::new( + "list", + DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + false, + ), + Field::new( + "list nullable", + DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + true, + ), ]) } pub(super) fn data() -> Chunk> { + let list_dt = DataType::List(Box::new(Field::new("item", DataType::Int32, true))); + let list_dt1 = DataType::List(Box::new(Field::new("item", DataType::Int32, true))); + let columns = vec![ Box::new(Int64Array::from_slice([27, 47])) as Box, Box::new(Int64Array::from([Some(27), None])), @@ -64,6 +79,24 @@ pub(super) fn data() -> Chunk> { Some(months_days_ns::new(1, 1, 10 * 1_000_000)), // 10 millis None, ])), + Box::new(ListArray::::new( + list_dt, + vec![0, 2, 5].into(), + Arc::new(PrimitiveArray::::from([ + None, + Some(1), + None, + Some(3), + Some(4), + ])), + None, + )), + Box::new(ListArray::::new( + list_dt1, + vec![0, 2, 2].into(), + Arc::new(PrimitiveArray::::from([None, Some(1)])), + Some([true, false].into()), + )), ]; Chunk::new(columns)