From 02cda0e07f6d4098ac752cbbd6ff33bba015d78e Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 23 Oct 2021 05:53:32 +0000 Subject: [PATCH] Improved internals. --- src/io/parquet/read/binary/basic.rs | 35 +-------- src/io/parquet/read/binary/mod.rs | 1 + src/io/parquet/read/binary/nested.rs | 34 ++------ src/io/parquet/read/binary/utils.rs | 29 +++++++ src/io/parquet/read/boolean/nested.rs | 17 +--- src/io/parquet/read/mod.rs | 107 ++++++++++++++++---------- src/io/parquet/read/nested_utils.rs | 1 + src/io/parquet/read/primitive/mod.rs | 28 ++----- 8 files changed, 116 insertions(+), 136 deletions(-) create mode 100644 src/io/parquet/read/binary/utils.rs diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 655e77144eb..5a88b772522 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -7,7 +7,7 @@ use parquet2::{ }; use crate::{ - array::{Array, BinaryArray, Offset, Utf8Array}, + array::{Array, Offset}, bitmap::{utils::BitmapIter, MutableBitmap}, buffer::MutableBuffer, datatypes::DataType, @@ -15,6 +15,7 @@ use crate::{ }; use super::super::utils; +use super::utils::finish_array; /// Assumptions: No rep levels #[allow(clippy::too_many_arguments)] @@ -325,21 +326,7 @@ where )? } - Ok(match data_type { - DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data( - data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )), - DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data( - data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )), - _ => unreachable!(), - }) + Ok(finish_array(data_type.clone(), offsets, values, validity)) } pub async fn stream_to_array( @@ -371,19 +358,5 @@ where )? } - Ok(match data_type { - DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data( - data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )), - DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data( - data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )), - _ => unreachable!(), - }) + Ok(finish_array(data_type.clone(), offsets, values, validity)) } diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 1bfd2b04235..e37090c94f7 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -1,6 +1,7 @@ mod basic; mod dictionary; mod nested; +mod utils; pub use basic::iter_to_array; pub use basic::stream_to_array; diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index 0482074c480..8640d4c84b3 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -11,8 +11,9 @@ use parquet2::{ use super::super::nested_utils::*; use super::super::utils; use super::basic::read_plain_required; +use super::utils::finish_array; use crate::{ - array::{Array, BinaryArray, Offset, Utf8Array}, + array::{Array, Offset}, bitmap::MutableBitmap, buffer::MutableBuffer, datatypes::DataType, @@ -150,7 +151,7 @@ pub fn iter_to_array( mut iter: I, metadata: &ColumnChunkMetaData, data_type: DataType, -) -> Result> +) -> Result<(Arc, Vec>)> where O: Offset, ArrowError: From, @@ -176,32 +177,7 @@ where )? } - let inner_data_type = match data_type { - DataType::List(ref inner) => inner.data_type(), - DataType::LargeList(ref inner) => inner.data_type(), - _ => { - return Err(ArrowError::NotYetImplemented(format!( - "Read nested datatype {:?}", - data_type - ))) - } - }; - - let values = match inner_data_type { - DataType::LargeBinary | DataType::Binary => Arc::new(BinaryArray::from_data( - inner_data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )) as Arc, - DataType::LargeUtf8 | DataType::Utf8 => Arc::new(Utf8Array::from_data( - inner_data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )) as Arc, - _ => unreachable!(), - }; + let values = finish_array(data_type, offsets, values, validity).into(); - create_list(data_type, &mut nested, values) + Ok((values, nested)) } diff --git a/src/io/parquet/read/binary/utils.rs b/src/io/parquet/read/binary/utils.rs new file mode 100644 index 00000000000..3b72782b4e7 --- /dev/null +++ b/src/io/parquet/read/binary/utils.rs @@ -0,0 +1,29 @@ +use crate::{ + array::{Array, BinaryArray, Offset, Utf8Array}, + bitmap::MutableBitmap, + buffer::MutableBuffer, + datatypes::DataType, +}; + +pub(super) fn finish_array( + data_type: DataType, + offsets: MutableBuffer, + values: MutableBuffer, + validity: MutableBitmap, +) -> Box { + match data_type { + DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data( + data_type, + offsets.into(), + values.into(), + validity.into(), + )), + DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data( + data_type, + offsets.into(), + values.into(), + validity.into(), + )), + _ => unreachable!(), + } +} diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index 5d99bbb2f7d..ac89cac8d4f 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -135,7 +135,7 @@ pub fn iter_to_array( mut iter: I, metadata: &ColumnChunkMetaData, data_type: DataType, -) -> Result> +) -> Result<(Arc, Vec>)> where ArrowError: From, I: FallibleStreamingIterator, @@ -157,22 +157,11 @@ where )? } - let inner_data_type = match data_type { - DataType::List(ref inner) => inner.data_type(), - DataType::LargeList(ref inner) => inner.data_type(), - _ => { - return Err(ArrowError::NotYetImplemented(format!( - "Read nested datatype {:?}", - data_type - ))) - } - }; - let values = Arc::new(BooleanArray::from_data( - inner_data_type.clone(), + data_type, values.into(), validity.into(), )); - create_list(data_type, &mut nested, values) + Ok((values, nested)) } diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 1f6198c1e69..92f4d899f0e 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -1,4 +1,6 @@ //! APIs to read from Parquet format. +#![allow(clippy::type_complexity)] + use std::{ convert::TryInto, io::{Read, Seek}, @@ -28,6 +30,7 @@ use crate::{ array::{Array, DictionaryKey, PrimitiveArray}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{ArrowError, Result}, + io::parquet::read::nested_utils::create_list, }; mod binary; @@ -44,6 +47,8 @@ pub use record_batch::RecordReader; pub(crate) use schema::is_type_nullable; pub use schema::{get_schema, FileMetaData}; +use self::nested_utils::Nested; + /// Creates a new iterator of compressed pages. pub fn get_page_iterator<'b, RR: Read + Seek>( column_metadata: &ColumnChunkMetaData, @@ -165,6 +170,57 @@ fn dict_read< } } +fn page_iter_to_array_nested< + I: FallibleStreamingIterator, +>( + iter: &mut I, + metadata: &ColumnChunkMetaData, + data_type: DataType, +) -> Result<(Arc, Vec>)> { + use DataType::*; + match data_type { + UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8), + UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16), + UInt32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u32), + Int8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i8), + Int16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i16), + Int32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i32), + + Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() { + ParquetType::PrimitiveType { physical_type, .. } => match physical_type { + PhysicalType::Int96 => primitive::iter_to_array_nested( + iter, + metadata, + DataType::Timestamp(TimeUnit::Nanosecond, None), + int96_to_i64_ns, + ), + _ => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x), + }, + _ => unreachable!(), + }, + + // INT64 + Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { + primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x) + } + UInt64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x as u64), + + Float32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f32| x), + Float64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f64| x), + + Boolean => boolean::iter_to_array_nested(iter, metadata, data_type), + + Binary | Utf8 => binary::iter_to_array_nested::(iter, metadata, data_type), + LargeBinary | LargeUtf8 => { + binary::iter_to_array_nested::(iter, metadata, data_type) + } + other => Err(ArrowError::NotYetImplemented(format!( + "Reading {:?} from parquet still not implemented", + other + ))), + } +} + /// Converts an iterator of [`DataPage`] into a single [`Array`]. pub fn page_iter_to_array>( iter: &mut I, @@ -255,47 +311,16 @@ pub fn page_iter_to_array unreachable!(), }, - List(ref inner) => match inner.data_type() { - UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8), - UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16), - UInt32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u32), - Int8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i8), - Int16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i16), - Int32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i32), - - Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() { - ParquetType::PrimitiveType { physical_type, .. } => match physical_type { - PhysicalType::Int96 => primitive::iter_to_array_nested( - iter, - metadata, - DataType::Timestamp(TimeUnit::Nanosecond, None), - int96_to_i64_ns, - ), - _ => primitive::iter_to_array(iter, metadata, data_type, |x: i64| x), - }, - _ => unreachable!(), - }, - - // INT64 - Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { - primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x) - } - UInt64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x as u64), - - Float32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f32| x), - Float64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f64| x), - - Boolean => boolean::iter_to_array_nested(iter, metadata, data_type), - - Binary | Utf8 => binary::iter_to_array_nested::(iter, metadata, data_type), - LargeBinary | LargeUtf8 => { - binary::iter_to_array_nested::(iter, metadata, data_type) - } - other => Err(ArrowError::NotYetImplemented(format!( - "Reading {:?} from parquet still not implemented", - other - ))), - }, + List(ref inner) => { + let (values, mut nested) = + page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?; + create_list(data_type, &mut nested, values) + } + LargeList(ref inner) => { + let (values, mut nested) = + page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?; + create_list(data_type, &mut nested, values) + } Dictionary(ref key, _) => match key.as_ref() { Int8 => dict_read::(iter, metadata, data_type), diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index bdf7ff589e3..04037f05ba7 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -10,6 +10,7 @@ use crate::{ error::{ArrowError, Result}, }; +/// trait describing deserialized repetition and definition levels pub trait Nested: std::fmt::Debug { fn inner(&mut self) -> (Buffer, Option); diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index 748c1285224..7839988127a 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -100,7 +100,7 @@ pub fn iter_to_array_nested( metadata: &ColumnChunkMetaData, data_type: DataType, op: F, -) -> Result> +) -> Result<(Arc, Vec>)> where ArrowError: From, T: NativeType, @@ -127,24 +127,10 @@ where )? } - let values = match data_type { - DataType::List(ref inner) => Arc::new(PrimitiveArray::::from_data( - inner.data_type().clone(), - values.into(), - validity.into(), - )), - DataType::LargeList(ref inner) => Arc::new(PrimitiveArray::::from_data( - inner.data_type().clone(), - values.into(), - validity.into(), - )), - _ => { - return Err(ArrowError::NotYetImplemented(format!( - "Read nested datatype {:?}", - data_type - ))) - } - }; - - create_list(data_type, &mut nested, values) + let values = Arc::new(PrimitiveArray::::from_data( + data_type, + values.into(), + validity.into(), + )); + Ok((values, nested)) }