diff --git a/src/io/parquet/read/deserialize/binary/dictionary.rs b/src/io/parquet/read/deserialize/binary/dictionary.rs index 2d8900a09e2..fae1f54882e 100644 --- a/src/io/parquet/read/deserialize/binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/binary/dictionary.rs @@ -7,7 +7,7 @@ use crate::{ bitmap::MutableBitmap, datatypes::{DataType, PhysicalType}, error::Result, - io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState}, + io::parquet::read::deserialize::nested_utils::{InitNested, NestedState}, }; use super::super::dictionary::*; @@ -108,6 +108,7 @@ where } } +/// An iterator adapter that converts [`DataPages`] into an [`Iterator`] of [`DictionaryArray`] #[derive(Debug)] pub struct NestedDictIter where @@ -178,25 +179,3 @@ where } } } - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, K, O, I>( - iter: I, - init: Vec, - data_type: DataType, - num_rows: usize, - chunk_size: Option, -) -> NestedArrayIter<'a> -where - I: 'a + DataPages, - O: Offset, - K: DictionaryKey, -{ - Box::new( - NestedDictIter::::new(iter, init, data_type, num_rows, chunk_size).map(|result| { - let (mut nested, array) = result?; - let _ = nested.nested.pop().unwrap(); // the primitive - Ok((nested, array.boxed())) - }), - ) -} diff --git a/src/io/parquet/read/deserialize/binary/mod.rs b/src/io/parquet/read/deserialize/binary/mod.rs index e17557c4b41..c48bfe276bc 100644 --- a/src/io/parquet/read/deserialize/binary/mod.rs +++ b/src/io/parquet/read/deserialize/binary/mod.rs @@ -3,7 +3,6 @@ mod dictionary; mod nested; mod utils; -pub use self::nested::NestedIter; pub use basic::Iter; -pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter}; -pub use nested::iter_to_arrays_nested; +pub use dictionary::{DictIter, NestedDictIter}; +pub use nested::NestedIter; diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 19267f3a38c..355cf3d6cb5 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -6,7 +6,6 @@ use parquet2::{ schema::Repetition, }; -use crate::array::Array; use crate::{ array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result, io::parquet::read::DataPages, @@ -194,27 +193,3 @@ impl, I: DataPages> Iterator for NestedIter( - iter: I, - init: Vec, - data_type: DataType, - num_rows: usize, - chunk_size: Option, -) -> NestedArrayIter<'a> -where - I: 'a + DataPages, - A: TraitBinaryArray, - O: Offset, -{ - Box::new( - NestedIter::::new(iter, init, data_type, num_rows, chunk_size).map(|x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - let values = Box::new(array) as Box; - (nested, values) - }) - }), - ) -} diff --git a/src/io/parquet/read/deserialize/boolean/mod.rs b/src/io/parquet/read/deserialize/boolean/mod.rs index 01ca1fb1122..840d8210161 100644 --- a/src/io/parquet/read/deserialize/boolean/mod.rs +++ b/src/io/parquet/read/deserialize/boolean/mod.rs @@ -2,4 +2,4 @@ mod basic; mod nested; pub use self::basic::Iter; -pub use nested::iter_to_arrays_nested; +pub use nested::NestedIter; diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index 7285d440f7b..d04e0d50bd1 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -147,22 +147,3 @@ impl Iterator for NestedIter { } } } - -/// Converts [`DataPages`] to an [`Iterator`] of [`BooleanArray`] -pub fn iter_to_arrays_nested<'a, I: 'a>( - iter: I, - init: Vec, - num_rows: usize, - chunk_size: Option, -) -> NestedArrayIter<'a> -where - I: DataPages, -{ - Box::new(NestedIter::new(iter, init, num_rows, chunk_size).map(|x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - let values = array.boxed(); - (nested, values) - }) - })) -} diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs index af8a90b3f0e..6c73577472c 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs @@ -7,7 +7,7 @@ use crate::{ bitmap::MutableBitmap, datatypes::DataType, error::Result, - io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState}, + io::parquet::read::deserialize::nested_utils::{InitNested, NestedState}, }; use super::super::dictionary::*; @@ -90,6 +90,7 @@ where } } +/// An iterator adapter that converts [`DataPages`] into an [`Iterator`] of [`DictionaryArray`]. #[derive(Debug)] pub struct NestedDictIter where @@ -155,24 +156,3 @@ where } } } - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, K, I>( - iter: I, - init: Vec, - data_type: DataType, - num_rows: usize, - chunk_size: Option, -) -> NestedArrayIter<'a> -where - I: 'a + DataPages, - K: DictionaryKey, -{ - Box::new( - NestedDictIter::::new(iter, init, data_type, num_rows, chunk_size).map(|result| { - let (mut nested, array) = result?; - let _ = nested.nested.pop().unwrap(); // the primitive - Ok((nested, array.boxed())) - }), - ) -} diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs b/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs index 55b57a519f6..0ed9e60eaca 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs @@ -3,4 +3,4 @@ mod dictionary; mod utils; pub use basic::Iter; -pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter}; +pub use dictionary::{DictIter, NestedDictIter}; diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 8ec016f6333..0d0b54449f8 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -3,6 +3,7 @@ mod binary; mod boolean; mod dictionary; mod fixed_size_binary; +mod nested; mod nested_utils; mod null; mod primitive; @@ -14,11 +15,9 @@ use parquet2::read::get_page_iterator as _get_page_iterator; use parquet2::schema::types::PrimitiveType; use crate::{ - array::{ - Array, BinaryArray, DictionaryKey, FixedSizeListArray, ListArray, MapArray, Utf8Array, - }, + array::{Array, DictionaryKey, FixedSizeListArray, ListArray}, datatypes::{DataType, Field, IntervalUnit}, - error::{Error, Result}, + error::Result, }; use self::nested_utils::{InitNested, NestedArrayIter, NestedState}; @@ -97,16 +96,13 @@ fn columns_to_iter_recursive<'a, I: 'a>( mut columns: Vec, mut types: Vec<&PrimitiveType>, field: Field, - mut init: Vec, + init: Vec, num_rows: usize, chunk_size: Option, ) -> Result> where I: DataPages, { - use crate::datatypes::PhysicalType::*; - use crate::datatypes::PrimitiveType::*; - if init.is_empty() && is_primitive(&field.data_type) { return Ok(Box::new( page_iter_to_arrays( @@ -120,272 +116,7 @@ where )); } - Ok(match field.data_type().to_physical_type() { - Boolean => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - boolean::iter_to_arrays_nested(columns.pop().unwrap(), init, num_rows, chunk_size) - } - Primitive(Int8) => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: i32| x as i8, - ) - } - Primitive(Int16) => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: i32| x as i16, - ) - } - Primitive(Int32) => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: i32| x, - ) - } - Primitive(Int64) => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: i64| x, - ) - } - Primitive(UInt8) => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: i32| x as u8, - ) - } - Primitive(UInt16) => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: i32| x as u16, - ) - } - Primitive(UInt32) => { - init.push(InitNested::Primitive(field.is_nullable)); - let type_ = types.pop().unwrap(); - match type_.physical_type { - PhysicalType::Int32 => primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: i32| x as u32, - ), - // some implementations of parquet write arrow's u32 into i64. - PhysicalType::Int64 => primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: i64| x as u32, - ), - other => { - return Err(Error::nyi(format!( - "Deserializing UInt32 from {other:?}'s parquet" - ))) - } - } - } - Primitive(UInt64) => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: i64| x as u64, - ) - } - Primitive(Float32) => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: f32| x, - ) - } - Primitive(Float64) => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - |x: f64| x, - ) - } - Utf8 => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - binary::iter_to_arrays_nested::, _>( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - ) - } - LargeUtf8 => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - binary::iter_to_arrays_nested::, _>( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - ) - } - Binary => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - binary::iter_to_arrays_nested::, _>( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - ) - } - LargeBinary => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - binary::iter_to_arrays_nested::, _>( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - ) - } - _ => match field.data_type().to_logical_type() { - DataType::Dictionary(key_type, _, _) => { - init.push(InitNested::Primitive(field.is_nullable)); - let type_ = types.pop().unwrap(); - let iter = columns.pop().unwrap(); - let data_type = field.data_type().clone(); - match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(iter, init, type_, data_type, num_rows, chunk_size) - })? - } - DataType::List(inner) - | DataType::LargeList(inner) - | DataType::FixedSizeList(inner, _) => { - init.push(InitNested::List(field.is_nullable)); - let iter = columns_to_iter_recursive( - columns, - types, - inner.as_ref().clone(), - init, - num_rows, - chunk_size, - )?; - let iter = iter.map(move |x| { - let (mut nested, array) = x?; - let array = create_list(field.data_type().clone(), &mut nested, array); - Ok((nested, array)) - }); - Box::new(iter) as _ - } - DataType::Struct(fields) => { - let columns = fields - .iter() - .rev() - .map(|f| { - let mut init = init.clone(); - init.push(InitNested::Struct(field.is_nullable)); - let n = n_columns(&f.data_type); - let columns = columns.drain(columns.len() - n..).collect(); - let types = types.drain(types.len() - n..).collect(); - columns_to_iter_recursive( - columns, - types, - f.clone(), - init, - num_rows, - chunk_size, - ) - }) - .collect::>>()?; - let columns = columns.into_iter().rev().collect(); - Box::new(struct_::StructIterator::new(columns, fields.clone())) - } - DataType::Map(inner, _) => { - init.push(InitNested::List(field.is_nullable)); - let iter = columns_to_iter_recursive( - columns, - types, - inner.as_ref().clone(), - init, - num_rows, - chunk_size, - )?; - Box::new(iter.map(move |x| { - let (nested, inner) = x?; - let array = MapArray::new( - field.data_type().clone(), - vec![0, inner.len() as i32].into(), - inner, - None, - ); - Ok((nested, array.boxed())) - })) - } - other => { - return Err(Error::nyi(format!( - "Deserializing type {other:?} from parquet" - ))) - } - }, - }) + nested::columns_to_iter_recursive(columns, types, field, init, num_rows, chunk_size) } /// Returns the number of (parquet) columns that a [`DataType`] contains. @@ -438,127 +169,3 @@ where .map(|x| x.map(|x| x.1)), )) } - -fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( - iter: I, - init: Vec, - _type_: &PrimitiveType, - data_type: DataType, - num_rows: usize, - chunk_size: Option, -) -> Result> { - use DataType::*; - let values_data_type = if let Dictionary(_, v, _) = &data_type { - v.as_ref() - } else { - panic!() - }; - - Ok(match values_data_type.to_logical_type() { - UInt8 => primitive::iter_to_dict_arrays_nested::( - iter, - init, - data_type, - num_rows, - chunk_size, - |x: i32| x as u8, - ), - UInt16 => primitive::iter_to_dict_arrays_nested::( - iter, - init, - data_type, - num_rows, - chunk_size, - |x: i32| x as u16, - ), - UInt32 => primitive::iter_to_dict_arrays_nested::( - iter, - init, - data_type, - num_rows, - chunk_size, - |x: i32| x as u32, - ), - Int8 => primitive::iter_to_dict_arrays_nested::( - iter, - init, - data_type, - num_rows, - chunk_size, - |x: i32| x as i8, - ), - Int16 => primitive::iter_to_dict_arrays_nested::( - iter, - init, - data_type, - num_rows, - chunk_size, - |x: i32| x as i16, - ), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { - primitive::iter_to_dict_arrays_nested::( - iter, - init, - data_type, - num_rows, - chunk_size, - |x: i32| x, - ) - } - Int64 | Date64 | Time64(_) | Duration(_) => { - primitive::iter_to_dict_arrays_nested::( - iter, - init, - data_type, - num_rows, - chunk_size, - |x: i64| x as i32, - ) - } - Float32 => primitive::iter_to_dict_arrays_nested::( - iter, - init, - data_type, - num_rows, - chunk_size, - |x: f32| x, - ), - Float64 => primitive::iter_to_dict_arrays_nested::( - iter, - init, - data_type, - num_rows, - chunk_size, - |x: f64| x, - ), - Utf8 | Binary => binary::iter_to_dict_arrays_nested::( - iter, init, data_type, num_rows, chunk_size, - ), - LargeUtf8 | LargeBinary => binary::iter_to_dict_arrays_nested::( - iter, init, data_type, num_rows, chunk_size, - ), - FixedSizeBinary(_) => fixed_size_binary::iter_to_dict_arrays_nested::( - iter, init, data_type, num_rows, chunk_size, - ), - /* - - Timestamp(time_unit, _) => { - let time_unit = *time_unit; - return timestamp_dict::( - iter, - physical_type, - logical_type, - data_type, - chunk_size, - time_unit, - ); - } - */ - other => { - return Err(Error::nyi(format!( - "Reading nested dictionaries of type {:?}", - other - ))) - } - }) -} diff --git a/src/io/parquet/read/deserialize/nested.rs b/src/io/parquet/read/deserialize/nested.rs new file mode 100644 index 00000000000..b24a9a8b11a --- /dev/null +++ b/src/io/parquet/read/deserialize/nested.rs @@ -0,0 +1,436 @@ +use parquet2::schema::types::PrimitiveType; + +use crate::{ + array::{BinaryArray, MapArray, Utf8Array}, + datatypes::{DataType, Field}, + error::{Error, Result}, +}; + +use super::nested_utils::{InitNested, NestedArrayIter}; +use super::*; + +/// Converts an iterator of arrays to a trait object returning trait objects +#[inline] +fn primitive<'a, A, I>(iter: I) -> NestedArrayIter<'a> +where + A: Array, + I: Iterator> + Send + Sync + 'a, +{ + Box::new(iter.map(|x| { + x.map(|(mut nested, array)| { + let _ = nested.nested.pop().unwrap(); // the primitive + (nested, Box::new(array) as _) + }) + })) +} + +pub fn columns_to_iter_recursive<'a, I: 'a>( + mut columns: Vec, + mut types: Vec<&PrimitiveType>, + field: Field, + mut init: Vec, + num_rows: usize, + chunk_size: Option, +) -> Result> +where + I: DataPages, +{ + use crate::datatypes::PhysicalType::*; + use crate::datatypes::PrimitiveType::*; + + Ok(match field.data_type().to_physical_type() { + Boolean => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(boolean::NestedIter::new( + columns.pop().unwrap(), + init, + num_rows, + chunk_size, + )) + } + Primitive(Int8) => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: i32| x as i8, + )) + } + Primitive(Int16) => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: i32| x as i16, + )) + } + Primitive(Int32) => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: i32| x, + )) + } + Primitive(Int64) => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: i64| x, + )) + } + Primitive(UInt8) => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: i32| x as u8, + )) + } + Primitive(UInt16) => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: i32| x as u16, + )) + } + Primitive(UInt32) => { + init.push(InitNested::Primitive(field.is_nullable)); + let type_ = types.pop().unwrap(); + match type_.physical_type { + PhysicalType::Int32 => primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: i32| x as u32, + )), + // some implementations of parquet write arrow's u32 into i64. + PhysicalType::Int64 => primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: i64| x as u32, + )), + other => { + return Err(Error::nyi(format!( + "Deserializing UInt32 from {other:?}'s parquet" + ))) + } + } + } + Primitive(UInt64) => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: i64| x as u64, + )) + } + Primitive(Float32) => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: f32| x, + )) + } + Primitive(Float64) => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(primitive::NestedIter::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + |x: f64| x, + )) + } + Utf8 => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(binary::NestedIter::, _>::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + )) + } + LargeUtf8 => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(binary::NestedIter::, _>::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + )) + } + Binary => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(binary::NestedIter::, _>::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + )) + } + LargeBinary => { + init.push(InitNested::Primitive(field.is_nullable)); + types.pop(); + primitive(binary::NestedIter::, _>::new( + columns.pop().unwrap(), + init, + field.data_type().clone(), + num_rows, + chunk_size, + )) + } + _ => match field.data_type().to_logical_type() { + DataType::Dictionary(key_type, _, _) => { + init.push(InitNested::Primitive(field.is_nullable)); + let type_ = types.pop().unwrap(); + let iter = columns.pop().unwrap(); + let data_type = field.data_type().clone(); + match_integer_type!(key_type, |$K| { + dict_read::<$K, _>(iter, init, type_, data_type, num_rows, chunk_size) + })? + } + DataType::List(inner) + | DataType::LargeList(inner) + | DataType::FixedSizeList(inner, _) => { + init.push(InitNested::List(field.is_nullable)); + let iter = columns_to_iter_recursive( + columns, + types, + inner.as_ref().clone(), + init, + num_rows, + chunk_size, + )?; + let iter = iter.map(move |x| { + let (mut nested, array) = x?; + let array = create_list(field.data_type().clone(), &mut nested, array); + Ok((nested, array)) + }); + Box::new(iter) as _ + } + DataType::Struct(fields) => { + let columns = fields + .iter() + .rev() + .map(|f| { + let mut init = init.clone(); + init.push(InitNested::Struct(field.is_nullable)); + let n = n_columns(&f.data_type); + let columns = columns.drain(columns.len() - n..).collect(); + let types = types.drain(types.len() - n..).collect(); + columns_to_iter_recursive( + columns, + types, + f.clone(), + init, + num_rows, + chunk_size, + ) + }) + .collect::>>()?; + let columns = columns.into_iter().rev().collect(); + Box::new(struct_::StructIterator::new(columns, fields.clone())) + } + DataType::Map(inner, _) => { + init.push(InitNested::List(field.is_nullable)); + let iter = columns_to_iter_recursive( + columns, + types, + inner.as_ref().clone(), + init, + num_rows, + chunk_size, + )?; + Box::new(iter.map(move |x| { + let (nested, inner) = x?; + let array = MapArray::new( + field.data_type().clone(), + vec![0, inner.len() as i32].into(), + inner, + None, + ); + Ok((nested, array.boxed())) + })) + } + other => { + return Err(Error::nyi(format!( + "Deserializing type {other:?} from parquet" + ))) + } + }, + }) +} + +fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( + iter: I, + init: Vec, + _type_: &PrimitiveType, + data_type: DataType, + num_rows: usize, + chunk_size: Option, +) -> Result> { + use DataType::*; + let values_data_type = if let Dictionary(_, v, _) = &data_type { + v.as_ref() + } else { + panic!() + }; + + Ok(match values_data_type.to_logical_type() { + UInt8 => primitive(primitive::NestedDictIter::::new( + iter, + init, + data_type, + num_rows, + chunk_size, + |x: i32| x as u8, + )), + UInt16 => primitive(primitive::NestedDictIter::::new( + iter, + init, + data_type, + num_rows, + chunk_size, + |x: i32| x as u16, + )), + UInt32 => primitive(primitive::NestedDictIter::::new( + iter, + init, + data_type, + num_rows, + chunk_size, + |x: i32| x as u32, + )), + Int8 => primitive(primitive::NestedDictIter::::new( + iter, + init, + data_type, + num_rows, + chunk_size, + |x: i32| x as i8, + )), + Int16 => primitive(primitive::NestedDictIter::::new( + iter, + init, + data_type, + num_rows, + chunk_size, + |x: i32| x as i16, + )), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { + primitive(primitive::NestedDictIter::::new( + iter, + init, + data_type, + num_rows, + chunk_size, + |x: i32| x, + )) + } + Int64 | Date64 | Time64(_) | Duration(_) => { + primitive(primitive::NestedDictIter::::new( + iter, + init, + data_type, + num_rows, + chunk_size, + |x: i64| x as i32, + )) + } + Float32 => primitive(primitive::NestedDictIter::::new( + iter, + init, + data_type, + num_rows, + chunk_size, + |x: f32| x, + )), + Float64 => primitive(primitive::NestedDictIter::::new( + iter, + init, + data_type, + num_rows, + chunk_size, + |x: f64| x, + )), + Utf8 | Binary => primitive(binary::NestedDictIter::::new( + iter, init, data_type, num_rows, chunk_size, + )), + LargeUtf8 | LargeBinary => primitive(binary::NestedDictIter::::new( + iter, init, data_type, num_rows, chunk_size, + )), + FixedSizeBinary(_) => primitive(fixed_size_binary::NestedDictIter::::new( + iter, init, data_type, num_rows, chunk_size, + )), + /* + + Timestamp(time_unit, _) => { + let time_unit = *time_unit; + return timestamp_dict::( + iter, + physical_type, + logical_type, + data_type, + chunk_size, + time_unit, + ); + } + */ + other => { + return Err(Error::nyi(format!( + "Reading nested dictionaries of type {:?}", + other + ))) + } + }) +} diff --git a/src/io/parquet/read/deserialize/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs index 56366ec0ffa..cb9d63c4cc6 100644 --- a/src/io/parquet/read/deserialize/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -15,7 +15,7 @@ use crate::{ use super::super::dictionary::nested_next_dict; use super::super::dictionary::*; -use super::super::nested_utils::{InitNested, NestedArrayIter, NestedState}; +use super::super::nested_utils::{InitNested, NestedState}; use super::super::utils::MaybeNext; use super::super::DataPages; @@ -116,6 +116,7 @@ where } } +/// An iterator adapter that converts [`DataPages`] into an [`Iterator`] of [`DictionaryArray`] #[derive(Debug)] pub struct NestedDictIter where @@ -196,30 +197,3 @@ where } } } - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, K, I, T, P, F>( - iter: I, - init: Vec, - data_type: DataType, - num_rows: usize, - chunk_size: Option, - op: F, -) -> NestedArrayIter<'a> -where - I: 'a + DataPages, - K: DictionaryKey, - T: crate::types::NativeType, - P: parquet2::types::NativeType, - F: 'a + Copy + Send + Sync + Fn(P) -> T, -{ - Box::new( - NestedDictIter::::new(iter, init, data_type, num_rows, chunk_size, op).map( - |result| { - let (mut nested, array) = result?; - let _ = nested.nested.pop().unwrap(); // the primitive - Ok((nested, array.boxed())) - }, - ), - ) -} diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index e30d813d20a..70d743dcc27 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -3,5 +3,5 @@ mod dictionary; mod nested; pub use basic::Iter; -pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter}; -pub use nested::iter_to_arrays_nested; +pub use dictionary::{DictIter, NestedDictIter}; +pub use nested::NestedIter; diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index e72bd5190c7..b37aa955e96 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -159,7 +159,7 @@ fn finish( /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays #[derive(Debug)] -pub struct ArrayIterator +pub struct NestedIter where I: DataPages, T: NativeType, @@ -176,7 +176,7 @@ where decoder: PrimitiveDecoder, } -impl ArrayIterator +impl NestedIter where I: DataPages, T: NativeType, @@ -204,7 +204,7 @@ where } } -impl Iterator for ArrayIterator +impl Iterator for NestedIter where I: DataPages, T: NativeType, @@ -233,30 +233,3 @@ where } } } - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, I, T, P, F>( - iter: I, - init: Vec, - data_type: DataType, - num_rows: usize, - chunk_size: Option, - op: F, -) -> NestedArrayIter<'a> -where - I: 'a + DataPages, - T: crate::types::NativeType, - P: parquet2::types::NativeType, - F: 'a + Copy + Send + Sync + Fn(P) -> T, -{ - Box::new( - ArrayIterator::::new(iter, init, data_type, num_rows, chunk_size, op).map( - |x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - (nested, array.boxed()) - }) - }, - ), - ) -}