Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Initial take
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 22, 2022
1 parent 9ad04ca commit d6f3966
Show file tree
Hide file tree
Showing 7 changed files with 480 additions and 13 deletions.
8 changes: 5 additions & 3 deletions src/io/parquet/read/deserialize/dictionary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod nested;

use std::collections::VecDeque;

use parquet2::{
Expand Down Expand Up @@ -292,8 +294,7 @@ pub(super) fn next_dict<
MaybeNext::More
} else {
let (values, validity) = items.pop_front().unwrap();
let keys =
PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into());
let keys = finish_key(values, validity);
MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap()))
}
}
Expand All @@ -304,11 +305,12 @@ pub(super) fn next_dict<
debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX));

let keys = finish_key(values, validity);

MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap()))
} else {
MaybeNext::None
}
}
}
}

pub use nested::next_dict as nested_next_dict;
205 changes: 205 additions & 0 deletions src/io/parquet/read/deserialize/dictionary/nested.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use std::collections::VecDeque;

use parquet2::{
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
page::{DataPage, DictPage},
schema::Repetition,
};

use crate::datatypes::DataType;
use crate::{
array::{Array, DictionaryArray, DictionaryKey},
bitmap::MutableBitmap,
error::{Error, Result},
};

use super::{
super::super::DataPages,
super::nested_utils::*,
super::utils::{dict_indices_decoder, not_implemented, MaybeNext, PageState},
finish_key, Dict,
};

// The state of a required DataPage with a boolean physical type
#[derive(Debug)]
pub struct Required<'a> {
values: HybridRleDecoder<'a>,
length: usize,
}

impl<'a> Required<'a> {
fn try_new(page: &'a DataPage) -> Result<Self> {
let values = dict_indices_decoder(page)?;
let length = page.num_values();
Ok(Self { values, length })
}
}

// The state of a `DataPage` of a `Dictionary` type
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum State<'a> {
Optional(HybridRleDecoder<'a>),
Required(Required<'a>),
}

impl<'a> State<'a> {
pub fn len(&self) -> usize {
match self {
State::Optional(page) => page.len(),
State::Required(page) => page.length,
}
}
}

impl<'a> PageState<'a> for State<'a> {
fn len(&self) -> usize {
self.len()
}
}

#[derive(Debug)]
pub struct DictionaryDecoder<K>
where
K: DictionaryKey,
{
phantom_k: std::marker::PhantomData<K>,
}

impl<K> Default for DictionaryDecoder<K>
where
K: DictionaryKey,
{
#[inline]
fn default() -> Self {
Self {
phantom_k: std::marker::PhantomData,
}
}
}

impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder<K> {
type State = State<'a>;
type DecodedState = (Vec<K>, MutableBitmap);

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let is_filtered = page.selected_rows().is_some();

match (page.encoding(), is_optional, is_filtered) {
(Encoding::RleDictionary | Encoding::PlainDictionary, true, false) => {
dict_indices_decoder(page).map(State::Optional)
}
(Encoding::RleDictionary | Encoding::PlainDictionary, false, false) => {
Required::try_new(page).map(State::Required)
}
_ => Err(not_implemented(page)),
}
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
Vec::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
)
}

fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) {
let (values, validity) = decoded;
match state {
State::Optional(page_values) => {
let key = page_values.next();
// todo: convert unwrap to error
let key = match K::try_from(key.unwrap_or_default() as usize) {
Ok(key) => key,
Err(_) => todo!(),
};
values.push(key);
validity.push(true);
}
State::Required(page_values) => {
let key = page_values.values.next();
let key = match K::try_from(key.unwrap_or_default() as usize) {
Ok(key) => key,
Err(_) => todo!(),
};
values.push(key);
}
}
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
let (values, validity) = decoded;
values.push(K::default());
validity.push(false)
}
}

pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box<dyn Array>>(
iter: &'a mut I,
items: &mut VecDeque<(NestedState, (Vec<K>, MutableBitmap))>,
init: &[InitNested],
dict: &mut Dict,
data_type: DataType,
chunk_size: Option<usize>,
read_dict: F,
) -> MaybeNext<Result<(NestedState, DictionaryArray<K>)>> {
if items.len() > 1 {
let (nested, (values, validity)) = items.pop_front().unwrap();
let keys = finish_key(values, validity);
let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap());
return MaybeNext::Some(dict.map(|dict| (nested, dict)));
}
match iter.next() {
Err(e) => MaybeNext::Some(Err(e.into())),
Ok(Some(page)) => {
// consume the dictionary page
match (&dict, page.dictionary_page()) {
(Dict::Empty, None) => {
return MaybeNext::Some(Err(Error::nyi(
"dictionary arrays from non-dict-encoded pages",
)));
}
(Dict::Empty, Some(dict_page)) => {
*dict = Dict::Complete(read_dict(dict_page.as_ref()))
}
(Dict::Complete(_), _) => {}
};

let error = extend(
page,
init,
items,
&DictionaryDecoder::<K>::default(),
chunk_size,
);
match error {
Ok(_) => {}
Err(e) => return MaybeNext::Some(Err(e)),
};

if items.front().unwrap().0.len() < chunk_size.unwrap_or(usize::MAX) {
MaybeNext::More
} else {
let (nested, (values, validity)) = items.pop_front().unwrap();
let keys = finish_key(values, validity);
let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap());
MaybeNext::Some(dict.map(|dict| (nested, dict)))
}
}
Ok(None) => {
if let Some((nested, (values, validity))) = items.pop_front() {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX));

let keys = finish_key(values, validity);
let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap());
MaybeNext::Some(dict.map(|dict| (nested, dict)))
} else {
MaybeNext::None
}
}
}
}
126 changes: 125 additions & 1 deletion src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use parquet2::read::get_page_iterator as _get_page_iterator;
use parquet2::schema::types::PrimitiveType;

use crate::{
array::{Array, BinaryArray, FixedSizeListArray, ListArray, MapArray, Utf8Array},
array::{
Array, BinaryArray, DictionaryKey, FixedSizeListArray, ListArray, MapArray, Utf8Array,
},
datatypes::{DataType, Field},
error::{Error, Result},
};
Expand Down Expand Up @@ -289,6 +291,14 @@ where
}

_ => match field.data_type().to_logical_type() {
DataType::Dictionary(key_type, _, _) => {
let type_ = types.pop().unwrap();
let iter = columns.pop().unwrap();
let data_type = field.data_type().clone();
match_integer_type!(key_type, |$K| {
dict_read::<$K, _>(iter, init, type_, data_type, chunk_size)
})?
}
DataType::List(inner)
| DataType::LargeList(inner)
| DataType::FixedSizeList(inner, _) => {
Expand Down Expand Up @@ -401,3 +411,117 @@ where
.map(|x| x.map(|x| x.1)),
))
}

fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>(
iter: I,
init: Vec<InitNested>,
_type_: &PrimitiveType,
data_type: DataType,
chunk_size: Option<usize>,
) -> Result<NestedArrayIter<'a>> {
use DataType::*;
let values_data_type = if let Dictionary(_, v, _) = &data_type {
v.as_ref()
} else {
panic!()
};

Ok(match values_data_type.to_logical_type() {
UInt8 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: i32| x as u8,
),
Float32 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: f32| x,
),
Float64 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: f64| x,
),
/*
UInt16 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
iter,
data_type,
chunk_size,
|x: i32| x as u16,
)),
UInt32 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
iter,
data_type,
chunk_size,
|x: i32| x as u32,
)),
Int8 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
iter,
data_type,
chunk_size,
|x: i32| x as i8,
)),
Int16 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
iter,
data_type,
chunk_size,
|x: i32| x as i16,
)),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(
primitive::DictIter::<K, _, _, _, _>::new(iter, data_type, chunk_size, |x: i32| {
x as i32
}),
),
Timestamp(time_unit, _) => {
let time_unit = *time_unit;
return timestamp_dict::<K, _>(
iter,
physical_type,
logical_type,
data_type,
chunk_size,
time_unit,
);
}
Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter(
primitive::DictIter::<K, _, _, _, _>::new(iter, data_type, chunk_size, |x: i64| x),
),
Float32 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
iter,
data_type,
chunk_size,
|x: f32| x,
)),
Float64 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
iter,
data_type,
chunk_size,
|x: f64| x,
)),
Utf8 | Binary => dyn_iter(binary::DictIter::<K, i32, _>::new(
iter, data_type, chunk_size,
)),
LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::<K, i64, _>::new(
iter, data_type, chunk_size,
)),
FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::<K, _>::new(
iter, data_type, chunk_size,
)),
*/
other => {
return Err(Error::nyi(format!(
"Reading nested dictionaries of type {:?}",
other
)))
}
})
}
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl NestedState {

/// Extends `items` by consuming `page`, first trying to complete the last `item`
/// and extending it if more are needed
fn extend<'a, D: NestedDecoder<'a>>(
pub(super) fn extend<'a, D: NestedDecoder<'a>>(
page: &'a DataPage,
init: &[InitNested],
items: &mut VecDeque<(NestedState, D::DecodedState)>,
Expand Down
Loading

0 comments on commit d6f3966

Please sign in to comment.