Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed struct stats
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 25, 2022
1 parent 218a031 commit c06a80e
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 6 deletions.
46 changes: 42 additions & 4 deletions src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod dictionary;
mod fixlen;
mod list;
mod primitive;
mod struct_;
mod utf8;

use self::list::DynMutableListArray;
Expand Down Expand Up @@ -98,6 +99,10 @@ fn make_mutable(data_type: &DataType, capacity: usize) -> Result<Box<dyn Mutable
PhysicalType::Dictionary(_) => Box::new(
dictionary::DynMutableDictionary::try_with_capacity(data_type.clone(), capacity)?,
),
PhysicalType::Struct => Box::new(struct_::DynMutableStructArray::try_with_capacity(
data_type.clone(),
capacity,
)?),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Deserializing parquet stats from {:?} is still not implemented",
Expand Down Expand Up @@ -188,11 +193,12 @@ fn push_others(
}

fn push(
mut stats: VecDeque<(Option<Arc<dyn ParquetStatistics>>, ParquetPrimitiveType)>,
stats: &mut VecDeque<(Option<Arc<dyn ParquetStatistics>>, ParquetPrimitiveType)>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
distinct_count: &mut UInt64Vec,
null_count: &mut UInt64Vec,
do_push_others: bool,
) -> Result<()> {
match min.data_type().to_logical_type() {
List(_) | LargeList(_) => {
Expand All @@ -210,6 +216,7 @@ fn push(
max.inner.as_mut(),
distinct_count,
null_count,
true,
);
}
Dictionary(_, _, _) => {
Expand All @@ -227,15 +234,45 @@ fn push(
max.inner.as_mut(),
distinct_count,
null_count,
true,
);
}
Struct(_) => {
let min = min
.as_mut_any()
.downcast_mut::<struct_::DynMutableStructArray>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<struct_::DynMutableStructArray>()
.unwrap();
let mut do_push_others = true;
return min
.inner
.iter_mut()
.zip(max.inner.iter_mut())
.try_for_each(|(min, max)| {
push(
stats,
min.as_mut(),
max.as_mut(),
distinct_count,
null_count,
do_push_others,
)?;
do_push_others = false;
Ok(())
});
}
_ => {}
}

let (from, type_) = stats.pop_front().unwrap();
let from = from.as_deref();

push_others(from, distinct_count, null_count);
if do_push_others {
push_others(from, distinct_count, null_count);
}

let physical_type = &type_.physical_type;

Expand Down Expand Up @@ -302,7 +339,7 @@ pub fn deserialize_statistics(field: &Field, groups: &[RowGroupMetaData]) -> Res
// transpose
groups.iter().try_for_each(|group| {
let columns = get_field_columns(group.columns(), field.name.as_ref());
let stats = columns
let mut stats = columns
.into_iter()
.map(|column| {
Ok((
Expand All @@ -312,11 +349,12 @@ pub fn deserialize_statistics(field: &Field, groups: &[RowGroupMetaData]) -> Res
})
.collect::<Result<VecDeque<(Option<_>, ParquetPrimitiveType)>>>()?;
push(
stats,
&mut stats,
statistics.min_value.as_mut(),
statistics.max_value.as_mut(),
&mut statistics.distinct_count,
&mut statistics.null_count,
true,
)
})?;

Expand Down
61 changes: 61 additions & 0 deletions src/io/parquet/read/statistics/struct_.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::array::{Array, StructArray};
use crate::error::Result;
use crate::{array::MutableArray, datatypes::DataType};

use super::make_mutable;

#[derive(Debug)]
pub struct DynMutableStructArray {
data_type: DataType,
pub inner: Vec<Box<dyn MutableArray>>,
}

impl DynMutableStructArray {
pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result<Self> {
let inners = match data_type.to_logical_type() {
DataType::Struct(inner) => inner,
_ => unreachable!(),
};
let inner = inners
.iter()
.map(|f| make_mutable(f.data_type(), capacity))
.collect::<Result<Vec<_>>>()?;

Ok(Self { data_type, inner })
}
}
impl MutableArray for DynMutableStructArray {
fn data_type(&self) -> &DataType {
&self.data_type
}

fn len(&self) -> usize {
self.inner.len()
}

fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> {
None
}

fn as_box(&mut self) -> Box<dyn Array> {
let inner = self.inner.iter_mut().map(|x| x.as_arc()).collect();

Box::new(StructArray::new(self.data_type.clone(), inner, None))
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

fn push_null(&mut self) {
todo!()
}

fn shrink_to_fit(&mut self) {
todo!()
}
}
25 changes: 23 additions & 2 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,12 +730,33 @@ pub fn pyarrow_struct(column: &str) -> Box<dyn Array> {
}

pub fn pyarrow_struct_statistics(column: &str) -> Statistics {
let new_struct = |arrays: Vec<Arc<dyn Array>>, names: Vec<String>| {
let fields = names
.into_iter()
.zip(arrays.iter())
.map(|(n, a)| Field::new(n, a.data_type().clone(), true))
.collect();
Box::new(StructArray::new(DataType::Struct(fields), arrays, None))
};

match column {
"struct" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: UInt64Array::from([Some(4)]),
min_value: Box::new(BooleanArray::from_slice([false])),
max_value: Box::new(BooleanArray::from_slice([true])),
min_value: new_struct(
vec![
Arc::new(Utf8Array::<i32>::from_slice([""])),
Arc::new(BooleanArray::from_slice([false])),
],
vec!["f1".to_string(), "f2".to_string()],
),
max_value: new_struct(
vec![
Arc::new(Utf8Array::<i32>::from_slice(["def"])),
Arc::new(BooleanArray::from_slice([true])),
],
vec!["f1".to_string(), "f2".to_string()],
),
},
"struct_struct" => Statistics {
distinct_count: UInt64Array::from([None]),
Expand Down

0 comments on commit c06a80e

Please sign in to comment.