Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Made chunk_size optional in parquet (#1055)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 13, 2022
1 parent 7587a27 commit 698c7b8
Show file tree
Hide file tree
Showing 22 changed files with 75 additions and 53 deletions.
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,12 @@ pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
iter: I,
data_type: DataType,
items: VecDeque<(Binary<O>, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iter<O, A, I> {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
Self {
iter,
data_type,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ where
data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
phantom: std::marker::PhantomData<O>,
}

Expand All @@ -35,7 +35,7 @@ where
O: Offset,
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
Expand Down
9 changes: 7 additions & 2 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,17 @@ pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
init: Vec<InitNested>,
items: VecDeque<(Binary<O>, MutableBitmap)>,
nested: VecDeque<NestedState>,
chunk_size: usize,
chunk_size: Option<usize>,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
pub fn new(iter: I, init: Vec<InitNested>, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> Self {
Self {
iter,
data_type,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ pub struct Iter<I: DataPages> {
iter: I,
data_type: DataType,
items: VecDeque<(MutableBitmap, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
}

impl<I: DataPages> Iter<I> {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
Self {
iter,
data_type,
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use self::basic::Iter;
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: Vec<InitNested>,
chunk_size: usize,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: DataPages,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ pub struct ArrayIterator<I: DataPages> {
// invariant: items.len() == nested.len()
items: VecDeque<(MutableBitmap, MutableBitmap)>,
nested: VecDeque<NestedState>,
chunk_size: usize,
chunk_size: Option<usize>,
}

impl<I: DataPages> ArrayIterator<I> {
pub fn new(iter: I, init: Vec<InitNested>, chunk_size: usize) -> Self {
pub fn new(iter: I, init: Vec<InitNested>, chunk_size: Option<usize>) -> Self {
Self {
iter,
init,
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/deserialize/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ pub(super) fn next_dict<
iter: &'a mut I,
items: &mut VecDeque<(Vec<K>, MutableBitmap)>,
dict: &mut Dict,
chunk_size: usize,
chunk_size: Option<usize>,
read_dict: F,
) -> MaybeNext<Result<DictionaryArray<K>>> {
if items.len() > 1 {
Expand Down Expand Up @@ -249,7 +249,7 @@ pub(super) fn next_dict<

utils::extend_from_new_page(page, chunk_size, items, &PrimitiveDecoder::<K>::default());

if items.front().unwrap().len() < chunk_size {
if items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX) {
MaybeNext::More
} else {
let (values, validity) = items.pop_front().unwrap();
Expand All @@ -262,7 +262,7 @@ pub(super) fn next_dict<
if let Some((values, validity)) = items.pop_front() {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= chunk_size);
debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX));

let keys = finish_key(values, validity);

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,11 @@ pub struct Iter<I: DataPages> {
data_type: DataType,
size: usize,
items: VecDeque<(FixedSizeBinary, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
}

impl<I: DataPages> Iter<I> {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
let size = FixedSizeBinaryArray::get_size(&data_type);
Self {
iter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ where
data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
}

impl<K, I> DictIter<K, I>
where
K: DictionaryKey,
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
Expand Down
11 changes: 8 additions & 3 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ mod simple;
mod struct_;
mod utils;

use parquet2::read::get_page_iterator as _get_page_iterator;
use parquet2::schema::types::PrimitiveType;

use crate::{
array::{Array, BinaryArray, FixedSizeListArray, ListArray, MapArray, Utf8Array},
datatypes::{DataType, Field},
error::{Error, Result},
};

use self::nested_utils::{InitNested, NestedArrayIter, NestedState};
use parquet2::schema::types::PrimitiveType;
use simple::page_iter_to_arrays;

use super::*;
Expand Down Expand Up @@ -94,7 +96,7 @@ fn columns_to_iter_recursive<'a, I: 'a>(
mut types: Vec<&PrimitiveType>,
field: Field,
mut init: Vec<InitNested>,
chunk_size: usize,
chunk_size: Option<usize>,
) -> Result<NestedArrayIter<'a>>
where
I: DataPages,
Expand Down Expand Up @@ -359,12 +361,15 @@ fn n_columns(data_type: &DataType) -> usize {

/// An iterator adapter that maps multiple iterators of [`DataPages`] into an iterator of [`Array`]s.
///
/// For a non-nested datatypes such as [`DataType::Int32`], this function requires a single element in `columns` and `types`.
/// For nested types, `columns` must be composed by all parquet columns with associated types `types`.
///
/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`.
pub fn column_iter_to_arrays<'a, I: 'a>(
columns: Vec<I>,
types: Vec<&PrimitiveType>,
field: Field,
chunk_size: usize,
chunk_size: Option<usize>,
) -> Result<ArrayIter<'a>>
where
I: DataPages,
Expand Down
15 changes: 9 additions & 6 deletions src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,11 @@ pub fn extend_offsets1<'a>(
page: &mut NestedPage<'a>,
init: &[InitNested],
items: &mut VecDeque<NestedState>,
chunk_size: usize,
chunk_size: Option<usize>,
) {
let capacity = chunk_size.unwrap_or(0);
let chunk_size = chunk_size.unwrap_or(usize::MAX);

let mut nested = if let Some(nested) = items.pop_back() {
// there is a already a state => it must be incomplete...
debug_assert!(
Expand All @@ -374,7 +377,7 @@ pub fn extend_offsets1<'a>(
nested
} else {
// there is no state => initialize it
init_nested(init, chunk_size)
init_nested(init, capacity)
};

let remaining = chunk_size - nested.len();
Expand All @@ -384,7 +387,7 @@ pub fn extend_offsets1<'a>(
items.push_back(nested);

while page.len() > 0 {
let mut nested = init_nested(init, chunk_size);
let mut nested = init_nested(init, capacity);
extend_offsets2(page, &mut nested, chunk_size);
items.push_back(nested);
}
Expand Down Expand Up @@ -425,7 +428,7 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi

let next_rep = page.iter.peek().map(|x| x.0).unwrap_or(0);

if next_rep == 0 && rows == additional + 1 {
if next_rep == 0 && rows == additional.saturating_add(1) {
break;
}
}
Expand Down Expand Up @@ -478,7 +481,7 @@ pub(super) fn next<'a, I, D>(
items: &mut VecDeque<D::DecodedState>,
nested_items: &mut VecDeque<NestedState>,
init: &[InitNested],
chunk_size: usize,
chunk_size: Option<usize>,
decoder: &D,
) -> MaybeNext<Result<(NestedState, D::DecodedState)>>
where
Expand Down Expand Up @@ -517,7 +520,7 @@ where

extend_from_new_page(page, items, nested_items, decoder);

if nested_items.front().unwrap().len() < chunk_size {
if nested_items.front().unwrap().len() < chunk_size.unwrap_or(0) {
MaybeNext::More
} else {
let nested = nested_items.pop_front().unwrap();
Expand Down
8 changes: 7 additions & 1 deletion src/io/parquet/read/deserialize/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ use crate::{array::NullArray, datatypes::DataType};
use super::super::{ArrayIter, DataPages};

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, I>(mut iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
pub fn iter_to_arrays<'a, I>(
mut iter: I,
data_type: DataType,
chunk_size: Option<usize>,
) -> ArrayIter<'a>
where
I: 'a + DataPages,
{
Expand All @@ -16,6 +20,8 @@ where
return Box::new(std::iter::empty());
}

let chunk_size = chunk_size.unwrap_or(len);

let complete_chunks = chunk_size / len;
let remainder = chunk_size % len;
let i_data_type = data_type.clone();
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ where
iter: I,
data_type: DataType,
items: VecDeque<(Vec<T>, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
op: F,
phantom: std::marker::PhantomData<P>,
}
Expand All @@ -309,7 +309,7 @@ where
P: ParquetNativeType,
F: Copy + Fn(P) -> T,
{
pub fn new(iter: I, data_type: DataType, chunk_size: usize, op: F) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, op: F) -> Self {
Self {
iter,
data_type,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/primitive/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
op: F,
phantom: std::marker::PhantomData<P>,
}
Expand All @@ -61,7 +61,7 @@ where
P: ParquetNativeType,
F: Copy + Fn(P) -> T,
{
pub fn new(iter: I, data_type: DataType, chunk_size: usize, op: F) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, op: F) -> Self {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => *values,
_ => data_type,
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn iter_to_arrays_nested<'a, I, T, P, F>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
op: F,
) -> NestedArrayIter<'a>
where
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ where
// invariant: items.len() == nested.len()
items: VecDeque<(Vec<T>, MutableBitmap)>,
nested: VecDeque<NestedState>,
chunk_size: usize,
chunk_size: Option<usize>,
decoder: PrimitiveDecoder<T, P, F>,
}

Expand All @@ -195,7 +195,7 @@ where
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
op: F,
) -> Self {
Self {
Expand Down
8 changes: 4 additions & 4 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
pages: I,
type_: &PrimitiveType,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
) -> Result<ArrayIter<'a>> {
use DataType::*;

Expand Down Expand Up @@ -230,7 +230,7 @@ fn timestamp<'a, I: 'a + DataPages>(
physical_type: &PhysicalType,
logical_type: &Option<PrimitiveLogicalType>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
time_unit: TimeUnit,
) -> Result<ArrayIter<'a>> {
if physical_type == &PhysicalType::Int96 {
Expand Down Expand Up @@ -289,7 +289,7 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>(
physical_type: &PhysicalType,
logical_type: &Option<PrimitiveLogicalType>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
time_unit: TimeUnit,
) -> Result<ArrayIter<'a>> {
if physical_type == &PhysicalType::Int96 {
Expand Down Expand Up @@ -424,7 +424,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>(
physical_type: &PhysicalType,
logical_type: &Option<PrimitiveLogicalType>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
) -> Result<ArrayIter<'a>> {
use DataType::*;
let values_data_type = if let Dictionary(_, v, _) = &data_type {
Expand Down
Loading

0 comments on commit 698c7b8

Please sign in to comment.