diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 7a36f2bbef1..29c062d4a49 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -2,7 +2,7 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, metadata::Descriptor, page::{EncodedDictPage, EncodedPage}, - statistics::ParquetStatistics, + statistics::{serialize_statistics, ParquetStatistics}, write::DynIter, }; @@ -120,10 +120,10 @@ macro_rules! dyn_prim { let mut buffer = vec![]; primitive_encode_plain::<$from, $to>(values, false, &mut buffer); - ( - EncodedDictPage::new(buffer, values.len()), - primitive_build_statistics::<$from, $to>(values, $descriptor.primitive_type.clone()), - ) + let stats = + primitive_build_statistics::<$from, $to>(values, $descriptor.primitive_type.clone()); + let stats = serialize_statistics(&stats); + (EncodedDictPage::new(buffer, values.len()), stats) }}; } @@ -191,6 +191,7 @@ pub fn array_to_pages( fixed_binary_encode_plain(array, false, &mut buffer); let stats = fixed_binary_build_statistics(array, descriptor.primitive_type.clone()); + let stats = serialize_statistics(&stats); (EncodedDictPage::new(buffer, array.len()), stats) } other => { diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index 59ae75134e8..a2494f82e38 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -3,12 +3,12 @@ use parquet2::{ metadata::Descriptor, page::DataPage, schema::types::PrimitiveType, - statistics::{serialize_statistics, FixedLenStatistics, ParquetStatistics, Statistics}, + statistics::{serialize_statistics, FixedLenStatistics}, }; use super::{binary::ord_binary, utils, WriteOptions}; use crate::{ - array::{Array, FixedSizeBinaryArray}, + array::{Array, FixedSizeBinaryArray, PrimitiveArray}, error::Result, io::parquet::read::schema::is_nullable, }; @@ -30,6 +30,7 @@ pub fn array_to_page( array: &FixedSizeBinaryArray, options: WriteOptions, descriptor: Descriptor, + statistics: Option, ) -> Result { let is_optional = is_nullable(&descriptor.primitive_type.field_info); let validity = array.validity(); @@ -47,12 +48,6 @@ pub fn array_to_page( encode_plain(array, is_optional, &mut buffer); - let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.primitive_type.clone())) - } else { - None - }; - utils::build_plain_page( buffer, array.len(), @@ -60,7 +55,7 @@ pub fn array_to_page( array.null_count(), 0, definition_levels_byte_length, - statistics, + statistics.map(|x| serialize_statistics(&x)), descriptor, options, Encoding::Plain, @@ -70,8 +65,8 @@ pub fn array_to_page( pub(super) fn build_statistics( array: &FixedSizeBinaryArray, primitive_type: PrimitiveType, -) -> ParquetStatistics { - let statistics = &FixedLenStatistics { +) -> FixedLenStatistics { + FixedLenStatistics { primitive_type, null_count: Some(array.null_count() as i64), distinct_count: None, @@ -85,6 +80,27 @@ pub(super) fn build_statistics( .flatten() .min_by(|x, y| ord_binary(x, y)) .map(|x| x.to_vec()), - } as &dyn Statistics; - serialize_statistics(statistics) + } +} + +pub(super) fn build_statistics_decimal( + array: &PrimitiveArray, + primitive_type: PrimitiveType, + size: usize, +) -> FixedLenStatistics { + FixedLenStatistics { + primitive_type, + null_count: Some(array.null_count() as i64), + distinct_count: None, + max_value: array + .iter() + .flatten() + .max() + .map(|x| x.to_be_bytes()[16 - size..].to_vec()), + min_value: array + .iter() + .flatten() + .min() + .map(|x| x.to_be_bytes()[16 - size..].to_vec()), + } } diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 1e32f1745da..5081df4fb5d 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -243,7 +243,15 @@ pub fn array_to_page( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, descriptor) + let statistics = if options.write_statistics { + Some(fixed_len_bytes::build_statistics( + &array, + descriptor.primitive_type.clone(), + )) + } else { + None + }; + fixed_len_bytes::array_to_page(&array, options, descriptor, statistics) } DataType::Interval(IntervalUnit::DayTime) => { let array = array @@ -261,13 +269,29 @@ pub fn array_to_page( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, descriptor) + let statistics = if options.write_statistics { + Some(fixed_len_bytes::build_statistics( + &array, + descriptor.primitive_type.clone(), + )) + } else { + None + }; + fixed_len_bytes::array_to_page(&array, options, descriptor, statistics) + } + DataType::FixedSizeBinary(_) => { + let array = array.as_any().downcast_ref().unwrap(); + let statistics = if options.write_statistics { + Some(fixed_len_bytes::build_statistics( + array, + descriptor.primitive_type.clone(), + )) + } else { + None + }; + + fixed_len_bytes::array_to_page(array, options, descriptor, statistics) } - DataType::FixedSizeBinary(_) => fixed_len_bytes::array_to_page( - array.as_any().downcast_ref().unwrap(), - options, - descriptor, - ), DataType::Decimal(precision, _) => { let precision = *precision; let array = array @@ -298,6 +322,18 @@ pub fn array_to_page( primitive::array_to_page::(&array, options, descriptor) } else { let size = decimal_length_from_precision(precision); + + let statistics = if options.write_statistics { + let stats = fixed_len_bytes::build_statistics_decimal( + array, + descriptor.primitive_type.clone(), + size, + ); + Some(stats) + } else { + None + }; + let mut values = Vec::::with_capacity(size * array.len()); array.values().iter().for_each(|x| { let bytes = &x.to_be_bytes()[16 - size..]; @@ -308,7 +344,7 @@ pub fn array_to_page( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, descriptor) + fixed_len_bytes::array_to_page(&array, options, descriptor, statistics) } } DataType::FixedSizeList(_, _) | DataType::List(_) | DataType::LargeList(_) => { diff --git a/src/io/parquet/write/primitive/basic.rs b/src/io/parquet/write/primitive/basic.rs index 1c58804fa1a..70c54f2ad00 100644 --- a/src/io/parquet/write/primitive/basic.rs +++ b/src/io/parquet/write/primitive/basic.rs @@ -3,7 +3,7 @@ use parquet2::{ metadata::Descriptor, page::DataPage, schema::types::PrimitiveType, - statistics::{serialize_statistics, ParquetStatistics, PrimitiveStatistics, Statistics}, + statistics::{serialize_statistics, PrimitiveStatistics}, types::NativeType, }; @@ -67,7 +67,10 @@ where encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.primitive_type.clone())) + Some(serialize_statistics(&build_statistics( + array, + descriptor.primitive_type.clone(), + ))) } else { None }; @@ -89,13 +92,13 @@ where pub fn build_statistics( array: &PrimitiveArray, primitive_type: PrimitiveType, -) -> ParquetStatistics +) -> PrimitiveStatistics where T: ArrowNativeType, R: NativeType, T: num_traits::AsPrimitive, { - let statistics = &PrimitiveStatistics:: { + PrimitiveStatistics:: { primitive_type, null_count: Some(array.null_count() as i64), distinct_count: None, @@ -115,6 +118,5 @@ where x }) .min_by(|x, y| x.ord(y)), - } as &dyn Statistics; - serialize_statistics(statistics) + } } diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 86732fdae97..6acd467c080 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -1,3 +1,4 @@ +use parquet2::statistics::serialize_statistics; use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, types::NativeType}; use super::super::levels; @@ -37,7 +38,10 @@ where encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.primitive_type.clone())) + Some(serialize_statistics(&build_statistics( + array, + descriptor.primitive_type.clone(), + ))) } else { None }; diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 623f8023e82..995ad2bf40c 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -560,49 +560,49 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { match column { "list_int16" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(4)]), // this should be 1, see ARROW-16299 + null_count: UInt64Array::from([Some(1)]), min_value: new_list(Arc::new(Int16Array::from_slice([0])), true), max_value: new_list(Arc::new(Int16Array::from_slice([10])), true), }, "list_bool" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(4)]), + null_count: UInt64Array::from([Some(1)]), min_value: new_list(Arc::new(BooleanArray::from_slice([false])), true), max_value: new_list(Arc::new(BooleanArray::from_slice([true])), true), }, "list_utf8" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(4)].into(), + null_count: [Some(1)].into(), min_value: new_list(Arc::new(Utf8Array::::from_slice([""])), true), max_value: new_list(Arc::new(Utf8Array::::from_slice(["ccc"])), true), }, "list_large_binary" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(4)].into(), // this should be 1, see ARROW-16299 + null_count: [Some(1)].into(), min_value: new_list(Arc::new(BinaryArray::::from_slice([b""])), true), max_value: new_list(Arc::new(BinaryArray::::from_slice([b"ccc"])), true), }, "list_int64" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(4)].into(), // this should be 1, see ARROW-16299 + null_count: [Some(1)].into(), min_value: new_list(Arc::new(Int64Array::from_slice([0])), true), max_value: new_list(Arc::new(Int64Array::from_slice([10])), true), }, "list_int64_required" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(3)].into(), // this should be 1, see ARROW-16299 + null_count: [Some(1)].into(), min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), max_value: new_list(Arc::new(Int64Array::from_slice([10])), false), }, "list_int64_required_required" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(3)].into(), // this should be 0, see ARROW-16299 + null_count: [Some(0)].into(), min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), max_value: new_list(Arc::new(Int64Array::from_slice([10])), false), }, "list_nested_i64" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(7)].into(), // this should be 2, see ARROW-16299 + null_count: [Some(2)].into(), min_value: new_list( new_list(Arc::new(Int64Array::from_slice([0])), true).into(), true, @@ -614,7 +614,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { }, "list_nested_inner_required_required_i64" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(3)].into(), // this should be 0, see ARROW-16299 + null_count: [Some(0)].into(), min_value: new_list( new_list(Arc::new(Int64Array::from_slice([0])), true).into(), true, @@ -626,7 +626,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { }, "list_nested_inner_required_i64" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(4)].into(), // this should be 0, see ARROW-16299 + null_count: [Some(0)].into(), min_value: new_list( new_list(Arc::new(Int64Array::from_slice([0])), true).into(), true, diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index c92d92fb925..8e9d5b06cad 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -51,7 +51,23 @@ fn test_pyarrow_integration( }; assert_eq!(expected.as_ref(), array.as_ref()); - assert_eq!(expected_statistics, statistics); + if ![ + "list_int16", + "list_large_binary", + "list_int64", + "list_int64_required", + "list_int64_required_required", + "list_nested_i64", + "list_utf8", + "list_bool", + "list_nested_inner_required_required_i64", + "list_nested_inner_required_i64", + ] + .contains(&column) + { + // pyarrow outputs an incorrect number of null count for nested types - ARROW-16299 + assert_eq!(expected_statistics, statistics); + } Ok(()) }