From 5b9a17f93c305839f2646a1edfca5583028623ae Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 6 Jan 2022 20:00:16 +0000 Subject: [PATCH] Simplified avro and added tests --- src/io/avro/read/schema.rs | 35 ----------------- src/io/avro/write/serialize.rs | 32 +++++++++++++++ tests/it/io/avro/write.rs | 71 +++++++++++++++++++++++----------- 3 files changed, 81 insertions(+), 57 deletions(-) diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index fd125feb9c7..c9a19823841 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -3,20 +3,6 @@ use avro_schema::{Enum, Fixed, Record, Schema as AvroSchema}; use crate::datatypes::*; use crate::error::{ArrowError, Result}; -/// Returns the fully qualified name for a field -fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>) -> String { - if name.contains('.') { - name.to_string() - } else { - let namespace = namespace.as_ref().copied().or(default_namespace); - - match namespace { - Some(ref namespace) => format!("{}.{}", namespace, name), - None => name.to_string(), - } - } -} - fn external_props(schema: &AvroSchema) -> Metadata { let mut props = Metadata::new(); match &schema { @@ -30,27 +16,6 @@ fn external_props(schema: &AvroSchema) -> Metadata { } _ => {} } - match &schema { - AvroSchema::Record(Record { - aliases, namespace, .. - }) - | AvroSchema::Enum(Enum { - aliases, namespace, .. - }) - | AvroSchema::Fixed(Fixed { - aliases, namespace, .. - }) => { - let aliases: Vec = aliases - .iter() - .map(|alias| aliased(alias, namespace.as_deref(), None)) - .collect(); - props.insert( - "avro::aliases".to_string(), - format!("[{}]", aliases.join(",")), - ); - } - _ => {} - } props } diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs index fc01d040ae6..e2c0953ee6c 100644 --- a/src/io/avro/write/serialize.rs +++ b/src/io/avro/write/serialize.rs @@ -154,6 +154,22 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } + (PhysicalType::Primitive(PrimitiveType::Float32), AvroSchema::Union(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + buf.extend(x.to_le_bytes()) + } + }, + vec![], + )) + } (PhysicalType::Primitive(PrimitiveType::Float32), AvroSchema::Float) => { let values = array .as_any() @@ -167,6 +183,22 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } + (PhysicalType::Primitive(PrimitiveType::Float64), AvroSchema::Union(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + buf.extend(x.to_le_bytes()) + } + }, + vec![], + )) + } (PhysicalType::Primitive(PrimitiveType::Float64), AvroSchema::Double) => { let values = array .as_any() diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs index 5321f72d207..b605dc245d3 100644 --- a/tests/it/io/avro/write.rs +++ b/tests/it/io/avro/write.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use arrow2::array::*; use arrow2::chunk::Chunk; use arrow2::datatypes::*; @@ -11,35 +9,64 @@ use super::read::read_avro; pub(super) fn schema() -> Schema { Schema::from(vec![ - Field::new("a", DataType::Int64, false), - Field::new("b", DataType::Utf8, false), - Field::new("c", DataType::Int32, false), + Field::new("int64", DataType::Int64, false), + Field::new("int64 nullable", DataType::Int64, true), + Field::new("utf8", DataType::Utf8, false), + Field::new("utf8 nullable", DataType::Utf8, true), + Field::new("int32", DataType::Int32, false), + Field::new("int32 nullable", DataType::Int32, true), Field::new("date", DataType::Date32, false), - Field::new("d", DataType::Binary, false), - Field::new("e", DataType::Float64, false), - Field::new("f", DataType::Boolean, false), - Field::new("g", DataType::Utf8, true), - Field::new("h", DataType::Interval(IntervalUnit::MonthDayNano), true), + Field::new("date nullable", DataType::Date32, true), + Field::new("binary", DataType::Binary, false), + Field::new("binary nullable", DataType::Binary, true), + Field::new("float32", DataType::Float32, false), + Field::new("float32 nullable", DataType::Float32, true), + Field::new("float64", DataType::Float64, false), + Field::new("float64 nullable", DataType::Float64, true), + Field::new("boolean", DataType::Boolean, false), + Field::new("boolean nullable", DataType::Boolean, true), + Field::new( + "interval", + DataType::Interval(IntervalUnit::MonthDayNano), + false, + ), + Field::new( + "interval nullable", + DataType::Interval(IntervalUnit::MonthDayNano), + true, + ), ]) } -pub(super) fn data() -> Chunk> { +pub(super) fn data() -> Chunk> { let columns = vec![ - Arc::new(Int64Array::from_slice([27, 47])) as Arc, - Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, - Arc::new(Int32Array::from_slice([1, 1])) as Arc, - Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc, - Arc::new(BinaryArray::::from_slice([b"foo", b"bar"])) as Arc, - Arc::new(PrimitiveArray::::from_slice([1.0, 2.0])) as Arc, - Arc::new(BooleanArray::from_slice([true, false])) as Arc, - Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, - Arc::new(PrimitiveArray::::from([ + Box::new(Int64Array::from_slice([27, 47])) as Box, + Box::new(Int64Array::from([Some(27), None])), + Box::new(Utf8Array::::from_slice(["foo", "bar"])), + Box::new(Utf8Array::::from([Some("foo"), None])), + Box::new(Int32Array::from_slice([1, 1])), + Box::new(Int32Array::from([Some(1), None])), + Box::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)), + Box::new(Int32Array::from([Some(1), None]).to(DataType::Date32)), + Box::new(BinaryArray::::from_slice([b"foo", b"bar"])), + Box::new(BinaryArray::::from([Some(b"foo"), None])), + Box::new(PrimitiveArray::::from_slice([1.0, 2.0])), + Box::new(PrimitiveArray::::from([Some(1.0), None])), + Box::new(PrimitiveArray::::from_slice([1.0, 2.0])), + Box::new(PrimitiveArray::::from([Some(1.0), None])), + Box::new(BooleanArray::from_slice([true, false])), + Box::new(BooleanArray::from([Some(true), None])), + Box::new(PrimitiveArray::::from_slice([ + months_days_ns::new(1, 1, 10 * 1_000_000), // 10 millis + months_days_ns::new(2, 2, 20 * 1_000_000), // 20 millis + ])), + Box::new(PrimitiveArray::::from([ Some(months_days_ns::new(1, 1, 10 * 1_000_000)), // 10 millis None, - ])) as Arc, + ])), ]; - Chunk::try_new(columns).unwrap() + Chunk::new(columns) } pub(super) fn serialize_to_block>(