From c06a80e7a65e3c20d0dc829d2f2ebeb73f22b4aa Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 25 Apr 2022 06:31:40 +0000 Subject: [PATCH] Fixed struct stats --- src/io/parquet/read/statistics/mod.rs | 46 +++++++++++++++-- src/io/parquet/read/statistics/struct_.rs | 61 +++++++++++++++++++++++ tests/it/io/parquet/mod.rs | 25 +++++++++- 3 files changed, 126 insertions(+), 6 deletions(-) create mode 100644 src/io/parquet/read/statistics/struct_.rs diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index 4e6e41fbdf4..817490d67d1 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -23,6 +23,7 @@ mod dictionary; mod fixlen; mod list; mod primitive; +mod struct_; mod utf8; use self::list::DynMutableListArray; @@ -98,6 +99,10 @@ fn make_mutable(data_type: &DataType, capacity: usize) -> Result Box::new( dictionary::DynMutableDictionary::try_with_capacity(data_type.clone(), capacity)?, ), + PhysicalType::Struct => Box::new(struct_::DynMutableStructArray::try_with_capacity( + data_type.clone(), + capacity, + )?), other => { return Err(ArrowError::NotYetImplemented(format!( "Deserializing parquet stats from {:?} is still not implemented", @@ -188,11 +193,12 @@ fn push_others( } fn push( - mut stats: VecDeque<(Option>, ParquetPrimitiveType)>, + stats: &mut VecDeque<(Option>, ParquetPrimitiveType)>, min: &mut dyn MutableArray, max: &mut dyn MutableArray, distinct_count: &mut UInt64Vec, null_count: &mut UInt64Vec, + do_push_others: bool, ) -> Result<()> { match min.data_type().to_logical_type() { List(_) | LargeList(_) => { @@ -210,6 +216,7 @@ fn push( max.inner.as_mut(), distinct_count, null_count, + true, ); } Dictionary(_, _, _) => { @@ -227,15 +234,45 @@ fn push( max.inner.as_mut(), distinct_count, null_count, + true, ); } + Struct(_) => { + let min = min + .as_mut_any() + .downcast_mut::() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::() + .unwrap(); + let mut do_push_others = true; + return min + .inner + .iter_mut() + .zip(max.inner.iter_mut()) + .try_for_each(|(min, max)| { + push( + stats, + min.as_mut(), + max.as_mut(), + distinct_count, + null_count, + do_push_others, + )?; + do_push_others = false; + Ok(()) + }); + } _ => {} } let (from, type_) = stats.pop_front().unwrap(); let from = from.as_deref(); - push_others(from, distinct_count, null_count); + if do_push_others { + push_others(from, distinct_count, null_count); + } let physical_type = &type_.physical_type; @@ -302,7 +339,7 @@ pub fn deserialize_statistics(field: &Field, groups: &[RowGroupMetaData]) -> Res // transpose groups.iter().try_for_each(|group| { let columns = get_field_columns(group.columns(), field.name.as_ref()); - let stats = columns + let mut stats = columns .into_iter() .map(|column| { Ok(( @@ -312,11 +349,12 @@ pub fn deserialize_statistics(field: &Field, groups: &[RowGroupMetaData]) -> Res }) .collect::, ParquetPrimitiveType)>>>()?; push( - stats, + &mut stats, statistics.min_value.as_mut(), statistics.max_value.as_mut(), &mut statistics.distinct_count, &mut statistics.null_count, + true, ) })?; diff --git a/src/io/parquet/read/statistics/struct_.rs b/src/io/parquet/read/statistics/struct_.rs new file mode 100644 index 00000000000..085737d6241 --- /dev/null +++ b/src/io/parquet/read/statistics/struct_.rs @@ -0,0 +1,61 @@ +use crate::array::{Array, StructArray}; +use crate::error::Result; +use crate::{array::MutableArray, datatypes::DataType}; + +use super::make_mutable; + +#[derive(Debug)] +pub struct DynMutableStructArray { + data_type: DataType, + pub inner: Vec>, +} + +impl DynMutableStructArray { + pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result { + let inners = match data_type.to_logical_type() { + DataType::Struct(inner) => inner, + _ => unreachable!(), + }; + let inner = inners + .iter() + .map(|f| make_mutable(f.data_type(), capacity)) + .collect::>>()?; + + Ok(Self { data_type, inner }) + } +} +impl MutableArray for DynMutableStructArray { + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> { + None + } + + fn as_box(&mut self) -> Box { + let inner = self.inner.iter_mut().map(|x| x.as_arc()).collect(); + + Box::new(StructArray::new(self.data_type.clone(), inner, None)) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + + fn push_null(&mut self) { + todo!() + } + + fn shrink_to_fit(&mut self) { + todo!() + } +} diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 995ad2bf40c..97d1afec009 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -730,12 +730,33 @@ pub fn pyarrow_struct(column: &str) -> Box { } pub fn pyarrow_struct_statistics(column: &str) -> Statistics { + let new_struct = |arrays: Vec>, names: Vec| { + let fields = names + .into_iter() + .zip(arrays.iter()) + .map(|(n, a)| Field::new(n, a.data_type().clone(), true)) + .collect(); + Box::new(StructArray::new(DataType::Struct(fields), arrays, None)) + }; + match column { "struct" => Statistics { distinct_count: UInt64Array::from([None]), null_count: UInt64Array::from([Some(4)]), - min_value: Box::new(BooleanArray::from_slice([false])), - max_value: Box::new(BooleanArray::from_slice([true])), + min_value: new_struct( + vec![ + Arc::new(Utf8Array::::from_slice([""])), + Arc::new(BooleanArray::from_slice([false])), + ], + vec!["f1".to_string(), "f2".to_string()], + ), + max_value: new_struct( + vec![ + Arc::new(Utf8Array::::from_slice(["def"])), + Arc::new(BooleanArray::from_slice([true])), + ], + vec!["f1".to_string(), "f2".to_string()], + ), }, "struct_struct" => Statistics { distinct_count: UInt64Array::from([None]),