Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added write of Struct
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 13, 2022
1 parent 8b504d9 commit 45dc02c
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/io/avro/read/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ impl DynMutableStructArray {

#[inline]
fn push_null(&mut self) {
self.values.iter_mut().for_each(|x| x.push_null());
match &mut self.validity {
Some(validity) => validity.push(false),
None => self.init_validity(),
Expand Down
8 changes: 2 additions & 6 deletions src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,15 @@ fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) ->
DataType::Union(fields, None, UnionMode::Dense)
}
}
AvroSchema::Record(Record { name, fields, .. }) => {
AvroSchema::Record(Record { fields, .. }) => {
let fields = fields
.iter()
.map(|field| {
let mut props = Metadata::new();
if let Some(doc) = &field.doc {
props.insert("avro::doc".to_string(), doc.clone());
}
schema_to_field(
&field.schema,
Some(&format!("{}.{}", name, field.name)),
props,
)
schema_to_field(&field.schema, Some(&field.name), props)
})
.collect::<Result<_>>()?;
DataType::Struct(fields)
Expand Down
9 changes: 8 additions & 1 deletion src/io/avro/write/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use avro_schema::{
BytesLogical, Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical,
BytesLogical, Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Record,
Schema as AvroSchema,
};

Expand Down Expand Up @@ -39,6 +39,13 @@ fn _type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
DataType::LargeList(inner) | DataType::List(inner) => AvroSchema::Array(Box::new(
type_to_schema(&inner.data_type, inner.is_nullable)?,
)),
DataType::Struct(fields) => AvroSchema::Record(Record::new(
"",
fields
.iter()
.map(|field| field_to_field(field))
.collect::<Result<Vec<_>>>()?,
)),
DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)),
DataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)),
DataType::Time64(TimeUnit::Microsecond) => AvroSchema::Long(Some(LongLogical::Time)),
Expand Down
63 changes: 62 additions & 1 deletion src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use avro_schema::Schema as AvroSchema;
use avro_schema::{Record, Schema as AvroSchema};

use crate::bitmap::utils::zip_validity;
use crate::datatypes::{IntervalUnit, PhysicalType, PrimitiveType};
Expand Down Expand Up @@ -116,6 +116,56 @@ fn list_optional<'a, O: Offset>(array: &'a ListArray<O>, schema: &AvroSchema) ->
))
}

fn struct_required<'a>(array: &'a StructArray, schema: &Record) -> BoxSerializer<'a> {
let schemas = schema.fields.iter().map(|x| &x.schema);
let mut inner = array
.values()
.iter()
.zip(schemas)
.map(|(x, schema)| new_serializer(x.as_ref(), schema))
.collect::<Vec<_>>();

Box::new(BufStreamingIterator::new(
0..array.len(),
move |_, buf| {
inner
.iter_mut()
.for_each(|item| buf.extend_from_slice(item.next().unwrap()))
},
vec![],
))
}

fn struct_optional<'a>(array: &'a StructArray, schema: &Record) -> BoxSerializer<'a> {
let schemas = schema.fields.iter().map(|x| &x.schema);
let mut inner = array
.values()
.iter()
.zip(schemas)
.map(|(x, schema)| new_serializer(x.as_ref(), schema))
.collect::<Vec<_>>();

let iterator = zip_validity(0..array.len(), array.validity().as_ref().map(|x| x.iter()));

Box::new(BufStreamingIterator::new(
iterator,
move |maybe, buf| {
util::zigzag_encode(maybe.is_some() as i64, buf).unwrap();
if let Some(_) = maybe {
inner
.iter_mut()
.for_each(|item| buf.extend_from_slice(item.next().unwrap()))
} else {
// skip the item
inner.iter_mut().for_each(|item| {
let _ = item.next().unwrap();
});
}
},
vec![],
))
}

/// Creates a [`StreamingIterator`] trait object that presents items from `array`
/// encoded according to `schema`.
/// # Panic
Expand Down Expand Up @@ -375,6 +425,17 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
};
list_optional::<i64>(array.as_any().downcast_ref().unwrap(), schema)
}
(PhysicalType::Struct, AvroSchema::Record(inner)) => {
struct_required(array.as_any().downcast_ref().unwrap(), inner)
}
(PhysicalType::Struct, AvroSchema::Union(inner)) => {
let inner = if let AvroSchema::Record(inner) = &inner[1] {
inner
} else {
unreachable!("The schema declaration does not match the deserialization")
};
struct_optional(array.as_any().downcast_ref().unwrap(), inner)
}
(a, b) => todo!("{:?} -> {:?} not supported", a, b),
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/it/io/avro/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub(super) fn schema() -> (AvroSchema, Schema) {
),
Field::new(
"i",
DataType::Struct(vec![Field::new("bla.e", DataType::Float64, false)]),
DataType::Struct(vec![Field::new("e", DataType::Float64, false)]),
false,
),
Field::new(
Expand Down Expand Up @@ -103,7 +103,7 @@ pub(super) fn data() -> Chunk<Arc<dyn Array>> {
Arc::new(Utf8Array::<i32>::from([Some("foo"), None])),
array.into_arc(),
Arc::new(StructArray::from_data(
DataType::Struct(vec![Field::new("bla.e", DataType::Float64, false)]),
DataType::Struct(vec![Field::new("e", DataType::Float64, false)]),
vec![Arc::new(PrimitiveArray::<f64>::from_slice([1.0, 2.0]))],
None,
)),
Expand Down
66 changes: 66 additions & 0 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,69 @@ fn check_large_format() -> Result<()> {

Ok(())
}

fn struct_schema() -> Schema {
Schema::from(vec![
/*Field::new(
"struct",
DataType::Struct(vec![
Field::new("item1", DataType::Int32, false),
Field::new("item2", DataType::Int32, true),
]),
false,
),*/
Field::new(
"struct nullable",
DataType::Struct(vec![
Field::new("item1", DataType::Int32, false),
Field::new("item2", DataType::Int32, true),
]),
true,
),
])
}

fn struct_data() -> Chunk<Box<dyn Array>> {
let struct_dt = DataType::Struct(vec![
Field::new("item1", DataType::Int32, false),
Field::new("item2", DataType::Int32, true),
]);

Chunk::new(vec![
/*Box::new(StructArray::new(
struct_dt.clone(),
vec![
Arc::new(PrimitiveArray::<i32>::from_slice([1, 2])),
Arc::new(PrimitiveArray::<i32>::from([None, Some(1)])),
],
None,
)),*/
Box::new(StructArray::new(
struct_dt,
vec![
Arc::new(PrimitiveArray::<i32>::from_slice([1, 2])),
Arc::new(PrimitiveArray::<i32>::from([None, Some(1)])),
],
Some([true, false].into()),
)),
])
}

#[test]
fn struct_() -> Result<()> {
let write_schema = struct_schema();
let write_data = struct_data();

let data = write_avro(&write_data, &write_schema, None)?;
let (result, read_schema) = read_avro(&data, None)?;

let expected_schema = struct_schema();
assert_eq!(read_schema, expected_schema);

let expected_data = struct_data();
for (c1, c2) in result.columns().iter().zip(expected_data.columns().iter()) {
assert_eq!(c1.as_ref(), c2.as_ref());
}

Ok(())
}

0 comments on commit 45dc02c

Please sign in to comment.