Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for filtered dictionary-encoded read
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 12, 2022
1 parent b26e738 commit 1e8256a
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,28 @@ impl<'a> RequiredDictionary<'a> {
}
}

#[derive(Debug)]
pub(super) struct FilteredRequiredDictionary<'a> {
pub values: SliceFilteredIter<hybrid_rle::HybridRleDecoder<'a>>,
pub dict: &'a BinaryPageDict,
}

impl<'a> FilteredRequiredDictionary<'a> {
pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let values = utils::dict_indices_decoder(page);

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Self { values, dict }
}

#[inline]
pub fn len(&self) -> usize {
self.values.size_hint().0
}
}

#[derive(Debug)]
pub(super) struct ValuesDictionary<'a> {
pub values: hybrid_rle::HybridRleDecoder<'a>,
Expand All @@ -143,6 +165,8 @@ enum State<'a> {
OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a>),
FilteredRequired(FilteredRequired<'a>),
FilteredOptional(FilteredOptionalPageValidity<'a>, BinaryIter<'a>),
FilteredRequiredDictionary(FilteredRequiredDictionary<'a>),
FilteredOptionalDictionary(FilteredOptionalPageValidity<'a>, ValuesDictionary<'a>),
}

impl<'a> utils::PageState<'a> for State<'a> {
Expand All @@ -154,6 +178,8 @@ impl<'a> utils::PageState<'a> for State<'a> {
State::OptionalDictionary(optional, _) => optional.len(),
State::FilteredRequired(state) => state.len(),
State::FilteredOptional(validity, _) => validity.len(),
State::FilteredRequiredDictionary(values) => values.len(),
State::FilteredOptionalDictionary(optional, _) => optional.len(),
}
}
}
Expand Down Expand Up @@ -231,6 +257,21 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
ValuesDictionary::new(page, dict),
))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::FilteredRequiredDictionary(
FilteredRequiredDictionary::new(page, dict),
))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::FilteredOptionalDictionary(
FilteredOptionalPageValidity::new(page),
ValuesDictionary::new(page, dict),
))
}
(Encoding::Plain, _, true, false) => {
let (_, _, values) = utils::split_buffer(page);

Expand Down Expand Up @@ -327,6 +368,38 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
page_values.by_ref(),
);
}
State::FilteredRequiredDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();
let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};

for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
}
State::FilteredOptionalDictionary(page_validity, page_values) => {
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page_values.values.by_ref().map(op),
)
}
}
}
}
Expand Down

0 comments on commit 1e8256a

Please sign in to comment.