Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for MapArray read and write to parquet #1419

Merged
merged 18 commits into from
Feb 28, 2023
29 changes: 28 additions & 1 deletion src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use parquet2::read::get_page_iterator as _get_page_iterator;
use parquet2::schema::types::PrimitiveType;

use crate::{
array::{Array, DictionaryKey, FixedSizeListArray, ListArray},
array::{Array, DictionaryKey, FixedSizeListArray, ListArray, MapArray},
datatypes::{DataType, Field, IntervalUnit},
error::Result,
offset::Offsets,
Expand Down Expand Up @@ -87,6 +87,33 @@ pub fn create_list(
}
}

/// Creates a new [`MapArray`].
pub fn create_map(
data_type: DataType,
nested: &mut NestedState,
values: Box<dyn Array>,
) -> Box<dyn Array> {
let (mut offsets, validity) = nested.nested.pop().unwrap().inner();
match data_type.to_logical_type() {
DataType::Map(_, _) => {
offsets.push(values.len() as i64);
let offsets = offsets.iter().map(|x| *x as i32).collect::<Vec<_>>();

let offsets: Offsets<i32> = offsets
.try_into()
.expect("i64 offsets do not fit in i32 offsets");

Box::new(MapArray::new(
data_type,
offsets.into(),
values,
validity.and_then(|x| x.into()),
))
}
_ => unreachable!(),
}
}

fn is_primitive(data_type: &DataType) -> bool {
matches!(
data_type.to_physical_type(),
Expand Down
17 changes: 6 additions & 11 deletions src/io/parquet/read/deserialize/nested.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use parquet2::schema::types::PrimitiveType;

use crate::{
array::MapArray,
datatypes::{DataType, Field},
error::{Error, Result},
};
Expand Down Expand Up @@ -283,16 +282,12 @@ where
num_rows,
chunk_size,
)?;
Box::new(iter.map(move |x| {
let (nested, inner) = x?;
let array = MapArray::new(
field.data_type().clone(),
vec![0, inner.len() as i32].try_into().unwrap(),
inner,
None,
);
Ok((nested, array.boxed()))
}))
let iter = iter.map(move |x| {
let (mut nested, array) = x?;
let array = create_map(field.data_type().clone(), &mut nested, array);
Ok((nested, array))
});
Box::new(iter) as _
}
other => {
return Err(Error::nyi(format!(
Expand Down
14 changes: 13 additions & 1 deletion src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,20 @@ fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
unreachable!()
}
}
Map => {
if let DataType::Map(field, _) = data_type.to_logical_type() {
if let DataType::Struct(fields) = field.data_type.to_logical_type() {
for field in fields {
transverse_recursive(&field.data_type, map.clone(), encodings)
}
} else {
unreachable!()
}
} else {
unreachable!()
}
}
Union => todo!(),
Map => todo!(),
}
}

Expand Down
127 changes: 126 additions & 1 deletion src/io/parquet/write/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType
use parquet2::{page::Page, write::DynIter};
use std::fmt::Debug;

use crate::array::{ListArray, StructArray};
use crate::array::{ListArray, MapArray, StructArray};
use crate::bitmap::Bitmap;
use crate::datatypes::PhysicalType;
use crate::io::parquet::read::schema::is_nullable;
Expand Down Expand Up @@ -141,6 +141,29 @@ fn to_nested_recursive(
)));
to_nested_recursive(array.values().as_ref(), type_, nested, parents)?;
}
Map => {
let array = array.as_any().downcast_ref::<MapArray>().unwrap();
let type_ = if let ParquetType::GroupType { fields, .. } = type_ {
if let ParquetType::GroupType { fields, .. } = &fields[0] {
&fields[0]
} else {
return Err(Error::InvalidArgumentError(
"Parquet type must be a group for a map array".to_string(),
));
}
} else {
return Err(Error::InvalidArgumentError(
"Parquet type must be a group for a map array".to_string(),
));
};

parents.push(Nested::List(ListNested::new(
array.offsets().clone(),
array.validity().cloned(),
is_optional,
)));
to_nested_recursive(array.field().as_ref(), type_, nested, parents)?;
}
_ => {
parents.push(Nested::Primitive(
array.validity().cloned(),
Expand Down Expand Up @@ -178,6 +201,10 @@ fn to_leaves_recursive<'a>(array: &'a dyn Array, leaves: &mut Vec<&'a dyn Array>
let array = array.as_any().downcast_ref::<ListArray<i64>>().unwrap();
to_leaves_recursive(array.values().as_ref(), leaves);
}
Map => {
let array = array.as_any().downcast_ref::<MapArray>().unwrap();
to_leaves_recursive(array.field().as_ref(), leaves);
}
Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
| LargeUtf8 | Dictionary(_) => leaves.push(array),
other => todo!("Writing {:?} to parquet not yet implemented", other),
Expand Down Expand Up @@ -231,6 +258,7 @@ pub fn array_to_columns<A: AsRef<dyn Array> + Send + Sync>(

#[cfg(test)]
mod tests {
use parquet2::schema::types::{GroupLogicalType, PrimitiveConvertedType, PrimitiveLogicalType};
use parquet2::schema::Repetition;

use super::*;
Expand Down Expand Up @@ -509,4 +537,101 @@ mod tests {
]
);
}

#[test]
fn test_map() {
let kv_type = DataType::Struct(vec![
Field::new("k", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]);
let kv_field = Field::new("kv", kv_type.clone(), false);
let map_type = DataType::Map(Box::new(kv_field), false);

let key_array = Utf8Array::<i32>::from_slice(["k1", "k2", "k3", "k4", "k5", "k6"]).boxed();
let val_array = Int32Array::from_slice([42, 28, 19, 31, 21, 17]).boxed();
let kv_array = StructArray::try_new(kv_type, vec![key_array, val_array], None)
.unwrap()
.boxed();
let offsets = OffsetsBuffer::try_from(vec![0, 2, 3, 4, 6]).unwrap();

let array = MapArray::try_new(map_type, offsets, kv_array, None).unwrap();

let type_ = ParquetType::GroupType {
field_info: FieldInfo {
name: "kv".to_string(),
repetition: Repetition::Optional,
id: None,
},
logical_type: None,
converted_type: None,
fields: vec![
ParquetType::PrimitiveType(ParquetPrimitiveType {
field_info: FieldInfo {
name: "k".to_string(),
repetition: Repetition::Required,
id: None,
},
logical_type: Some(PrimitiveLogicalType::String),
converted_type: Some(PrimitiveConvertedType::Utf8),
physical_type: ParquetPhysicalType::ByteArray,
}),
ParquetType::PrimitiveType(ParquetPrimitiveType {
field_info: FieldInfo {
name: "v".to_string(),
repetition: Repetition::Required,
id: None,
},
logical_type: None,
converted_type: None,
physical_type: ParquetPhysicalType::Int32,
}),
],
};

let type_ = ParquetType::GroupType {
field_info: FieldInfo {
name: "m".to_string(),
repetition: Repetition::Required,
id: None,
},
logical_type: Some(GroupLogicalType::Map),
converted_type: None,
fields: vec![ParquetType::GroupType {
field_info: FieldInfo {
name: "map".to_string(),
repetition: Repetition::Repeated,
id: None,
},
logical_type: None,
converted_type: None,
fields: vec![type_],
}],
};

let a = to_nested(&array, &type_).unwrap();

assert_eq!(
a,
vec![
vec![
Nested::List(ListNested::<i32> {
is_optional: false,
offsets: vec![0, 2, 3, 4, 6].try_into().unwrap(),
validity: None,
}),
Nested::Struct(None, true, 6),
Nested::Primitive(None, false, 6),
],
vec![
Nested::List(ListNested::<i32> {
is_optional: false,
offsets: vec![0, 2, 3, 4, 6].try_into().unwrap(),
validity: None,
}),
Nested::Struct(None, true, 6),
Nested::Primitive(None, false, 6),
],
]
);
}
}
15 changes: 15 additions & 0 deletions src/io/parquet/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,21 @@ pub fn to_parquet_type(field: &Field) -> Result<ParquetType> {
None,
))
}
DataType::Map(f, _) => Ok(ParquetType::from_group(
name,
repetition,
None,
Some(GroupLogicalType::Map),
vec![ParquetType::from_group(
"map".to_string(),
Repetition::Repeated,
None,
None,
vec![to_parquet_type(f)?],
None,
)],
None,
)),
other => Err(Error::NotYetImplemented(format!(
"Writing the data type {other:?} is not yet implemented"
))),
Expand Down