From 8fa68a61d0c46b5fd050f06d0e69ffa43384dae2 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 28 Jun 2022 11:16:13 +0000 Subject: [PATCH] Added support for FixedSizeBinary to avro --- src/array/fixed_size_binary/mutable.rs | 2 +- src/io/avro/write/serialize.rs | 34 +++++++++++++++++++ .../parquet/read/deserialize/binary/nested.rs | 1 - tests/it/io/avro/read.rs | 13 +++++-- tests/it/io/avro/write.rs | 6 ++++ 5 files changed, 51 insertions(+), 5 deletions(-) diff --git a/src/array/fixed_size_binary/mutable.rs b/src/array/fixed_size_binary/mutable.rs index 4155baa32fe..528689ae4bc 100644 --- a/src/array/fixed_size_binary/mutable.rs +++ b/src/array/fixed_size_binary/mutable.rs @@ -235,7 +235,7 @@ impl MutableArray for MutableFixedSizeBinaryArray { } fn push_null(&mut self) { - self.values.resize(self.values.len() + self.size, 0); + self.push::<&[u8]>(None); } fn shrink_to_fit(&mut self) { diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs index e6c18fbaf8e..b41633ac950 100644 --- a/src/io/avro/write/serialize.rs +++ b/src/io/avro/write/serialize.rs @@ -62,6 +62,29 @@ fn binary_optional(array: &BinaryArray) -> BoxSerializer { )) } +fn fixed_size_binary_required(array: &FixedSizeBinaryArray) -> BoxSerializer { + Box::new(BufStreamingIterator::new( + array.values_iter(), + |x, buf| { + buf.extend_from_slice(x); + }, + vec![], + )) +} + +fn fixed_size_binary_optional(array: &FixedSizeBinaryArray) -> BoxSerializer { + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + buf.extend_from_slice(x); + } + }, + vec![], + )) +} + fn list_required<'a, O: Offset>(array: &'a ListArray, schema: &AvroSchema) -> BoxSerializer<'a> { let mut inner = new_serializer(array.values().as_ref(), schema); let lengths = array @@ -218,12 +241,18 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria (PhysicalType::LargeBinary, AvroSchema::Union(_)) => { binary_optional::(array.as_any().downcast_ref().unwrap()) } + (PhysicalType::FixedSizeBinary, AvroSchema::Union(_)) => { + fixed_size_binary_optional(array.as_any().downcast_ref().unwrap()) + } (PhysicalType::Binary, AvroSchema::Bytes(_)) => { binary_required::(array.as_any().downcast_ref().unwrap()) } (PhysicalType::LargeBinary, AvroSchema::Bytes(_)) => { binary_required::(array.as_any().downcast_ref().unwrap()) } + (PhysicalType::FixedSizeBinary, AvroSchema::Fixed(_)) => { + fixed_size_binary_required(array.as_any().downcast_ref().unwrap()) + } (PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Union(_)) => { let values = array @@ -446,6 +475,7 @@ pub fn can_serialize(data_type: &DataType) -> bool { match data_type.to_logical_type() { List(inner) => return can_serialize(&inner.data_type), LargeList(inner) => return can_serialize(&inner.data_type), + Struct(inner) => return inner.iter().all(|inner| can_serialize(&inner.data_type)), _ => {} }; @@ -454,8 +484,12 @@ pub fn can_serialize(data_type: &DataType) -> bool { Boolean | Int32 | Int64 + | Float32 + | Float64 + | Decimal(_, _) | Utf8 | Binary + | FixedSizeBinary(_) | LargeUtf8 | LargeBinary | Interval(IntervalUnit::MonthDayNano) diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 8dbf1ba22d0..0d43c4a1cee 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -126,7 +126,6 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { } } State::OptionalDictionary(page_validity, page_values) => { - println!("optional_dict"); let dict_values = page_values.dict.values(); let dict_offsets = page_values.dict.offsets(); diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index 939c179732f..70034c71834 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -236,11 +236,13 @@ fn read_without_codec() -> Result<()> { test(Codec::Null) } +#[cfg(feature = "io_avro_compression")] #[test] fn read_deflate() -> Result<()> { test(Codec::Deflate) } +#[cfg(feature = "io_avro_compression")] #[test] fn read_snappy() -> Result<()> { test(Codec::Snappy) @@ -274,7 +276,12 @@ fn test_projected(projection: Vec) -> Result<()> { #[test] fn read_projected() -> Result<()> { - test_projected(vec![ - true, false, false, false, false, false, false, false, false, false, false, false, - ]) + for i in 0..12 { + let mut projection = vec![ + false, false, false, false, false, false, false, false, false, false, false, false, + ]; + projection[i] = true; + test_projected(projection)?; + } + Ok(()) } diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs index 06e8d72f112..d4b7e410c13 100644 --- a/tests/it/io/avro/write.rs +++ b/tests/it/io/avro/write.rs @@ -19,6 +19,8 @@ pub(super) fn schema() -> Schema { Field::new("date nullable", DataType::Date32, true), Field::new("binary", DataType::Binary, false), Field::new("binary nullable", DataType::Binary, true), + Field::new("fs binary", DataType::FixedSizeBinary(3), false), + Field::new("fs binary nullable", DataType::FixedSizeBinary(3), true), Field::new("float32", DataType::Float32, false), Field::new("float32 nullable", DataType::Float32, true), Field::new("float64", DataType::Float64, false), @@ -63,6 +65,8 @@ pub(super) fn data() -> Chunk> { Box::new(Int32Array::from([Some(1), None]).to(DataType::Date32)), Box::new(BinaryArray::::from_slice([b"foo", b"bar"])), Box::new(BinaryArray::::from([Some(b"foo"), None])), + Box::new(FixedSizeBinaryArray::from_slice([[1, 2, 3], [1, 2, 3]])), + Box::new(FixedSizeBinaryArray::from([Some([1, 2, 3]), None])), Box::new(PrimitiveArray::::from_slice([1.0, 2.0])), Box::new(PrimitiveArray::::from([Some(1.0), None])), Box::new(PrimitiveArray::::from_slice([1.0, 2.0])), @@ -162,11 +166,13 @@ fn no_compression() -> Result<()> { roundtrip(None) } +#[cfg(feature = "io_avro_compression")] #[test] fn snappy() -> Result<()> { roundtrip(Some(write::Compression::Snappy)) } +#[cfg(feature = "io_avro_compression")] #[test] fn deflate() -> Result<()> { roundtrip(Some(write::Compression::Deflate))