From d8717fcefc7f9ae012323912088da58bc40c63af Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Tue, 8 Feb 2022 19:49:47 +0100 Subject: [PATCH] Added support for large* write to Avro (#825) --- src/io/avro/write/serialize.rs | 108 ++++++++++++++++++++------------- 1 file changed, 66 insertions(+), 42 deletions(-) diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs index e2c0953ee6c..02659a3018b 100644 --- a/src/io/avro/write/serialize.rs +++ b/src/io/avro/write/serialize.rs @@ -11,6 +11,56 @@ use super::util; /// (i.e. a column -> row transposition of types known at run-time) pub type BoxSerializer<'a> = Box + 'a + Send + Sync>; +fn utf8_required(array: &Utf8Array) -> BoxSerializer { + Box::new(BufStreamingIterator::new( + array.values_iter(), + |x, buf| { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x.as_bytes()); + }, + vec![], + )) +} + +fn utf8_optional(array: &Utf8Array) -> BoxSerializer { + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x.as_bytes()); + } + }, + vec![], + )) +} + +fn binary_required(array: &BinaryArray) -> BoxSerializer { + Box::new(BufStreamingIterator::new( + array.values_iter(), + |x, buf| { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x); + }, + vec![], + )) +} + +fn binary_optional(array: &BinaryArray) -> BoxSerializer { + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x); + } + }, + vec![], + )) +} + /// Creates a [`StreamingIterator`] trait object that presents items from `array` /// encoded according to `schema`. /// # Panic @@ -46,54 +96,28 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria )) } (PhysicalType::Utf8, AvroSchema::Union(_)) => { - let values = array.as_any().downcast_ref::>().unwrap(); - Box::new(BufStreamingIterator::new( - values.iter(), - |x, buf| { - util::zigzag_encode(x.is_some() as i64, buf).unwrap(); - if let Some(x) = x { - util::zigzag_encode(x.len() as i64, buf).unwrap(); - buf.extend_from_slice(x.as_bytes()); - } - }, - vec![], - )) + utf8_optional::(array.as_any().downcast_ref().unwrap()) + } + (PhysicalType::LargeUtf8, AvroSchema::Union(_)) => { + utf8_optional::(array.as_any().downcast_ref().unwrap()) } (PhysicalType::Utf8, AvroSchema::String(_)) => { - let values = array.as_any().downcast_ref::>().unwrap(); - Box::new(BufStreamingIterator::new( - values.values_iter(), - |x, buf| { - util::zigzag_encode(x.len() as i64, buf).unwrap(); - buf.extend_from_slice(x.as_bytes()); - }, - vec![], - )) + utf8_required::(array.as_any().downcast_ref().unwrap()) + } + (PhysicalType::LargeUtf8, AvroSchema::String(_)) => { + utf8_required::(array.as_any().downcast_ref().unwrap()) } (PhysicalType::Binary, AvroSchema::Union(_)) => { - let values = array.as_any().downcast_ref::>().unwrap(); - Box::new(BufStreamingIterator::new( - values.iter(), - |x, buf| { - util::zigzag_encode(x.is_some() as i64, buf).unwrap(); - if let Some(x) = x { - util::zigzag_encode(x.len() as i64, buf).unwrap(); - buf.extend_from_slice(x); - } - }, - vec![], - )) + binary_optional::(array.as_any().downcast_ref().unwrap()) + } + (PhysicalType::LargeBinary, AvroSchema::Union(_)) => { + binary_optional::(array.as_any().downcast_ref().unwrap()) } (PhysicalType::Binary, AvroSchema::Bytes(_)) => { - let values = array.as_any().downcast_ref::>().unwrap(); - Box::new(BufStreamingIterator::new( - values.values_iter(), - |x, buf| { - util::zigzag_encode(x.len() as i64, buf).unwrap(); - buf.extend_from_slice(x); - }, - vec![], - )) + binary_required::(array.as_any().downcast_ref().unwrap()) + } + (PhysicalType::LargeBinary, AvroSchema::Bytes(_)) => { + binary_required::(array.as_any().downcast_ref().unwrap()) } (PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Union(_)) => {