diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index 63aa8ff642a..3732839b7b0 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -426,16 +426,18 @@ pub struct Iter, I: DataPages> { data_type: DataType, items: VecDeque<(Binary, MutableBitmap)>, chunk_size: Option, + remaining: usize, phantom_a: std::marker::PhantomData, } impl, I: DataPages> Iter { - pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option, num_rows: usize) -> Self { Self { iter, data_type, items: VecDeque::new(), chunk_size, + remaining: num_rows, phantom_a: Default::default(), } } @@ -448,6 +450,7 @@ impl, I: DataPages> Iterator for Iter let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, self.chunk_size, &BinaryDecoder::::default(), ); diff --git a/src/io/parquet/read/deserialize/binary/dictionary.rs b/src/io/parquet/read/deserialize/binary/dictionary.rs index 7095afdeaf2..2d8900a09e2 100644 --- a/src/io/parquet/read/deserialize/binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/binary/dictionary.rs @@ -26,6 +26,7 @@ where data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, + remaining: usize, chunk_size: Option, phantom: std::marker::PhantomData, } @@ -36,12 +37,13 @@ where O: Offset, I: DataPages, { - pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { + pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option) -> Self { Self { iter, data_type, values: Dict::Empty, items: VecDeque::new(), + remaining: num_rows, chunk_size, phantom: std::marker::PhantomData, } @@ -93,6 +95,7 @@ where &mut self.items, &mut self.values, self.data_type.clone(), + &mut self.remaining, self.chunk_size, |dict| read_dict::(self.data_type.clone(), dict), ); @@ -117,6 +120,7 @@ where data_type: DataType, values: Dict, items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + remaining: usize, chunk_size: Option, phantom: std::marker::PhantomData, } @@ -131,6 +135,7 @@ where iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> Self { Self { @@ -139,6 +144,7 @@ where data_type, values: Dict::Empty, items: VecDeque::new(), + remaining: num_rows, chunk_size, phantom: Default::default(), } @@ -157,6 +163,7 @@ where let maybe_state = nested_next_dict( &mut self.iter, &mut self.items, + &mut self.remaining, &self.init, &mut self.values, self.data_type.clone(), @@ -177,6 +184,7 @@ pub fn iter_to_arrays_nested<'a, K, O, I>( iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> NestedArrayIter<'a> where @@ -185,7 +193,7 @@ where K: DictionaryKey, { Box::new( - NestedDictIter::::new(iter, init, data_type, chunk_size).map(|result| { + NestedDictIter::::new(iter, init, data_type, num_rows, chunk_size).map(|result| { let (mut nested, array) = result?; let _ = nested.nested.pop().unwrap(); // the primitive Ok((nested, array.boxed())) diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index b5703812940..19267f3a38c 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -148,6 +148,7 @@ pub struct NestedIter, I: DataPages> { init: Vec, items: VecDeque<(NestedState, (Binary, MutableBitmap))>, chunk_size: Option, + remaining: usize, phantom_a: std::marker::PhantomData, } @@ -156,6 +157,7 @@ impl, I: DataPages> NestedIter { iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> Self { Self { @@ -164,6 +166,7 @@ impl, I: DataPages> NestedIter { init, items: VecDeque::new(), chunk_size, + remaining: num_rows, phantom_a: Default::default(), } } @@ -176,6 +179,7 @@ impl, I: DataPages> Iterator for NestedIter::default(), @@ -196,6 +200,7 @@ pub fn iter_to_arrays_nested<'a, O, A, I>( iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> NestedArrayIter<'a> where @@ -204,11 +209,12 @@ where O: Offset, { Box::new( - NestedIter::::new(iter, init, data_type, chunk_size).map(|result| { - let (mut nested, array) = result?; - let _ = nested.nested.pop().unwrap(); // the primitive - let array = Box::new(array) as Box; - Ok((nested, array)) + NestedIter::::new(iter, init, data_type, num_rows, chunk_size).map(|x| { + x.map(|(mut nested, array)| { + let _ = nested.nested.pop().unwrap(); // the primitive + let values = Box::new(array) as Box; + (nested, values) + }) }), ) } diff --git a/src/io/parquet/read/deserialize/boolean/basic.rs b/src/io/parquet/read/deserialize/boolean/basic.rs index d3189b23f64..572565a6d5d 100644 --- a/src/io/parquet/read/deserialize/boolean/basic.rs +++ b/src/io/parquet/read/deserialize/boolean/basic.rs @@ -192,15 +192,17 @@ pub struct Iter { data_type: DataType, items: VecDeque<(MutableBitmap, MutableBitmap)>, chunk_size: Option, + remaining: usize, } impl Iter { - pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option, num_rows: usize) -> Self { Self { iter, data_type, items: VecDeque::new(), chunk_size, + remaining: num_rows, } } } @@ -212,6 +214,7 @@ impl Iterator for Iter { let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, self.chunk_size, &BooleanDecoder::default(), ); diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index f2b4ccd983f..7285d440f7b 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -105,15 +105,17 @@ pub struct NestedIter { iter: I, init: Vec, items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>, + remaining: usize, chunk_size: Option, } impl NestedIter { - pub fn new(iter: I, init: Vec, chunk_size: Option) -> Self { + pub fn new(iter: I, init: Vec, num_rows: usize, chunk_size: Option) -> Self { Self { iter, init, items: VecDeque::new(), + remaining: num_rows, chunk_size, } } @@ -130,6 +132,7 @@ impl Iterator for NestedIter { let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, &self.init, self.chunk_size, &BooleanDecoder::default(), @@ -149,14 +152,17 @@ impl Iterator for NestedIter { pub fn iter_to_arrays_nested<'a, I: 'a>( iter: I, init: Vec, + num_rows: usize, chunk_size: Option, ) -> NestedArrayIter<'a> where I: DataPages, { - Box::new(NestedIter::new(iter, init, chunk_size).map(|result| { - let (mut nested, array) = result?; - let _ = nested.nested.pop().unwrap(); // the primitive - Ok((nested, array.boxed())) + Box::new(NestedIter::new(iter, init, num_rows, chunk_size).map(|x| { + x.map(|(mut nested, array)| { + let _ = nested.nested.pop().unwrap(); // the primitive + let values = array.boxed(); + (nested, values) + }) })) } diff --git a/src/io/parquet/read/deserialize/dictionary/mod.rs b/src/io/parquet/read/deserialize/dictionary/mod.rs index 4171c1f8118..4b37f6e8291 100644 --- a/src/io/parquet/read/deserialize/dictionary/mod.rs +++ b/src/io/parquet/read/deserialize/dictionary/mod.rs @@ -257,6 +257,7 @@ pub(super) fn next_dict< items: &mut VecDeque<(Vec, MutableBitmap)>, dict: &mut Dict, data_type: DataType, + remaining: &mut usize, chunk_size: Option, read_dict: F, ) -> MaybeNext>> { @@ -288,7 +289,13 @@ pub(super) fn next_dict< Err(e) => return MaybeNext::Some(Err(e)), }; - utils::extend_from_new_page(page, chunk_size, items, &PrimitiveDecoder::::default()); + utils::extend_from_new_page( + page, + chunk_size, + items, + remaining, + &PrimitiveDecoder::::default(), + ); if items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX) { MaybeNext::More diff --git a/src/io/parquet/read/deserialize/dictionary/nested.rs b/src/io/parquet/read/deserialize/dictionary/nested.rs index 23a8a5f5491..ced5808349d 100644 --- a/src/io/parquet/read/deserialize/dictionary/nested.rs +++ b/src/io/parquet/read/deserialize/dictionary/nested.rs @@ -136,9 +136,11 @@ impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder { } } +#[allow(clippy::too_many_arguments)] pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box>( iter: &'a mut I, items: &mut VecDeque<(NestedState, (Vec, MutableBitmap))>, + remaining: &mut usize, init: &[InitNested], dict: &mut Dict, data_type: DataType, @@ -171,6 +173,7 @@ pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box page, init, items, + remaining, &DictionaryDecoder::::default(), chunk_size, ); diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs index 5f260967e71..b68df58613a 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs @@ -286,10 +286,11 @@ pub struct Iter { size: usize, items: VecDeque<(FixedSizeBinary, MutableBitmap)>, chunk_size: Option, + remaining: usize, } impl Iter { - pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { + pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option) -> Self { let size = FixedSizeBinaryArray::get_size(&data_type); Self { iter, @@ -297,6 +298,7 @@ impl Iter { size, items: VecDeque::new(), chunk_size, + remaining: num_rows, } } } @@ -308,6 +310,7 @@ impl Iterator for Iter { let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, self.chunk_size, &BinaryDecoder { size: self.size }, ); diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs index 2fec20fcd64..af8a90b3f0e 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs @@ -25,6 +25,7 @@ where data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, + remaining: usize, chunk_size: Option, } @@ -33,12 +34,13 @@ where K: DictionaryKey, I: DataPages, { - pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { + pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option) -> Self { Self { iter, data_type, values: Dict::Empty, items: VecDeque::new(), + remaining: num_rows, chunk_size, } } @@ -75,6 +77,7 @@ where &mut self.items, &mut self.values, self.data_type.clone(), + &mut self.remaining, self.chunk_size, |dict| read_dict(self.data_type.clone(), dict), ); @@ -98,6 +101,7 @@ where data_type: DataType, values: Dict, items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + remaining: usize, chunk_size: Option, } @@ -110,6 +114,7 @@ where iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> Self { Self { @@ -117,6 +122,7 @@ where init, data_type, values: Dict::Empty, + remaining: num_rows, items: VecDeque::new(), chunk_size, } @@ -134,6 +140,7 @@ where let maybe_state = nested_next_dict( &mut self.iter, &mut self.items, + &mut self.remaining, &self.init, &mut self.values, self.data_type.clone(), @@ -154,6 +161,7 @@ pub fn iter_to_arrays_nested<'a, K, I>( iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> NestedArrayIter<'a> where @@ -161,7 +169,7 @@ where K: DictionaryKey, { Box::new( - NestedDictIter::::new(iter, init, data_type, chunk_size).map(|result| { + NestedDictIter::::new(iter, init, data_type, num_rows, chunk_size).map(|result| { let (mut nested, array) = result?; let _ = nested.nested.pop().unwrap(); // the primitive Ok((nested, array.boxed())) diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index ffe19ac8d9f..8ec016f6333 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -98,6 +98,7 @@ fn columns_to_iter_recursive<'a, I: 'a>( mut types: Vec<&PrimitiveType>, field: Field, mut init: Vec, + num_rows: usize, chunk_size: Option, ) -> Result> where @@ -113,6 +114,7 @@ where types.pop().unwrap(), field.data_type, chunk_size, + num_rows, )? .map(|x| Ok((NestedState::new(vec![]), x?))), )); @@ -122,7 +124,7 @@ where Boolean => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); - boolean::iter_to_arrays_nested(columns.pop().unwrap(), init, chunk_size) + boolean::iter_to_arrays_nested(columns.pop().unwrap(), init, num_rows, chunk_size) } Primitive(Int8) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -131,6 +133,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x as i8, ) @@ -142,6 +145,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x as i16, ) @@ -153,6 +157,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x, ) @@ -164,6 +169,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i64| x, ) @@ -175,6 +181,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x as u8, ) @@ -186,6 +193,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x as u16, ) @@ -198,6 +206,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x as u32, ), @@ -206,6 +215,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i64| x as u32, ), @@ -223,6 +233,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i64| x as u64, ) @@ -234,6 +245,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: f32| x, ) @@ -245,6 +257,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: f64| x, ) @@ -256,6 +269,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, ) } @@ -266,6 +280,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, ) } @@ -276,6 +291,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, ) } @@ -286,6 +302,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, ) } @@ -296,7 +313,7 @@ where let iter = columns.pop().unwrap(); let data_type = field.data_type().clone(); match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(iter, init, type_, data_type, chunk_size) + dict_read::<$K, _>(iter, init, type_, data_type, num_rows, chunk_size) })? } DataType::List(inner) @@ -308,6 +325,7 @@ where types, inner.as_ref().clone(), init, + num_rows, chunk_size, )?; let iter = iter.map(move |x| { @@ -327,7 +345,14 @@ where let n = n_columns(&f.data_type); let columns = columns.drain(columns.len() - n..).collect(); let types = types.drain(types.len() - n..).collect(); - columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size) + columns_to_iter_recursive( + columns, + types, + f.clone(), + init, + num_rows, + chunk_size, + ) }) .collect::>>()?; let columns = columns.into_iter().rev().collect(); @@ -340,6 +365,7 @@ where types, inner.as_ref().clone(), init, + num_rows, chunk_size, )?; Box::new(iter.map(move |x| { @@ -402,12 +428,13 @@ pub fn column_iter_to_arrays<'a, I: 'a>( types: Vec<&PrimitiveType>, field: Field, chunk_size: Option, + num_rows: usize, ) -> Result> where I: DataPages, { Ok(Box::new( - columns_to_iter_recursive(columns, types, field, vec![], chunk_size)? + columns_to_iter_recursive(columns, types, field, vec![], num_rows, chunk_size)? .map(|x| x.map(|x| x.1)), )) } @@ -417,6 +444,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( init: Vec, _type_: &PrimitiveType, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> Result> { use DataType::*; @@ -431,6 +459,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( iter, init, data_type, + num_rows, chunk_size, |x: i32| x as u8, ), @@ -438,6 +467,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( iter, init, data_type, + num_rows, chunk_size, |x: i32| x as u16, ), @@ -445,6 +475,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( iter, init, data_type, + num_rows, chunk_size, |x: i32| x as u32, ), @@ -452,6 +483,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( iter, init, data_type, + num_rows, chunk_size, |x: i32| x as i8, ), @@ -459,6 +491,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( iter, init, data_type, + num_rows, chunk_size, |x: i32| x as i16, ), @@ -467,6 +500,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( iter, init, data_type, + num_rows, chunk_size, |x: i32| x, ) @@ -476,6 +510,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( iter, init, data_type, + num_rows, chunk_size, |x: i64| x as i32, ) @@ -484,6 +519,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( iter, init, data_type, + num_rows, chunk_size, |x: f32| x, ), @@ -491,18 +527,19 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( iter, init, data_type, + num_rows, chunk_size, |x: f64| x, ), - Utf8 | Binary => { - binary::iter_to_dict_arrays_nested::(iter, init, data_type, chunk_size) - } - LargeUtf8 | LargeBinary => { - binary::iter_to_dict_arrays_nested::(iter, init, data_type, chunk_size) - } - FixedSizeBinary(_) => { - fixed_size_binary::iter_to_dict_arrays_nested::(iter, init, data_type, chunk_size) - } + Utf8 | Binary => binary::iter_to_dict_arrays_nested::( + iter, init, data_type, num_rows, chunk_size, + ), + LargeUtf8 | LargeBinary => binary::iter_to_dict_arrays_nested::( + iter, init, data_type, num_rows, chunk_size, + ), + FixedSizeBinary(_) => fixed_size_binary::iter_to_dict_arrays_nested::( + iter, init, data_type, num_rows, chunk_size, + ), /* Timestamp(time_unit, _) => { diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 9d9fe723306..627c1dc3a45 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -340,6 +340,7 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( page: &'a DataPage, init: &[InitNested], items: &mut VecDeque<(NestedState, D::DecodedState)>, + remaining: &mut usize, decoder: &D, chunk_size: Option, ) -> Result<()> { @@ -347,21 +348,20 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( let mut page = NestedPage::try_new(page)?; let capacity = chunk_size.unwrap_or(0); - let chunk_size = chunk_size.unwrap_or(usize::MAX); + let chunk_size = chunk_size.map(|x| x.min(*remaining)).unwrap_or(*remaining); let (mut nested, mut decoded) = if let Some((nested, decoded)) = items.pop_back() { - // there is a already a state => it must be incomplete... - debug_assert!( - nested.len() < chunk_size, - "the temp array is expected to be incomplete" - ); + *remaining += nested.len(); (nested, decoded) } else { // there is no state => initialize it (init_nested(init, capacity), decoder.with_capacity(0)) }; - let remaining = chunk_size - nested.len(); + // e.g. chunk = 10, remaining = 100, decoded = 2 => 8.min(100) = 8 + // e.g. chunk = 100, remaining = 100, decoded = 0 => 100.min(100) = 100 + // e.g. chunk = 10, remaining = 2, decoded = 2 => 8.min(2) = 2 + let additional = (chunk_size - nested.len()).min(*remaining); // extend the current state extend_offsets2( @@ -370,12 +370,15 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( &mut nested.nested, &mut decoded, decoder, - remaining, + additional, ); + *remaining -= nested.len(); items.push_back((nested, decoded)); - while page.len() > 0 { - let mut nested = init_nested(init, capacity); + while page.len() > 0 && *remaining > 0 { + let additional = chunk_size.min(*remaining); + + let mut nested = init_nested(init, additional); let mut decoded = decoder.with_capacity(0); extend_offsets2( &mut page, @@ -383,8 +386,9 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( &mut nested.nested, &mut decoded, decoder, - chunk_size, + additional, ); + *remaining -= nested.len(); items.push_back((nested, decoded)); } Ok(()) @@ -465,6 +469,7 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>( pub(super) fn next<'a, I, D>( iter: &'a mut I, items: &mut VecDeque<(NestedState, D::DecodedState)>, + remaining: &mut usize, init: &[InitNested], chunk_size: Option, decoder: &D, @@ -478,18 +483,24 @@ where let (nested, decoded) = items.pop_front().unwrap(); return MaybeNext::Some(Ok((nested, decoded))); } + if *remaining == 0 { + return match items.pop_front() { + Some(decoded) => MaybeNext::Some(Ok(decoded)), + None => MaybeNext::None, + }; + } match iter.next() { Err(e) => MaybeNext::Some(Err(e.into())), Ok(None) => { - if let Some((nested, decoded)) = items.pop_front() { - MaybeNext::Some(Ok((nested, decoded))) + if let Some(decoded) = items.pop_front() { + MaybeNext::Some(Ok(decoded)) } else { MaybeNext::None } } Ok(Some(page)) => { // there is a new page => consume the page from the start - let error = extend(page, init, items, decoder, chunk_size); + let error = extend(page, init, items, remaining, decoder, chunk_size); match error { Ok(_) => {} Err(e) => return MaybeNext::Some(Err(e)), @@ -498,8 +509,7 @@ where if items.front().unwrap().0.len() < chunk_size.unwrap_or(0) { MaybeNext::More } else { - let (nested, decoded) = items.pop_front().unwrap(); - MaybeNext::Some(Ok((nested, decoded))) + MaybeNext::Some(Ok(items.pop_front().unwrap())) } } } diff --git a/src/io/parquet/read/deserialize/null.rs b/src/io/parquet/read/deserialize/null.rs index 2ea2bf06e1e..e8c18eb8c65 100644 --- a/src/io/parquet/read/deserialize/null.rs +++ b/src/io/parquet/read/deserialize/null.rs @@ -2,11 +2,12 @@ use crate::{array::NullArray, datatypes::DataType}; use super::super::{ArrayIter, DataPages}; -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +/// Converts [`DataPages`] to an [`ArrayIter`] pub fn iter_to_arrays<'a, I>( mut iter: I, data_type: DataType, chunk_size: Option, + num_rows: usize, ) -> ArrayIter<'a> where I: 'a + DataPages, @@ -14,16 +15,22 @@ where let mut len = 0usize; while let Ok(Some(x)) = iter.next() { - len += x.num_values() + let rows = x.num_values(); + len = (len + rows).min(num_rows); + if len == num_rows { + break; + } } + if len == 0 { return Box::new(std::iter::empty()); } let chunk_size = chunk_size.unwrap_or(len); - let complete_chunks = chunk_size / len; - let remainder = chunk_size % len; + let complete_chunks = len / chunk_size; + + let remainder = len - (complete_chunks * chunk_size); let i_data_type = data_type.clone(); let complete = (0..complete_chunks) .map(move |_| Ok(NullArray::new(i_data_type.clone(), chunk_size).boxed())); @@ -34,3 +41,56 @@ where Box::new(complete.chain(std::iter::once(Ok(array.boxed())))) } } + +#[cfg(test)] +mod tests { + use parquet2::{ + encoding::Encoding, + error::Error as ParquetError, + metadata::Descriptor, + page::{DataPage, DataPageHeader, DataPageHeaderV1}, + schema::types::{PhysicalType, PrimitiveType}, + }; + + use crate::{array::NullArray, datatypes::DataType, error::Error}; + + use super::iter_to_arrays; + + #[test] + fn limit() { + let new_page = |values: i32| { + DataPage::new( + DataPageHeader::V1(DataPageHeaderV1 { + num_values: values, + encoding: Encoding::Plain.into(), + definition_level_encoding: Encoding::Plain.into(), + repetition_level_encoding: Encoding::Plain.into(), + statistics: None, + }), + vec![], + None, + Descriptor { + primitive_type: PrimitiveType::from_physical( + "a".to_string(), + PhysicalType::Int32, + ), + max_def_level: 0, + max_rep_level: 0, + }, + None, + ) + }; + + let p1 = new_page(100); + let p2 = new_page(100); + let pages = vec![Result::<_, ParquetError>::Ok(&p1), Ok(&p2)]; + let pages = fallible_streaming_iterator::convert(pages.into_iter()); + let arrays = iter_to_arrays(pages, DataType::Null, Some(10), 101); + + let arrays = arrays.collect::, Error>>().unwrap(); + let expected = std::iter::repeat(NullArray::new(DataType::Null, 10).boxed()) + .take(10) + .chain(std::iter::once(NullArray::new(DataType::Null, 1).boxed())); + assert_eq!(arrays, expected.collect::>()) + } +} diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index 1a3d1afef96..0ed1d8b51cb 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -284,7 +284,7 @@ pub(super) fn finish( MutablePrimitiveArray::from_data(data_type.clone(), values, validity) } -/// An iterator adapter over [`DataPages`] assumed to be encoded as primitive arrays +/// An [`Iterator`] adapter over [`DataPages`] assumed to be encoded as primitive arrays #[derive(Debug)] pub struct Iter where @@ -296,6 +296,7 @@ where iter: I, data_type: DataType, items: VecDeque<(Vec, MutableBitmap)>, + remaining: usize, chunk_size: Option, op: F, phantom: std::marker::PhantomData

, @@ -309,11 +310,18 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - pub fn new(iter: I, data_type: DataType, chunk_size: Option, op: F) -> Self { + pub fn new( + iter: I, + data_type: DataType, + num_rows: usize, + chunk_size: Option, + op: F, + ) -> Self { Self { iter, data_type, items: VecDeque::new(), + remaining: num_rows, chunk_size, op, phantom: Default::default(), @@ -334,6 +342,7 @@ where let maybe_state = utils::next( &mut self.iter, &mut self.items, + &mut self.remaining, self.chunk_size, &PrimitiveDecoder::new(self.op), ); diff --git a/src/io/parquet/read/deserialize/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs index 9db058ba6e8..56366ec0ffa 100644 --- a/src/io/parquet/read/deserialize/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -52,6 +52,7 @@ where data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, + remaining: usize, chunk_size: Option, op: F, phantom: std::marker::PhantomData

, @@ -66,13 +67,20 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - pub fn new(iter: I, data_type: DataType, chunk_size: Option, op: F) -> Self { + pub fn new( + iter: I, + data_type: DataType, + num_rows: usize, + chunk_size: Option, + op: F, + ) -> Self { Self { iter, data_type, values: Dict::Empty, items: VecDeque::new(), chunk_size, + remaining: num_rows, op, phantom: Default::default(), } @@ -95,6 +103,7 @@ where &mut self.items, &mut self.values, self.data_type.clone(), + &mut self.remaining, self.chunk_size, |dict| read_dict::(self.data_type.clone(), self.op, dict), ); @@ -121,6 +130,7 @@ where data_type: DataType, values: Dict, items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + remaining: usize, chunk_size: Option, op: F, phantom: std::marker::PhantomData

, @@ -139,6 +149,7 @@ where iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, op: F, ) -> Self { @@ -148,6 +159,7 @@ where data_type, values: Dict::Empty, items: VecDeque::new(), + remaining: num_rows, chunk_size, op, phantom: Default::default(), @@ -169,6 +181,7 @@ where let maybe_state = nested_next_dict( &mut self.iter, &mut self.items, + &mut self.remaining, &self.init, &mut self.values, self.data_type.clone(), @@ -189,6 +202,7 @@ pub fn iter_to_arrays_nested<'a, K, I, T, P, F>( iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, op: F, ) -> NestedArrayIter<'a> @@ -200,10 +214,12 @@ where F: 'a + Copy + Send + Sync + Fn(P) -> T, { Box::new( - NestedDictIter::::new(iter, init, data_type, chunk_size, op).map(|result| { - let (mut nested, array) = result?; - let _ = nested.nested.pop().unwrap(); // the primitive - Ok((nested, array.boxed())) - }), + NestedDictIter::::new(iter, init, data_type, num_rows, chunk_size, op).map( + |result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + }, + ), ) } diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index ce6d7a94eb5..e72bd5190c7 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -171,6 +171,7 @@ where init: Vec, data_type: DataType, items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + remaining: usize, chunk_size: Option, decoder: PrimitiveDecoder, } @@ -187,6 +188,7 @@ where iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, op: F, ) -> Self { @@ -196,6 +198,7 @@ where data_type, items: VecDeque::new(), chunk_size, + remaining: num_rows, decoder: PrimitiveDecoder::new(op), } } @@ -215,6 +218,7 @@ where let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, &self.init, self.chunk_size, &self.decoder, @@ -235,6 +239,7 @@ pub fn iter_to_arrays_nested<'a, I, T, P, F>( iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, op: F, ) -> NestedArrayIter<'a> @@ -245,10 +250,13 @@ where F: 'a + Copy + Send + Sync + Fn(P) -> T, { Box::new( - ArrayIterator::::new(iter, init, data_type, chunk_size, op).map(|result| { - let (mut nested, array) = result?; - let _ = nested.nested.pop().unwrap(); // the primitive - Ok((nested, array.boxed())) - }), + ArrayIterator::::new(iter, init, data_type, num_rows, chunk_size, op).map( + |x| { + x.map(|(mut nested, array)| { + let _ = nested.nested.pop().unwrap(); // the primitive + (nested, array.boxed()) + }) + }, + ), ) } diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index ea6dacf609b..0ef14a4fd35 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -61,6 +61,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( type_: &PrimitiveType, data_type: DataType, chunk_size: Option, + num_rows: usize, ) -> Result> { use DataType::*; @@ -68,17 +69,19 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( let logical_type = &type_.logical_type; Ok(match data_type.to_logical_type() { - Null => null::iter_to_arrays(pages, data_type, chunk_size), - Boolean => dyn_iter(boolean::Iter::new(pages, data_type, chunk_size)), + Null => null::iter_to_arrays(pages, data_type, chunk_size, num_rows), + Boolean => dyn_iter(boolean::Iter::new(pages, data_type, chunk_size, num_rows)), UInt8 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as u8, ))), UInt16 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as u16, ))), @@ -86,6 +89,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as u32, ))), @@ -93,6 +97,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i64| x as u32, ))), @@ -106,18 +111,21 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( Int8 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as i8, ))), Int16 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as i16, ))), Int32 | Date32 | Time32(_) => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as i32, ))), @@ -129,17 +137,24 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( physical_type, logical_type, data_type, + num_rows, chunk_size, time_unit, ); } - FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new(pages, data_type, chunk_size)), + FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new( + pages, data_type, num_rows, chunk_size, + )), Interval(IntervalUnit::YearMonth) => { let n = 12; - let pages = - fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(n), + num_rows, + chunk_size, + ); let pages = pages.map(move |maybe_array| { let array = maybe_array?; @@ -160,8 +175,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( Interval(IntervalUnit::DayTime) => { let n = 12; - let pages = - fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(n), + num_rows, + chunk_size, + ); let pages = pages.map(move |maybe_array| { let array = maybe_array?; @@ -184,12 +203,14 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as i128, ))), PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i64| x as i128, ))), @@ -202,8 +223,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( PhysicalType::FixedLenByteArray(n) => { let n = *n; - let pages = - fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(n), + num_rows, + chunk_size, + ); let pages = pages.map(move |maybe_array| { let array = maybe_array?; @@ -228,12 +253,14 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i64| x as i64, ))), UInt64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i64| x as u64, ))), @@ -241,32 +268,34 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( Float32 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: f32| x, ))), Float64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: f64| x, ))), Binary => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, + pages, data_type, chunk_size, num_rows, )), LargeBinary => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, + pages, data_type, chunk_size, num_rows, )), Utf8 => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, + pages, data_type, chunk_size, num_rows, )), LargeUtf8 => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, + pages, data_type, chunk_size, num_rows, )), Dictionary(key_type, _, _) => { return match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(pages, physical_type, logical_type, data_type, chunk_size) + dict_read::<$K, _>(pages, physical_type, logical_type, data_type, num_rows, chunk_size) }) } @@ -315,11 +344,12 @@ fn timestamp<'a, I: 'a + DataPages>( physical_type: &PhysicalType, logical_type: &Option, data_type: DataType, + num_rows: usize, chunk_size: Option, time_unit: TimeUnit, ) -> Result> { if physical_type == &PhysicalType::Int96 { - let iter = primitive::Iter::new(pages, data_type, chunk_size, int96_to_i64_ns); + let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ns); let logical_type = PrimitiveLogicalType::Timestamp { unit: ParquetTimeUnit::Nanoseconds, is_adjusted_to_utc: false, @@ -338,7 +368,7 @@ fn timestamp<'a, I: 'a + DataPages>( )); } - let iter = primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x); + let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, |x: i64| x); let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit); match (factor, is_multiplier) { (1, _) => Ok(dyn_iter(iden(iter))), @@ -352,6 +382,7 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( physical_type: &PhysicalType, logical_type: &Option, data_type: DataType, + num_rows: usize, chunk_size: Option, time_unit: TimeUnit, ) -> Result> { @@ -365,12 +396,14 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( (a, true) => Ok(dyn_iter(primitive::DictIter::::new( pages, DataType::Timestamp(TimeUnit::Nanosecond, None), + num_rows, chunk_size, move |x| int96_to_i64_ns(x) * a, ))), (a, false) => Ok(dyn_iter(primitive::DictIter::::new( pages, DataType::Timestamp(TimeUnit::Nanosecond, None), + num_rows, chunk_size, move |x| int96_to_i64_ns(x) / a, ))), @@ -382,12 +415,14 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( (a, true) => Ok(dyn_iter(primitive::DictIter::::new( pages, data_type, + num_rows, chunk_size, move |x: i64| x * a, ))), (a, false) => Ok(dyn_iter(primitive::DictIter::::new( pages, data_type, + num_rows, chunk_size, move |x: i64| x / a, ))), @@ -399,6 +434,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( physical_type: &PhysicalType, logical_type: &Option, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> Result> { use DataType::*; @@ -412,44 +448,54 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( UInt8 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i32| x as u8, )), UInt16 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i32| x as u16, )), UInt32 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i32| x as u32, )), UInt64 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i64| x as u64, )), Int8 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i32| x as i8, )), Int16 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i32| x as i16, )), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter( - primitive::DictIter::::new(iter, data_type, chunk_size, |x: i32| { - x as i32 - }), - ), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { + dyn_iter(primitive::DictIter::::new( + iter, + data_type, + num_rows, + chunk_size, + |x: i32| x as i32, + )) + } Timestamp(time_unit, _) => { let time_unit = *time_unit; @@ -458,35 +504,44 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( physical_type, logical_type, data_type, + num_rows, chunk_size, time_unit, ); } - Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter( - primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), - ), + Int64 | Date64 | Time64(_) | Duration(_) => { + dyn_iter(primitive::DictIter::::new( + iter, + data_type, + num_rows, + chunk_size, + |x: i64| x, + )) + } Float32 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: f32| x, )), Float64 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: f64| x, )), Utf8 | Binary => dyn_iter(binary::DictIter::::new( - iter, data_type, chunk_size, + iter, data_type, num_rows, chunk_size, )), LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( - iter, data_type, chunk_size, + iter, data_type, num_rows, chunk_size, )), FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( - iter, data_type, chunk_size, + iter, data_type, num_rows, chunk_size, )), other => { return Err(Error::nyi(format!( diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index 4c022850130..5ff3d269537 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -372,33 +372,37 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( mut page: T::State, chunk_size: Option, items: &mut VecDeque, + remaining: &mut usize, decoder: &T, ) { let capacity = chunk_size.unwrap_or(0); - let chunk_size = chunk_size.unwrap_or(usize::MAX); + let chunk_size = chunk_size.map(|x| x.min(*remaining)).unwrap_or(*remaining); let mut decoded = if let Some(decoded) = items.pop_back() { - // there is a already a state => it must be incomplete... - debug_assert!( - decoded.len() <= chunk_size, - "the temp state is expected to be incomplete" - ); + *remaining += decoded.len(); decoded } else { // there is no state => initialize it decoder.with_capacity(capacity) }; - let remaining = chunk_size - decoded.len(); + // e.g. chunk = 10, remaining = 100, decoded = 2 => 8.min(100) = 8 + // e.g. chunk = 100, remaining = 100, decoded = 0 => 100.min(100) = 100 + // e.g. chunk = 10, remaining = 2, decoded = 2 => 8.min(2) = 2 + let additional = (chunk_size - decoded.len()).min(*remaining); // extend the current state - decoder.extend_from_state(&mut page, &mut decoded, remaining); + decoder.extend_from_state(&mut page, &mut decoded, additional); + *remaining -= decoded.len(); items.push_back(decoded); - while page.len() > 0 { - let mut decoded = decoder.with_capacity(capacity); - decoder.extend_from_state(&mut page, &mut decoded, chunk_size); + while page.len() > 0 && *remaining > 0 { + let additional = chunk_size.min(*remaining); + + let mut decoded = decoder.with_capacity(additional); + decoder.extend_from_state(&mut page, &mut decoded, additional); + *remaining -= decoded.len(); items.push_back(decoded) } } @@ -414,6 +418,7 @@ pub enum MaybeNext

{ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( iter: &'a mut I, items: &mut VecDeque, + remaining: &mut usize, chunk_size: Option, decoder: &D, ) -> MaybeNext> { @@ -422,6 +427,12 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( let item = items.pop_front().unwrap(); return MaybeNext::Some(Ok(item)); } + if *remaining == 0 { + return match items.pop_front() { + Some(decoded) => MaybeNext::Some(Ok(decoded)), + None => MaybeNext::None, + }; + } match iter.next() { Err(e) => MaybeNext::Some(Err(e.into())), Ok(Some(page)) => { @@ -432,7 +443,7 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( Err(e) => return MaybeNext::Some(Err(e)), }; - extend_from_new_page(page, chunk_size, items, decoder); + extend_from_new_page(page, chunk_size, items, remaining, decoder); if (items.len() == 1) && items.front().unwrap().len() < chunk_size.unwrap_or(0) { MaybeNext::More diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index 9f866a8dce5..474f29f7de6 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -240,6 +240,7 @@ impl RowGroupReader { row_group, self.schema.fields.clone(), self.chunk_size, + Some(self.remaining_rows), )?; let result = RowGroupDeserializer::new( diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index deefaea3644..4dde21e467d 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -65,14 +65,7 @@ impl Iterator for RowGroupDeserializer { let chunk = self .column_chunks .iter_mut() - .map(|iter| { - let array = iter.next().unwrap()?; - Ok(if array.len() > self.remaining_rows { - array.slice(0, array.len() - self.remaining_rows) - } else { - array - }) - }) + .map(|iter| iter.next().unwrap()) .collect::>>() .and_then(Chunk::try_new); self.remaining_rows = self.remaining_rows.saturating_sub( @@ -182,7 +175,7 @@ pub fn to_deserializer<'a>( num_rows: usize, chunk_size: Option, ) -> Result> { - let chunk_size = chunk_size.unwrap_or(usize::MAX).min(num_rows); + let chunk_size = chunk_size.map(|c| c.min(num_rows)); let (columns, types): (Vec<_>, Vec<_>) = columns .into_iter() @@ -200,7 +193,7 @@ pub fn to_deserializer<'a>( }) .unzip(); - column_iter_to_arrays(columns, types, field, Some(chunk_size)) + column_iter_to_arrays(columns, types, field, chunk_size, num_rows) } /// Returns a vector of iterators of [`Array`] ([`ArrayIter`]) corresponding to the top @@ -218,7 +211,11 @@ pub fn read_columns_many<'a, R: Read + Seek>( row_group: &RowGroupMetaData, fields: Vec, chunk_size: Option, + limit: Option, ) -> Result>> { + let num_rows = row_group.num_rows(); + let num_rows = limit.map(|limit| limit.min(num_rows)).unwrap_or(num_rows); + // reads all the necessary columns for all fields from the row group // This operation is IO-bounded `O(C)` where C is the number of columns in the row group let field_columns = fields @@ -229,9 +226,7 @@ pub fn read_columns_many<'a, R: Read + Seek>( field_columns .into_iter() .zip(fields.into_iter()) - .map(|(columns, field)| { - to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size) - }) + .map(|(columns, field)| to_deserializer(columns, field, num_rows, chunk_size)) .collect() } diff --git a/src/io/print.rs b/src/io/print.rs index 08cc3e3efb8..9cb0438f645 100644 --- a/src/io/print.rs +++ b/src/io/print.rs @@ -8,27 +8,27 @@ use crate::{ use comfy_table::{Cell, Table}; /// Returns a visual representation of [`Chunk`] -pub fn write, N: AsRef>(batches: &[Chunk], names: &[N]) -> String { +pub fn write, N: AsRef>(chunks: &[Chunk], names: &[N]) -> String { let mut table = Table::new(); table.load_preset("||--+-++| ++++++"); - if batches.is_empty() { + if chunks.is_empty() { return table.to_string(); } let header = names.iter().map(|name| Cell::new(name.as_ref())); table.set_header(header); - for batch in batches { - let displayes = batch + for chunk in chunks { + let displayes = chunk .arrays() .iter() .map(|array| get_display(array.as_ref(), "")) .collect::>(); - for row in 0..batch.len() { + for row in 0..chunk.len() { let mut cells = Vec::new(); - (0..batch.arrays().len()).for_each(|col| { + (0..chunk.arrays().len()).for_each(|col| { let mut string = String::new(); displayes[col](&mut string, row).unwrap(); cells.push(Cell::new(string)); diff --git a/tests/it/io/parquet/integration.rs b/tests/it/io/parquet/integration.rs index 71ce7facaf8..7f84c433b0d 100644 --- a/tests/it/io/parquet/integration.rs +++ b/tests/it/io/parquet/integration.rs @@ -14,7 +14,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let data = integration_write(&schema, &batches)?; - let (read_schema, read_batches) = integration_read(&data)?; + let (read_schema, read_batches) = integration_read(&data, None)?; assert_eq!(schema, read_schema); assert_eq!(batches, read_batches); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 9b701192143..1cf69d432c4 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1149,9 +1149,9 @@ fn integration_write(schema: &Schema, chunks: &[Chunk>]) -> Resul type IntegrationRead = (Schema, Vec>>); -fn integration_read(data: &[u8]) -> Result { +fn integration_read(data: &[u8], limit: Option) -> Result { let reader = Cursor::new(data); - let reader = FileReader::try_new(reader, None, None, None, None)?; + let reader = FileReader::try_new(reader, None, None, limit, None)?; let schema = reader.schema().clone(); for field in &schema.fields { @@ -1163,10 +1163,7 @@ fn integration_read(data: &[u8]) -> Result { Ok((schema, batches)) } -/// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its -/// logical types. -#[test] -fn arrow_type() -> Result<()> { +fn generic_data() -> Result<(Schema, Chunk>)> { let array1 = PrimitiveArray::::from([Some(1), None, Some(2)]) .to(DataType::Duration(TimeUnit::Second)); let array2 = Utf8Array::::from([Some("a"), None, Some("bb")]); @@ -1254,15 +1251,42 @@ fn arrow_type() -> Result<()> { array14.boxed(), ])?; + Ok((schema, chunk)) +} + +fn assert_roundtrip( + schema: Schema, + chunk: Chunk>, + limit: Option, +) -> Result<()> { let r = integration_write(&schema, &[chunk.clone()])?; - let (new_schema, new_chunks) = integration_read(&r)?; + let (new_schema, new_chunks) = integration_read(&r, limit)?; + + let expected = if let Some(limit) = limit { + let expected = chunk + .into_arrays() + .into_iter() + .map(|x| x.slice(0, limit)) + .collect::>(); + Chunk::new(expected) + } else { + chunk + }; assert_eq!(new_schema, schema); - assert_eq!(new_chunks, vec![chunk]); + assert_eq!(new_chunks, vec![expected]); Ok(()) } +/// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its +/// logical types. +#[test] +fn arrow_type() -> Result<()> { + let (schema, chunk) = generic_data()?; + assert_roundtrip(schema, chunk, None) +} + fn data>( mut iter: I, inner_is_nullable: bool, @@ -1299,7 +1323,11 @@ fn data>( array.into() } -fn list_array_generic(is_nullable: bool, array: ListArray) -> Result<()> { +fn list_array_generic( + is_nullable: bool, + array: ListArray, + limit: Option, +) -> Result<()> { let schema = Schema::from(vec![Field::new( "a1", array.data_type().clone(), @@ -1307,42 +1335,44 @@ fn list_array_generic(is_nullable: bool, array: ListArray) -> Resu )]); let chunk = Chunk::try_new(vec![array.boxed()])?; - let r = integration_write(&schema, &[chunk.clone()])?; - - let (new_schema, new_chunks) = integration_read(&r)?; + assert_roundtrip(schema, chunk, limit) +} - assert_eq!(new_schema, schema); - assert_eq!(new_chunks, vec![chunk]); - Ok(()) +fn test_list_array_required_required(limit: Option) -> Result<()> { + list_array_generic(false, data(0..12i8, false), limit)?; + list_array_generic(false, data(0..12i16, false), limit)?; + list_array_generic(false, data(0..12i32, false), limit)?; + list_array_generic(false, data(0..12i64, false), limit)?; + list_array_generic(false, data(0..12u8, false), limit)?; + list_array_generic(false, data(0..12u16, false), limit)?; + list_array_generic(false, data(0..12u32, false), limit)?; + list_array_generic(false, data(0..12u64, false), limit)?; + list_array_generic(false, data((0..12).map(|x| (x as f32) * 1.0), false), limit)?; + list_array_generic( + false, + data((0..12).map(|x| (x as f64) * 1.0f64), false), + limit, + ) } #[test] fn list_array_required_required() -> Result<()> { - list_array_generic(false, data(0..12i8, false))?; - list_array_generic(false, data(0..12i16, false))?; - list_array_generic(false, data(0..12i32, false))?; - list_array_generic(false, data(0..12i64, false))?; - list_array_generic(false, data(0..12u8, false))?; - list_array_generic(false, data(0..12u16, false))?; - list_array_generic(false, data(0..12u32, false))?; - list_array_generic(false, data(0..12u64, false))?; - list_array_generic(false, data((0..12).map(|x| (x as f32) * 1.0), false))?; - list_array_generic(false, data((0..12).map(|x| (x as f64) * 1.0f64), false)) + test_list_array_required_required(None) } #[test] fn list_array_optional_optional() -> Result<()> { - list_array_generic(true, data(0..12, true)) + list_array_generic(true, data(0..12, true), None) } #[test] fn list_array_required_optional() -> Result<()> { - list_array_generic(true, data(0..12, false)) + list_array_generic(true, data(0..12, false), None) } #[test] fn list_array_optional_required() -> Result<()> { - list_array_generic(false, data(0..12, true)) + list_array_generic(false, data(0..12, true), None) } #[test] @@ -1355,7 +1385,7 @@ fn list_utf8() -> Result<()> { let mut array = MutableListArray::::new_with_field(MutableUtf8Array::::new(), "item", true); array.try_extend(data).unwrap(); - list_array_generic(false, array.into()) + list_array_generic(false, array.into(), None) } #[test] @@ -1368,7 +1398,7 @@ fn list_large_utf8() -> Result<()> { let mut array = MutableListArray::::new_with_field(MutableUtf8Array::::new(), "item", true); array.try_extend(data).unwrap(); - list_array_generic(false, array.into()) + list_array_generic(false, array.into(), None) } #[test] @@ -1381,7 +1411,7 @@ fn list_binary() -> Result<()> { let mut array = MutableListArray::::new_with_field(MutableBinaryArray::::new(), "item", true); array.try_extend(data).unwrap(); - list_array_generic(false, array.into()) + list_array_generic(false, array.into(), None) } #[test] @@ -1394,7 +1424,7 @@ fn large_list_large_binary() -> Result<()> { let mut array = MutableListArray::::new_with_field(MutableBinaryArray::::new(), "item", true); array.try_extend(data).unwrap(); - list_array_generic(false, array.into()) + list_array_generic(false, array.into(), None) } #[test] @@ -1410,7 +1440,7 @@ fn list_utf8_nullable() -> Result<()> { let mut array = MutableListArray::::new_with_field(MutableUtf8Array::::new(), "item", true); array.try_extend(data).unwrap(); - list_array_generic(true, array.into()) + list_array_generic(true, array.into(), None) } #[test] @@ -1429,61 +1459,63 @@ fn list_int_nullable() -> Result<()> { true, ); array.try_extend(data).unwrap(); - list_array_generic(true, array.into()) + list_array_generic(true, array.into(), None) } #[test] -fn nested_dict() -> Result<()> { +fn limit() -> Result<()> { + let (schema, chunk) = generic_data()?; + assert_roundtrip(schema, chunk, Some(2)) +} + +#[test] +fn limit_list() -> Result<()> { + test_list_array_required_required(Some(2)) +} + +fn nested_dict_data(data_type: DataType) -> Result<(Schema, Chunk>)> { + let values = match data_type { + DataType::Float32 => PrimitiveArray::from_slice([1.0f32, 3.0]).boxed(), + DataType::Utf8 => Utf8Array::::from_slice(["a", "b"]).boxed(), + _ => unreachable!(), + }; + let indices = PrimitiveArray::from_values((0..3u64).map(|x| x % 2)); - let values = PrimitiveArray::from_slice([1.0f32, 3.0]); - let floats = DictionaryArray::try_from_keys(indices, values.boxed()).unwrap(); - let floats = ListArray::try_new( + let values = DictionaryArray::try_from_keys(indices, values).unwrap(); + let values = ListArray::try_new( DataType::List(Box::new(Field::new( "item", - floats.data_type().clone(), + values.data_type().clone(), false, ))), vec![0i32, 0, 0, 2, 3].into(), - floats.boxed(), + values.boxed(), Some([true, false, true, true].into()), )?; - let schema = Schema::from(vec![Field::new("floats", floats.data_type().clone(), true)]); - let batch = Chunk::try_new(vec![floats.boxed()])?; + let schema = Schema::from(vec![Field::new("c1", values.data_type().clone(), true)]); + let chunk = Chunk::try_new(vec![values.boxed()])?; - let r = integration_write(&schema, &[batch.clone()])?; + Ok((schema, chunk)) +} - let (new_schema, new_batches) = integration_read(&r)?; +#[test] +fn nested_dict() -> Result<()> { + let (schema, chunk) = nested_dict_data(DataType::Float32)?; - assert_eq!(new_schema, schema); - assert_eq!(new_batches, vec![batch]); - Ok(()) + assert_roundtrip(schema, chunk, None) } #[test] fn nested_dict_utf8() -> Result<()> { - let indices = PrimitiveArray::from_values((0..3u64).map(|x| x % 2)); - let values = Utf8Array::::from_slice(["a", "b"]); - let floats = DictionaryArray::try_from_keys(indices, values.boxed()).unwrap(); - let floats = ListArray::try_new( - DataType::List(Box::new(Field::new( - "item", - floats.data_type().clone(), - false, - ))), - vec![0i32, 0, 0, 2, 3].into(), - floats.boxed(), - Some([true, false, true, true].into()), - )?; - - let schema = Schema::from(vec![Field::new("floats", floats.data_type().clone(), true)]); - let batch = Chunk::try_new(vec![floats.boxed()])?; + let (schema, chunk) = nested_dict_data(DataType::Utf8)?; - let r = integration_write(&schema, &[batch.clone()])?; + assert_roundtrip(schema, chunk, None) +} - let (new_schema, new_batches) = integration_read(&r)?; +#[test] +fn nested_dict_limit() -> Result<()> { + let (schema, chunk) = nested_dict_data(DataType::Float32)?; - assert_eq!(new_schema, schema); - assert_eq!(new_batches, vec![batch]); - Ok(()) + assert_roundtrip(schema, chunk, Some(2)) } diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index b5fb2ba9c19..68f09bae804 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -124,7 +124,8 @@ fn read_with_indexes( vec![pages], vec![&c1.descriptor().descriptor.primitive_type], schema.fields[1].clone(), - Some(row_group.num_rows() as usize), + None, + row_group.num_rows() as usize, )?; let arrays = arrays.collect::>>()?;