From 31a8d3af6340290c95cd31e144def608bc8af5ae Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 21 Jun 2022 16:11:20 +0000 Subject: [PATCH 1/4] Moved file --- .../parquet/read/deserialize/{dictionary.rs => dictionary/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/io/parquet/read/deserialize/{dictionary.rs => dictionary/mod.rs} (100%) diff --git a/src/io/parquet/read/deserialize/dictionary.rs b/src/io/parquet/read/deserialize/dictionary/mod.rs similarity index 100% rename from src/io/parquet/read/deserialize/dictionary.rs rename to src/io/parquet/read/deserialize/dictionary/mod.rs From 96547fdd32072fdf7333021584c9b8854cccdc57 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 22 Jun 2022 04:00:57 +0000 Subject: [PATCH 2/4] Simpler --- src/io/parquet/read/deserialize/binary/mod.rs | 37 +------------------ .../parquet/read/deserialize/binary/nested.rs | 29 +++++++++++++-- .../parquet/read/deserialize/boolean/mod.rs | 25 +------------ .../read/deserialize/boolean/nested.rs | 22 +++++++++-- .../parquet/read/deserialize/primitive/mod.rs | 33 +---------------- .../read/deserialize/primitive/nested.rs | 23 ++++++++++++ 6 files changed, 73 insertions(+), 96 deletions(-) diff --git a/src/io/parquet/read/deserialize/binary/mod.rs b/src/io/parquet/read/deserialize/binary/mod.rs index 6eee318772d..613b95e9ca9 100644 --- a/src/io/parquet/read/deserialize/binary/mod.rs +++ b/src/io/parquet/read/deserialize/binary/mod.rs @@ -3,40 +3,7 @@ mod dictionary; mod nested; mod utils; -use crate::{ - array::{Array, Offset}, - datatypes::DataType, -}; - -use self::basic::TraitBinaryArray; -use self::nested::ArrayIterator; -use super::{ - nested_utils::{InitNested, NestedArrayIter}, - DataPages, -}; - +pub use self::nested::NestedIter; pub use basic::Iter; pub use dictionary::DictIter; - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, O, A, I>( - iter: I, - init: Vec, - data_type: DataType, - chunk_size: Option, -) -> NestedArrayIter<'a> -where - I: 'a + DataPages, - A: TraitBinaryArray, - O: Offset, -{ - Box::new( - ArrayIterator::::new(iter, init, data_type, chunk_size).map(|x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - let values = Box::new(array) as Box; - (nested, values) - }) - }), - ) -} +pub use nested::iter_to_arrays_nested; diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index f7b16fe03ef..b5703812940 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -6,6 +6,7 @@ use parquet2::{ schema::Repetition, }; +use crate::array::Array; use crate::{ array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result, io::parquet::read::DataPages, @@ -141,7 +142,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { } } -pub struct ArrayIterator, I: DataPages> { +pub struct NestedIter, I: DataPages> { iter: I, data_type: DataType, init: Vec, @@ -150,7 +151,7 @@ pub struct ArrayIterator, I: DataPages> { phantom_a: std::marker::PhantomData, } -impl, I: DataPages> ArrayIterator { +impl, I: DataPages> NestedIter { pub fn new( iter: I, init: Vec, @@ -168,7 +169,7 @@ impl, I: DataPages> ArrayIterator { } } -impl, I: DataPages> Iterator for ArrayIterator { +impl, I: DataPages> Iterator for NestedIter { type Item = Result<(NestedState, A)>; fn next(&mut self) -> Option { @@ -189,3 +190,25 @@ impl, I: DataPages> Iterator for ArrayIterator } } } + +/// Converts [`DataPages`] to an [`Iterator`] of [`TraitBinaryArray`] +pub fn iter_to_arrays_nested<'a, O, A, I>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + A: TraitBinaryArray, + O: Offset, +{ + Box::new( + NestedIter::::new(iter, init, data_type, chunk_size).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + let array = Box::new(array) as Box; + Ok((nested, array)) + }), + ) +} diff --git a/src/io/parquet/read/deserialize/boolean/mod.rs b/src/io/parquet/read/deserialize/boolean/mod.rs index dde0a14852a..01ca1fb1122 100644 --- a/src/io/parquet/read/deserialize/boolean/mod.rs +++ b/src/io/parquet/read/deserialize/boolean/mod.rs @@ -1,28 +1,5 @@ mod basic; mod nested; -use self::nested::ArrayIterator; -use super::{ - nested_utils::{InitNested, NestedArrayIter}, - DataPages, -}; - pub use self::basic::Iter; - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, I: 'a>( - iter: I, - init: Vec, - chunk_size: Option, -) -> NestedArrayIter<'a> -where - I: DataPages, -{ - Box::new(ArrayIterator::new(iter, init, chunk_size).map(|x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - let values = array.boxed(); - (nested, values) - }) - })) -} +pub use nested::iter_to_arrays_nested; diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index 8c20694ed5c..f2b4ccd983f 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -101,14 +101,14 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder { /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays #[derive(Debug)] -pub struct ArrayIterator { +pub struct NestedIter { iter: I, init: Vec, items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>, chunk_size: Option, } -impl ArrayIterator { +impl NestedIter { pub fn new(iter: I, init: Vec, chunk_size: Option) -> Self { Self { iter, @@ -123,7 +123,7 @@ fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap) BooleanArray::new(data_type.clone(), values.into(), validity.into()) } -impl Iterator for ArrayIterator { +impl Iterator for NestedIter { type Item = Result<(NestedState, BooleanArray)>; fn next(&mut self) -> Option { @@ -144,3 +144,19 @@ impl Iterator for ArrayIterator { } } } + +/// Converts [`DataPages`] to an [`Iterator`] of [`BooleanArray`] +pub fn iter_to_arrays_nested<'a, I: 'a>( + iter: I, + init: Vec, + chunk_size: Option, +) -> NestedArrayIter<'a> +where + I: DataPages, +{ + Box::new(NestedIter::new(iter, init, chunk_size).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + })) +} diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index e49cdb80ea5..b9f87520c8d 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -2,35 +2,6 @@ mod basic; mod dictionary; mod nested; -pub use dictionary::DictIter; - -use crate::datatypes::DataType; - -use super::{nested_utils::*, DataPages}; - pub use basic::Iter; -use nested::ArrayIterator; - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, I, T, P, F>( - iter: I, - init: Vec, - data_type: DataType, - chunk_size: Option, - op: F, -) -> NestedArrayIter<'a> -where - I: 'a + DataPages, - T: crate::types::NativeType, - P: parquet2::types::NativeType, - F: 'a + Copy + Send + Sync + Fn(P) -> T, -{ - Box::new( - ArrayIterator::::new(iter, init, data_type, chunk_size, op).map(|x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - (nested, array.boxed()) - }) - }), - ) -} +pub use dictionary::DictIter; +pub use nested::iter_to_arrays_nested; diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 587e1967adc..ce6d7a94eb5 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -229,3 +229,26 @@ where } } } + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, I, T, P, F>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + op: F, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + T: crate::types::NativeType, + P: parquet2::types::NativeType, + F: 'a + Copy + Send + Sync + Fn(P) -> T, +{ + Box::new( + ArrayIterator::::new(iter, init, data_type, chunk_size, op).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + }), + ) +} From f1a3a2a592ec7fdf9f89bed0f657adaf252699eb Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 22 Jun 2022 04:50:44 +0000 Subject: [PATCH 3/4] Initial take --- .../read/deserialize/dictionary/mod.rs | 8 +- .../read/deserialize/dictionary/nested.rs | 208 +++++++++++++++ src/io/parquet/read/deserialize/mod.rs | 238 +++++++++++++----- .../read/deserialize/primitive/dictionary.rs | 112 ++++++++- .../parquet/read/deserialize/primitive/mod.rs | 2 +- tests/it/io/parquet/mod.rs | 44 +++- 6 files changed, 538 insertions(+), 74 deletions(-) create mode 100644 src/io/parquet/read/deserialize/dictionary/nested.rs diff --git a/src/io/parquet/read/deserialize/dictionary/mod.rs b/src/io/parquet/read/deserialize/dictionary/mod.rs index f89a5e06d5a..4171c1f8118 100644 --- a/src/io/parquet/read/deserialize/dictionary/mod.rs +++ b/src/io/parquet/read/deserialize/dictionary/mod.rs @@ -1,3 +1,5 @@ +mod nested; + use std::collections::VecDeque; use parquet2::{ @@ -292,8 +294,7 @@ pub(super) fn next_dict< MaybeNext::More } else { let (values, validity) = items.pop_front().unwrap(); - let keys = - PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into()); + let keys = finish_key(values, validity); MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap())) } } @@ -304,7 +305,6 @@ pub(super) fn next_dict< debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX)); let keys = finish_key(values, validity); - MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap())) } else { MaybeNext::None @@ -312,3 +312,5 @@ pub(super) fn next_dict< } } } + +pub use nested::next_dict as nested_next_dict; diff --git a/src/io/parquet/read/deserialize/dictionary/nested.rs b/src/io/parquet/read/deserialize/dictionary/nested.rs new file mode 100644 index 00000000000..2ec08e11650 --- /dev/null +++ b/src/io/parquet/read/deserialize/dictionary/nested.rs @@ -0,0 +1,208 @@ +use std::collections::VecDeque; + +use parquet2::{ + encoding::{hybrid_rle::HybridRleDecoder, Encoding}, + page::{DataPage, DictPage}, + schema::Repetition, +}; + +use crate::{ + array::{Array, DictionaryArray, DictionaryKey}, + bitmap::MutableBitmap, + error::{Error, Result}, +}; +use crate::{datatypes::DataType, io::parquet::read::deserialize::utils::DecodedState}; + +use super::{ + super::super::DataPages, + super::nested_utils::*, + super::utils::{dict_indices_decoder, not_implemented, Decoder, MaybeNext, PageState}, + finish_key, Dict, +}; + +// The state of a required DataPage with a boolean physical type +#[derive(Debug)] +pub struct Required<'a> { + values: HybridRleDecoder<'a>, + length: usize, +} + +impl<'a> Required<'a> { + fn try_new(page: &'a DataPage) -> Result { + let values = dict_indices_decoder(page)?; + let length = page.num_values(); + Ok(Self { values, length }) + } +} + +// The state of a `DataPage` of `Boolean` parquet boolean type +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum State<'a> { + Optional(Optional<'a>, HybridRleDecoder<'a>), + Required(Required<'a>), +} + +impl<'a> State<'a> { + pub fn len(&self) -> usize { + match self { + State::Optional(optional, _) => optional.len(), + State::Required(page) => page.length, + } + } +} + +impl<'a> PageState<'a> for State<'a> { + fn len(&self) -> usize { + self.len() + } +} + +#[derive(Debug)] +pub struct PrimitiveDecoder +where + K: DictionaryKey, +{ + phantom_k: std::marker::PhantomData, +} + +impl Default for PrimitiveDecoder +where + K: DictionaryKey, +{ + #[inline] + fn default() -> Self { + Self { + phantom_k: std::marker::PhantomData, + } + } +} + +impl<'a, K: DictionaryKey> Decoder<'a> for PrimitiveDecoder { + type State = State<'a>; + type DecodedState = (Vec, MutableBitmap); + + fn build_state(&self, page: &'a DataPage) -> Result { + let is_optional = + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; + let is_filtered = page.selected_rows().is_some(); + + match (page.encoding(), is_optional, is_filtered) { + (Encoding::Plain, true, false) => Ok(State::Optional( + Optional::try_new(page)?, + dict_indices_decoder(page)?, + )), + (Encoding::Plain, false, false) => Required::try_new(page).map(State::Required), + _ => Err(not_implemented(page)), + } + } + + fn with_capacity(&self, capacity: usize) -> Self::DecodedState { + ( + Vec::with_capacity(capacity), + MutableBitmap::with_capacity(capacity), + ) + } + + fn extend_from_state( + &self, + state: &mut State, + decoded: &mut Self::DecodedState, + additional: usize, + ) { + let (values, validity) = decoded; + match state { + State::Optional(page_validity, page_values) => { + let items = page_validity.by_ref().take(additional); + let items = Zip::new(items, page_values.by_ref().map(|x| K::from_u32(x).unwrap())); + + read_optional_values(items, values, validity) + } + State::Required(page) => { + values.extend( + page.values + .by_ref() + .map(|x| K::from_u32(x).unwrap()) + .take(additional), + ); + } + } + } +} + +pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box>( + iter: &'a mut I, + items: &mut VecDeque<(Vec, MutableBitmap)>, + nested_items: &mut VecDeque, + init: &[InitNested], + dict: &mut Dict, + data_type: DataType, + chunk_size: Option, + read_dict: F, +) -> MaybeNext)>> { + if items.len() > 1 { + let nested = nested_items.pop_front().unwrap(); + let (values, validity) = items.pop_front().unwrap(); + let keys = finish_key(values, validity); + let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + return MaybeNext::Some(dict.map(|dict| (nested, dict))); + } + match iter.next() { + Err(e) => MaybeNext::Some(Err(e.into())), + Ok(None) => { + if let Some(nested) = nested_items.pop_front() { + // we have a populated item and no more pages + // the only case where an item's length may be smaller than chunk_size + let (values, validity) = items.pop_front().unwrap(); + debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX)); + + let keys = finish_key(values, validity); + + let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + return MaybeNext::Some(dict.map(|dict| (nested, dict))); + } else { + MaybeNext::None + } + } + Ok(Some(page)) => { + // consume the dictionary page + match (&dict, page.dictionary_page()) { + (Dict::Empty, None) => { + return MaybeNext::Some(Err(Error::nyi( + "dictionary arrays from non-dict-encoded pages", + ))); + } + (Dict::Empty, Some(dict_page)) => { + *dict = Dict::Complete(read_dict(dict_page.as_ref())) + } + (Dict::Complete(_), _) => {} + }; + + // there is a new page => consume the page from the start + let mut nested_page = NestedPage::try_new(page)?; + + extend_offsets1(&mut nested_page, init, nested_items, chunk_size); + + let decoder = PrimitiveDecoder::::default(); + + let maybe_page = decoder.build_state(page); + let page = match maybe_page { + Ok(page) => page, + Err(e) => return MaybeNext::Some(Err(e)), + }; + + extend_from_new_page(page, items, nested_items, &decoder); + + if items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX) { + MaybeNext::More + } else { + let nested = nested_items.pop_front().unwrap(); + let (values, validity) = items.pop_front().unwrap(); + let keys = finish_key(values, validity); + + let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + return MaybeNext::Some(dict.map(|dict| (nested, dict))); + } + } + } +} diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 3d229555b79..a612830cfd3 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -14,7 +14,9 @@ use parquet2::read::get_page_iterator as _get_page_iterator; use parquet2::schema::types::PrimitiveType; use crate::{ - array::{Array, BinaryArray, FixedSizeListArray, ListArray, MapArray, Utf8Array}, + array::{ + Array, BinaryArray, DictionaryKey, FixedSizeListArray, ListArray, MapArray, Utf8Array, + }, datatypes::{DataType, Field}, error::{Error, Result}, }; @@ -287,68 +289,71 @@ where chunk_size, ) } - - _ => match field.data_type().to_logical_type() { - DataType::List(inner) - | DataType::LargeList(inner) - | DataType::FixedSizeList(inner, _) => { - init.push(InitNested::List(field.is_nullable)); - let iter = columns_to_iter_recursive( - columns, - types, - inner.as_ref().clone(), - init, - chunk_size, - )?; - let iter = iter.map(move |x| { - let (mut nested, array) = x?; - let array = create_list(field.data_type().clone(), &mut nested, array); - Ok((nested, array)) - }); - Box::new(iter) as _ - } - DataType::Struct(fields) => { - let columns = fields - .iter() - .rev() - .map(|f| { - let mut init = init.clone(); - init.push(InitNested::Struct(field.is_nullable)); - let n = n_columns(&f.data_type); - let columns = columns.drain(columns.len() - n..).collect(); - let types = types.drain(types.len() - n..).collect(); - columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size) - }) - .collect::>>()?; - let columns = columns.into_iter().rev().collect(); - Box::new(struct_::StructIterator::new(columns, fields.clone())) - } - DataType::Map(inner, _) => { - init.push(InitNested::List(field.is_nullable)); - let iter = columns_to_iter_recursive( - columns, - types, - inner.as_ref().clone(), - init, - chunk_size, - )?; - Box::new(iter.map(move |x| { - let (nested, inner) = x?; - let array = MapArray::new( - field.data_type().clone(), - vec![0, inner.len() as i32].into(), - inner, - None, - ); - Ok((nested, array.boxed())) - })) - } - other => { - return Err(Error::nyi(format!( - "Deserializing type {other:?} from parquet" - ))) - } - }, + DataType::List(inner) | DataType::LargeList(inner) | DataType::FixedSizeList(inner, _) => { + init.push(InitNested::List(field.is_nullable)); + let iter = columns_to_iter_recursive( + columns, + types, + inner.as_ref().clone(), + init, + chunk_size, + )?; + let iter = iter.map(move |x| { + let (mut nested, array) = x?; + let array = create_list(field.data_type().clone(), &mut nested, array); + Ok((nested, array)) + }); + Box::new(iter) as _ + } + DataType::Struct(fields) => { + let columns = fields + .iter() + .rev() + .map(|f| { + let mut init = init.clone(); + init.push(InitNested::Struct(field.is_nullable)); + let n = n_columns(&f.data_type); + let columns = columns.drain(columns.len() - n..).collect(); + let types = types.drain(types.len() - n..).collect(); + columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size) + }) + .collect::>>()?; + let columns = columns.into_iter().rev().collect(); + Box::new(struct_::StructIterator::new(columns, fields.clone())) + } + DataType::Map(inner, _) => { + init.push(InitNested::List(field.is_nullable)); + let iter = columns_to_iter_recursive( + columns, + types, + inner.as_ref().clone(), + init, + chunk_size, + )?; + Box::new(iter.map(move |x| { + let (nested, inner) = x?; + let array = MapArray::new( + field.data_type().clone(), + vec![0, inner.len() as i32].into(), + inner, + None, + ); + Ok((nested, array.boxed())) + })) + } + Dictionary(key_type) => { + let type_ = types.pop().unwrap(); + let iter = columns.pop().unwrap(); + let data_type = field.data_type().clone(); + match_integer_type!(key_type, |$K| { + dict_read::<$K, _>(iter, init, type_, data_type, chunk_size) + })? + } + other => { + return Err(Error::nyi(format!( + "Deserializing type {other:?} from parquet" + ))) + } }) } @@ -401,3 +406,110 @@ where .map(|x| x.map(|x| x.1)), )) } + +fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( + iter: I, + init: Vec, + _type_: &PrimitiveType, + data_type: DataType, + chunk_size: Option, +) -> Result> { + use DataType::*; + let values_data_type = if let Dictionary(_, v, _) = &data_type { + v.as_ref() + } else { + panic!() + }; + + Ok(match values_data_type.to_logical_type() { + UInt8 => primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: i32| x as u8, + ), + Float32 => primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: f32| x, + ), + /* + UInt16 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u16, + )), + UInt32 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u32, + )), + Int8 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as i8, + )), + Int16 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as i16, + )), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter( + primitive::DictIter::::new(iter, data_type, chunk_size, |x: i32| { + x as i32 + }), + ), + + Timestamp(time_unit, _) => { + let time_unit = *time_unit; + return timestamp_dict::( + iter, + physical_type, + logical_type, + data_type, + chunk_size, + time_unit, + ); + } + + Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter( + primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), + ), + Float32 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: f32| x, + )), + Float64 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: f64| x, + )), + + Utf8 | Binary => dyn_iter(binary::DictIter::::new( + iter, data_type, chunk_size, + )), + LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( + iter, data_type, chunk_size, + )), + FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( + iter, data_type, chunk_size, + )), + */ + other => { + return Err(Error::nyi(format!( + "Reading nested dictionaries of type {:?}", + other + ))) + } + }) +} diff --git a/src/io/parquet/read/deserialize/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs index 2b3f0ca5491..c6b0f5e44a0 100644 --- a/src/io/parquet/read/deserialize/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -13,11 +13,12 @@ use crate::{ types::NativeType, }; +use super::super::dictionary::nested_next_dict; use super::super::dictionary::*; +use super::super::nested_utils::{InitNested, NestedArrayIter, NestedState}; use super::super::utils::MaybeNext; use super::super::DataPages; -#[inline] fn read_dict(data_type: DataType, op: F, dict: &dyn DictPage) -> Box where T: NativeType, @@ -107,3 +108,112 @@ where } } } + +#[derive(Debug)] +pub struct NestedDictIter +where + I: DataPages, + T: NativeType, + K: DictionaryKey, + P: ParquetNativeType, + F: Fn(P) -> T, +{ + iter: I, + init: Vec, + data_type: DataType, + values: Dict, + // invariant: items.len() == nested.len() + items: VecDeque<(Vec, MutableBitmap)>, + nested: VecDeque, + chunk_size: Option, + op: F, + phantom: std::marker::PhantomData

, +} + +impl NestedDictIter +where + K: DictionaryKey, + I: DataPages, + T: NativeType, + + P: ParquetNativeType, + F: Copy + Fn(P) -> T, +{ + pub fn new( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + op: F, + ) -> Self { + let data_type = match data_type { + DataType::Dictionary(_, values, _) => *values, + _ => data_type, + }; + Self { + iter, + init, + data_type, + values: Dict::Empty, + items: VecDeque::new(), + nested: VecDeque::new(), + chunk_size, + op, + phantom: Default::default(), + } + } +} + +impl Iterator for NestedDictIter +where + I: DataPages, + T: NativeType, + K: DictionaryKey, + P: ParquetNativeType, + F: Copy + Fn(P) -> T, +{ + type Item = Result<(NestedState, DictionaryArray)>; + + fn next(&mut self) -> Option { + let maybe_state = nested_next_dict( + &mut self.iter, + &mut self.items, + &mut self.nested, + &self.init, + &mut self.values, + self.data_type.clone(), + self.chunk_size, + |dict| read_dict::(self.data_type.clone(), self.op, dict), + ); + match maybe_state { + MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), + } + } +} + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, K, I, T, P, F>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + op: F, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + K: DictionaryKey, + T: crate::types::NativeType, + P: parquet2::types::NativeType, + F: 'a + Copy + Send + Sync + Fn(P) -> T, +{ + Box::new( + NestedDictIter::::new(iter, init, data_type, chunk_size, op).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + }), + ) +} diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index b9f87520c8d..e30d813d20a 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -3,5 +3,5 @@ mod dictionary; mod nested; pub use basic::Iter; -pub use dictionary::DictIter; +pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter}; pub use nested::iter_to_arrays_nested; diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 346208e8951..295b366c8f8 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1121,12 +1121,14 @@ fn integration_write(schema: &Schema, chunks: &[Chunk>]) -> Resul let encodings = schema .fields .iter() - .map(|x| { - vec![if let DataType::Dictionary(..) = x.data_type() { - Encoding::RleDictionary - } else { - Encoding::Plain - }] + .map(|f| { + transverse(&f.data_type, |x| { + if let DataType::Dictionary(..) = x { + Encoding::RleDictionary + } else { + Encoding::Plain + } + }) }) .collect(); @@ -1429,3 +1431,33 @@ fn list_int_nullable() -> Result<()> { array.try_extend(data).unwrap(); list_array_generic(true, array.into()) } + +/// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its +/// logical types. +#[test] +fn nested_dict() -> Result<()> { + let indices = PrimitiveArray::from_values((0..3u64).map(|x| x % 2)); + let values = PrimitiveArray::from_slice([1.0f32, 3.0]); + let floats = DictionaryArray::from_data(indices, values.boxed()); + let floats = ListArray::try_new( + DataType::List(Box::new(Field::new( + "item", + floats.data_type().clone(), + true, + ))), + vec![0i32, 0, 2, 3, 3].into(), + floats.boxed(), + Some([true, false, true, true].into()), + )?; + + let schema = Schema::from(vec![Field::new("floats", floats.data_type().clone(), true)]); + let batch = Chunk::try_new(vec![floats.boxed()])?; + + let r = integration_write(&schema, &[batch.clone()])?; + + let (new_schema, new_batches) = integration_read(&r)?; + + assert_eq!(new_schema, schema); + assert_eq!(new_batches, vec![batch]); + Ok(()) +} From f0b0b7af3f6cb2a8377413ac7bd184697a4284e3 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 20 Jul 2022 10:58:04 +0200 Subject: [PATCH 4/4] fix some compiler errors --- src/array/dictionary/mod.rs | 3 +- .../read/deserialize/dictionary/mod.rs | 4 + .../read/deserialize/dictionary/nested.rs | 9 +- src/io/parquet/read/deserialize/mod.rs | 134 +++++++++--------- 4 files changed, 82 insertions(+), 68 deletions(-) diff --git a/src/array/dictionary/mod.rs b/src/array/dictionary/mod.rs index 0d5b4502eb2..0880b35e323 100644 --- a/src/array/dictionary/mod.rs +++ b/src/array/dictionary/mod.rs @@ -1,3 +1,4 @@ +use num_traits::FromPrimitive; use std::hint::unreachable_unchecked; use crate::{ @@ -23,7 +24,7 @@ use super::{new_empty_array, primitive::PrimitiveArray, Array}; use super::{new_null_array, specification::check_indexes}; /// Trait denoting [`NativeType`]s that can be used as keys of a dictionary. -pub trait DictionaryKey: NativeType + TryInto + TryFrom { +pub trait DictionaryKey: NativeType + TryInto + TryFrom + FromPrimitive { /// The corresponding [`IntegerType`] of this key const KEY_TYPE: IntegerType; diff --git a/src/io/parquet/read/deserialize/dictionary/mod.rs b/src/io/parquet/read/deserialize/dictionary/mod.rs index 4171c1f8118..7bcb764573d 100644 --- a/src/io/parquet/read/deserialize/dictionary/mod.rs +++ b/src/io/parquet/read/deserialize/dictionary/mod.rs @@ -76,6 +76,10 @@ impl<'a> Optional<'a> { validity: OptionalPageValidity::try_new(page)?, }) } + + fn len(&self) -> usize { + self.values.len() + } } impl<'a> utils::PageState<'a> for State<'a> { diff --git a/src/io/parquet/read/deserialize/dictionary/nested.rs b/src/io/parquet/read/deserialize/dictionary/nested.rs index 2ec08e11650..1e6b8aab48d 100644 --- a/src/io/parquet/read/deserialize/dictionary/nested.rs +++ b/src/io/parquet/read/deserialize/dictionary/nested.rs @@ -6,6 +6,8 @@ use parquet2::{ schema::Repetition, }; +use crate::io::parquet::read::deserialize::dictionary::Optional; +use crate::io::parquet::read::deserialize::utils::extend_from_new_page; use crate::{ array::{Array, DictionaryArray, DictionaryKey}, bitmap::MutableBitmap, @@ -179,7 +181,10 @@ pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box }; // there is a new page => consume the page from the start - let mut nested_page = NestedPage::try_new(page)?; + let mut nested_page = match NestedPage::try_new(page) { + Ok(nested_page) => nested_page, + Err(e) => return MaybeNext::Some(Err(e)), + }; extend_offsets1(&mut nested_page, init, nested_items, chunk_size); @@ -191,7 +196,7 @@ pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box Err(e) => return MaybeNext::Some(Err(e)), }; - extend_from_new_page(page, items, nested_items, &decoder); + extend_from_new_page(page, chunk_size, nested_items, &decoder); if items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX) { MaybeNext::More diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index a612830cfd3..43b3fa5c372 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -289,71 +289,75 @@ where chunk_size, ) } - DataType::List(inner) | DataType::LargeList(inner) | DataType::FixedSizeList(inner, _) => { - init.push(InitNested::List(field.is_nullable)); - let iter = columns_to_iter_recursive( - columns, - types, - inner.as_ref().clone(), - init, - chunk_size, - )?; - let iter = iter.map(move |x| { - let (mut nested, array) = x?; - let array = create_list(field.data_type().clone(), &mut nested, array); - Ok((nested, array)) - }); - Box::new(iter) as _ - } - DataType::Struct(fields) => { - let columns = fields - .iter() - .rev() - .map(|f| { - let mut init = init.clone(); - init.push(InitNested::Struct(field.is_nullable)); - let n = n_columns(&f.data_type); - let columns = columns.drain(columns.len() - n..).collect(); - let types = types.drain(types.len() - n..).collect(); - columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size) - }) - .collect::>>()?; - let columns = columns.into_iter().rev().collect(); - Box::new(struct_::StructIterator::new(columns, fields.clone())) - } - DataType::Map(inner, _) => { - init.push(InitNested::List(field.is_nullable)); - let iter = columns_to_iter_recursive( - columns, - types, - inner.as_ref().clone(), - init, - chunk_size, - )?; - Box::new(iter.map(move |x| { - let (nested, inner) = x?; - let array = MapArray::new( - field.data_type().clone(), - vec![0, inner.len() as i32].into(), - inner, - None, - ); - Ok((nested, array.boxed())) - })) - } - Dictionary(key_type) => { - let type_ = types.pop().unwrap(); - let iter = columns.pop().unwrap(); - let data_type = field.data_type().clone(); - match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(iter, init, type_, data_type, chunk_size) - })? - } - other => { - return Err(Error::nyi(format!( - "Deserializing type {other:?} from parquet" - ))) - } + _ => match field.data_type().to_logical_type() { + DataType::List(inner) + | DataType::LargeList(inner) + | DataType::FixedSizeList(inner, _) => { + init.push(InitNested::List(field.is_nullable)); + let iter = columns_to_iter_recursive( + columns, + types, + inner.as_ref().clone(), + init, + chunk_size, + )?; + let iter = iter.map(move |x| { + let (mut nested, array) = x?; + let array = create_list(field.data_type().clone(), &mut nested, array); + Ok((nested, array)) + }); + Box::new(iter) as _ + } + DataType::Struct(fields) => { + let columns = fields + .iter() + .rev() + .map(|f| { + let mut init = init.clone(); + init.push(InitNested::Struct(field.is_nullable)); + let n = n_columns(&f.data_type); + let columns = columns.drain(columns.len() - n..).collect(); + let types = types.drain(types.len() - n..).collect(); + columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size) + }) + .collect::>>()?; + let columns = columns.into_iter().rev().collect(); + Box::new(struct_::StructIterator::new(columns, fields.clone())) + } + DataType::Map(inner, _) => { + init.push(InitNested::List(field.is_nullable)); + let iter = columns_to_iter_recursive( + columns, + types, + inner.as_ref().clone(), + init, + chunk_size, + )?; + Box::new(iter.map(move |x| { + let (nested, inner) = x?; + let array = MapArray::new( + field.data_type().clone(), + vec![0, inner.len() as i32].into(), + inner, + None, + ); + Ok((nested, array.boxed())) + })) + } + DataType::Dictionary(key_type, _, _) => { + let type_ = types.pop().unwrap(); + let iter = columns.pop().unwrap(); + let data_type = field.data_type().clone(); + match_integer_type!(key_type, |$K| { + dict_read::<$K, _>(iter, init, type_, data_type, chunk_size) + })? + } + other => { + return Err(Error::nyi(format!( + "Deserializing type {other:?} from parquet" + ))) + } + }, }) }