Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Delayed dict #1185

Merged
merged 4 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ futures = { version = "0.3", optional = true }
async-stream = { version = "0.3.2", optional = true }

# parquet support
parquet2 = { version = "0.14.0", optional = true, default_features = false }
parquet2 = { version = "0.15.0", optional = true, default_features = false, features = ["async"] }

# avro support
avro-schema = { version = "0.3", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ fn add_benchmark(c: &mut Criterion) {
let a = format!("read i64 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, true, true, false, false);
let a = format!("read ts dict 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 11).unwrap()));

let a = format!("read utf8 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module for non-nested arrays is `simple::page_iter_to_arrays`.

This function expects

* a (fallible) streaming iterator of decompressed and encoded pages, `DataPages`
* a (fallible) streaming iterator of decompressed and encoded pages, `Pages`
* the source (parquet) column type, including its logical information
* the target (arrow) `DataType`
* the chunk size
Expand All @@ -18,7 +18,7 @@ This design is shared among _all_ `(parquet, arrow)` implemented tuples. Their m
difference is how they are deserialized, which depends on the source and target types.

When the array iterator is pulled the first time, the following happens:
* a page from `DataPages` is pulled
* a page from `Pages` is pulled
* a `PageState<'a>` is built from the page
* the `PageState` is consumed into a mutable array:
* if `chunk_size` is larger than the number of rows in the page, the mutable array state is preserved and a new page is pulled and the process repeated until we fill a chunk.
Expand Down
106 changes: 41 additions & 65 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::default::Default;
use parquet2::{
deserialize::SliceFilteredIter,
encoding::{hybrid_rle, Encoding},
page::{split_buffer, BinaryPageDict, DataPage},
page::{split_buffer, DataPage, DictPage},
schema::Repetition,
};

Expand All @@ -20,7 +20,7 @@ use super::super::utils::{
extend_from_decoder, get_selected_rows, next, DecodedState, FilteredOptionalPageValidity,
MaybeNext, OptionalPageValidity,
};
use super::super::DataPages;
use super::super::Pages;
use super::{super::utils, utils::*};

/*
Expand Down Expand Up @@ -99,14 +99,16 @@ impl<'a> FilteredRequired<'a> {
}
}

pub(super) type Dict = Vec<Vec<u8>>;

#[derive(Debug)]
pub(super) struct RequiredDictionary<'a> {
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub dict: &'a BinaryPageDict,
pub dict: &'a Dict,
}

impl<'a> RequiredDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self> {
pub fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result<Self> {
let values = utils::dict_indices_decoder(page)?;

Ok(Self { dict, values })
Expand All @@ -121,11 +123,11 @@ impl<'a> RequiredDictionary<'a> {
#[derive(Debug)]
pub(super) struct FilteredRequiredDictionary<'a> {
pub values: SliceFilteredIter<hybrid_rle::HybridRleDecoder<'a>>,
pub dict: &'a BinaryPageDict,
pub dict: &'a Dict,
}

impl<'a> FilteredRequiredDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self> {
pub fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result<Self> {
let values = utils::dict_indices_decoder(page)?;

let rows = get_selected_rows(page);
Expand All @@ -143,11 +145,11 @@ impl<'a> FilteredRequiredDictionary<'a> {
#[derive(Debug)]
pub(super) struct ValuesDictionary<'a> {
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub dict: &'a BinaryPageDict,
pub dict: &'a Dict,
}

impl<'a> ValuesDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self> {
pub fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result<Self> {
let values = utils::dict_indices_decoder(page)?;

Ok(Self { dict, values })
Expand Down Expand Up @@ -219,7 +221,7 @@ impl<O: Offset> TraitBinaryArray<O> for Utf8Array<O> {
}
}

impl<'a, O: Offset> DecodedState<'a> for (Binary<O>, MutableBitmap) {
impl<O: Offset> DecodedState for (Binary<O>, MutableBitmap) {
fn len(&self) -> usize {
self.0.len()
}
Expand All @@ -232,42 +234,29 @@ struct BinaryDecoder<O: Offset> {

impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
type State = State<'a>;
type Dict = Dict;
type DecodedState = (Binary<O>, MutableBitmap);

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
fn build_state(&self, page: &'a DataPage, dict: Option<&'a Self::Dict>) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let is_filtered = page.selected_rows().is_some();

match (
page.encoding(),
page.dictionary_page(),
is_optional,
is_filtered,
) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
Ok(State::RequiredDictionary(RequiredDictionary::try_new(
page,
dict.as_any().downcast_ref().unwrap(),
)?))
}
match (page.encoding(), dict, is_optional, is_filtered) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => Ok(
State::RequiredDictionary(RequiredDictionary::try_new(page, dict)?),
),
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::OptionalDictionary(
OptionalPageValidity::try_new(page)?,
ValuesDictionary::try_new(page, dict)?,
))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, true) => {
let dict = dict.as_any().downcast_ref().unwrap();

FilteredRequiredDictionary::try_new(page, dict)
.map(State::FilteredRequiredDictionary)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::FilteredOptionalDictionary(
FilteredOptionalPageValidity::try_new(page)?,
ValuesDictionary::try_new(page, dict)?,
Expand Down Expand Up @@ -332,15 +321,8 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
}
State::OptionalDictionary(page_validity, page_values) => {
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
utils::extend_from_decoder(
validity,
page_validity,
Expand All @@ -350,14 +332,8 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
)
}
State::RequiredDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();
let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();

for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
Expand All @@ -373,29 +349,16 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
);
}
State::FilteredRequiredDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();
let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();

for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
}
State::FilteredOptionalDictionary(page_validity, page_values) => {
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
utils::extend_from_decoder(
validity,
page_validity,
Expand All @@ -406,6 +369,10 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
}
}

fn deserialize_dict(&self, page: &DictPage) -> Self::Dict {
deserialize_plain(&page.buffer, page.num_values)
}
}

pub(super) fn finish<O: Offset, A: TraitBinaryArray<O>>(
Expand All @@ -421,35 +388,38 @@ pub(super) fn finish<O: Offset, A: TraitBinaryArray<O>>(
)
}

pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: Pages> {
iter: I,
data_type: DataType,
items: VecDeque<(Binary<O>, MutableBitmap)>,
dict: Option<Dict>,
chunk_size: Option<usize>,
remaining: usize,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iter<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: Pages> Iter<O, A, I> {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, num_rows: usize) -> Self {
Self {
iter,
data_type,
items: VecDeque::new(),
dict: None,
chunk_size,
remaining: num_rows,
phantom_a: Default::default(),
}
}
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for Iter<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: Pages> Iterator for Iter<O, A, I> {
type Item = Result<A>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.dict,
&mut self.remaining,
self.chunk_size,
&BinaryDecoder::<O>::default(),
Expand All @@ -464,3 +434,9 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for Iter<O, A, I>
}
}
}

pub(super) fn deserialize_plain(values: &[u8], num_values: usize) -> Dict {
SizedBinaryIter::new(values, num_values)
.map(|x| x.to_vec())
.collect()
}
Loading