From d6f3966b37117d9bef57d7fd98b3462a6d2bf291 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 22 Jun 2022 04:50:44 +0000 Subject: [PATCH] Initial take --- .../read/deserialize/dictionary/mod.rs | 8 +- .../read/deserialize/dictionary/nested.rs | 205 ++++++++++++++++++ src/io/parquet/read/deserialize/mod.rs | 126 ++++++++++- .../parquet/read/deserialize/nested_utils.rs | 2 +- .../read/deserialize/primitive/dictionary.rs | 108 ++++++++- .../parquet/read/deserialize/primitive/mod.rs | 2 +- tests/it/io/parquet/mod.rs | 42 +++- 7 files changed, 480 insertions(+), 13 deletions(-) create mode 100644 src/io/parquet/read/deserialize/dictionary/nested.rs diff --git a/src/io/parquet/read/deserialize/dictionary/mod.rs b/src/io/parquet/read/deserialize/dictionary/mod.rs index f89a5e06d5a..4171c1f8118 100644 --- a/src/io/parquet/read/deserialize/dictionary/mod.rs +++ b/src/io/parquet/read/deserialize/dictionary/mod.rs @@ -1,3 +1,5 @@ +mod nested; + use std::collections::VecDeque; use parquet2::{ @@ -292,8 +294,7 @@ pub(super) fn next_dict< MaybeNext::More } else { let (values, validity) = items.pop_front().unwrap(); - let keys = - PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into()); + let keys = finish_key(values, validity); MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap())) } } @@ -304,7 +305,6 @@ pub(super) fn next_dict< debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX)); let keys = finish_key(values, validity); - MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap())) } else { MaybeNext::None @@ -312,3 +312,5 @@ pub(super) fn next_dict< } } } + +pub use nested::next_dict as nested_next_dict; diff --git a/src/io/parquet/read/deserialize/dictionary/nested.rs b/src/io/parquet/read/deserialize/dictionary/nested.rs new file mode 100644 index 00000000000..23a8a5f5491 --- /dev/null +++ b/src/io/parquet/read/deserialize/dictionary/nested.rs @@ -0,0 +1,205 @@ +use std::collections::VecDeque; + +use parquet2::{ + encoding::{hybrid_rle::HybridRleDecoder, Encoding}, + page::{DataPage, DictPage}, + schema::Repetition, +}; + +use crate::datatypes::DataType; +use crate::{ + array::{Array, DictionaryArray, DictionaryKey}, + bitmap::MutableBitmap, + error::{Error, Result}, +}; + +use super::{ + super::super::DataPages, + super::nested_utils::*, + super::utils::{dict_indices_decoder, not_implemented, MaybeNext, PageState}, + finish_key, Dict, +}; + +// The state of a required DataPage with a boolean physical type +#[derive(Debug)] +pub struct Required<'a> { + values: HybridRleDecoder<'a>, + length: usize, +} + +impl<'a> Required<'a> { + fn try_new(page: &'a DataPage) -> Result { + let values = dict_indices_decoder(page)?; + let length = page.num_values(); + Ok(Self { values, length }) + } +} + +// The state of a `DataPage` of a `Dictionary` type +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum State<'a> { + Optional(HybridRleDecoder<'a>), + Required(Required<'a>), +} + +impl<'a> State<'a> { + pub fn len(&self) -> usize { + match self { + State::Optional(page) => page.len(), + State::Required(page) => page.length, + } + } +} + +impl<'a> PageState<'a> for State<'a> { + fn len(&self) -> usize { + self.len() + } +} + +#[derive(Debug)] +pub struct DictionaryDecoder +where + K: DictionaryKey, +{ + phantom_k: std::marker::PhantomData, +} + +impl Default for DictionaryDecoder +where + K: DictionaryKey, +{ + #[inline] + fn default() -> Self { + Self { + phantom_k: std::marker::PhantomData, + } + } +} + +impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder { + type State = State<'a>; + type DecodedState = (Vec, MutableBitmap); + + fn build_state(&self, page: &'a DataPage) -> Result { + let is_optional = + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; + let is_filtered = page.selected_rows().is_some(); + + match (page.encoding(), is_optional, is_filtered) { + (Encoding::RleDictionary | Encoding::PlainDictionary, true, false) => { + dict_indices_decoder(page).map(State::Optional) + } + (Encoding::RleDictionary | Encoding::PlainDictionary, false, false) => { + Required::try_new(page).map(State::Required) + } + _ => Err(not_implemented(page)), + } + } + + fn with_capacity(&self, capacity: usize) -> Self::DecodedState { + ( + Vec::with_capacity(capacity), + MutableBitmap::with_capacity(capacity), + ) + } + + fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) { + let (values, validity) = decoded; + match state { + State::Optional(page_values) => { + let key = page_values.next(); + // todo: convert unwrap to error + let key = match K::try_from(key.unwrap_or_default() as usize) { + Ok(key) => key, + Err(_) => todo!(), + }; + values.push(key); + validity.push(true); + } + State::Required(page_values) => { + let key = page_values.values.next(); + let key = match K::try_from(key.unwrap_or_default() as usize) { + Ok(key) => key, + Err(_) => todo!(), + }; + values.push(key); + } + } + } + + fn push_null(&self, decoded: &mut Self::DecodedState) { + let (values, validity) = decoded; + values.push(K::default()); + validity.push(false) + } +} + +pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box>( + iter: &'a mut I, + items: &mut VecDeque<(NestedState, (Vec, MutableBitmap))>, + init: &[InitNested], + dict: &mut Dict, + data_type: DataType, + chunk_size: Option, + read_dict: F, +) -> MaybeNext)>> { + if items.len() > 1 { + let (nested, (values, validity)) = items.pop_front().unwrap(); + let keys = finish_key(values, validity); + let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + return MaybeNext::Some(dict.map(|dict| (nested, dict))); + } + match iter.next() { + Err(e) => MaybeNext::Some(Err(e.into())), + Ok(Some(page)) => { + // consume the dictionary page + match (&dict, page.dictionary_page()) { + (Dict::Empty, None) => { + return MaybeNext::Some(Err(Error::nyi( + "dictionary arrays from non-dict-encoded pages", + ))); + } + (Dict::Empty, Some(dict_page)) => { + *dict = Dict::Complete(read_dict(dict_page.as_ref())) + } + (Dict::Complete(_), _) => {} + }; + + let error = extend( + page, + init, + items, + &DictionaryDecoder::::default(), + chunk_size, + ); + match error { + Ok(_) => {} + Err(e) => return MaybeNext::Some(Err(e)), + }; + + if items.front().unwrap().0.len() < chunk_size.unwrap_or(usize::MAX) { + MaybeNext::More + } else { + let (nested, (values, validity)) = items.pop_front().unwrap(); + let keys = finish_key(values, validity); + let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + MaybeNext::Some(dict.map(|dict| (nested, dict))) + } + } + Ok(None) => { + if let Some((nested, (values, validity))) = items.pop_front() { + // we have a populated item and no more pages + // the only case where an item's length may be smaller than chunk_size + debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX)); + + let keys = finish_key(values, validity); + let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + MaybeNext::Some(dict.map(|dict| (nested, dict))) + } else { + MaybeNext::None + } + } + } +} diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 3d229555b79..2ee84b6ca7d 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -14,7 +14,9 @@ use parquet2::read::get_page_iterator as _get_page_iterator; use parquet2::schema::types::PrimitiveType; use crate::{ - array::{Array, BinaryArray, FixedSizeListArray, ListArray, MapArray, Utf8Array}, + array::{ + Array, BinaryArray, DictionaryKey, FixedSizeListArray, ListArray, MapArray, Utf8Array, + }, datatypes::{DataType, Field}, error::{Error, Result}, }; @@ -289,6 +291,14 @@ where } _ => match field.data_type().to_logical_type() { + DataType::Dictionary(key_type, _, _) => { + let type_ = types.pop().unwrap(); + let iter = columns.pop().unwrap(); + let data_type = field.data_type().clone(); + match_integer_type!(key_type, |$K| { + dict_read::<$K, _>(iter, init, type_, data_type, chunk_size) + })? + } DataType::List(inner) | DataType::LargeList(inner) | DataType::FixedSizeList(inner, _) => { @@ -401,3 +411,117 @@ where .map(|x| x.map(|x| x.1)), )) } + +fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( + iter: I, + init: Vec, + _type_: &PrimitiveType, + data_type: DataType, + chunk_size: Option, +) -> Result> { + use DataType::*; + let values_data_type = if let Dictionary(_, v, _) = &data_type { + v.as_ref() + } else { + panic!() + }; + + Ok(match values_data_type.to_logical_type() { + UInt8 => primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: i32| x as u8, + ), + Float32 => primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: f32| x, + ), + Float64 => primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: f64| x, + ), + /* + UInt16 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u16, + )), + UInt32 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u32, + )), + Int8 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as i8, + )), + Int16 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as i16, + )), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter( + primitive::DictIter::::new(iter, data_type, chunk_size, |x: i32| { + x as i32 + }), + ), + + Timestamp(time_unit, _) => { + let time_unit = *time_unit; + return timestamp_dict::( + iter, + physical_type, + logical_type, + data_type, + chunk_size, + time_unit, + ); + } + + Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter( + primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), + ), + Float32 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: f32| x, + )), + Float64 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: f64| x, + )), + + Utf8 | Binary => dyn_iter(binary::DictIter::::new( + iter, data_type, chunk_size, + )), + LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( + iter, data_type, chunk_size, + )), + FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( + iter, data_type, chunk_size, + )), + */ + other => { + return Err(Error::nyi(format!( + "Reading nested dictionaries of type {:?}", + other + ))) + } + }) +} diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 5fafdceee2f..d7851f76e05 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -336,7 +336,7 @@ impl NestedState { /// Extends `items` by consuming `page`, first trying to complete the last `item` /// and extending it if more are needed -fn extend<'a, D: NestedDecoder<'a>>( +pub(super) fn extend<'a, D: NestedDecoder<'a>>( page: &'a DataPage, init: &[InitNested], items: &mut VecDeque<(NestedState, D::DecodedState)>, diff --git a/src/io/parquet/read/deserialize/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs index 2b3f0ca5491..bc0a5f43e8f 100644 --- a/src/io/parquet/read/deserialize/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -13,11 +13,12 @@ use crate::{ types::NativeType, }; +use super::super::dictionary::nested_next_dict; use super::super::dictionary::*; +use super::super::nested_utils::{InitNested, NestedArrayIter, NestedState}; use super::super::utils::MaybeNext; use super::super::DataPages; -#[inline] fn read_dict(data_type: DataType, op: F, dict: &dyn DictPage) -> Box where T: NativeType, @@ -107,3 +108,108 @@ where } } } + +#[derive(Debug)] +pub struct NestedDictIter +where + I: DataPages, + T: NativeType, + K: DictionaryKey, + P: ParquetNativeType, + F: Fn(P) -> T, +{ + iter: I, + init: Vec, + data_type: DataType, + values: Dict, + items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + chunk_size: Option, + op: F, + phantom: std::marker::PhantomData

, +} + +impl NestedDictIter +where + K: DictionaryKey, + I: DataPages, + T: NativeType, + + P: ParquetNativeType, + F: Copy + Fn(P) -> T, +{ + pub fn new( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + op: F, + ) -> Self { + let data_type = match data_type { + DataType::Dictionary(_, values, _) => *values, + _ => data_type, + }; + Self { + iter, + init, + data_type, + values: Dict::Empty, + items: VecDeque::new(), + chunk_size, + op, + phantom: Default::default(), + } + } +} + +impl Iterator for NestedDictIter +where + I: DataPages, + T: NativeType, + K: DictionaryKey, + P: ParquetNativeType, + F: Copy + Fn(P) -> T, +{ + type Item = Result<(NestedState, DictionaryArray)>; + + fn next(&mut self) -> Option { + let maybe_state = nested_next_dict( + &mut self.iter, + &mut self.items, + &self.init, + &mut self.values, + self.data_type.clone(), + self.chunk_size, + |dict| read_dict::(self.data_type.clone(), self.op, dict), + ); + match maybe_state { + MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), + } + } +} + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, K, I, T, P, F>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + op: F, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + K: DictionaryKey, + T: crate::types::NativeType, + P: parquet2::types::NativeType, + F: 'a + Copy + Send + Sync + Fn(P) -> T, +{ + Box::new( + NestedDictIter::::new(iter, init, data_type, chunk_size, op).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + }), + ) +} diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index b9f87520c8d..e30d813d20a 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -3,5 +3,5 @@ mod dictionary; mod nested; pub use basic::Iter; -pub use dictionary::DictIter; +pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter}; pub use nested::iter_to_arrays_nested; diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 346208e8951..2eab739a4ad 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1121,12 +1121,14 @@ fn integration_write(schema: &Schema, chunks: &[Chunk>]) -> Resul let encodings = schema .fields .iter() - .map(|x| { - vec![if let DataType::Dictionary(..) = x.data_type() { - Encoding::RleDictionary - } else { - Encoding::Plain - }] + .map(|f| { + transverse(&f.data_type, |x| { + if let DataType::Dictionary(..) = x { + Encoding::RleDictionary + } else { + Encoding::Plain + } + }) }) .collect(); @@ -1429,3 +1431,31 @@ fn list_int_nullable() -> Result<()> { array.try_extend(data).unwrap(); list_array_generic(true, array.into()) } + +#[test] +fn nested_dict() -> Result<()> { + let indices = PrimitiveArray::from_values((0..3u64).map(|x| x % 2)); + let values = PrimitiveArray::from_slice([1.0f32, 3.0]); + let floats = DictionaryArray::try_from_keys(indices, values.boxed()).unwrap(); + let floats = ListArray::try_new( + DataType::List(Box::new(Field::new( + "item", + floats.data_type().clone(), + true, + ))), + vec![0i32, 0, 2, 3, 3].into(), + floats.boxed(), + Some([true, false, true, true].into()), + )?; + + let schema = Schema::from(vec![Field::new("floats", floats.data_type().clone(), true)]); + let batch = Chunk::try_new(vec![floats.boxed()])?; + + let r = integration_write(&schema, &[batch.clone()])?; + + let (new_schema, new_batches) = integration_read(&r)?; + + assert_eq!(new_schema, schema); + assert_eq!(new_batches, vec![batch]); + Ok(()) +}