Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 25, 2022
1 parent c13c9c5 commit 86154a1
Show file tree
Hide file tree
Showing 23 changed files with 383 additions and 115 deletions.
5 changes: 4 additions & 1 deletion src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,16 +426,18 @@ pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
data_type: DataType,
items: VecDeque<(Binary<O>, MutableBitmap)>,
chunk_size: Option<usize>,
remaining: usize,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iter<O, A, I> {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, num_rows: usize) -> Self {
Self {
iter,
data_type,
items: VecDeque::new(),
chunk_size,
remaining: num_rows,
phantom_a: Default::default(),
}
}
Expand All @@ -448,6 +450,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for Iter<O, A, I>
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.remaining,
self.chunk_size,
&BinaryDecoder::<O>::default(),
);
Expand Down
5 changes: 4 additions & 1 deletion src/io/parquet/read/deserialize/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ where
values_data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
remaining: usize,
chunk_size: Option<usize>,
phantom: std::marker::PhantomData<O>,
}
Expand All @@ -36,7 +37,7 @@ where
O: Offset,
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option<usize>) -> Self {
let values_data_type = match &data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
Expand All @@ -47,6 +48,7 @@ where
values_data_type,
values: Dict::Empty,
items: VecDeque::new(),
remaining: num_rows,
chunk_size,
phantom: std::marker::PhantomData,
}
Expand Down Expand Up @@ -93,6 +95,7 @@ where
&mut self.items,
&mut self.values,
self.data_type.clone(),
&mut self.remaining,
self.chunk_size,
|dict| read_dict::<O>(self.values_data_type.clone(), dict),
);
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
num_rows: usize,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
Expand All @@ -31,7 +32,7 @@ where
O: Offset,
{
Box::new(
ArrayIterator::<O, A, I>::new(iter, init, data_type, chunk_size).map(|x| {
ArrayIterator::<O, A, I>::new(iter, init, data_type, num_rows, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = Box::new(array) as Box<dyn Array>;
Expand Down
4 changes: 4 additions & 0 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
init: Vec<InitNested>,
items: VecDeque<(NestedState, (Binary<O>, MutableBitmap))>,
chunk_size: Option<usize>,
remaining: usize,
phantom_a: std::marker::PhantomData<A>,
}

Expand All @@ -155,6 +156,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
iter: I,
init: Vec<InitNested>,
data_type: DataType,
num_rows: usize,
chunk_size: Option<usize>,
) -> Self {
Self {
Expand All @@ -163,6 +165,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
init,
items: VecDeque::new(),
chunk_size,
remaining: num_rows,
phantom_a: Default::default(),
}
}
Expand All @@ -175,6 +178,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for ArrayIterator
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.remaining,
&self.init,
self.chunk_size,
&BinaryDecoder::<O>::default(),
Expand Down
5 changes: 4 additions & 1 deletion src/io/parquet/read/deserialize/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,17 @@ pub struct Iter<I: DataPages> {
data_type: DataType,
items: VecDeque<(MutableBitmap, MutableBitmap)>,
chunk_size: Option<usize>,
remaining: usize,
}

impl<I: DataPages> Iter<I> {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, num_rows: usize) -> Self {
Self {
iter,
data_type,
items: VecDeque::new(),
chunk_size,
remaining: num_rows,
}
}
}
Expand All @@ -212,6 +214,7 @@ impl<I: DataPages> Iterator for Iter<I> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.remaining,
self.chunk_size,
&BooleanDecoder::default(),
);
Expand Down
17 changes: 10 additions & 7 deletions src/io/parquet/read/deserialize/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ pub use self::basic::Iter;
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: Vec<InitNested>,
num_rows: usize,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: DataPages,
{
Box::new(ArrayIterator::new(iter, init, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = array.boxed();
(nested, values)
})
}))
Box::new(
ArrayIterator::new(iter, init, num_rows, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = array.boxed();
(nested, values)
})
}),
)
}
5 changes: 4 additions & 1 deletion src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,17 @@ pub struct ArrayIterator<I: DataPages> {
iter: I,
init: Vec<InitNested>,
items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>,
remaining: usize,
chunk_size: Option<usize>,
}

impl<I: DataPages> ArrayIterator<I> {
pub fn new(iter: I, init: Vec<InitNested>, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, init: Vec<InitNested>, num_rows: usize, chunk_size: Option<usize>) -> Self {
Self {
iter,
init,
items: VecDeque::new(),
remaining: num_rows,
chunk_size,
}
}
Expand All @@ -130,6 +132,7 @@ impl<I: DataPages> Iterator for ArrayIterator<I> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.remaining,
&self.init,
self.chunk_size,
&BooleanDecoder::default(),
Expand Down
9 changes: 8 additions & 1 deletion src/io/parquet/read/deserialize/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ pub(super) fn next_dict<
items: &mut VecDeque<(Vec<K>, MutableBitmap)>,
dict: &mut Dict,
data_type: DataType,
remaining: &mut usize,
chunk_size: Option<usize>,
read_dict: F,
) -> MaybeNext<Result<DictionaryArray<K>>> {
Expand Down Expand Up @@ -286,7 +287,13 @@ pub(super) fn next_dict<
Err(e) => return MaybeNext::Some(Err(e)),
};

utils::extend_from_new_page(page, chunk_size, items, &PrimitiveDecoder::<K>::default());
utils::extend_from_new_page(
page,
chunk_size,
items,
remaining,
&PrimitiveDecoder::<K>::default(),
);

if items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX) {
MaybeNext::More
Expand Down
5 changes: 4 additions & 1 deletion src/io/parquet/read/deserialize/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,19 @@ pub struct Iter<I: DataPages> {
size: usize,
items: VecDeque<(FixedSizeBinary, MutableBitmap)>,
chunk_size: Option<usize>,
remaining: usize,
}

impl<I: DataPages> Iter<I> {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option<usize>) -> Self {
let size = FixedSizeBinaryArray::get_size(&data_type);
Self {
iter,
data_type,
size,
items: VecDeque::new(),
chunk_size,
remaining: num_rows,
}
}
}
Expand All @@ -308,6 +310,7 @@ impl<I: DataPages> Iterator for Iter<I> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.remaining,
self.chunk_size,
&BinaryDecoder { size: self.size },
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ where
values_data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
remaining: usize,
chunk_size: Option<usize>,
}

Expand All @@ -33,7 +34,7 @@ where
K: DictionaryKey,
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option<usize>) -> Self {
let values_data_type = match &data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
Expand All @@ -44,6 +45,7 @@ where
values_data_type,
values: Dict::Empty,
items: VecDeque::new(),
remaining: num_rows,
chunk_size,
}
}
Expand Down Expand Up @@ -76,6 +78,7 @@ where
&mut self.items,
&mut self.values,
self.data_type.clone(),
&mut self.remaining,
self.chunk_size,
|dict| read_dict(self.values_data_type.clone(), dict),
);
Expand Down
Loading

0 comments on commit 86154a1

Please sign in to comment.