diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 2cd0833882c..b6f48abc6a5 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -29,6 +29,7 @@ def case_basic_nullable(size=1): pa.field("decimal_18", pa.decimal128(18, 0)), pa.field("decimal_26", pa.decimal128(26, 0)), pa.field("timestamp_us", pa.timestamp("us")), + pa.field("timestamp_s", pa.timestamp("s")), ] schema = pa.schema(fields) @@ -45,6 +46,7 @@ def case_basic_nullable(size=1): "decimal_18": decimal * size, "decimal_26": decimal * size, "timestamp_us": int64 * size, + "timestamp_s": int64 * size, }, schema, f"basic_nullable_{size*10}.parquet", diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 4945d82be90..d00976bc989 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -322,6 +322,7 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( chunk_size: usize, ) -> Result> { use DataType::*; + match data_type.to_logical_type() { Null => Ok(null::iter_to_arrays(pages, data_type, chunk_size)), Boolean => Ok(boolean::iter_to_arrays(pages, data_type, chunk_size)), @@ -670,16 +671,6 @@ where }) } -// [Struct, List, Bool] -// => [Struct(Int), Struct(Utf8), List(Int), Bool] -// [Struct, Utf8>, List, Bool] -// => [Struct(Struct(Int)), Struct(Utf8), List(Int), Bool] -// [List>] -// => [List(Struct(Int)), List(Struct(Bool))] -// [Struct, Utf8>] -// => [Struct(Int), Struct(Bool)] -// => [Struct(Struct(Int)), Struct(Struct(Bool)), Struct(Utf8)] - fn field_to_init(field: &Field) -> Vec { use crate::datatypes::PhysicalType::*; match field.data_type.to_physical_type() { diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index 70614573566..4d2376663a7 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -268,7 +268,12 @@ pub(super) fn finish( values: Vec, validity: MutableBitmap, ) -> MutablePrimitiveArray { - MutablePrimitiveArray::from_data(data_type.clone(), values, validity.into()) + let validity = if validity.is_empty() { + None + } else { + Some(validity) + }; + MutablePrimitiveArray::from_data(data_type.clone(), values, validity) } /// An iterator adapter over [`DataPages`] assumed to be encoded as primitive arrays diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 3205d069043..23ac8215a39 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -87,16 +87,15 @@ impl Iterator for RowGroupDeserializer { } } -/// Returns all the column metadata in `row_group` associated to `field_name`. -fn get_field_columns<'a>( +/// Returns all the parquet columns associated to `field_name`. +/// For non-nested parquet types, this returns a single column +pub(super) fn get_field_columns<'a>( columns: &'a [ColumnChunkMetaData], field_name: &str, ) -> Vec<&'a ColumnChunkMetaData> { columns .iter() - .enumerate() - .filter(|x| x.1.descriptor().path_in_schema()[0] == field_name) - .map(|x| x.1) + .filter(|x| x.descriptor().path_in_schema()[0] == field_name) .collect() } diff --git a/src/io/parquet/read/statistics/binary.rs b/src/io/parquet/read/statistics/binary.rs index e8f272cc24a..fc6d887a68d 100644 --- a/src/io/parquet/read/statistics/binary.rs +++ b/src/io/parquet/read/statistics/binary.rs @@ -2,13 +2,12 @@ use std::any::Any; use std::convert::TryFrom; use crate::datatypes::DataType; -use parquet2::schema::types::ParquetType; use parquet2::statistics::BinaryStatistics as ParquetByteArrayStatistics; -use super::super::schema; use super::Statistics; use crate::error::{ArrowError, Result}; +/// Represents a `Binary` or `LargeBinary` #[derive(Debug, Clone, PartialEq)] pub struct BinaryStatistics { pub null_count: Option, @@ -87,14 +86,14 @@ impl TryFrom<&ParquetByteArrayStatistics> for Utf8Statistics { pub(super) fn statistics_from_byte_array( stats: &ParquetByteArrayStatistics, - type_: &ParquetType, + data_type: DataType, ) -> Result> { - let data_type = schema::to_data_type(type_)?.unwrap(); - use DataType::*; Ok(match data_type { Utf8 => Box::new(Utf8Statistics::try_from(stats)?), + LargeUtf8 => Box::new(Utf8Statistics::try_from(stats)?), Binary => Box::new(BinaryStatistics::from(stats)), + LargeBinary => Box::new(BinaryStatistics::from(stats)), other => { return Err(ArrowError::NotYetImplemented(format!( "Can't read {:?} from parquet", diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index 0e720097443..9ad29c19b8a 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -1,11 +1,14 @@ //! APIs exposing `parquet2`'s statistics as arrow's statistics. -use crate::datatypes::DataType; -use crate::error::ArrowError; +use std::any::Any; + +use parquet2::metadata::ColumnChunkMetaData; use parquet2::schema::types::PhysicalType; use parquet2::statistics::PrimitiveStatistics as ParquetPrimitiveStatistics; use parquet2::statistics::Statistics as ParquetStatistics; -use std::any::Any; +use crate::datatypes::DataType; +use crate::datatypes::Field; +use crate::error::ArrowError; use crate::error::Result; mod primitive; @@ -17,6 +20,8 @@ pub use boolean::*; mod fixlen; pub use fixlen::*; +use super::get_field_columns; + /// Trait representing a deserialized parquet statistics into arrow. pub trait Statistics: std::fmt::Debug { /// returns the [`DataType`] of the statistics. @@ -41,19 +46,24 @@ impl PartialEq for Box { } } -pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result> { +/// Deserializes [`ParquetStatistics`] into [`Statistics`] based on `data_type`. +/// This takes into account the Arrow schema declared in Parquet's schema +fn _deserialize_statistics( + stats: &dyn ParquetStatistics, + data_type: DataType, +) -> Result> { match stats.physical_type() { PhysicalType::Int32 => { let stats = stats.as_any().downcast_ref().unwrap(); - primitive::statistics_from_i32(stats, stats.descriptor.type_()) + primitive::statistics_from_i32(stats, data_type) } PhysicalType::Int64 => { let stats = stats.as_any().downcast_ref().unwrap(); - primitive::statistics_from_i64(stats, stats.descriptor.type_()) + primitive::statistics_from_i64(stats, data_type) } PhysicalType::ByteArray => { let stats = stats.as_any().downcast_ref().unwrap(); - binary::statistics_from_byte_array(stats, stats.descriptor.type_()) + binary::statistics_from_byte_array(stats, data_type) } PhysicalType::Boolean => { let stats = stats.as_any().downcast_ref().unwrap(); @@ -65,8 +75,7 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result>() .unwrap(); Ok(Box::new(PrimitiveStatistics::::from(( - stats, - DataType::Float32, + stats, data_type, )))) } PhysicalType::Double => { @@ -75,8 +84,7 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result>() .unwrap(); Ok(Box::new(PrimitiveStatistics::::from(( - stats, - DataType::Float64, + stats, data_type, )))) } PhysicalType::FixedLenByteArray(_) => { @@ -88,3 +96,36 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result Vec<&Field> { + match field.data_type.to_logical_type() { + DataType::List(inner) => get_fields(inner), + DataType::LargeList(inner) => get_fields(inner), + DataType::Struct(fields) => fields.iter().map(get_fields).flatten().collect(), + _ => vec![field], + } +} + +/// Deserializes [`ParquetStatistics`] into [`Statistics`] associated to `field` +/// +/// For non-nested types, it returns a single column. +/// For nested types, it returns one column per parquet primitive column. +pub fn deserialize_statistics( + field: &Field, + columns: &[ColumnChunkMetaData], +) -> Result>>> { + let columns = get_field_columns(columns, field.name.as_ref()); + + let fields = get_fields(field); + + columns + .into_iter() + .zip(fields.into_iter()) + .map(|(column, field)| { + column + .statistics() + .map(|x| _deserialize_statistics(x?.as_ref(), field.data_type.clone())) + .transpose() + }) + .collect() +} diff --git a/src/io/parquet/read/statistics/primitive.rs b/src/io/parquet/read/statistics/primitive.rs index 14c90901bc1..0693c6ac9ed 100644 --- a/src/io/parquet/read/statistics/primitive.rs +++ b/src/io/parquet/read/statistics/primitive.rs @@ -1,10 +1,12 @@ +use crate::datatypes::TimeUnit; use crate::{datatypes::DataType, types::NativeType}; -use parquet2::schema::types::ParquetType; +use parquet2::schema::types::{ + LogicalType, ParquetType, TimeUnit as ParquetTimeUnit, TimestampType, +}; use parquet2::statistics::PrimitiveStatistics as ParquetPrimitiveStatistics; use parquet2::types::NativeType as ParquetNativeType; use std::any::Any; -use super::super::schema; use super::Statistics; use crate::error::Result; @@ -50,10 +52,8 @@ where pub(super) fn statistics_from_i32( stats: &ParquetPrimitiveStatistics, - type_: &ParquetType, + data_type: DataType, ) -> Result> { - let data_type = schema::to_data_type(type_)?.unwrap(); - use DataType::*; Ok(match data_type { UInt8 => { @@ -68,17 +68,58 @@ pub(super) fn statistics_from_i32( }) } +fn timestamp(type_: &ParquetType, time_unit: TimeUnit, x: i64) -> i64 { + let logical_type = if let ParquetType::PrimitiveType { logical_type, .. } = type_ { + logical_type + } else { + unreachable!() + }; + + let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { + unit + } else { + return x; + }; + + match (unit, time_unit) { + (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => x / 1_000, + (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => x / 1_000_000, + (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => x * 1_000_000_000, + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => x, + (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => x / 1_000, + (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => x / 1_000_000, + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => x * 1_000, + (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => x, + (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => x / 1_000, + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => x * 1_000_000, + (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => x * 1_000, + (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => x, + } +} + pub(super) fn statistics_from_i64( stats: &ParquetPrimitiveStatistics, - type_: &ParquetType, + data_type: DataType, ) -> Result> { - let data_type = schema::to_data_type(type_)?.unwrap(); - use DataType::*; Ok(match data_type { UInt64 => { Box::new(PrimitiveStatistics::::from((stats, data_type))) as Box } + Timestamp(time_unit, None) => Box::new(PrimitiveStatistics:: { + data_type, + null_count: stats.null_count, + distinct_count: stats.distinct_count, + min_value: stats + .min_value + .map(|x| timestamp(stats.descriptor.type_(), time_unit, x)), + max_value: stats + .max_value + .map(|x| timestamp(stats.descriptor.type_(), time_unit, x)), + }), Decimal(_, _) => Box::new(PrimitiveStatistics::::from((stats, data_type))), _ => Box::new(PrimitiveStatistics::::from((stats, data_type))), }) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index fe8fe8d4878..515d1710747 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -19,18 +19,17 @@ pub fn read_column( column: usize, ) -> Result { let metadata = read_metadata(&mut reader)?; + let schema = get_schema(&metadata)?; let mut reader = FileReader::try_new(reader, Some(&[column]), None, None, None)?; - let statistics = metadata.row_groups[row_group] - .column(column) - .statistics() - .map(|x| statistics::deserialize_statistics(x?.as_ref())) - .transpose()?; + let field = &schema.fields[column]; + + let mut statistics = deserialize_statistics(field, metadata.row_groups[row_group].columns())?; Ok(( reader.next().unwrap()?.into_arrays().pop().unwrap(), - statistics, + statistics.pop().unwrap(), )) } @@ -328,6 +327,9 @@ pub fn pyarrow_nullable(column: usize) -> Box { PrimitiveArray::::from(i64_values) .to(DataType::Timestamp(TimeUnit::Microsecond, None)), ), + 11 => Box::new( + PrimitiveArray::::from(i64_values).to(DataType::Timestamp(TimeUnit::Second, None)), + ), _ => unreachable!(), } } @@ -406,6 +408,13 @@ pub fn pyarrow_nullable_statistics(column: usize) -> Option> min_value: Some(0), max_value: Some(9), }), + 11 => Box::new(PrimitiveStatistics:: { + data_type: DataType::Timestamp(TimeUnit::Second, None), + distinct_count: None, + null_count: Some(3), + min_value: Some(0), + max_value: Some(9), + }), _ => unreachable!(), }) } @@ -607,11 +616,11 @@ pub fn pyarrow_struct(column: usize) -> Box { pub fn pyarrow_struct_statistics(column: usize) -> Option> { match column { - 0 => Some(Box::new(Utf8Statistics { + 0 => Some(Box::new(BooleanStatistics { distinct_count: None, - null_count: Some(1), - min_value: Some("".to_string()), - max_value: Some("def".to_string()), + null_count: Some(4), + min_value: Some(false), + max_value: Some(true), })), 1 => Some(Box::new(BooleanStatistics { distinct_count: None, diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 470fe1f782b..794b9b24160 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -345,6 +345,11 @@ fn v1_timestamp_us_nullable() -> Result<()> { test_pyarrow_integration(10, 1, "basic", false, false, None) } +#[test] +fn v1_timestamp_s_nullable() -> Result<()> { + test_pyarrow_integration(11, 1, "basic", false, false, None) +} + #[test] fn v2_decimal_26_required() -> Result<()> { test_pyarrow_integration(8, 2, "basic", false, true, None)