diff --git a/examples/parquet_read_async.rs b/examples/parquet_read_async.rs index 3e155065882..86f064da20d 100644 --- a/examples/parquet_read_async.rs +++ b/examples/parquet_read_async.rs @@ -25,7 +25,7 @@ async fn main() -> Result<()> { // this operation is usually done before reading the data, during planning. // This is a mix of IO and CPU-bounded tasks but both of them are O(1) let metadata = read::read_metadata_async(&mut reader).await?; - let schema = read::get_schema(&metadata)?; + let schema = read::infer_schema(&metadata)?; // This factory yields one file descriptor per column and is used to read columns concurrently. // They do not need to be buffered since we execute exactly 1 seek and 1 read on them. diff --git a/examples/parquet_read_parallel/src/main.rs b/examples/parquet_read_parallel/src/main.rs index 81736c22eec..4e1a60b401f 100644 --- a/examples/parquet_read_parallel/src/main.rs +++ b/examples/parquet_read_parallel/src/main.rs @@ -11,7 +11,7 @@ use arrow2::{array::Array, chunk::Chunk, error::Result, io::parquet::read}; fn parallel_read(path: &str, row_group: usize) -> Result>> { let mut file = BufReader::new(File::open(path)?); let metadata = read::read_metadata(&mut file)?; - let schema = read::get_schema(&metadata)?; + let schema = read::infer_schema(&metadata)?; // read (IO-bounded) all columns into memory (use a subset of the fields to project) let columns = read::read_columns( diff --git a/guide/src/io/parquet_read.md b/guide/src/io/parquet_read.md index 7ced401c849..597d5ddc239 100644 --- a/guide/src/io/parquet_read.md +++ b/guide/src/io/parquet_read.md @@ -44,3 +44,51 @@ by delegating all CPU-intensive tasks to separate threads. This can of course be reversed; in configurations where IO is bounded (e.g. when a network is involved), we can use multiple producers of pages, potentially divided in file readers, and a single consumer that performs all CPU-intensive work. + +## Apache Arrow <-> Apache Parquet + +Arrow and Parquet are two different formats that declare different physical and logical types. +When reading Parquet, we must _infer_ to which types we are reading the data to. +This inference is based on Parquet's physical, logical and converted types. + +When a logical type is defined, we use it as follows: + +| `Parquet` | `Parquet logical` | `DataType` | +| ----------------- | ----------------- | ------------- | +| Int32 | Int8 | Int8 | +| Int32 | Int16 | Int16 | +| Int32 | Int32 | Int32 | +| Int32 | UInt8 | UInt8 | +| Int32 | UInt16 | UInt16 | +| Int32 | UInt32 | UInt32 | +| Int32 | Decimal | Decimal | +| Int32 | Date | Date32 | +| Int32 | Time(ms) | Time32(ms) | +| Int64 | Int64 | Int64 | +| Int64 | UInt64 | UInt64 | +| Int64 | Time(us) | Time64(us) | +| Int64 | Time(ns) | Time64(ns) | +| Int64 | Timestamp(\_) | Timestamp(\_) | +| Int64 | Decimal | Decimal | +| ByteArray | Utf8 | Utf8 | +| ByteArray | JSON | Binary | +| ByteArray | BSON | Binary | +| ByteArray | ENUM | Binary | +| ByteArray | Decimal | Decimal | +| FixedLenByteArray | Decimal | Decimal | + +When a a logical type is not defined but a converted type is defined, we use +the equivalent convertion as above, mutatis mutandis. + +When neither is defined, we fall back to the physical representation: + +| `Parquet` | `DataType` | +| ----------------- | --------------- | +| Boolean | Boolean | +| Int32 | Int32 | +| Int64 | Int64 | +| Int96 | Timestamp(ns) | +| Float | Float32 | +| Double | Float64 | +| ByteArray | Binary | +| FixedLenByteArray | FixedSizeBinary | diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index 451131bb6c9..474fbc89200 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -10,7 +10,7 @@ use crate::{ error::{ArrowError, Result}, }; -use super::{get_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData}; +use super::{infer_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData}; type GroupFilter = Arc bool>; @@ -47,7 +47,7 @@ impl FileReader { ) -> Result { let metadata = read_metadata(&mut reader)?; - let schema = get_schema(&metadata)?; + let schema = infer_schema(&metadata)?; let schema_metadata = schema.metadata; let fields: Vec = if let Some(projection) = &projection { diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 278610e298c..4a4c6be60f0 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -54,7 +54,7 @@ mod utils; pub use file::{FileReader, RowGroupReader}; pub use row_group::*; pub(crate) use schema::is_type_nullable; -pub use schema::{get_schema, FileMetaData}; +pub use schema::{infer_schema, FileMetaData}; use self::nested_utils::{InitNested, NestedArrayIter, NestedState}; use deserialize::page_iter_to_arrays; diff --git a/src/io/parquet/read/schema/convert.rs b/src/io/parquet/read/schema/convert.rs index 459e9f27e35..4fea0db5994 100644 --- a/src/io/parquet/read/schema/convert.rs +++ b/src/io/parquet/read/schema/convert.rs @@ -1,120 +1,79 @@ -// Convert a parquet schema into Arrow schema -use parquet2::{ - metadata::{KeyValue, SchemaDescriptor}, - schema::{ - types::{ - BasicTypeInfo, GroupConvertedType, LogicalType, ParquetType, PhysicalType, - PrimitiveConvertedType, TimeUnit as ParquetTimeUnit, TimestampType, - }, - Repetition, +//! This module has a single entry point, [`parquet_to_arrow_schema`]. +use parquet2::schema::{ + types::{ + BasicTypeInfo, GroupConvertedType, LogicalType, ParquetType, PhysicalType, + PrimitiveConvertedType, TimeUnit as ParquetTimeUnit, TimestampType, }, + Repetition, }; -use crate::datatypes::{DataType, Field, IntervalUnit, Metadata, Schema, TimeUnit}; -use crate::error::{ArrowError, Result}; - -fn parse_key_value_metadata(key_value_metadata: &Option>) -> Option { - key_value_metadata.as_ref().and_then(|key_values| { - let map: Metadata = key_values - .iter() - .filter_map(|kv| { - kv.value - .as_ref() - .map(|value| (kv.key.clone(), value.clone())) - }) - .collect(); - - if map.is_empty() { - None - } else { - Some(map) - } - }) -} - -/// Convert parquet schema to arrow schema -pub fn parquet_to_arrow_schema( - schema: &SchemaDescriptor, - key_value_metadata: &Option>, -) -> Result { - let metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); +use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; - schema - .fields() - .iter() - .map(to_field) - .filter_map(|x| x.transpose()) - .collect::>>() - .map(|fields| Schema { fields, metadata }) +/// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain +/// any physical column. +pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec { + fields.iter().map(to_field).flatten().collect::>() } -pub fn from_int32( +fn from_int32( logical_type: &Option, converted_type: &Option, -) -> Result { +) -> DataType { match (logical_type, converted_type) { - (None, None) => Ok(DataType::Int32), + // handle logical types first (Some(LogicalType::INTEGER(t)), _) => match (t.bit_width, t.is_signed) { - (8, true) => Ok(DataType::Int8), - (16, true) => Ok(DataType::Int16), - (32, true) => Ok(DataType::Int32), - (8, false) => Ok(DataType::UInt8), - (16, false) => Ok(DataType::UInt16), - (32, false) => Ok(DataType::UInt32), - _ => Err(ArrowError::ExternalFormat(format!( - "Cannot create INT32 physical type from {:?}", - t - ))), + (8, true) => DataType::Int8, + (16, true) => DataType::Int16, + (32, true) => DataType::Int32, + (8, false) => DataType::UInt8, + (16, false) => DataType::UInt16, + (32, false) => DataType::UInt32, + // The above are the only possible annotations for parquet's int32. Anything else + // is a deviation to the parquet specification and we ignore + _ => DataType::Int32, }, (Some(LogicalType::DECIMAL(t)), _) => { - Ok(DataType::Decimal(t.precision as usize, t.scale as usize)) + DataType::Decimal(t.precision as usize, t.scale as usize) } - (Some(LogicalType::DATE(_)), _) => Ok(DataType::Date32), + (Some(LogicalType::DATE(_)), _) => DataType::Date32, (Some(LogicalType::TIME(t)), _) => match t.unit { - ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)), - _ => Err(ArrowError::ExternalFormat(format!( - "Cannot create INT32 physical type from {:?}", - t.unit - ))), + ParquetTimeUnit::MILLIS(_) => DataType::Time32(TimeUnit::Millisecond), + // MILLIS is the only possible annotation for parquet's int32. Anything else + // is a deviation to the parquet specification and we ignore + _ => DataType::Int32, }, - (None, Some(PrimitiveConvertedType::Uint8)) => Ok(DataType::UInt8), - (None, Some(PrimitiveConvertedType::Uint16)) => Ok(DataType::UInt16), - (None, Some(PrimitiveConvertedType::Uint32)) => Ok(DataType::UInt32), - (None, Some(PrimitiveConvertedType::Int8)) => Ok(DataType::Int8), - (None, Some(PrimitiveConvertedType::Int16)) => Ok(DataType::Int16), - (None, Some(PrimitiveConvertedType::Int32)) => Ok(DataType::Int32), - (None, Some(PrimitiveConvertedType::Date)) => Ok(DataType::Date32), - (None, Some(PrimitiveConvertedType::TimeMillis)) => { - Ok(DataType::Time32(TimeUnit::Millisecond)) - } - (None, Some(PrimitiveConvertedType::Decimal(precision, scale))) => { - Ok(DataType::Decimal(*precision as usize, *scale as usize)) + // handle converted types: + (_, Some(PrimitiveConvertedType::Uint8)) => DataType::UInt8, + (_, Some(PrimitiveConvertedType::Uint16)) => DataType::UInt16, + (_, Some(PrimitiveConvertedType::Uint32)) => DataType::UInt32, + (_, Some(PrimitiveConvertedType::Int8)) => DataType::Int8, + (_, Some(PrimitiveConvertedType::Int16)) => DataType::Int16, + (_, Some(PrimitiveConvertedType::Int32)) => DataType::Int32, + (_, Some(PrimitiveConvertedType::Date)) => DataType::Date32, + (_, Some(PrimitiveConvertedType::TimeMillis)) => DataType::Time32(TimeUnit::Millisecond), + (_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => { + DataType::Decimal(*precision as usize, *scale as usize) } - (logical, converted) => Err(ArrowError::ExternalFormat(format!( - "Unable to convert parquet INT32 logical type {:?} or converted type {:?}", - logical, converted - ))), + (_, _) => DataType::Int32, } } -pub fn from_int64( +fn from_int64( logical_type: &Option, converted_type: &Option, -) -> Result { - Ok(match (converted_type, logical_type) { - (None, None) => DataType::Int64, - (_, Some(LogicalType::INTEGER(t))) if t.bit_width == 64 => match t.is_signed { +) -> DataType { + match (logical_type, converted_type) { + // handle logical types first + (Some(LogicalType::INTEGER(t)), _) if t.bit_width == 64 => match t.is_signed { true => DataType::Int64, false => DataType::UInt64, }, - (Some(PrimitiveConvertedType::Int64), None) => DataType::Int64, - (Some(PrimitiveConvertedType::Uint64), None) => DataType::UInt64, ( - _, Some(LogicalType::TIMESTAMP(TimestampType { is_adjusted_to_u_t_c, unit, })), + _, ) => { let timezone = if *is_adjusted_to_u_t_c { // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md @@ -139,61 +98,52 @@ pub fn from_int64( ParquetTimeUnit::NANOS(_) => DataType::Timestamp(TimeUnit::Nanosecond, timezone), } } - // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - // *Backward compatibility:* - // TIME_MILLIS | TimeType (isAdjustedToUTC = true, unit = MILLIS) - // TIME_MICROS | TimeType (isAdjustedToUTC = true, unit = MICROS) - (Some(PrimitiveConvertedType::TimeMillis), None) => DataType::Time32(TimeUnit::Millisecond), - (Some(PrimitiveConvertedType::TimeMicros), None) => DataType::Time64(TimeUnit::Microsecond), - (Some(PrimitiveConvertedType::TimestampMillis), None) => { + (Some(LogicalType::TIME(t)), _) => match t.unit { + ParquetTimeUnit::MICROS(_) => DataType::Time64(TimeUnit::Microsecond), + ParquetTimeUnit::NANOS(_) => DataType::Time64(TimeUnit::Nanosecond), + // MILLIS is only possible for int32. Appearing in int64 is a deviation + // to parquet's spec, which we ignore + _ => DataType::Int64, + }, + (Some(LogicalType::DECIMAL(t)), _) => { + DataType::Decimal(t.precision as usize, t.scale as usize) + } + // handle converted types: + (_, Some(PrimitiveConvertedType::TimeMicros)) => DataType::Time64(TimeUnit::Microsecond), + (_, Some(PrimitiveConvertedType::TimestampMillis)) => { DataType::Timestamp(TimeUnit::Millisecond, None) } - (Some(PrimitiveConvertedType::TimestampMicros), None) => { + (_, Some(PrimitiveConvertedType::TimestampMicros)) => { DataType::Timestamp(TimeUnit::Microsecond, None) } - (_, Some(LogicalType::TIME(t))) => match t.unit { - ParquetTimeUnit::MILLIS(_) => { - return Err(ArrowError::ExternalFormat( - "Cannot create INT64 from MILLIS time unit".to_string(), - )) - } - ParquetTimeUnit::MICROS(_) => DataType::Time64(TimeUnit::Microsecond), - ParquetTimeUnit::NANOS(_) => DataType::Time64(TimeUnit::Nanosecond), - }, - (Some(PrimitiveConvertedType::Decimal(precision, scale)), _) => { + (_, Some(PrimitiveConvertedType::Int64)) => DataType::Int64, + (_, Some(PrimitiveConvertedType::Uint64)) => DataType::UInt64, + (_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => { DataType::Decimal(*precision as usize, *scale as usize) } - (c, l) => { - return Err(ArrowError::NotYetImplemented(format!( - "The conversion of (Int64, {:?}, {:?}) to arrow still not implemented", - c, l - ))) - } - }) + + (_, _) => DataType::Int64, + } } -pub fn from_byte_array( +fn from_byte_array( logical_type: &Option, converted_type: &Option, -) -> Result { +) -> DataType { match (logical_type, converted_type) { - (Some(LogicalType::STRING(_)), _) => Ok(DataType::Utf8), - (Some(LogicalType::JSON(_)), _) => Ok(DataType::Binary), - (Some(LogicalType::BSON(_)), _) => Ok(DataType::Binary), - (Some(LogicalType::ENUM(_)), _) => Ok(DataType::Binary), - (None, None) => Ok(DataType::Binary), - (None, Some(PrimitiveConvertedType::Json)) => Ok(DataType::Binary), - (None, Some(PrimitiveConvertedType::Bson)) => Ok(DataType::Binary), - (None, Some(PrimitiveConvertedType::Enum)) => Ok(DataType::Binary), - (None, Some(PrimitiveConvertedType::Utf8)) => Ok(DataType::Utf8), - (logical, converted) => Err(ArrowError::ExternalFormat(format!( - "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {:?}", - logical, converted - ))), + (Some(LogicalType::STRING(_)), _) => DataType::Utf8, + (Some(LogicalType::JSON(_)), _) => DataType::Binary, + (Some(LogicalType::BSON(_)), _) => DataType::Binary, + (Some(LogicalType::ENUM(_)), _) => DataType::Binary, + (_, Some(PrimitiveConvertedType::Json)) => DataType::Binary, + (_, Some(PrimitiveConvertedType::Bson)) => DataType::Binary, + (_, Some(PrimitiveConvertedType::Enum)) => DataType::Binary, + (_, Some(PrimitiveConvertedType::Utf8)) => DataType::Utf8, + (_, _) => DataType::Binary, } } -pub fn from_fixed_len_byte_array( +fn from_fixed_len_byte_array( length: &i32, logical_type: &Option, converted_type: &Option, @@ -215,25 +165,23 @@ pub fn from_fixed_len_byte_array( } } -/// Converting parquet primitive type to arrow data type. +/// Maps a [`PhysicalType`] with optional metadata to a [`DataType`] fn to_primitive_type_inner( physical_type: &PhysicalType, logical_type: &Option, converted_type: &Option, -) -> Result { +) -> DataType { match physical_type { - PhysicalType::Boolean => Ok(DataType::Boolean), + PhysicalType::Boolean => DataType::Boolean, PhysicalType::Int32 => from_int32(logical_type, converted_type), PhysicalType::Int64 => from_int64(logical_type, converted_type), - PhysicalType::Int96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)), - PhysicalType::Float => Ok(DataType::Float32), - PhysicalType::Double => Ok(DataType::Float64), + PhysicalType::Int96 => DataType::Timestamp(TimeUnit::Nanosecond, None), + PhysicalType::Float => DataType::Float32, + PhysicalType::Double => DataType::Float64, PhysicalType::ByteArray => from_byte_array(logical_type, converted_type), - PhysicalType::FixedLenByteArray(length) => Ok(from_fixed_len_byte_array( - length, - logical_type, - converted_type, - )), + PhysicalType::FixedLenByteArray(length) => { + from_fixed_len_byte_array(length, logical_type, converted_type) + } } } @@ -245,27 +193,27 @@ fn to_primitive_type( physical_type: &PhysicalType, logical_type: &Option, converted_type: &Option, -) -> Result> { - to_primitive_type_inner(physical_type, logical_type, converted_type).map(|dt| { - Some(if basic_info.repetition() == &Repetition::Repeated { - DataType::List(Box::new(Field::new( - basic_info.name(), - dt, - is_nullable(basic_info), - ))) - } else { - dt - }) - }) +) -> DataType { + let base_type = to_primitive_type_inner(physical_type, logical_type, converted_type); + + if basic_info.repetition() == &Repetition::Repeated { + DataType::List(Box::new(Field::new( + basic_info.name(), + base_type, + is_nullable(basic_info), + ))) + } else { + base_type + } } -/// Converting parquet primitive type to arrow data type. -fn to_group_type_inner( +fn non_repeated_group( logical_type: &Option, converted_type: &Option, fields: &[ParquetType], parent_name: &str, -) -> Result> { +) -> Option { + debug_assert!(!fields.is_empty()); match (logical_type, converted_type) { (Some(LogicalType::LIST(_)), _) => to_list(fields, parent_name), (None, Some(GroupConvertedType::List)) => to_list(fields, parent_name), @@ -273,20 +221,19 @@ fn to_group_type_inner( } } -/// Converts a parquet group type to arrow struct. -fn to_struct(fields: &[ParquetType]) -> Result> { - fields +/// Converts a parquet group type to an arrow [`DataType::Struct`]. +/// Returns [`None`] if all its fields are empty +fn to_struct(fields: &[ParquetType]) -> Option { + let fields = fields .iter() .map(to_field) - .collect::>>>() - .map(|result| result.into_iter().flatten().collect::>()) - .map(|fields| { - if fields.is_empty() { - None - } else { - Some(DataType::Struct(fields)) - } - }) + .flatten() + .collect::>(); + if fields.is_empty() { + None + } else { + Some(DataType::Struct(fields)) + } } /// Entry point for converting parquet group type. @@ -298,19 +245,16 @@ fn to_group_type( converted_type: &Option, fields: &[ParquetType], parent_name: &str, -) -> Result> { +) -> Option { + debug_assert!(!fields.is_empty()); if basic_info.repetition() == &Repetition::Repeated { - to_struct(fields).map(|opt| { - opt.map(|dt| { - DataType::List(Box::new(Field::new( - basic_info.name(), - dt, - is_nullable(basic_info), - ))) - }) - }) + Some(DataType::List(Box::new(Field::new( + basic_info.name(), + to_struct(fields)?, + is_nullable(basic_info), + )))) } else { - to_group_type_inner(logical_type, converted_type, fields, parent_name) + non_repeated_group(logical_type, converted_type, fields, parent_name) } } @@ -324,59 +268,53 @@ pub(crate) fn is_nullable(basic_info: &BasicTypeInfo) -> bool { } /// Converts parquet schema to arrow field. -fn to_field(type_: &ParquetType) -> Result> { - to_data_type(type_).map(|opt| { - opt.map(|dt| { - Field::new( - type_.get_basic_info().name(), - dt, - is_nullable(type_.get_basic_info()), - ) - }) - }) +/// Returns `None` iff the parquet type has no associated primitive types, +/// i.e. if it is a column-less group type. +fn to_field(type_: &ParquetType) -> Option { + Some(Field::new( + type_.get_basic_info().name(), + to_data_type(type_)?, + is_nullable(type_.get_basic_info()), + )) } /// Converts a parquet list to arrow list. /// /// To fully understand this algorithm, please refer to /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md). -fn to_list(fields: &[ParquetType], parent_name: &str) -> Result> { - let list_item = fields.first().unwrap(); +fn to_list(fields: &[ParquetType], parent_name: &str) -> Option { + let item = fields.first().unwrap(); - let item_type = match list_item { + let item_type = match item { ParquetType::PrimitiveType { - basic_info, physical_type, logical_type, converted_type, .. - } => { - if basic_info.repetition() == &Repetition::Repeated { - to_primitive_type_inner(physical_type, logical_type, converted_type).map(Some) - } else { - Err(ArrowError::ExternalFormat( - "PrimitiveArray element type of list must be repeated.".to_string(), - )) - } - } + } => Some(to_primitive_type_inner( + physical_type, + logical_type, + converted_type, + )), ParquetType::GroupType { fields, .. } => { if fields.len() == 1 - && list_item.name() != "array" - && list_item.name() != format!("{}_tuple", parent_name) + && item.name() != "array" + && item.name() != format!("{}_tuple", parent_name) { + // extract the repetition field let nested_item = fields.first().unwrap(); to_data_type(nested_item) } else { to_struct(fields) } } - }; + }?; // Check that the name of the list child is "list", in which case we // get the child nullability and name (normally "element") from the nested // group type. // Without this step, the child incorrectly inherits the parent's optionality - let (list_item_name, item_is_optional) = match list_item { + let (list_item_name, item_is_optional) = match item { ParquetType::GroupType { basic_info, fields, .. } if basic_info.name() == "list" && fields.len() == 1 => { @@ -387,14 +325,16 @@ fn to_list(fields: &[ParquetType], parent_name: &str) -> Result ) } _ => ( - list_item.name(), - list_item.get_basic_info().repetition() != &Repetition::Required, + item.name(), + item.get_basic_info().repetition() != &Repetition::Required, ), }; - item_type.map(|opt| { - opt.map(|dt| DataType::List(Box::new(Field::new(list_item_name, dt, item_is_optional)))) - }) + Some(DataType::List(Box::new(Field::new( + list_item_name, + item_type, + item_is_optional, + )))) } /// Converts parquet schema to arrow data type. @@ -406,31 +346,44 @@ fn to_list(fields: &[ParquetType], parent_name: &str) -> Result /// /// If this schema is a group type and none of its children is reserved in the /// conversion, the result is Ok(None). -pub(crate) fn to_data_type(type_: &ParquetType) -> Result> { +pub(crate) fn to_data_type(type_: &ParquetType) -> Option { match type_ { ParquetType::PrimitiveType { basic_info, physical_type, logical_type, converted_type, - } => to_primitive_type(basic_info, physical_type, logical_type, converted_type), - ParquetType::GroupType { + } => Some(to_primitive_type( basic_info, + physical_type, logical_type, converted_type, - fields, - } => to_group_type( + )), + ParquetType::GroupType { basic_info, logical_type, converted_type, fields, - basic_info.name(), - ), + } => { + if fields.is_empty() { + None + } else { + to_group_type( + basic_info, + logical_type, + converted_type, + fields, + basic_info.name(), + ) + } + } } } #[cfg(test)] mod tests { + use parquet2::metadata::SchemaDescriptor; + use super::*; use crate::datatypes::{DataType, Field, TimeUnit}; @@ -468,9 +421,9 @@ mod tests { ]; let parquet_schema = SchemaDescriptor::try_from_message(message)?; - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, &None)?; + let fields = parquet_to_arrow_schema(parquet_schema.fields()); - assert_eq!(converted_arrow_schema.fields, expected); + assert_eq!(fields, expected); Ok(()) } @@ -488,9 +441,9 @@ mod tests { ]; let parquet_schema = SchemaDescriptor::try_from_message(message)?; - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, &None)?; + let fields = parquet_to_arrow_schema(parquet_schema.fields()); - assert_eq!(converted_arrow_schema.fields, expected); + assert_eq!(fields, expected); Ok(()) } @@ -508,9 +461,9 @@ mod tests { ]; let parquet_schema = SchemaDescriptor::try_from_message(message)?; - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, &None)?; + let fields = parquet_to_arrow_schema(parquet_schema.fields()); - assert_eq!(converted_arrow_schema.fields, expected); + assert_eq!(fields, expected); Ok(()) } @@ -707,9 +660,9 @@ mod tests { } let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, &None)?; + let fields = parquet_to_arrow_schema(parquet_schema.fields()); - assert_eq!(arrow_fields, converted_arrow_schema.fields); + assert_eq!(arrow_fields, fields); Ok(()) } @@ -780,9 +733,9 @@ mod tests { } let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, &None)?; + let fields = parquet_to_arrow_schema(parquet_schema.fields()); - assert_eq!(arrow_fields, converted_arrow_schema.fields); + assert_eq!(arrow_fields, fields); Ok(()) } @@ -812,9 +765,9 @@ mod tests { "; let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, &None)?; + let fields = parquet_to_arrow_schema(parquet_schema.fields()); - assert_eq!(arrow_fields, converted_arrow_schema.fields); + assert_eq!(arrow_fields, fields); Ok(()) } @@ -862,9 +815,9 @@ mod tests { "; let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, &None)?; + let fields = parquet_to_arrow_schema(parquet_schema.fields()); - assert_eq!(arrow_fields, converted_arrow_schema.fields); + assert_eq!(arrow_fields, fields); Ok(()) } @@ -930,9 +883,9 @@ mod tests { ]; let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, &None)?; + let fields = parquet_to_arrow_schema(parquet_schema.fields()); - assert_eq!(arrow_fields, converted_arrow_schema.fields); + assert_eq!(arrow_fields, fields); Ok(()) } @@ -1025,33 +978,9 @@ mod tests { ]; let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, &None)?; - - assert_eq!(arrow_fields, converted_arrow_schema.fields); - Ok(()) - } - - #[test] - fn test_metadata() -> Result<()> { - let message_type = " - message test_schema { - OPTIONAL BINARY string (STRING); - } - "; - let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; - - let key_value_metadata = vec![ - KeyValue::new("foo".to_owned(), Some("bar".to_owned())), - KeyValue::new("baz".to_owned(), None), - ]; - - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, &Some(key_value_metadata))?; - - let mut expected_metadata = Metadata::new(); - expected_metadata.insert("foo".to_owned(), "bar".to_owned()); + let fields = parquet_to_arrow_schema(parquet_schema.fields()); - assert_eq!(converted_arrow_schema.metadata, expected_metadata); + assert_eq!(arrow_fields, fields); Ok(()) } } diff --git a/src/io/parquet/read/schema/metadata.rs b/src/io/parquet/read/schema/metadata.rs index a54406bc6a8..8a3e0f16da5 100644 --- a/src/io/parquet/read/schema/metadata.rs +++ b/src/io/parquet/read/schema/metadata.rs @@ -1,8 +1,6 @@ -use std::collections::HashMap; - pub use parquet2::metadata::KeyValue; -use crate::datatypes::Schema; +use crate::datatypes::{Metadata, Schema}; use crate::error::{ArrowError, Result}; use crate::io::ipc::read::deserialize_schema; @@ -11,10 +9,7 @@ use super::super::super::ARROW_SCHEMA_META_KEY; /// Reads an arrow schema from Parquet's file metadata. Returns `None` if no schema was found. /// # Errors /// Errors iff the schema cannot be correctly parsed. -pub fn read_schema_from_metadata( - key_value_metadata: &Option>, -) -> Result> { - let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); +pub fn read_schema_from_metadata(metadata: &mut Metadata) -> Result> { metadata .remove(ARROW_SCHEMA_META_KEY) .map(|encoded| get_arrow_schema_from_metadata(&encoded)) @@ -43,26 +38,18 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result { } } -fn parse_key_value_metadata( - key_value_metadata: &Option>, -) -> Option> { - match key_value_metadata { - Some(key_values) => { - let map: HashMap = key_values +pub(super) fn parse_key_value_metadata(key_value_metadata: &Option>) -> Metadata { + key_value_metadata + .as_ref() + .map(|key_values| { + key_values .iter() .filter_map(|kv| { kv.value .as_ref() .map(|value| (kv.key.clone(), value.clone())) }) - .collect(); - - if map.is_empty() { - None - } else { - Some(map) - } - } - None => None, - } + .collect() + }) + .unwrap_or_default() } diff --git a/src/io/parquet/read/schema/mod.rs b/src/io/parquet/read/schema/mod.rs index d499799b188..0c7e7d4d665 100644 --- a/src/io/parquet/read/schema/mod.rs +++ b/src/io/parquet/read/schema/mod.rs @@ -5,21 +5,28 @@ use crate::error::Result; mod convert; mod metadata; -pub use convert::parquet_to_arrow_schema; pub use metadata::read_schema_from_metadata; pub use parquet2::metadata::{FileMetaData, KeyValue, SchemaDescriptor}; pub use parquet2::schema::types::ParquetType; pub(crate) use convert::*; -/// Parses parquet's metadata into a [`Schema`]. This first looks for the metadata key -/// `"ARROW:schema"`; if it does not exist, it converts logical and converted parquet types to -/// Arrow equivalents. -pub fn get_schema(metadata: &FileMetaData) -> Result { - let schema = read_schema_from_metadata(metadata.key_value_metadata())?; - Ok(schema).transpose().unwrap_or_else(|| { - parquet_to_arrow_schema(metadata.schema(), metadata.key_value_metadata()) - }) +use self::metadata::parse_key_value_metadata; + +/// Infers a [`Schema`] from parquet's [`FileMetaData`]. This first looks for the metadata key +/// `"ARROW:schema"`; if it does not exist, it converts the parquet types declared in the +/// file's parquet schema to Arrow's equivalent. +/// # Error +/// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded, +/// indicating that that the file's arrow metadata was incorrectly written. +pub fn infer_schema(file_metadata: &FileMetaData) -> Result { + let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata()); + + let schema = read_schema_from_metadata(&mut metadata)?; + Ok(schema.unwrap_or_else(|| { + let fields = parquet_to_arrow_schema(file_metadata.schema().fields()); + Schema { fields, metadata } + })) } pub(crate) fn is_type_nullable(type_: &ParquetType) -> bool { diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 550a8f924b6..d5446a8c8b9 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -19,7 +19,7 @@ pub fn read_column( column: usize, ) -> Result { let metadata = read_metadata(&mut reader)?; - let schema = get_schema(&metadata)?; + let schema = infer_schema(&metadata)?; let mut reader = FileReader::try_new(reader, Some(&[column]), None, None, None)?;