diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index 5f289bb050a..a223ae88535 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -50,16 +50,17 @@ pub(super) struct ValuesDictionary<'a, P> where P: ParquetNativeType, { - values: hybrid_rle::HybridRleDecoder<'a>, - dict: &'a [P], + pub values: hybrid_rle::HybridRleDecoder<'a>, + pub dict: &'a [P], } impl<'a, P> ValuesDictionary<'a, P> where P: ParquetNativeType, { - fn new(data: &'a [u8], length: usize, dict: &'a PrimitivePageDict
) -> Self { - let values = utils::dict_indices_decoder(data, length); + pub fn new(page: &'a DataPage, dict: &'a PrimitivePageDict
) -> Self { + let (_, _, indices_buffer) = utils::split_buffer(page); + let values = utils::dict_indices_decoder(indices_buffer, page.num_values()); Self { dict: dict.values(), @@ -142,20 +143,14 @@ where match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::RequiredDictionary(ValuesDictionary::new( - page.buffer(), - page.num_values(), - dict, - ))) + Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict))) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { let dict = dict.as_any().downcast_ref().unwrap(); - let (_, _, values_buffer) = utils::split_buffer(page); - Ok(State::OptionalDictionary( OptionalPageValidity::new(page), - ValuesDictionary::new(values_buffer, page.num_values(), dict), + ValuesDictionary::new(page, dict), )) } (Encoding::Plain, _, true) => { diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index cbfc2ae778c..e1353d41c52 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -13,7 +13,7 @@ use crate::{ use super::super::nested_utils::*; use super::super::utils; use super::super::DataPages; -use super::basic::Values; +use super::basic::{Values, ValuesDictionary}; // The state of a `DataPage` of `Primitive` parquet primitive type #[allow(clippy::large_enum_variant)] @@ -24,8 +24,8 @@ where { Optional(Optional<'a>, Values<'a, P>), Required(Values<'a, P>), - //RequiredDictionary(ValuesDictionary<'a, T, P, F>), - //OptionalDictionary(Optional<'a>, ValuesDictionary<'a, T, P, F>), + RequiredDictionary(ValuesDictionary<'a, P>), + OptionalDictionary(Optional<'a>, ValuesDictionary<'a, P>), } impl<'a, P> utils::PageState<'a> for State<'a, P> @@ -36,8 +36,8 @@ where match self { State::Optional(optional, _) => optional.len(), State::Required(required) => required.len(), - //State::RequiredDictionary(required) => required.len(), - //State::OptionalDictionary(optional, _) => optional.len(), + State::RequiredDictionary(required) => required.len(), + State::OptionalDictionary(optional, _) => optional.len(), } } } @@ -83,19 +83,21 @@ where page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; match (page.encoding(), page.dictionary_page(), is_optional) { - /*(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { - todo!() + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { + let dict = dict.as_any().downcast_ref().unwrap(); + Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict))) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::OptionalDictionary(OptionalDictionaryPage::new( - page, dict, self.op2, - ))) - }*/ - (Encoding::Plain, None, true) => { + Ok(State::OptionalDictionary( + Optional::new(page), + ValuesDictionary::new(page, dict), + )) + } + (Encoding::Plain, _, true) => { Ok(State::Optional(Optional::new(page), Values::new(page))) } - (Encoding::Plain, None, false) => Ok(State::Required(Values::new(page))), + (Encoding::Plain, _, false) => Ok(State::Required(Values::new(page))), _ => Err(utils::not_implemented( &page.encoding(), is_optional, @@ -130,10 +132,30 @@ where ) } State::Required(page) => { - values.extend(page.values.by_ref().map(decode).map(self.op).take(remaining)); + values.extend( + page.values + .by_ref() + .map(decode) + .map(self.op) + .take(remaining), + ); + } + State::RequiredDictionary(page) => { + let op1 = |index: u32| page.dict[index as usize]; + values.extend(page.values.by_ref().map(op1).map(self.op).take(remaining)); + } + State::OptionalDictionary(page_validity, page_values) => { + let max_def = page_validity.max_def(); + let op1 = |index: u32| page_values.dict[index as usize]; + read_optional_values( + page_validity.definition_levels.by_ref(), + max_def, + page_values.values.by_ref().map(op1).map(self.op), + values, + validity, + remaining, + ) } - //State::OptionalDictionary(page) => todo!(), - //State::RequiredDictionary(page) => todo!(), } } } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 16c52ead2e5..13fde93610e 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -232,6 +232,23 @@ fn v1_nested_i16() -> Result<()> { test_pyarrow_integration("list_int16", 1, "nested", false, false, None) } +#[test] +fn v1_nested_i16_dict() -> Result<()> { + test_pyarrow_integration("list_int16", 1, "nested", true, false, None) +} + +#[test] +fn v2_nested_i16_required_dict() -> Result<()> { + test_pyarrow_integration( + "list_int64_required_required", + 1, + "nested", + true, + false, + None, + ) +} + #[test] fn v2_nested_bool() -> Result<()> { test_pyarrow_integration("list_bool", 2, "nested", false, false, None)