Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added reading nested primitive dictionary
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 7, 2022
1 parent 3d76b04 commit 75ee7df
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 19 deletions.
6 changes: 3 additions & 3 deletions src/io/parquet/read/deserialize/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ pub(super) struct ValuesDictionary<'a, P>
where
P: ParquetNativeType,
{
values: hybrid_rle::HybridRleDecoder<'a>,
dict: &'a [P],
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub dict: &'a [P],
}

impl<'a, P> ValuesDictionary<'a, P>
where
P: ParquetNativeType,
{
fn new(page: &'a DataPage, dict: &'a PrimitivePageDict<P>) -> Self {
pub fn new(page: &'a DataPage, dict: &'a PrimitivePageDict<P>) -> Self {
let (_, _, indices_buffer) = utils::split_buffer(page);
let values = utils::dict_indices_decoder(indices_buffer, page.num_values());

Expand Down
54 changes: 38 additions & 16 deletions src/io/parquet/read/deserialize/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
use super::super::nested_utils::*;
use super::super::utils;
use super::super::DataPages;
use super::basic::Values;
use super::basic::{Values, ValuesDictionary};

// The state of a `DataPage` of `Primitive` parquet primitive type
#[allow(clippy::large_enum_variant)]
Expand All @@ -24,8 +24,8 @@ where
{
Optional(Optional<'a>, Values<'a, P>),
Required(Values<'a, P>),
//RequiredDictionary(ValuesDictionary<'a, T, P, F>),
//OptionalDictionary(Optional<'a>, ValuesDictionary<'a, T, P, F>),
RequiredDictionary(ValuesDictionary<'a, P>),
OptionalDictionary(Optional<'a>, ValuesDictionary<'a, P>),
}

impl<'a, P> utils::PageState<'a> for State<'a, P>
Expand All @@ -36,8 +36,8 @@ where
match self {
State::Optional(optional, _) => optional.len(),
State::Required(required) => required.len(),
//State::RequiredDictionary(required) => required.len(),
//State::OptionalDictionary(optional, _) => optional.len(),
State::RequiredDictionary(required) => required.len(),
State::OptionalDictionary(optional, _) => optional.len(),
}
}
}
Expand Down Expand Up @@ -83,19 +83,21 @@ where
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
/*(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
todo!()
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::OptionalDictionary(OptionalDictionaryPage::new(
page, dict, self.op2,
)))
}*/
(Encoding::Plain, None, true) => {
Ok(State::OptionalDictionary(
Optional::new(page),
ValuesDictionary::new(page, dict),
))
}
(Encoding::Plain, _, true) => {
Ok(State::Optional(Optional::new(page), Values::new(page)))
}
(Encoding::Plain, None, false) => Ok(State::Required(Values::new(page))),
(Encoding::Plain, _, false) => Ok(State::Required(Values::new(page))),
_ => Err(utils::not_implemented(
&page.encoding(),
is_optional,
Expand Down Expand Up @@ -130,10 +132,30 @@ where
)
}
State::Required(page) => {
values.extend(page.values.by_ref().map(decode).map(self.op).take(remaining));
values.extend(
page.values
.by_ref()
.map(decode)
.map(self.op)
.take(remaining),
);
}
State::RequiredDictionary(page) => {
let op1 = |index: u32| page.dict[index as usize];
values.extend(page.values.by_ref().map(op1).map(self.op).take(remaining));
}
State::OptionalDictionary(page_validity, page_values) => {
let max_def = page_validity.max_def();
let op1 = |index: u32| page_values.dict[index as usize];
read_optional_values(
page_validity.definition_levels.by_ref(),
max_def,
page_values.values.by_ref().map(op1).map(self.op),
values,
validity,
remaining,
)
}
//State::OptionalDictionary(page) => todo!(),
//State::RequiredDictionary(page) => todo!(),
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,23 @@ fn v1_nested_i16() -> Result<()> {
test_pyarrow_integration("list_int16", 1, "nested", false, false, None)
}

#[test]
fn v1_nested_i16_dict() -> Result<()> {
test_pyarrow_integration("list_int16", 1, "nested", true, false, None)
}

#[test]
fn v2_nested_i16_required_dict() -> Result<()> {
test_pyarrow_integration(
"list_int64_required_required",
1,
"nested",
true,
false,
None,
)
}

#[test]
fn v2_nested_bool() -> Result<()> {
test_pyarrow_integration("list_bool", 2, "nested", false, false, None)
Expand Down

0 comments on commit 75ee7df

Please sign in to comment.