From 25565740fdd88a3820cf2ddee7b33539d3ec8277 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 22 Feb 2023 10:13:11 +0100 Subject: [PATCH 1/2] fix(parquet): repsect statistics and correct null_count --- src/io/parquet/write/dictionary.rs | 165 +++++++++++++++++------------ 1 file changed, 98 insertions(+), 67 deletions(-) diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index cd03b8a51ea..9669797589e 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -104,7 +104,7 @@ fn serialize_keys( array: &DictionaryArray, type_: PrimitiveType, nested: &[Nested], - statistics: ParquetStatistics, + statistics: Option, options: WriteOptions, ) -> Result { let mut buffer = vec![]; @@ -146,7 +146,7 @@ fn serialize_keys( array.null_count(), repetition_levels_byte_length, definition_levels_byte_length, - Some(statistics), + statistics, type_, options, Encoding::RleDictionary, @@ -159,8 +159,15 @@ macro_rules! dyn_prim { let values = $array.values().as_any().downcast_ref().unwrap(); let buffer = primitive_encode_plain::<$from, $to>(values, false, vec![]); - let stats = primitive_build_statistics::<$from, $to>(values, $type_.clone()); - let stats = serialize_statistics(&stats); + + let stats: Option = if $options.write_statistics { + let mut stats = primitive_build_statistics::<$from, $to>(values, $type_.clone()); + stats.null_count = Some($array.null_count() as i64); + let stats = serialize_statistics(&stats); + Some(stats) + } else { + None + }; (DictPage::new(buffer, values.len(), false), stats) }}; } @@ -175,69 +182,93 @@ pub fn array_to_pages( match encoding { Encoding::PlainDictionary | Encoding::RleDictionary => { // write DictPage - let (dict_page, statistics) = match array.values().data_type().to_logical_type() { - DataType::Int8 => dyn_prim!(i8, i32, array, options, type_), - DataType::Int16 => dyn_prim!(i16, i32, array, options, type_), - DataType::Int32 | DataType::Date32 | DataType::Time32(_) => { - dyn_prim!(i32, i32, array, options, type_) - } - DataType::Int64 - | DataType::Date64 - | DataType::Time64(_) - | DataType::Timestamp(_, _) - | DataType::Duration(_) => dyn_prim!(i64, i64, array, options, type_), - DataType::UInt8 => dyn_prim!(u8, i32, array, options, type_), - DataType::UInt16 => dyn_prim!(u16, i32, array, options, type_), - DataType::UInt32 => dyn_prim!(u32, i32, array, options, type_), - DataType::UInt64 => dyn_prim!(u64, i64, array, options, type_), - DataType::Float32 => dyn_prim!(f32, f32, array, options, type_), - DataType::Float64 => dyn_prim!(f64, f64, array, options, type_), - DataType::Utf8 => { - let array = array.values().as_any().downcast_ref().unwrap(); - - let mut buffer = vec![]; - utf8_encode_plain::(array, false, &mut buffer); - let stats = utf8_build_statistics(array, type_.clone()); - (DictPage::new(buffer, array.len(), false), stats) - } - DataType::LargeUtf8 => { - let array = array.values().as_any().downcast_ref().unwrap(); - - let mut buffer = vec![]; - utf8_encode_plain::(array, false, &mut buffer); - let stats = utf8_build_statistics(array, type_.clone()); - (DictPage::new(buffer, array.len(), false), stats) - } - DataType::Binary => { - let array = array.values().as_any().downcast_ref().unwrap(); - - let mut buffer = vec![]; - binary_encode_plain::(array, false, &mut buffer); - let stats = binary_build_statistics(array, type_.clone()); - (DictPage::new(buffer, array.len(), false), stats) - } - DataType::LargeBinary => { - let array = array.values().as_any().downcast_ref().unwrap(); - - let mut buffer = vec![]; - binary_encode_plain::(array, false, &mut buffer); - let stats = binary_build_statistics(array, type_.clone()); - (DictPage::new(buffer, array.len(), false), stats) - } - DataType::FixedSizeBinary(_) => { - let mut buffer = vec![]; - let array = array.values().as_any().downcast_ref().unwrap(); - fixed_binary_encode_plain(array, false, &mut buffer); - let stats = fixed_binary_build_statistics(array, type_.clone()); - let stats = serialize_statistics(&stats); - (DictPage::new(buffer, array.len(), false), stats) - } - other => { - return Err(Error::NotYetImplemented(format!( - "Writing dictionary arrays to parquet only support data type {other:?}" - ))) - } - }; + let (dict_page, statistics): (_, Option) = + match array.values().data_type().to_logical_type() { + DataType::Int8 => dyn_prim!(i8, i32, array, options, type_), + DataType::Int16 => dyn_prim!(i16, i32, array, options, type_), + DataType::Int32 | DataType::Date32 | DataType::Time32(_) => { + dyn_prim!(i32, i32, array, options, type_) + } + DataType::Int64 + | DataType::Date64 + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Duration(_) => dyn_prim!(i64, i64, array, options, type_), + DataType::UInt8 => dyn_prim!(u8, i32, array, options, type_), + DataType::UInt16 => dyn_prim!(u16, i32, array, options, type_), + DataType::UInt32 => dyn_prim!(u32, i32, array, options, type_), + DataType::UInt64 => dyn_prim!(u64, i64, array, options, type_), + DataType::Float32 => dyn_prim!(f32, f32, array, options, type_), + DataType::Float64 => dyn_prim!(f64, f64, array, options, type_), + DataType::Utf8 => { + let array = array.values().as_any().downcast_ref().unwrap(); + + let mut buffer = vec![]; + utf8_encode_plain::(array, false, &mut buffer); + let stats = if options.write_statistics { + Some(utf8_build_statistics(array, type_.clone())) + } else { + None + }; + (DictPage::new(buffer, array.len(), false), stats) + } + DataType::LargeUtf8 => { + let array = array.values().as_any().downcast_ref().unwrap(); + + let mut buffer = vec![]; + utf8_encode_plain::(array, false, &mut buffer); + let stats = if options.write_statistics { + Some(utf8_build_statistics(array, type_.clone())) + } else { + None + }; + (DictPage::new(buffer, array.len(), false), stats) + } + DataType::Binary => { + let array = array.values().as_any().downcast_ref().unwrap(); + + let mut buffer = vec![]; + binary_encode_plain::(array, false, &mut buffer); + let stats = if options.write_statistics { + Some(binary_build_statistics(array, type_.clone())) + } else { + None + }; + (DictPage::new(buffer, array.len(), false), stats) + } + DataType::LargeBinary => { + let values = array.values().as_any().downcast_ref().unwrap(); + + let mut buffer = vec![]; + binary_encode_plain::(values, false, &mut buffer); + let stats = if options.write_statistics { + let mut stats = binary_build_statistics(values, type_.clone()); + stats.null_count = Some(array.null_count() as i64); + Some(stats) + } else { + None + }; + (DictPage::new(buffer, values.len(), false), stats) + } + DataType::FixedSizeBinary(_) => { + let mut buffer = vec![]; + let array = array.values().as_any().downcast_ref().unwrap(); + fixed_binary_encode_plain(array, false, &mut buffer); + let stats = if options.write_statistics { + let mut stats = fixed_binary_build_statistics(array, type_.clone()); + stats.null_count = Some(array.null_count() as i64); + Some(serialize_statistics(&stats)) + } else { + None + }; + (DictPage::new(buffer, array.len(), false), stats) + } + other => { + return Err(Error::NotYetImplemented(format!( + "Writing dictionary arrays to parquet only support data type {other:?}" + ))) + } + }; let dict_page = Page::Dict(dict_page); // write DataPage pointing to DictPage From bbeb28ac4384930d342ee8b3152de93fc89dca31 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 22 Feb 2023 11:03:52 +0100 Subject: [PATCH 2/2] fix invalid test --- tests/it/io/parquet/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 905d7c6dc04..7bb6cf01a9e 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -591,7 +591,7 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { Statistics { distinct_count: UInt64Array::from([None]).boxed(), - null_count: UInt64Array::from([Some(0)]).boxed(), + null_count: UInt64Array::from([Some(1)]).boxed(), min_value: new_dict(Box::new(Int32Array::from_slice([10]))), max_value: new_dict(Box::new(Int32Array::from_slice([200]))), }