From 81ce186fde587778b8be333aaf785d42d6083a18 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 16:33:06 -0700 Subject: [PATCH] Add options to infer_schema --- src/io/parquet/read/schema/convert.rs | 101 +++++++++++++++++++------- src/io/parquet/read/schema/mod.rs | 29 +++++++- 2 files changed, 102 insertions(+), 28 deletions(-) diff --git a/src/io/parquet/read/schema/convert.rs b/src/io/parquet/read/schema/convert.rs index 821d5107649..31b9f3af762 100644 --- a/src/io/parquet/read/schema/convert.rs +++ b/src/io/parquet/read/schema/convert.rs @@ -1,4 +1,4 @@ -//! This module has a single entry point, [`parquet_to_arrow_schema`]. +//! This module has entry points, [`parquet_to_arrow_schema`] and the more configurable [`parquet_to_arrow_schema_with_options`]. use parquet2::schema::{ types::{ FieldInfo, GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType, @@ -8,11 +8,17 @@ use parquet2::schema::{ }; use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; +use crate::io::parquet::read::schema::SchemaInferenceOptions; /// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain /// any physical column. pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec { - fields.iter().filter_map(to_field).collect::>() + parquet_to_arrow_schema_with_options(fields, &None) +} + +/// Like [`parquet_to_arrow_schema`] but with configurable options which affect the behavior of schema inference +pub fn parquet_to_arrow_schema_with_options(fields: &[ParquetType], options: &Option) -> Vec { + fields.iter().filter_map(|f| to_field(f, options.as_ref().unwrap_or(&Default::default()))).collect::>() } fn from_int32( @@ -169,7 +175,7 @@ fn from_fixed_len_byte_array( } /// Maps a [`PhysicalType`] with optional metadata to a [`DataType`] -fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType { +fn to_primitive_type_inner(primitive_type: &PrimitiveType, options: &SchemaInferenceOptions) -> DataType { match primitive_type.physical_type { PhysicalType::Boolean => DataType::Boolean, PhysicalType::Int32 => { @@ -178,7 +184,7 @@ fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType { PhysicalType::Int64 => { from_int64(primitive_type.logical_type, primitive_type.converted_type) } - PhysicalType::Int96 => DataType::Timestamp(TimeUnit::Nanosecond, None), + PhysicalType::Int96 => DataType::Timestamp(options.int96_coerce_to_timeunit, None), PhysicalType::Float => DataType::Float32, PhysicalType::Double => DataType::Float64, PhysicalType::ByteArray => { @@ -195,8 +201,8 @@ fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType { /// Entry point for converting parquet primitive type to arrow type. /// /// This function takes care of repetition. -fn to_primitive_type(primitive_type: &PrimitiveType) -> DataType { - let base_type = to_primitive_type_inner(primitive_type); +fn to_primitive_type(primitive_type: &PrimitiveType, options: &SchemaInferenceOptions) -> DataType { + let base_type = to_primitive_type_inner(primitive_type, options); if primitive_type.field_info.repetition == Repetition::Repeated { DataType::List(Box::new(Field::new( @@ -214,23 +220,24 @@ fn non_repeated_group( converted_type: &Option, fields: &[ParquetType], parent_name: &str, + options: &SchemaInferenceOptions, ) -> Option { debug_assert!(!fields.is_empty()); match (logical_type, converted_type) { - (Some(GroupLogicalType::List), _) => to_list(fields, parent_name), - (None, Some(GroupConvertedType::List)) => to_list(fields, parent_name), - (Some(GroupLogicalType::Map), _) => to_list(fields, parent_name), + (Some(GroupLogicalType::List), _) => to_list(fields, parent_name, options), + (None, Some(GroupConvertedType::List)) => to_list(fields, parent_name, options), + (Some(GroupLogicalType::Map), _) => to_list(fields, parent_name, options), (None, Some(GroupConvertedType::Map) | Some(GroupConvertedType::MapKeyValue)) => { - to_map(fields) + to_map(fields, options) } - _ => to_struct(fields), + _ => to_struct(fields, options), } } /// Converts a parquet group type to an arrow [`DataType::Struct`]. /// Returns [`None`] if all its fields are empty -fn to_struct(fields: &[ParquetType]) -> Option { - let fields = fields.iter().filter_map(to_field).collect::>(); +fn to_struct(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option { + let fields = fields.iter().filter_map(|f| to_field(f, options)).collect::>(); if fields.is_empty() { None } else { @@ -240,8 +247,8 @@ fn to_struct(fields: &[ParquetType]) -> Option { /// Converts a parquet group type to an arrow [`DataType::Struct`]. /// Returns [`None`] if all its fields are empty -fn to_map(fields: &[ParquetType]) -> Option { - let inner = to_field(&fields[0])?; +fn to_map(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option { + let inner = to_field(&fields[0], options)?; Some(DataType::Map(Box::new(inner), false)) } @@ -254,16 +261,17 @@ fn to_group_type( converted_type: &Option, fields: &[ParquetType], parent_name: &str, + options: &SchemaInferenceOptions, ) -> Option { debug_assert!(!fields.is_empty()); if field_info.repetition == Repetition::Repeated { Some(DataType::List(Box::new(Field::new( &field_info.name, - to_struct(fields)?, + to_struct(fields, options)?, is_nullable(field_info), )))) } else { - non_repeated_group(logical_type, converted_type, fields, parent_name) + non_repeated_group(logical_type, converted_type, fields, parent_name, options) } } @@ -279,10 +287,10 @@ pub(crate) fn is_nullable(field_info: &FieldInfo) -> bool { /// Converts parquet schema to arrow field. /// Returns `None` iff the parquet type has no associated primitive types, /// i.e. if it is a column-less group type. -fn to_field(type_: &ParquetType) -> Option { +fn to_field(type_: &ParquetType, options: &SchemaInferenceOptions) -> Option { Some(Field::new( &type_.get_field_info().name, - to_data_type(type_)?, + to_data_type(type_, options)?, is_nullable(type_.get_field_info()), )) } @@ -291,11 +299,11 @@ fn to_field(type_: &ParquetType) -> Option { /// /// To fully understand this algorithm, please refer to /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md). -fn to_list(fields: &[ParquetType], parent_name: &str) -> Option { +fn to_list(fields: &[ParquetType], parent_name: &str, options: &SchemaInferenceOptions) -> Option { let item = fields.first().unwrap(); let item_type = match item { - ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive)), + ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive, options)), ParquetType::GroupType { fields, .. } => { if fields.len() == 1 && item.name() != "array" @@ -303,9 +311,9 @@ fn to_list(fields: &[ParquetType], parent_name: &str) -> Option { { // extract the repetition field let nested_item = fields.first().unwrap(); - to_data_type(nested_item) + to_data_type(nested_item, options) } else { - to_struct(fields) + to_struct(fields, options) } } }?; @@ -346,9 +354,9 @@ fn to_list(fields: &[ParquetType], parent_name: &str) -> Option { /// /// If this schema is a group type and none of its children is reserved in the /// conversion, the result is Ok(None). -pub(crate) fn to_data_type(type_: &ParquetType) -> Option { +pub(crate) fn to_data_type(type_: &ParquetType, options: &SchemaInferenceOptions) -> Option { match type_ { - ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive)), + ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive, options)), ParquetType::GroupType { field_info, logical_type, @@ -364,6 +372,7 @@ pub(crate) fn to_data_type(type_: &ParquetType) -> Option { converted_type, fields, &field_info.name, + options, ) } } @@ -973,4 +982,46 @@ mod tests { assert_eq!(arrow_fields, fields); Ok(()) } + + #[test] + fn test_int96_options() -> Result<()> { + for tu in [TimeUnit::Second, TimeUnit::Microsecond, TimeUnit::Millisecond, TimeUnit::Nanosecond] { + let message_type = " + message arrow_schema { + REQUIRED INT96 int96_field; + OPTIONAL GROUP int96_list (LIST) { + REPEATED GROUP list { + OPTIONAL INT96 element; + } + } + REQUIRED GROUP int96_struct { + REQUIRED INT96 int96_field; + } + } + "; + let coerced_to = DataType::Timestamp(tu, None); + let arrow_fields = vec![ + Field::new("int96_field", coerced_to.clone(), false), + Field::new( + "int96_list", + DataType::List(Box::new(Field::new("element", coerced_to.clone(), true))), + true, + ), + Field::new( + "int96_struct", + DataType::Struct(vec![ + Field::new("int96_field", coerced_to.clone(), false), + ]), + false, + ), + ]; + + let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; + let fields = parquet_to_arrow_schema_with_options(parquet_schema.fields(), &Some(SchemaInferenceOptions{ + int96_coerce_to_timeunit: tu, + })); + assert_eq!(arrow_fields, fields); + } + Ok(()) + } } diff --git a/src/io/parquet/read/schema/mod.rs b/src/io/parquet/read/schema/mod.rs index d47055ef6aa..4c042b033f9 100644 --- a/src/io/parquet/read/schema/mod.rs +++ b/src/io/parquet/read/schema/mod.rs @@ -1,11 +1,11 @@ //! APIs to handle Parquet <-> Arrow schemas. -use crate::datatypes::Schema; +use crate::datatypes::{Schema, TimeUnit}; use crate::error::Result; mod convert; mod metadata; -pub use convert::parquet_to_arrow_schema; +pub use convert::parquet_to_arrow_schema_with_options; pub use metadata::read_schema_from_metadata; pub use parquet2::metadata::{FileMetaData, KeyValue, SchemaDescriptor}; pub use parquet2::schema::types::ParquetType; @@ -14,6 +14,24 @@ pub(crate) use convert::*; use self::metadata::parse_key_value_metadata; +/// Options when inferring schemas from Parquet +pub struct SchemaInferenceOptions { + /// When inferring schemas from the Parquet INT96 timestamp type, this is the corresponding TimeUnit + /// in the inferred Arrow Timestamp type. + /// + /// This defaults to `TimeUnit::Nanosecond`, but INT96 timestamps outside of the range of years 1678-2262, + /// will overflow when parsed as `Timestamp(TimeUnit::Nanosecond)`. Setting this to a lower resolution + /// (e.g. TimeUnit::Milliseconds) will result in loss of precision, but support a larger range of dates + /// without overflowing when parsing the data. + pub int96_coerce_to_timeunit: TimeUnit, +} + +impl Default for SchemaInferenceOptions { + fn default() -> Self { + SchemaInferenceOptions { int96_coerce_to_timeunit: TimeUnit::Nanosecond } + } +} + /// Infers a [`Schema`] from parquet's [`FileMetaData`]. This first looks for the metadata key /// `"ARROW:schema"`; if it does not exist, it converts the parquet types declared in the /// file's parquet schema to Arrow's equivalent. @@ -21,11 +39,16 @@ use self::metadata::parse_key_value_metadata; /// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded, /// indicating that that the file's arrow metadata was incorrectly written. pub fn infer_schema(file_metadata: &FileMetaData) -> Result { + infer_schema_with_options(file_metadata, &None) +} + +/// Like [`infer_schema`] but with configurable options which affects the behavior of inference +pub fn infer_schema_with_options(file_metadata: &FileMetaData, options: &Option) -> Result { let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata()); let schema = read_schema_from_metadata(&mut metadata)?; Ok(schema.unwrap_or_else(|| { - let fields = parquet_to_arrow_schema(file_metadata.schema().fields()); + let fields = parquet_to_arrow_schema_with_options(file_metadata.schema().fields(), options); Schema { fields, metadata } })) }