From 116cb44f2a122eb6de8b2e263ef3b3712480c6cb Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 28 Jun 2022 11:16:13 +0000 Subject: [PATCH] Added support for FixedSizeBinary to avro --- src/array/fixed_size_binary/mutable.rs | 2 +- src/io/avro/write/serialize.rs | 34 +++++++++++ .../parquet/read/deserialize/binary/nested.rs | 1 - tests/it/io/avro/read.rs | 58 ++++++++++--------- tests/it/io/avro/write.rs | 6 ++ 5 files changed, 73 insertions(+), 28 deletions(-) diff --git a/src/array/fixed_size_binary/mutable.rs b/src/array/fixed_size_binary/mutable.rs index 4155baa32fe..528689ae4bc 100644 --- a/src/array/fixed_size_binary/mutable.rs +++ b/src/array/fixed_size_binary/mutable.rs @@ -235,7 +235,7 @@ impl MutableArray for MutableFixedSizeBinaryArray { } fn push_null(&mut self) { - self.values.resize(self.values.len() + self.size, 0); + self.push::<&[u8]>(None); } fn shrink_to_fit(&mut self) { diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs index e6c18fbaf8e..b41633ac950 100644 --- a/src/io/avro/write/serialize.rs +++ b/src/io/avro/write/serialize.rs @@ -62,6 +62,29 @@ fn binary_optional(array: &BinaryArray) -> BoxSerializer { )) } +fn fixed_size_binary_required(array: &FixedSizeBinaryArray) -> BoxSerializer { + Box::new(BufStreamingIterator::new( + array.values_iter(), + |x, buf| { + buf.extend_from_slice(x); + }, + vec![], + )) +} + +fn fixed_size_binary_optional(array: &FixedSizeBinaryArray) -> BoxSerializer { + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + buf.extend_from_slice(x); + } + }, + vec![], + )) +} + fn list_required<'a, O: Offset>(array: &'a ListArray, schema: &AvroSchema) -> BoxSerializer<'a> { let mut inner = new_serializer(array.values().as_ref(), schema); let lengths = array @@ -218,12 +241,18 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria (PhysicalType::LargeBinary, AvroSchema::Union(_)) => { binary_optional::(array.as_any().downcast_ref().unwrap()) } + (PhysicalType::FixedSizeBinary, AvroSchema::Union(_)) => { + fixed_size_binary_optional(array.as_any().downcast_ref().unwrap()) + } (PhysicalType::Binary, AvroSchema::Bytes(_)) => { binary_required::(array.as_any().downcast_ref().unwrap()) } (PhysicalType::LargeBinary, AvroSchema::Bytes(_)) => { binary_required::(array.as_any().downcast_ref().unwrap()) } + (PhysicalType::FixedSizeBinary, AvroSchema::Fixed(_)) => { + fixed_size_binary_required(array.as_any().downcast_ref().unwrap()) + } (PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Union(_)) => { let values = array @@ -446,6 +475,7 @@ pub fn can_serialize(data_type: &DataType) -> bool { match data_type.to_logical_type() { List(inner) => return can_serialize(&inner.data_type), LargeList(inner) => return can_serialize(&inner.data_type), + Struct(inner) => return inner.iter().all(|inner| can_serialize(&inner.data_type)), _ => {} }; @@ -454,8 +484,12 @@ pub fn can_serialize(data_type: &DataType) -> bool { Boolean | Int32 | Int64 + | Float32 + | Float64 + | Decimal(_, _) | Utf8 | Binary + | FixedSizeBinary(_) | LargeUtf8 | LargeBinary | Interval(IntervalUnit::MonthDayNano) diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 8dbf1ba22d0..0d43c4a1cee 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -126,7 +126,6 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { } } State::OptionalDictionary(page_validity, page_values) => { - println!("optional_dict"); let dict_values = page_values.dict.values(); let dict_offsets = page_values.dict.offsets(); diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index 939c179732f..bef9dcb252a 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -236,45 +236,51 @@ fn read_without_codec() -> Result<()> { test(Codec::Null) } +#[cfg(feature = "io_avro_compression")] #[test] fn read_deflate() -> Result<()> { test(Codec::Deflate) } +#[cfg(feature = "io_avro_compression")] #[test] fn read_snappy() -> Result<()> { test(Codec::Snappy) } -fn test_projected(projection: Vec) -> Result<()> { - let avro = write_avro(Codec::Null).unwrap(); +#[test] +fn test_projected() -> Result<()> { let expected = data(); - let expected = expected - .into_arrays() - .into_iter() - .zip(projection.iter()) - .filter_map(|x| if *x.1 { Some(x.0) } else { None }) - .collect(); - let expected = Chunk::new(expected); let (_, expected_schema) = schema(); - let expected_fields = expected_schema - .fields - .into_iter() - .zip(projection.iter()) - .filter_map(|x| if *x.1 { Some(x.0) } else { None }) - .collect::>(); - let expected_schema = Schema::from(expected_fields); - let (result, schema) = read_avro(&avro, Some(projection))?; + let avro = write_avro(Codec::Null).unwrap(); - assert_eq!(schema, expected_schema); - assert_eq!(result, expected); - Ok(()) -} + for i in 0..expected_schema.fields.len() { + let mut projection = vec![false; expected_schema.fields.len()]; + projection[i] = true; -#[test] -fn read_projected() -> Result<()> { - test_projected(vec![ - true, false, false, false, false, false, false, false, false, false, false, false, - ]) + let expected = expected + .clone() + .into_arrays() + .into_iter() + .zip(projection.iter()) + .filter_map(|x| if *x.1 { Some(x.0) } else { None }) + .collect(); + let expected = Chunk::new(expected); + + let expected_fields = expected_schema + .clone() + .fields + .into_iter() + .zip(projection.iter()) + .filter_map(|x| if *x.1 { Some(x.0) } else { None }) + .collect::>(); + let expected_schema = Schema::from(expected_fields); + + let (result, schema) = read_avro(&avro, Some(projection))?; + + assert_eq!(schema, expected_schema); + assert_eq!(result, expected); + } + Ok(()) } diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs index 06e8d72f112..d4b7e410c13 100644 --- a/tests/it/io/avro/write.rs +++ b/tests/it/io/avro/write.rs @@ -19,6 +19,8 @@ pub(super) fn schema() -> Schema { Field::new("date nullable", DataType::Date32, true), Field::new("binary", DataType::Binary, false), Field::new("binary nullable", DataType::Binary, true), + Field::new("fs binary", DataType::FixedSizeBinary(3), false), + Field::new("fs binary nullable", DataType::FixedSizeBinary(3), true), Field::new("float32", DataType::Float32, false), Field::new("float32 nullable", DataType::Float32, true), Field::new("float64", DataType::Float64, false), @@ -63,6 +65,8 @@ pub(super) fn data() -> Chunk> { Box::new(Int32Array::from([Some(1), None]).to(DataType::Date32)), Box::new(BinaryArray::::from_slice([b"foo", b"bar"])), Box::new(BinaryArray::::from([Some(b"foo"), None])), + Box::new(FixedSizeBinaryArray::from_slice([[1, 2, 3], [1, 2, 3]])), + Box::new(FixedSizeBinaryArray::from([Some([1, 2, 3]), None])), Box::new(PrimitiveArray::::from_slice([1.0, 2.0])), Box::new(PrimitiveArray::::from([Some(1.0), None])), Box::new(PrimitiveArray::::from_slice([1.0, 2.0])), @@ -162,11 +166,13 @@ fn no_compression() -> Result<()> { roundtrip(None) } +#[cfg(feature = "io_avro_compression")] #[test] fn snappy() -> Result<()> { roundtrip(Some(write::Compression::Snappy)) } +#[cfg(feature = "io_avro_compression")] #[test] fn deflate() -> Result<()> { roundtrip(Some(write::Compression::Deflate))