Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to write ListArray to Avro
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 13, 2022
1 parent e237557 commit 4ff0b01
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 5 deletions.
7 changes: 3 additions & 4 deletions src/io/avro/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ fn _type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
DataType::LargeBinary => AvroSchema::Bytes(None),
DataType::Utf8 => AvroSchema::String(None),
DataType::LargeUtf8 => AvroSchema::String(None),
DataType::List(inner) => AvroSchema::Array(Box::new(type_to_schema(
&inner.data_type,
inner.is_nullable,
)?)),
DataType::LargeList(inner) | DataType::List(inner) => AvroSchema::Array(Box::new(
type_to_schema(&inner.data_type, inner.is_nullable)?,
)),
DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)),
DataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)),
DataType::Time64(TimeUnit::Microsecond) => AvroSchema::Long(Some(LongLogical::Time)),
Expand Down
93 changes: 92 additions & 1 deletion src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use avro_schema::Schema as AvroSchema;

use crate::bitmap::utils::zip_validity;
use crate::datatypes::{IntervalUnit, PhysicalType, PrimitiveType};
use crate::types::months_days_ns;
use crate::{array::*, datatypes::DataType};
Expand Down Expand Up @@ -61,6 +62,60 @@ fn binary_optional<O: Offset>(array: &BinaryArray<O>) -> BoxSerializer {
))
}

fn list_required<'a, O: Offset>(array: &'a ListArray<O>, schema: &AvroSchema) -> BoxSerializer<'a> {
let mut inner = new_serializer(array.values().as_ref(), schema);
let lengths = array
.offsets()
.windows(2)
.map(|w| (w[1] - w[0]).to_usize() as i64);

Box::new(BufStreamingIterator::new(
lengths,
move |length, buf| {
util::zigzag_encode(length, buf).unwrap();
let mut rows = 0;
while let Some(item) = inner.next() {
buf.extend_from_slice(item);
rows += 1;
if rows == length {
util::zigzag_encode(0, buf).unwrap();
break;
}
}
},
vec![],
))
}

fn list_optional<'a, O: Offset>(array: &'a ListArray<O>, schema: &AvroSchema) -> BoxSerializer<'a> {
let mut inner = new_serializer(array.values().as_ref(), schema);
let lengths = array
.offsets()
.windows(2)
.map(|w| (w[1] - w[0]).to_usize() as i64);
let lengths = zip_validity(lengths, array.validity().as_ref().map(|x| x.iter()));

Box::new(BufStreamingIterator::new(
lengths,
move |length, buf| {
util::zigzag_encode(length.is_some() as i64, buf).unwrap();
if let Some(length) = length {
util::zigzag_encode(length, buf).unwrap();
let mut rows = 0;
while let Some(item) = inner.next() {
buf.extend_from_slice(item);
rows += 1;
if rows == length {
util::zigzag_encode(0, buf).unwrap();
break;
}
}
}
},
vec![],
))
}

/// Creates a [`StreamingIterator`] trait object that presents items from `array`
/// encoded according to `schema`.
/// # Panic
Expand Down Expand Up @@ -297,16 +352,52 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}

(PhysicalType::List, AvroSchema::Array(schema)) => {
list_required::<i32>(array.as_any().downcast_ref().unwrap(), schema.as_ref())
}
(PhysicalType::LargeList, AvroSchema::Array(schema)) => {
list_required::<i64>(array.as_any().downcast_ref().unwrap(), schema.as_ref())
}
(PhysicalType::List, AvroSchema::Union(inner)) => {
let schema = if let AvroSchema::Array(schema) = &inner[1] {
schema.as_ref()
} else {
unreachable!("The schema declaration does not match the deserialization")
};
list_optional::<i32>(array.as_any().downcast_ref().unwrap(), schema)
}
(PhysicalType::LargeList, AvroSchema::Union(inner)) => {
let schema = if let AvroSchema::Array(schema) = &inner[1] {
schema.as_ref()
} else {
unreachable!("The schema declaration does not match the deserialization")
};
list_optional::<i64>(array.as_any().downcast_ref().unwrap(), schema)
}
(a, b) => todo!("{:?} -> {:?} not supported", a, b),
}
}

/// Whether [`new_serializer`] supports `data_type`.
pub fn can_serialize(data_type: &DataType) -> bool {
use DataType::*;
match data_type.to_logical_type() {
List(inner) => return can_serialize(&inner.data_type),
LargeList(inner) => return can_serialize(&inner.data_type),
_ => {}
};

matches!(
data_type,
Boolean | Int32 | Int64 | Utf8 | Binary | Interval(IntervalUnit::MonthDayNano)
Boolean
| Int32
| Int64
| Utf8
| Binary
| LargeUtf8
| LargeBinary
| Interval(IntervalUnit::MonthDayNano)
)
}

Expand Down
33 changes: 33 additions & 0 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::*;
Expand Down Expand Up @@ -35,10 +37,23 @@ pub(super) fn schema() -> Schema {
DataType::Interval(IntervalUnit::MonthDayNano),
true,
),
Field::new(
"list",
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
false,
),
Field::new(
"list nullable",
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
),
])
}

pub(super) fn data() -> Chunk<Box<dyn Array>> {
let list_dt = DataType::List(Box::new(Field::new("item", DataType::Int32, true)));
let list_dt1 = DataType::List(Box::new(Field::new("item", DataType::Int32, true)));

let columns = vec![
Box::new(Int64Array::from_slice([27, 47])) as Box<dyn Array>,
Box::new(Int64Array::from([Some(27), None])),
Expand All @@ -64,6 +79,24 @@ pub(super) fn data() -> Chunk<Box<dyn Array>> {
Some(months_days_ns::new(1, 1, 10 * 1_000_000)), // 10 millis
None,
])),
Box::new(ListArray::<i32>::new(
list_dt,
vec![0, 2, 5].into(),
Arc::new(PrimitiveArray::<i32>::from([
None,
Some(1),
None,
Some(3),
Some(4),
])),
None,
)),
Box::new(ListArray::<i32>::new(
list_dt1,
vec![0, 2, 2].into(),
Arc::new(PrimitiveArray::<i32>::from([None, Some(1)])),
Some([true, false].into()),
)),
];

Chunk::new(columns)
Expand Down

0 comments on commit 4ff0b01

Please sign in to comment.