This repository has been archived by the owner on Feb 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 224
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added support to read and write delta-bitpacked
- Loading branch information
1 parent
3b29c82
commit 487d8bb
Showing
13 changed files
with
493 additions
and
69 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,260 @@ | ||
use std::collections::VecDeque; | ||
|
||
use num_traits::AsPrimitive; | ||
use parquet2::{ | ||
deserialize::SliceFilteredIter, | ||
encoding::{delta_bitpacked::Decoder, Encoding}, | ||
page::{split_buffer, DataPage, DictPage}, | ||
schema::Repetition, | ||
types::NativeType as ParquetNativeType, | ||
}; | ||
|
||
use crate::{ | ||
array::MutablePrimitiveArray, | ||
bitmap::MutableBitmap, | ||
datatypes::DataType, | ||
error::Result, | ||
io::parquet::read::deserialize::utils::{ | ||
get_selected_rows, FilteredOptionalPageValidity, OptionalPageValidity, | ||
}, | ||
types::NativeType, | ||
}; | ||
|
||
use super::super::utils; | ||
use super::super::Pages; | ||
|
||
use super::basic::{finish, PrimitiveDecoder, State as PrimitiveState}; | ||
|
||
/// The state of a [`DataPage`] of an integer parquet type (i32 or i64) | ||
#[derive(Debug)] | ||
enum State<'a, T> | ||
where | ||
T: NativeType, | ||
{ | ||
Common(PrimitiveState<'a, T>), | ||
DeltaBinaryPackedRequired(Decoder<'a>), | ||
DeltaBinaryPackedOptional(OptionalPageValidity<'a>, Decoder<'a>), | ||
FilteredDeltaBinaryPackedRequired(SliceFilteredIter<Decoder<'a>>), | ||
FilteredDeltaBinaryPackedOptional(FilteredOptionalPageValidity<'a>, Decoder<'a>), | ||
} | ||
|
||
impl<'a, T> utils::PageState<'a> for State<'a, T> | ||
where | ||
T: NativeType, | ||
{ | ||
fn len(&self) -> usize { | ||
match self { | ||
State::Common(state) => state.len(), | ||
State::DeltaBinaryPackedRequired(state) => state.size_hint().0, | ||
State::DeltaBinaryPackedOptional(state, _) => state.len(), | ||
State::FilteredDeltaBinaryPackedRequired(state) => state.size_hint().0, | ||
State::FilteredDeltaBinaryPackedOptional(state, _) => state.len(), | ||
} | ||
} | ||
} | ||
|
||
/// Decoder of integer parquet type | ||
#[derive(Debug)] | ||
struct IntDecoder<T, P, F>(PrimitiveDecoder<T, P, F>) | ||
where | ||
T: NativeType, | ||
P: ParquetNativeType, | ||
i64: num_traits::AsPrimitive<P>, | ||
F: Fn(P) -> T; | ||
|
||
impl<T, P, F> IntDecoder<T, P, F> | ||
where | ||
T: NativeType, | ||
P: ParquetNativeType, | ||
i64: num_traits::AsPrimitive<P>, | ||
F: Fn(P) -> T, | ||
{ | ||
#[inline] | ||
fn new(op: F) -> Self { | ||
Self(PrimitiveDecoder::new(op)) | ||
} | ||
} | ||
|
||
impl<'a, T, P, F> utils::Decoder<'a> for IntDecoder<T, P, F> | ||
where | ||
T: NativeType, | ||
P: ParquetNativeType, | ||
i64: num_traits::AsPrimitive<P>, | ||
F: Copy + Fn(P) -> T, | ||
{ | ||
type State = State<'a, T>; | ||
type Dict = Vec<T>; | ||
type DecodedState = (Vec<T>, MutableBitmap); | ||
|
||
fn build_state(&self, page: &'a DataPage, dict: Option<&'a Self::Dict>) -> Result<Self::State> { | ||
let is_optional = | ||
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; | ||
let is_filtered = page.selected_rows().is_some(); | ||
|
||
match (page.encoding(), dict, is_optional, is_filtered) { | ||
(Encoding::DeltaBinaryPacked, _, false, false) => { | ||
let (_, _, values) = split_buffer(page)?; | ||
Ok(State::DeltaBinaryPackedRequired(Decoder::new(values))) | ||
} | ||
(Encoding::DeltaBinaryPacked, _, true, false) => { | ||
let (_, _, values) = split_buffer(page)?; | ||
Ok(State::DeltaBinaryPackedOptional( | ||
OptionalPageValidity::try_new(page)?, | ||
Decoder::new(values), | ||
)) | ||
} | ||
(Encoding::DeltaBinaryPacked, _, false, true) => { | ||
let (_, _, values) = split_buffer(page)?; | ||
let values = Decoder::new(values); | ||
|
||
let rows = get_selected_rows(page); | ||
let values = SliceFilteredIter::new(values, rows); | ||
|
||
Ok(State::FilteredDeltaBinaryPackedRequired(values)) | ||
} | ||
(Encoding::DeltaBinaryPacked, _, true, true) => { | ||
let (_, _, values) = split_buffer(page)?; | ||
let values = Decoder::new(values); | ||
|
||
Ok(State::FilteredDeltaBinaryPackedOptional( | ||
FilteredOptionalPageValidity::try_new(page)?, | ||
values, | ||
)) | ||
} | ||
_ => self.0.build_state(page, dict).map(State::Common), | ||
} | ||
} | ||
|
||
fn with_capacity(&self, capacity: usize) -> Self::DecodedState { | ||
self.0.with_capacity(capacity) | ||
} | ||
|
||
fn extend_from_state( | ||
&self, | ||
state: &mut Self::State, | ||
decoded: &mut Self::DecodedState, | ||
remaining: usize, | ||
) { | ||
let (values, validity) = decoded; | ||
match state { | ||
State::Common(state) => self.0.extend_from_state(state, decoded, remaining), | ||
State::DeltaBinaryPackedRequired(state) => { | ||
values.extend( | ||
state | ||
.by_ref() | ||
.map(|x| x.as_()) | ||
.map(self.0.op) | ||
.take(remaining), | ||
); | ||
} | ||
State::DeltaBinaryPackedOptional(page_validity, page_values) => { | ||
utils::extend_from_decoder( | ||
validity, | ||
page_validity, | ||
Some(remaining), | ||
values, | ||
page_values.by_ref().map(|x| x.as_()).map(self.0.op), | ||
) | ||
} | ||
State::FilteredDeltaBinaryPackedRequired(page) => { | ||
values.extend( | ||
page.by_ref() | ||
.map(|x| x.as_()) | ||
.map(self.0.op) | ||
.take(remaining), | ||
); | ||
} | ||
State::FilteredDeltaBinaryPackedOptional(page_validity, page_values) => { | ||
utils::extend_from_decoder( | ||
validity, | ||
page_validity, | ||
Some(remaining), | ||
values, | ||
page_values.by_ref().map(|x| x.as_()).map(self.0.op), | ||
); | ||
} | ||
} | ||
} | ||
|
||
fn deserialize_dict(&self, page: &DictPage) -> Self::Dict { | ||
self.0.deserialize_dict(page) | ||
} | ||
} | ||
|
||
/// An [`Iterator`] adapter over [`Pages`] assumed to be encoded as primitive arrays | ||
/// encoded as parquet integer types | ||
#[derive(Debug)] | ||
pub struct IntegerIter<T, I, P, F> | ||
where | ||
I: Pages, | ||
T: NativeType, | ||
P: ParquetNativeType, | ||
F: Fn(P) -> T, | ||
{ | ||
iter: I, | ||
data_type: DataType, | ||
items: VecDeque<(Vec<T>, MutableBitmap)>, | ||
remaining: usize, | ||
chunk_size: Option<usize>, | ||
dict: Option<Vec<T>>, | ||
op: F, | ||
phantom: std::marker::PhantomData<P>, | ||
} | ||
|
||
impl<T, I, P, F> IntegerIter<T, I, P, F> | ||
where | ||
I: Pages, | ||
T: NativeType, | ||
|
||
P: ParquetNativeType, | ||
F: Copy + Fn(P) -> T, | ||
{ | ||
pub fn new( | ||
iter: I, | ||
data_type: DataType, | ||
num_rows: usize, | ||
chunk_size: Option<usize>, | ||
op: F, | ||
) -> Self { | ||
Self { | ||
iter, | ||
data_type, | ||
items: VecDeque::new(), | ||
dict: None, | ||
remaining: num_rows, | ||
chunk_size, | ||
op, | ||
phantom: Default::default(), | ||
} | ||
} | ||
} | ||
|
||
impl<T, I, P, F> Iterator for IntegerIter<T, I, P, F> | ||
where | ||
I: Pages, | ||
T: NativeType, | ||
P: ParquetNativeType, | ||
i64: num_traits::AsPrimitive<P>, | ||
F: Copy + Fn(P) -> T, | ||
{ | ||
type Item = Result<MutablePrimitiveArray<T>>; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
let maybe_state = utils::next( | ||
&mut self.iter, | ||
&mut self.items, | ||
&mut self.dict, | ||
&mut self.remaining, | ||
self.chunk_size, | ||
&IntDecoder::new(self.op), | ||
); | ||
match maybe_state { | ||
utils::MaybeNext::Some(Ok((values, validity))) => { | ||
Some(Ok(finish(&self.data_type, values, validity))) | ||
} | ||
utils::MaybeNext::Some(Err(e)) => Some(Err(e)), | ||
utils::MaybeNext::None => None, | ||
utils::MaybeNext::More => self.next(), | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,9 @@ | ||
mod basic; | ||
mod dictionary; | ||
mod integer; | ||
mod nested; | ||
|
||
pub use basic::Iter; | ||
pub use dictionary::{DictIter, NestedDictIter}; | ||
pub use integer::IntegerIter; | ||
pub use nested::NestedIter; |
Oops, something went wrong.