Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to deserialize MapArray from parquet #1045

Merged
merged 3 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,32 @@ def case_nested_edge():
)


def case_map() -> Tuple[dict, pa.Schema, str]:
s1 = ["a1", "a2"]
s2 = ["b1", "b2"]
schema = pa.schema(
[
pa.field(
"map",
pa.map_(pa.string(), pa.string()),
),
pa.field(
"map_nullable",
pa.map_(pa.string(), pa.string()),
),
]
)

return (
{
"map": pa.MapArray.from_arrays([0, 2], s1, s2),
"map_nullable": pa.MapArray.from_arrays([0, 2], s1, ["b1", None]),
},
schema,
f"map_required_10.parquet",
)


def write_pyarrow(
case,
page_version: int,
Expand Down Expand Up @@ -299,7 +325,7 @@ def write_pyarrow(
)


for case in [case_basic_nullable, case_basic_required, case_nested, case_struct, case_nested_edge]:
for case in [case_basic_nullable, case_basic_required, case_nested, case_struct, case_nested_edge, case_map]:
for version in [1, 2]:
for use_dict in [True, False]:
for compression in ["lz4", None, "snappy"]:
Expand Down
12 changes: 12 additions & 0 deletions src/array/equal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ impl PartialEq<&dyn Array> for UnionArray {
}
}

impl PartialEq<MapArray> for MapArray {
fn eq(&self, other: &Self) -> bool {
map::equal(self, other)
}
}

impl PartialEq<&dyn Array> for MapArray {
fn eq(&self, other: &&dyn Array) -> bool {
equal(self, *other)
}
}

/// Logically compares two [`Array`]s.
/// Two arrays are logically equal if and only if:
/// * their data types are equal
Expand Down
24 changes: 24 additions & 0 deletions src/array/map/fmt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::fmt::{Debug, Formatter, Result, Write};

use super::super::fmt::{get_display, write_vec};
use super::MapArray;

pub fn write_value<W: Write>(
array: &MapArray,
index: usize,
null: &'static str,
f: &mut W,
) -> Result {
let values = array.value(index);
let writer = |f: &mut W, index| get_display(values.as_ref(), null)(f, index);
write_vec(f, writer, None, values.len(), null, false)
}

impl Debug for MapArray {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
let writer = |f: &mut Formatter, index| write_value(self, index, "None", f);

write!(f, "MapArray")?;
write_vec(f, writer, self.validity.as_ref(), self.len(), "None", false)
}
}
3 changes: 2 additions & 1 deletion src/array/map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ use crate::{
use super::{new_empty_array, specification::try_check_offsets, Array};

mod ffi;
mod fmt;
mod iterator;
pub use iterator::*;

/// An array representing a (key, value), both of arbitrary logical types.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct MapArray {
data_type: DataType,
// invariant: field.len() == offsets.len() - 1
Expand Down
2 changes: 1 addition & 1 deletion src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl std::fmt::Debug for dyn Array + '_ {
fmt_dyn!(self, DictionaryArray::<$T>, f)
})
}
Map => todo!(),
Map => fmt_dyn!(self, MapArray, f),
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/ffi/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ unsafe fn to_data_type(schema: &ArrowSchema) -> Result<DataType> {
let size = size_raw
.parse::<usize>()
.map_err(|_| Error::OutOfSpec("size is not a valid integer".to_string()))?;
println!("schema: {}", size);
DataType::FixedSizeBinary(size)
}
["+w", size_raw] => {
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ pub(super) struct Required<'a> {

impl<'a> Required<'a> {
pub fn new(page: &'a DataPage) -> Self {
let values = SizedBinaryIter::new(page.buffer(), page.num_values());
let (_, _, values) = utils::split_buffer(page);
let values = SizedBinaryIter::new(values, page.num_values());

Self { values }
}
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/read/deserialize/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ struct FilteredRequired<'a> {

impl<'a> FilteredRequired<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, _, values) = utils::split_buffer(page);
// todo: replace this by an iterator over slices, for faster deserialization
let values = BitmapIter::new(page.buffer(), 0, page.num_values());
let values = BitmapIter::new(values, 0, page.num_values());

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ struct Required<'a> {

impl<'a> Required<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, _, values) = utils::split_buffer(page);
Self {
values: page.buffer(),
values,
offset: 0,
length: page.num_values(),
}
Expand Down
32 changes: 29 additions & 3 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ mod struct_;
mod utils;

use crate::{
array::{Array, BinaryArray, FixedSizeListArray, ListArray, Utf8Array},
array::{Array, BinaryArray, FixedSizeListArray, ListArray, MapArray, Utf8Array},
datatypes::{DataType, Field},
error::Result,
error::{Error, Result},
};

use self::nested_utils::{InitNested, NestedArrayIter, NestedState};
Expand Down Expand Up @@ -300,7 +300,33 @@ where
let columns = columns.into_iter().rev().collect();
Box::new(struct_::StructIterator::new(columns, fields.clone()))
}
other => todo!("{other:?}"),
Map(inner, _) => {
println!("{:?}", init);
init.push(InitNested::List(field.is_nullable));
let iter = columns_to_iter_recursive(
columns,
types,
inner.as_ref().clone(),
init,
chunk_size,
)?;
Box::new(iter.map(move |x| {
let (nested, inner) = x?;
println!("{:?}", inner);
let array = MapArray::new(
field.data_type().clone(),
vec![0, inner.len() as i32].into(),
inner,
None,
);
Ok((nested, array.arced()))
}))
}
other => {
return Err(Error::nyi(format!(
"Deserializing type {other:?} from parquet"
)))
}
})
}

Expand Down
11 changes: 11 additions & 0 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ fn non_repeated_group(
match (logical_type, converted_type) {
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name),
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name),
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name),
(None, Some(GroupConvertedType::Map) | Some(GroupConvertedType::MapKeyValue)) => {
to_map(fields)
}
_ => to_struct(fields),
}
}
Expand All @@ -234,6 +238,13 @@ fn to_struct(fields: &[ParquetType]) -> Option<DataType> {
}
}

/// Converts a parquet group type to an arrow [`DataType::Struct`].
/// Returns [`None`] if all its fields are empty
fn to_map(fields: &[ParquetType]) -> Option<DataType> {
let inner = to_field(&fields[0])?;
Some(DataType::Map(Box::new(inner), false))
}

/// Entry point for converting parquet group type.
///
/// This function takes care of logical type and repetition.
Expand Down
64 changes: 64 additions & 0 deletions src/io/parquet/read/statistics/map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::{
array::{Array, MapArray, MutableArray},
datatypes::DataType,
error::Error,
};

use super::make_mutable;

#[derive(Debug)]
pub struct DynMutableMapArray {
data_type: DataType,
pub inner: Box<dyn MutableArray>,
}

impl DynMutableMapArray {
pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result<Self, Error> {
let inner = match data_type.to_logical_type() {
DataType::Map(inner, _) => inner,
_ => unreachable!(),
};
let inner = make_mutable(inner.data_type(), capacity)?;

Ok(Self { data_type, inner })
}
}

impl MutableArray for DynMutableMapArray {
fn data_type(&self) -> &DataType {
&self.data_type
}

fn len(&self) -> usize {
self.inner.len()
}

fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> {
None
}

fn as_box(&mut self) -> Box<dyn Array> {
Box::new(MapArray::new(
self.data_type.clone(),
vec![0, self.inner.len() as i32].into(),
self.inner.as_arc(),
None,
))
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

fn push_null(&mut self) {
todo!()
}

fn shrink_to_fit(&mut self) {
todo!()
}
}
55 changes: 55 additions & 0 deletions src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod boolean;
mod dictionary;
mod fixlen;
mod list;
mod map;
mod primitive;
mod struct_;
mod utf8;
Expand All @@ -37,6 +38,8 @@ pub enum Count {
Single(UInt64Array),
/// struct arrays have a count as a struct of UInt64
Struct(StructArray),
/// map arrays have a count as a map of UInt64
Map(MapArray),
}

/// Arrow-deserialized parquet Statistics of a file
Expand Down Expand Up @@ -76,6 +79,15 @@ impl From<MutableStatistics> for Statistics {
.unwrap()
.clone();
Count::Struct(a)
} else if let PhysicalType::Map = s.null_count.data_type().to_physical_type() {
let a = s
.null_count
.as_box()
.as_any()
.downcast_ref::<MapArray>()
.unwrap()
.clone();
Count::Map(a)
} else {
let a = s
.null_count
Expand All @@ -96,6 +108,15 @@ impl From<MutableStatistics> for Statistics {
.unwrap()
.clone();
Count::Struct(a)
} else if let PhysicalType::Map = s.null_count.data_type().to_physical_type() {
let a = s
.null_count
.as_box()
.as_any()
.downcast_ref::<MapArray>()
.unwrap()
.clone();
Count::Map(a)
} else {
let a = s
.distinct_count
Expand Down Expand Up @@ -151,6 +172,10 @@ fn make_mutable(data_type: &DataType, capacity: usize) -> Result<Box<dyn Mutable
data_type.clone(),
capacity,
)?),
PhysicalType::Map => Box::new(map::DynMutableMapArray::try_with_capacity(
data_type.clone(),
capacity,
)?),
other => {
return Err(Error::NotYetImplemented(format!(
"Deserializing parquet stats from {:?} is still not implemented",
Expand All @@ -168,6 +193,11 @@ fn create_dt(data_type: &DataType) -> DataType {
.map(|f| Field::new(&f.name, create_dt(&f.data_type), f.is_nullable))
.collect(),
)
} else if let DataType::Map(f, ordered) = data_type.to_logical_type() {
DataType::Map(
Box::new(Field::new(&f.name, create_dt(&f.data_type), f.is_nullable)),
*ordered,
)
} else {
DataType::UInt64
}
Expand Down Expand Up @@ -329,6 +359,31 @@ fn push(
)
});
}
Map(_, _) => {
let min = min
.as_mut_any()
.downcast_mut::<map::DynMutableMapArray>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<map::DynMutableMapArray>()
.unwrap();
let distinct_count = distinct_count
.as_mut_any()
.downcast_mut::<map::DynMutableMapArray>()
.unwrap();
let null_count = null_count
.as_mut_any()
.downcast_mut::<map::DynMutableMapArray>()
.unwrap();
return push(
stats,
min.inner.as_mut(),
max.inner.as_mut(),
distinct_count.inner.as_mut(),
null_count.inner.as_mut(),
);
}
_ => {}
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/statistics/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl MutableArray for DynMutableStructArray {
}

fn len(&self) -> usize {
self.inner.len()
self.inner[0].len()
}

fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> {
Expand Down
Loading