diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 2cd0833882c..b6f48abc6a5 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -29,6 +29,7 @@ def case_basic_nullable(size=1): pa.field("decimal_18", pa.decimal128(18, 0)), pa.field("decimal_26", pa.decimal128(26, 0)), pa.field("timestamp_us", pa.timestamp("us")), + pa.field("timestamp_s", pa.timestamp("s")), ] schema = pa.schema(fields) @@ -45,6 +46,7 @@ def case_basic_nullable(size=1): "decimal_18": decimal * size, "decimal_26": decimal * size, "timestamp_us": int64 * size, + "timestamp_s": int64 * size, }, schema, f"basic_nullable_{size*10}.parquet", diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 26f6e8bd4ab..d00976bc989 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -27,10 +27,14 @@ pub use parquet2::{ }; use crate::{ - array::{Array, BinaryArray, DictionaryKey, ListArray, PrimitiveArray, StructArray, Utf8Array}, + array::{ + Array, BinaryArray, DictionaryKey, ListArray, MutablePrimitiveArray, PrimitiveArray, + StructArray, Utf8Array, + }, datatypes::{DataType, Field, IntervalUnit, TimeUnit}, error::{ArrowError, Result}, io::parquet::read::primitive::read_item, + types::NativeType, }; mod binary; @@ -62,6 +66,41 @@ impl + Send { } +/// Converts an iterator of arrays to a trait object returning trait objects +#[inline] +fn dyn_iter<'a, A, I>(iter: I) -> ArrayIter<'a> +where + A: Array + 'static, + I: Iterator> + Send + Sync + 'a, +{ + Box::new(iter.map(|x| x.map(|x| Arc::new(x) as Arc))) +} + +/// Converts an iterator of [MutablePrimitiveArray] into an iterator of [PrimitiveArray] +#[inline] +fn iden(iter: I) -> impl Iterator>> +where + T: NativeType, + I: Iterator>>, +{ + iter.map(|x| x.map(|x| x.into())) +} + +#[inline] +fn op(iter: I, op: F) -> impl Iterator>> +where + T: NativeType, + I: Iterator>>, + F: Fn(T) -> T + Copy, +{ + iter.map(move |x| { + x.map(move |mut x| { + x.values_mut_slice().iter_mut().for_each(|x| *x = op(*x)); + x.into() + }) + }) +} + /// Creates a new iterator of compressed pages. pub fn get_page_iterator( column_metadata: &ColumnChunkMetaData, @@ -210,125 +249,148 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( }) } +fn timestamp<'a, I: 'a + DataPages>( + pages: I, + type_: &ParquetType, + data_type: DataType, + chunk_size: usize, + time_unit: TimeUnit, +) -> Result> { + let (physical_type, logical_type) = if let ParquetType::PrimitiveType { + physical_type, + logical_type, + .. + } = type_ + { + (physical_type, logical_type) + } else { + unreachable!() + }; + + if physical_type == &PhysicalType::Int96 { + if time_unit == TimeUnit::Nanosecond { + return Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + int96_to_i64_ns, + )))); + } else { + return Err(ArrowError::nyi( + "Can't decode int96 to timestamp other than ns", + )); + } + }; + if physical_type != &PhysicalType::Int64 { + return Err(ArrowError::nyi( + "Can't decode a timestamp from a non-int64 parquet type", + )); + } + + let iter = primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x); + + let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { + unit + } else { + return Ok(dyn_iter(iden(iter))); + }; + + Ok(match (unit, time_unit) { + (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x * 1_000_000_000)), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => dyn_iter(iden(iter)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000_000)), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x * 1_000)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => dyn_iter(iden(iter)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x / 1_000)), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000_000)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => dyn_iter(iden(iter)), + }) +} + fn page_iter_to_arrays<'a, I: 'a + DataPages>( pages: I, type_: &ParquetType, - field: Field, + data_type: DataType, chunk_size: usize, ) -> Result> { use DataType::*; - match field.data_type.to_logical_type() { - Null => Ok(null::iter_to_arrays(pages, field.data_type, chunk_size)), - Boolean => Ok(boolean::iter_to_arrays(pages, field.data_type, chunk_size)), - UInt8 => Ok(primitive::iter_to_arrays( + + match data_type.to_logical_type() { + Null => Ok(null::iter_to_arrays(pages, data_type, chunk_size)), + Boolean => Ok(boolean::iter_to_arrays(pages, data_type, chunk_size)), + UInt8 => Ok(dyn_iter(iden(primitive::Iter::new( pages, - field.data_type, + data_type, chunk_size, read_item, |x: i32| x as u8, - )), - UInt16 => Ok(primitive::iter_to_arrays( + )))), + UInt16 => Ok(dyn_iter(iden(primitive::Iter::new( pages, - field.data_type, + data_type, chunk_size, read_item, |x: i32| x as u16, - )), - UInt32 => Ok(primitive::iter_to_arrays( + )))), + UInt32 => Ok(dyn_iter(iden(primitive::Iter::new( pages, - field.data_type, + data_type, chunk_size, read_item, |x: i32| x as u32, - )), - Int8 => Ok(primitive::iter_to_arrays( + )))), + Int8 => Ok(dyn_iter(iden(primitive::Iter::new( pages, - field.data_type, + data_type, chunk_size, read_item, |x: i32| x as i8, - )), - Int16 => Ok(primitive::iter_to_arrays( + )))), + Int16 => Ok(dyn_iter(iden(primitive::Iter::new( pages, - field.data_type, + data_type, chunk_size, read_item, |x: i32| x as i16, - )), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => Ok( - primitive::iter_to_arrays(pages, field.data_type, chunk_size, read_item, |x: i32| { - x as i32 - }), - ), + )))), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => Ok(dyn_iter(iden( + primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i32| x as i32), + ))), - Timestamp(TimeUnit::Nanosecond, None) => match type_ { - ParquetType::PrimitiveType { - physical_type, - logical_type, - .. - } => match (physical_type, logical_type) { - (PhysicalType::Int96, _) => Ok(primitive::iter_to_arrays( - pages, - DataType::Timestamp(TimeUnit::Nanosecond, None), - chunk_size, - read_item, - int96_to_i64_ns, - )), - (_, Some(LogicalType::TIMESTAMP(TimestampType { unit, .. }))) => Ok(match unit { - ParquetTimeUnit::MILLIS(_) => primitive::iter_to_arrays( - pages, - field.data_type, - chunk_size, - read_item, - |x: i64| x * 1_000_000, - ), - ParquetTimeUnit::MICROS(_) => primitive::iter_to_arrays( - pages, - field.data_type, - chunk_size, - read_item, - |x: i64| x * 1_000, - ), - ParquetTimeUnit::NANOS(_) => primitive::iter_to_arrays( - pages, - field.data_type, - chunk_size, - read_item, - |x: i64| x, - ), - }), - _ => Ok(primitive::iter_to_arrays( - pages, - field.data_type, - chunk_size, - read_item, - |x: i64| x, - )), - }, - _ => unreachable!(), - }, + Timestamp(time_unit, None) => { + let time_unit = *time_unit; + timestamp(pages, type_, data_type, chunk_size, time_unit) + } FixedSizeBinary(_) => Ok(Box::new( - fixed_size_binary::BinaryArrayIterator::new(pages, field.data_type, chunk_size) + fixed_size_binary::BinaryArrayIterator::new(pages, data_type, chunk_size) .map(|x| x.map(|x| Arc::new(x) as _)), )), Decimal(_, _) => match type_ { ParquetType::PrimitiveType { physical_type, .. } => Ok(match physical_type { - PhysicalType::Int32 => primitive::iter_to_arrays( + PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( pages, - field.data_type, + data_type, chunk_size, read_item, |x: i32| x as i128, - ), - PhysicalType::Int64 => primitive::iter_to_arrays( + ))), + PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( pages, - field.data_type, + data_type, chunk_size, read_item, |x: i64| x as i128, - ), + ))), &PhysicalType::FixedLenByteArray(n) if n > 16 => { return Err(ArrowError::NotYetImplemented(format!( "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", @@ -362,7 +424,7 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( let validity = array.validity().cloned(); Ok(PrimitiveArray::::from_data( - field.data_type.clone(), + data_type.clone(), values.into(), validity, )) @@ -378,63 +440,49 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( }, // INT64 - Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => Ok( - primitive::iter_to_arrays(pages, field.data_type, chunk_size, read_item, |x: i64| { - x as i64 - }), - ), - UInt64 => Ok(primitive::iter_to_arrays( + Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => Ok(dyn_iter(iden( + primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x as i64), + ))), + UInt64 => Ok(dyn_iter(iden(primitive::Iter::new( pages, - field.data_type, + data_type, chunk_size, read_item, |x: i64| x as u64, - )), + )))), - Float32 => Ok(primitive::iter_to_arrays( + Float32 => Ok(dyn_iter(iden(primitive::Iter::new( pages, - field.data_type, + data_type, chunk_size, read_item, |x: f32| x, - )), - Float64 => Ok(primitive::iter_to_arrays( + )))), + Float64 => Ok(dyn_iter(iden(primitive::Iter::new( pages, - field.data_type, + data_type, chunk_size, read_item, |x: f64| x, - )), + )))), Binary => Ok(binary::iter_to_arrays::, _>( - pages, - field.data_type, - chunk_size, + pages, data_type, chunk_size, )), LargeBinary => Ok(binary::iter_to_arrays::, _>( - pages, - field.data_type, - chunk_size, + pages, data_type, chunk_size, )), Utf8 => Ok(binary::iter_to_arrays::, _>( - pages, - field.data_type, - chunk_size, + pages, data_type, chunk_size, )), LargeUtf8 => Ok(binary::iter_to_arrays::, _>( - pages, - field.data_type, - chunk_size, + pages, data_type, chunk_size, )), Dictionary(key_type, _, _) => match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(pages, type_, field.data_type, chunk_size) + dict_read::<$K, _>(pages, type_, data_type, chunk_size) }), - /*LargeList(inner) | List(inner) => { - let data_type = inner.data_type.clone(); - page_iter_to_arrays_nested(pages, type_, field, data_type, chunk_size) - }*/ other => Err(ArrowError::NotYetImplemented(format!( "Reading {:?} from parquet still not implemented", other @@ -535,7 +583,7 @@ where page_iter_to_arrays( columns.pop().unwrap(), types.pop().unwrap(), - field, + field.data_type, chunk_size, )? .map(|x| Ok((NestedState::new(vec![]), x?))), @@ -623,16 +671,6 @@ where }) } -// [Struct, List, Bool] -// => [Struct(Int), Struct(Utf8), List(Int), Bool] -// [Struct, Utf8>, List, Bool] -// => [Struct(Struct(Int)), Struct(Utf8), List(Int), Bool] -// [List>] -// => [List(Struct(Int)), List(Struct(Bool))] -// [Struct, Utf8>] -// => [Struct(Int), Struct(Bool)] -// => [Struct(Struct(Int)), Struct(Struct(Bool)), Struct(Utf8)] - fn field_to_init(field: &Field) -> Vec { use crate::datatypes::PhysicalType::*; match field.data_type.to_physical_type() { diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index 0755d399667..4d2376663a7 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -8,7 +8,7 @@ use parquet2::{ }; use crate::{ - array::PrimitiveArray, bitmap::MutableBitmap, datatypes::DataType, error::Result, + array::MutablePrimitiveArray, bitmap::MutableBitmap, datatypes::DataType, error::Result, types::NativeType, }; @@ -267,13 +267,18 @@ pub(super) fn finish( data_type: &DataType, values: Vec, validity: MutableBitmap, -) -> PrimitiveArray { - PrimitiveArray::from_data(data_type.clone(), values.into(), validity.into()) +) -> MutablePrimitiveArray { + let validity = if validity.is_empty() { + None + } else { + Some(validity) + }; + MutablePrimitiveArray::from_data(data_type.clone(), values, validity) } -/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays +/// An iterator adapter over [`DataPages`] assumed to be encoded as primitive arrays #[derive(Debug)] -pub struct PrimitiveArrayIterator +pub struct Iter where I: DataPages, T: NativeType, @@ -290,7 +295,7 @@ where phantom: std::marker::PhantomData

, } -impl PrimitiveArrayIterator +impl Iter where I: DataPages, T: NativeType, @@ -312,7 +317,7 @@ where } } -impl Iterator for PrimitiveArrayIterator +impl Iterator for Iter where I: DataPages, T: NativeType, @@ -320,7 +325,7 @@ where G: Copy + for<'b> Fn(&'b [u8]) -> P, F: Copy + Fn(P) -> T, { - type Item = Result>; + type Item = Result>; fn next(&mut self) -> Option { let maybe_state = utils::next( diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index e0ca42144fc..29e9f4bb27a 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -10,33 +10,11 @@ use std::sync::Arc; use crate::{array::Array, datatypes::DataType}; -use super::ArrayIter; use super::{nested_utils::*, DataPages}; -use basic::PrimitiveArrayIterator; +pub use basic::Iter; use nested::ArrayIterator; -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, I, T, P, G, F>( - iter: I, - data_type: DataType, - chunk_size: usize, - op1: G, - op2: F, -) -> ArrayIter<'a> -where - I: 'a + DataPages, - T: crate::types::NativeType, - P: parquet2::types::NativeType, - G: 'a + Copy + Send + Sync + for<'b> Fn(&'b [u8]) -> P, - F: 'a + Copy + Send + Sync + Fn(P) -> T, -{ - Box::new( - PrimitiveArrayIterator::::new(iter, data_type, chunk_size, op1, op2) - .map(|x| x.map(|x| Arc::new(x) as Arc)), - ) -} - /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, I, T, P, G, F>( iter: I, diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 3205d069043..23ac8215a39 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -87,16 +87,15 @@ impl Iterator for RowGroupDeserializer { } } -/// Returns all the column metadata in `row_group` associated to `field_name`. -fn get_field_columns<'a>( +/// Returns all the parquet columns associated to `field_name`. +/// For non-nested parquet types, this returns a single column +pub(super) fn get_field_columns<'a>( columns: &'a [ColumnChunkMetaData], field_name: &str, ) -> Vec<&'a ColumnChunkMetaData> { columns .iter() - .enumerate() - .filter(|x| x.1.descriptor().path_in_schema()[0] == field_name) - .map(|x| x.1) + .filter(|x| x.descriptor().path_in_schema()[0] == field_name) .collect() } diff --git a/src/io/parquet/read/statistics/binary.rs b/src/io/parquet/read/statistics/binary.rs index e8f272cc24a..fc6d887a68d 100644 --- a/src/io/parquet/read/statistics/binary.rs +++ b/src/io/parquet/read/statistics/binary.rs @@ -2,13 +2,12 @@ use std::any::Any; use std::convert::TryFrom; use crate::datatypes::DataType; -use parquet2::schema::types::ParquetType; use parquet2::statistics::BinaryStatistics as ParquetByteArrayStatistics; -use super::super::schema; use super::Statistics; use crate::error::{ArrowError, Result}; +/// Represents a `Binary` or `LargeBinary` #[derive(Debug, Clone, PartialEq)] pub struct BinaryStatistics { pub null_count: Option, @@ -87,14 +86,14 @@ impl TryFrom<&ParquetByteArrayStatistics> for Utf8Statistics { pub(super) fn statistics_from_byte_array( stats: &ParquetByteArrayStatistics, - type_: &ParquetType, + data_type: DataType, ) -> Result> { - let data_type = schema::to_data_type(type_)?.unwrap(); - use DataType::*; Ok(match data_type { Utf8 => Box::new(Utf8Statistics::try_from(stats)?), + LargeUtf8 => Box::new(Utf8Statistics::try_from(stats)?), Binary => Box::new(BinaryStatistics::from(stats)), + LargeBinary => Box::new(BinaryStatistics::from(stats)), other => { return Err(ArrowError::NotYetImplemented(format!( "Can't read {:?} from parquet", diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index 0e720097443..9ad29c19b8a 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -1,11 +1,14 @@ //! APIs exposing `parquet2`'s statistics as arrow's statistics. -use crate::datatypes::DataType; -use crate::error::ArrowError; +use std::any::Any; + +use parquet2::metadata::ColumnChunkMetaData; use parquet2::schema::types::PhysicalType; use parquet2::statistics::PrimitiveStatistics as ParquetPrimitiveStatistics; use parquet2::statistics::Statistics as ParquetStatistics; -use std::any::Any; +use crate::datatypes::DataType; +use crate::datatypes::Field; +use crate::error::ArrowError; use crate::error::Result; mod primitive; @@ -17,6 +20,8 @@ pub use boolean::*; mod fixlen; pub use fixlen::*; +use super::get_field_columns; + /// Trait representing a deserialized parquet statistics into arrow. pub trait Statistics: std::fmt::Debug { /// returns the [`DataType`] of the statistics. @@ -41,19 +46,24 @@ impl PartialEq for Box { } } -pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result> { +/// Deserializes [`ParquetStatistics`] into [`Statistics`] based on `data_type`. +/// This takes into account the Arrow schema declared in Parquet's schema +fn _deserialize_statistics( + stats: &dyn ParquetStatistics, + data_type: DataType, +) -> Result> { match stats.physical_type() { PhysicalType::Int32 => { let stats = stats.as_any().downcast_ref().unwrap(); - primitive::statistics_from_i32(stats, stats.descriptor.type_()) + primitive::statistics_from_i32(stats, data_type) } PhysicalType::Int64 => { let stats = stats.as_any().downcast_ref().unwrap(); - primitive::statistics_from_i64(stats, stats.descriptor.type_()) + primitive::statistics_from_i64(stats, data_type) } PhysicalType::ByteArray => { let stats = stats.as_any().downcast_ref().unwrap(); - binary::statistics_from_byte_array(stats, stats.descriptor.type_()) + binary::statistics_from_byte_array(stats, data_type) } PhysicalType::Boolean => { let stats = stats.as_any().downcast_ref().unwrap(); @@ -65,8 +75,7 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result>() .unwrap(); Ok(Box::new(PrimitiveStatistics::::from(( - stats, - DataType::Float32, + stats, data_type, )))) } PhysicalType::Double => { @@ -75,8 +84,7 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result>() .unwrap(); Ok(Box::new(PrimitiveStatistics::::from(( - stats, - DataType::Float64, + stats, data_type, )))) } PhysicalType::FixedLenByteArray(_) => { @@ -88,3 +96,36 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result Vec<&Field> { + match field.data_type.to_logical_type() { + DataType::List(inner) => get_fields(inner), + DataType::LargeList(inner) => get_fields(inner), + DataType::Struct(fields) => fields.iter().map(get_fields).flatten().collect(), + _ => vec![field], + } +} + +/// Deserializes [`ParquetStatistics`] into [`Statistics`] associated to `field` +/// +/// For non-nested types, it returns a single column. +/// For nested types, it returns one column per parquet primitive column. +pub fn deserialize_statistics( + field: &Field, + columns: &[ColumnChunkMetaData], +) -> Result>>> { + let columns = get_field_columns(columns, field.name.as_ref()); + + let fields = get_fields(field); + + columns + .into_iter() + .zip(fields.into_iter()) + .map(|(column, field)| { + column + .statistics() + .map(|x| _deserialize_statistics(x?.as_ref(), field.data_type.clone())) + .transpose() + }) + .collect() +} diff --git a/src/io/parquet/read/statistics/primitive.rs b/src/io/parquet/read/statistics/primitive.rs index 14c90901bc1..0693c6ac9ed 100644 --- a/src/io/parquet/read/statistics/primitive.rs +++ b/src/io/parquet/read/statistics/primitive.rs @@ -1,10 +1,12 @@ +use crate::datatypes::TimeUnit; use crate::{datatypes::DataType, types::NativeType}; -use parquet2::schema::types::ParquetType; +use parquet2::schema::types::{ + LogicalType, ParquetType, TimeUnit as ParquetTimeUnit, TimestampType, +}; use parquet2::statistics::PrimitiveStatistics as ParquetPrimitiveStatistics; use parquet2::types::NativeType as ParquetNativeType; use std::any::Any; -use super::super::schema; use super::Statistics; use crate::error::Result; @@ -50,10 +52,8 @@ where pub(super) fn statistics_from_i32( stats: &ParquetPrimitiveStatistics, - type_: &ParquetType, + data_type: DataType, ) -> Result> { - let data_type = schema::to_data_type(type_)?.unwrap(); - use DataType::*; Ok(match data_type { UInt8 => { @@ -68,17 +68,58 @@ pub(super) fn statistics_from_i32( }) } +fn timestamp(type_: &ParquetType, time_unit: TimeUnit, x: i64) -> i64 { + let logical_type = if let ParquetType::PrimitiveType { logical_type, .. } = type_ { + logical_type + } else { + unreachable!() + }; + + let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { + unit + } else { + return x; + }; + + match (unit, time_unit) { + (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => x / 1_000, + (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => x / 1_000_000, + (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => x * 1_000_000_000, + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => x, + (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => x / 1_000, + (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => x / 1_000_000, + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => x * 1_000, + (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => x, + (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => x / 1_000, + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => x * 1_000_000, + (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => x * 1_000, + (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => x, + } +} + pub(super) fn statistics_from_i64( stats: &ParquetPrimitiveStatistics, - type_: &ParquetType, + data_type: DataType, ) -> Result> { - let data_type = schema::to_data_type(type_)?.unwrap(); - use DataType::*; Ok(match data_type { UInt64 => { Box::new(PrimitiveStatistics::::from((stats, data_type))) as Box } + Timestamp(time_unit, None) => Box::new(PrimitiveStatistics:: { + data_type, + null_count: stats.null_count, + distinct_count: stats.distinct_count, + min_value: stats + .min_value + .map(|x| timestamp(stats.descriptor.type_(), time_unit, x)), + max_value: stats + .max_value + .map(|x| timestamp(stats.descriptor.type_(), time_unit, x)), + }), Decimal(_, _) => Box::new(PrimitiveStatistics::::from((stats, data_type))), _ => Box::new(PrimitiveStatistics::::from((stats, data_type))), }) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index fe8fe8d4878..515d1710747 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -19,18 +19,17 @@ pub fn read_column( column: usize, ) -> Result { let metadata = read_metadata(&mut reader)?; + let schema = get_schema(&metadata)?; let mut reader = FileReader::try_new(reader, Some(&[column]), None, None, None)?; - let statistics = metadata.row_groups[row_group] - .column(column) - .statistics() - .map(|x| statistics::deserialize_statistics(x?.as_ref())) - .transpose()?; + let field = &schema.fields[column]; + + let mut statistics = deserialize_statistics(field, metadata.row_groups[row_group].columns())?; Ok(( reader.next().unwrap()?.into_arrays().pop().unwrap(), - statistics, + statistics.pop().unwrap(), )) } @@ -328,6 +327,9 @@ pub fn pyarrow_nullable(column: usize) -> Box { PrimitiveArray::::from(i64_values) .to(DataType::Timestamp(TimeUnit::Microsecond, None)), ), + 11 => Box::new( + PrimitiveArray::::from(i64_values).to(DataType::Timestamp(TimeUnit::Second, None)), + ), _ => unreachable!(), } } @@ -406,6 +408,13 @@ pub fn pyarrow_nullable_statistics(column: usize) -> Option> min_value: Some(0), max_value: Some(9), }), + 11 => Box::new(PrimitiveStatistics:: { + data_type: DataType::Timestamp(TimeUnit::Second, None), + distinct_count: None, + null_count: Some(3), + min_value: Some(0), + max_value: Some(9), + }), _ => unreachable!(), }) } @@ -607,11 +616,11 @@ pub fn pyarrow_struct(column: usize) -> Box { pub fn pyarrow_struct_statistics(column: usize) -> Option> { match column { - 0 => Some(Box::new(Utf8Statistics { + 0 => Some(Box::new(BooleanStatistics { distinct_count: None, - null_count: Some(1), - min_value: Some("".to_string()), - max_value: Some("def".to_string()), + null_count: Some(4), + min_value: Some(false), + max_value: Some(true), })), 1 => Some(Box::new(BooleanStatistics { distinct_count: None, diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 470fe1f782b..794b9b24160 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -345,6 +345,11 @@ fn v1_timestamp_us_nullable() -> Result<()> { test_pyarrow_integration(10, 1, "basic", false, false, None) } +#[test] +fn v1_timestamp_s_nullable() -> Result<()> { + test_pyarrow_integration(11, 1, "basic", false, false, None) +} + #[test] fn v2_decimal_26_required() -> Result<()> { test_pyarrow_integration(8, 2, "basic", false, true, None)