From 2b42135fa57ab336b67daf6e44e17f0eda623abf Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 7 Mar 2022 05:43:57 +0000 Subject: [PATCH] Added reading nested primitive dictionary --- .../read/deserialize/primitive/basic.rs | 6 +-- .../read/deserialize/primitive/nested.rs | 54 +++++++++++++------ 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index f9b1ca4eaa5..a223ae88535 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -50,15 +50,15 @@ pub(super) struct ValuesDictionary<'a, P> where P: ParquetNativeType, { - values: hybrid_rle::HybridRleDecoder<'a>, - dict: &'a [P], + pub values: hybrid_rle::HybridRleDecoder<'a>, + pub dict: &'a [P], } impl<'a, P> ValuesDictionary<'a, P> where P: ParquetNativeType, { - fn new(page: &'a DataPage, dict: &'a PrimitivePageDict

) -> Self { + pub fn new(page: &'a DataPage, dict: &'a PrimitivePageDict

) -> Self { let (_, _, indices_buffer) = utils::split_buffer(page); let values = utils::dict_indices_decoder(indices_buffer, page.num_values()); diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index cbfc2ae778c..e1353d41c52 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -13,7 +13,7 @@ use crate::{ use super::super::nested_utils::*; use super::super::utils; use super::super::DataPages; -use super::basic::Values; +use super::basic::{Values, ValuesDictionary}; // The state of a `DataPage` of `Primitive` parquet primitive type #[allow(clippy::large_enum_variant)] @@ -24,8 +24,8 @@ where { Optional(Optional<'a>, Values<'a, P>), Required(Values<'a, P>), - //RequiredDictionary(ValuesDictionary<'a, T, P, F>), - //OptionalDictionary(Optional<'a>, ValuesDictionary<'a, T, P, F>), + RequiredDictionary(ValuesDictionary<'a, P>), + OptionalDictionary(Optional<'a>, ValuesDictionary<'a, P>), } impl<'a, P> utils::PageState<'a> for State<'a, P> @@ -36,8 +36,8 @@ where match self { State::Optional(optional, _) => optional.len(), State::Required(required) => required.len(), - //State::RequiredDictionary(required) => required.len(), - //State::OptionalDictionary(optional, _) => optional.len(), + State::RequiredDictionary(required) => required.len(), + State::OptionalDictionary(optional, _) => optional.len(), } } } @@ -83,19 +83,21 @@ where page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; match (page.encoding(), page.dictionary_page(), is_optional) { - /*(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { - todo!() + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { + let dict = dict.as_any().downcast_ref().unwrap(); + Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict))) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::OptionalDictionary(OptionalDictionaryPage::new( - page, dict, self.op2, - ))) - }*/ - (Encoding::Plain, None, true) => { + Ok(State::OptionalDictionary( + Optional::new(page), + ValuesDictionary::new(page, dict), + )) + } + (Encoding::Plain, _, true) => { Ok(State::Optional(Optional::new(page), Values::new(page))) } - (Encoding::Plain, None, false) => Ok(State::Required(Values::new(page))), + (Encoding::Plain, _, false) => Ok(State::Required(Values::new(page))), _ => Err(utils::not_implemented( &page.encoding(), is_optional, @@ -130,10 +132,30 @@ where ) } State::Required(page) => { - values.extend(page.values.by_ref().map(decode).map(self.op).take(remaining)); + values.extend( + page.values + .by_ref() + .map(decode) + .map(self.op) + .take(remaining), + ); + } + State::RequiredDictionary(page) => { + let op1 = |index: u32| page.dict[index as usize]; + values.extend(page.values.by_ref().map(op1).map(self.op).take(remaining)); + } + State::OptionalDictionary(page_validity, page_values) => { + let max_def = page_validity.max_def(); + let op1 = |index: u32| page_values.dict[index as usize]; + read_optional_values( + page_validity.definition_levels.by_ref(), + max_def, + page_values.values.by_ref().map(op1).map(self.op), + values, + validity, + remaining, + ) } - //State::OptionalDictionary(page) => todo!(), - //State::RequiredDictionary(page) => todo!(), } } }