Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved stats and reading timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 4, 2022
1 parent 99736ea commit a97f635
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 50 deletions.
2 changes: 2 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def case_basic_nullable(size=1):
pa.field("decimal_18", pa.decimal128(18, 0)),
pa.field("decimal_26", pa.decimal128(26, 0)),
pa.field("timestamp_us", pa.timestamp("us")),
pa.field("timestamp_s", pa.timestamp("s")),
]
schema = pa.schema(fields)

Expand All @@ -45,6 +46,7 @@ def case_basic_nullable(size=1):
"decimal_18": decimal * size,
"decimal_26": decimal * size,
"timestamp_us": int64 * size,
"timestamp_s": int64 * size,
},
schema,
f"basic_nullable_{size*10}.parquet",
Expand Down
11 changes: 1 addition & 10 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>(
chunk_size: usize,
) -> Result<ArrayIter<'a>> {
use DataType::*;

match data_type.to_logical_type() {
Null => Ok(null::iter_to_arrays(pages, data_type, chunk_size)),
Boolean => Ok(boolean::iter_to_arrays(pages, data_type, chunk_size)),
Expand Down Expand Up @@ -670,16 +671,6 @@ where
})
}

// [Struct<Int, Utf8>, List<Int>, Bool]
// => [Struct(Int), Struct(Utf8), List(Int), Bool]
// [Struct<Struct<Int>, Utf8>, List<Int>, Bool]
// => [Struct(Struct(Int)), Struct(Utf8), List(Int), Bool]
// [List<Struct<Int, Bool>>]
// => [List(Struct(Int)), List(Struct(Bool))]
// [Struct<Struct<Int, Bool>, Utf8>]
// => [Struct(Int), Struct(Bool)]
// => [Struct(Struct(Int)), Struct(Struct(Bool)), Struct(Utf8)]

fn field_to_init(field: &Field) -> Vec<InitNested> {
use crate::datatypes::PhysicalType::*;
match field.data_type.to_physical_type() {
Expand Down
7 changes: 6 additions & 1 deletion src/io/parquet/read/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,12 @@ pub(super) fn finish<T: NativeType>(
values: Vec<T>,
validity: MutableBitmap,
) -> MutablePrimitiveArray<T> {
MutablePrimitiveArray::from_data(data_type.clone(), values, validity.into())
let validity = if validity.is_empty() {
None
} else {
Some(validity)
};
MutablePrimitiveArray::from_data(data_type.clone(), values, validity)
}

/// An iterator adapter over [`DataPages`] assumed to be encoded as primitive arrays
Expand Down
9 changes: 4 additions & 5 deletions src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,15 @@ impl Iterator for RowGroupDeserializer {
}
}

/// Returns all the column metadata in `row_group` associated to `field_name`.
fn get_field_columns<'a>(
/// Returns all the parquet columns associated to `field_name`.
/// For non-nested parquet types, this returns a single column
pub(super) fn get_field_columns<'a>(
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<&'a ColumnChunkMetaData> {
columns
.iter()
.enumerate()
.filter(|x| x.1.descriptor().path_in_schema()[0] == field_name)
.map(|x| x.1)
.filter(|x| x.descriptor().path_in_schema()[0] == field_name)
.collect()
}

Expand Down
9 changes: 4 additions & 5 deletions src/io/parquet/read/statistics/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ use std::any::Any;
use std::convert::TryFrom;

use crate::datatypes::DataType;
use parquet2::schema::types::ParquetType;
use parquet2::statistics::BinaryStatistics as ParquetByteArrayStatistics;

use super::super::schema;
use super::Statistics;
use crate::error::{ArrowError, Result};

/// Represents a `Binary` or `LargeBinary`
#[derive(Debug, Clone, PartialEq)]
pub struct BinaryStatistics {
pub null_count: Option<i64>,
Expand Down Expand Up @@ -87,14 +86,14 @@ impl TryFrom<&ParquetByteArrayStatistics> for Utf8Statistics {

pub(super) fn statistics_from_byte_array(
stats: &ParquetByteArrayStatistics,
type_: &ParquetType,
data_type: DataType,
) -> Result<Box<dyn Statistics>> {
let data_type = schema::to_data_type(type_)?.unwrap();

use DataType::*;
Ok(match data_type {
Utf8 => Box::new(Utf8Statistics::try_from(stats)?),
LargeUtf8 => Box::new(Utf8Statistics::try_from(stats)?),
Binary => Box::new(BinaryStatistics::from(stats)),
LargeBinary => Box::new(BinaryStatistics::from(stats)),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Can't read {:?} from parquet",
Expand Down
63 changes: 52 additions & 11 deletions src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//! APIs exposing `parquet2`'s statistics as arrow's statistics.
use crate::datatypes::DataType;
use crate::error::ArrowError;
use std::any::Any;

use parquet2::metadata::ColumnChunkMetaData;
use parquet2::schema::types::PhysicalType;
use parquet2::statistics::PrimitiveStatistics as ParquetPrimitiveStatistics;
use parquet2::statistics::Statistics as ParquetStatistics;
use std::any::Any;

use crate::datatypes::DataType;
use crate::datatypes::Field;
use crate::error::ArrowError;
use crate::error::Result;

mod primitive;
Expand All @@ -17,6 +20,8 @@ pub use boolean::*;
mod fixlen;
pub use fixlen::*;

use super::get_field_columns;

/// Trait representing a deserialized parquet statistics into arrow.
pub trait Statistics: std::fmt::Debug {
/// returns the [`DataType`] of the statistics.
Expand All @@ -41,19 +46,24 @@ impl PartialEq for Box<dyn Statistics> {
}
}

pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result<Box<dyn Statistics>> {
/// Deserializes [`ParquetStatistics`] into [`Statistics`] based on `data_type`.
/// This takes into account the Arrow schema declared in Parquet's schema
fn _deserialize_statistics(
stats: &dyn ParquetStatistics,
data_type: DataType,
) -> Result<Box<dyn Statistics>> {
match stats.physical_type() {
PhysicalType::Int32 => {
let stats = stats.as_any().downcast_ref().unwrap();
primitive::statistics_from_i32(stats, stats.descriptor.type_())
primitive::statistics_from_i32(stats, data_type)
}
PhysicalType::Int64 => {
let stats = stats.as_any().downcast_ref().unwrap();
primitive::statistics_from_i64(stats, stats.descriptor.type_())
primitive::statistics_from_i64(stats, data_type)
}
PhysicalType::ByteArray => {
let stats = stats.as_any().downcast_ref().unwrap();
binary::statistics_from_byte_array(stats, stats.descriptor.type_())
binary::statistics_from_byte_array(stats, data_type)
}
PhysicalType::Boolean => {
let stats = stats.as_any().downcast_ref().unwrap();
Expand All @@ -65,8 +75,7 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result<Box<dyn S
.downcast_ref::<ParquetPrimitiveStatistics<f32>>()
.unwrap();
Ok(Box::new(PrimitiveStatistics::<f32>::from((
stats,
DataType::Float32,
stats, data_type,
))))
}
PhysicalType::Double => {
Expand All @@ -75,8 +84,7 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result<Box<dyn S
.downcast_ref::<ParquetPrimitiveStatistics<f64>>()
.unwrap();
Ok(Box::new(PrimitiveStatistics::<f64>::from((
stats,
DataType::Float64,
stats, data_type,
))))
}
PhysicalType::FixedLenByteArray(_) => {
Expand All @@ -88,3 +96,36 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result<Box<dyn S
)),
}
}

fn get_fields(field: &Field) -> Vec<&Field> {
match field.data_type.to_logical_type() {
DataType::List(inner) => get_fields(inner),
DataType::LargeList(inner) => get_fields(inner),
DataType::Struct(fields) => fields.iter().map(get_fields).flatten().collect(),
_ => vec![field],
}
}

/// Deserializes [`ParquetStatistics`] into [`Statistics`] associated to `field`
///
/// For non-nested types, it returns a single column.
/// For nested types, it returns one column per parquet primitive column.
pub fn deserialize_statistics(
field: &Field,
columns: &[ColumnChunkMetaData],
) -> Result<Vec<Option<Box<dyn Statistics>>>> {
let columns = get_field_columns(columns, field.name.as_ref());

let fields = get_fields(field);

columns
.into_iter()
.zip(fields.into_iter())
.map(|(column, field)| {
column
.statistics()
.map(|x| _deserialize_statistics(x?.as_ref(), field.data_type.clone()))
.transpose()
})
.collect()
}
57 changes: 49 additions & 8 deletions src/io/parquet/read/statistics/primitive.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::datatypes::TimeUnit;
use crate::{datatypes::DataType, types::NativeType};
use parquet2::schema::types::ParquetType;
use parquet2::schema::types::{
LogicalType, ParquetType, TimeUnit as ParquetTimeUnit, TimestampType,
};
use parquet2::statistics::PrimitiveStatistics as ParquetPrimitiveStatistics;
use parquet2::types::NativeType as ParquetNativeType;
use std::any::Any;

use super::super::schema;
use super::Statistics;
use crate::error::Result;

Expand Down Expand Up @@ -50,10 +52,8 @@ where

pub(super) fn statistics_from_i32(
stats: &ParquetPrimitiveStatistics<i32>,
type_: &ParquetType,
data_type: DataType,
) -> Result<Box<dyn Statistics>> {
let data_type = schema::to_data_type(type_)?.unwrap();

use DataType::*;
Ok(match data_type {
UInt8 => {
Expand All @@ -68,17 +68,58 @@ pub(super) fn statistics_from_i32(
})
}

fn timestamp(type_: &ParquetType, time_unit: TimeUnit, x: i64) -> i64 {
let logical_type = if let ParquetType::PrimitiveType { logical_type, .. } = type_ {
logical_type
} else {
unreachable!()
};

let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type {
unit
} else {
return x;
};

match (unit, time_unit) {
(ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => x / 1_000,
(ParquetTimeUnit::MICROS(_), TimeUnit::Second) => x / 1_000_000,
(ParquetTimeUnit::NANOS(_), TimeUnit::Second) => x * 1_000_000_000,

(ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => x,
(ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => x / 1_000,
(ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => x / 1_000_000,

(ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => x * 1_000,
(ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => x,
(ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => x / 1_000,

(ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => x * 1_000_000,
(ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => x * 1_000,
(ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => x,
}
}

pub(super) fn statistics_from_i64(
stats: &ParquetPrimitiveStatistics<i64>,
type_: &ParquetType,
data_type: DataType,
) -> Result<Box<dyn Statistics>> {
let data_type = schema::to_data_type(type_)?.unwrap();

use DataType::*;
Ok(match data_type {
UInt64 => {
Box::new(PrimitiveStatistics::<u64>::from((stats, data_type))) as Box<dyn Statistics>
}
Timestamp(time_unit, None) => Box::new(PrimitiveStatistics::<i64> {
data_type,
null_count: stats.null_count,
distinct_count: stats.distinct_count,
min_value: stats
.min_value
.map(|x| timestamp(stats.descriptor.type_(), time_unit, x)),
max_value: stats
.max_value
.map(|x| timestamp(stats.descriptor.type_(), time_unit, x)),
}),
Decimal(_, _) => Box::new(PrimitiveStatistics::<i128>::from((stats, data_type))),
_ => Box::new(PrimitiveStatistics::<i64>::from((stats, data_type))),
})
Expand Down
29 changes: 19 additions & 10 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@ pub fn read_column<R: Read + Seek>(
column: usize,
) -> Result<ArrayStats> {
let metadata = read_metadata(&mut reader)?;
let schema = get_schema(&metadata)?;

let mut reader = FileReader::try_new(reader, Some(&[column]), None, None, None)?;

let statistics = metadata.row_groups[row_group]
.column(column)
.statistics()
.map(|x| statistics::deserialize_statistics(x?.as_ref()))
.transpose()?;
let field = &schema.fields[column];

let mut statistics = deserialize_statistics(field, metadata.row_groups[row_group].columns())?;

Ok((
reader.next().unwrap()?.into_arrays().pop().unwrap(),
statistics,
statistics.pop().unwrap(),
))
}

Expand Down Expand Up @@ -328,6 +327,9 @@ pub fn pyarrow_nullable(column: usize) -> Box<dyn Array> {
PrimitiveArray::<i64>::from(i64_values)
.to(DataType::Timestamp(TimeUnit::Microsecond, None)),
),
11 => Box::new(
PrimitiveArray::<i64>::from(i64_values).to(DataType::Timestamp(TimeUnit::Second, None)),
),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -406,6 +408,13 @@ pub fn pyarrow_nullable_statistics(column: usize) -> Option<Box<dyn Statistics>>
min_value: Some(0),
max_value: Some(9),
}),
11 => Box::new(PrimitiveStatistics::<i64> {
data_type: DataType::Timestamp(TimeUnit::Second, None),
distinct_count: None,
null_count: Some(3),
min_value: Some(0),
max_value: Some(9),
}),
_ => unreachable!(),
})
}
Expand Down Expand Up @@ -607,11 +616,11 @@ pub fn pyarrow_struct(column: usize) -> Box<dyn Array> {

pub fn pyarrow_struct_statistics(column: usize) -> Option<Box<dyn Statistics>> {
match column {
0 => Some(Box::new(Utf8Statistics {
0 => Some(Box::new(BooleanStatistics {
distinct_count: None,
null_count: Some(1),
min_value: Some("".to_string()),
max_value: Some("def".to_string()),
null_count: Some(4),
min_value: Some(false),
max_value: Some(true),
})),
1 => Some(Box::new(BooleanStatistics {
distinct_count: None,
Expand Down
5 changes: 5 additions & 0 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ fn v1_timestamp_us_nullable() -> Result<()> {
test_pyarrow_integration(10, 1, "basic", false, false, None)
}

#[test]
fn v1_timestamp_s_nullable() -> Result<()> {
test_pyarrow_integration(11, 1, "basic", false, false, None)
}

#[test]
fn v2_decimal_26_required() -> Result<()> {
test_pyarrow_integration(8, 2, "basic", false, true, None)
Expand Down

0 comments on commit a97f635

Please sign in to comment.