diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 32c22adb106..51213c13e65 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -46,8 +46,6 @@ fn read_dict_buffer( values: &mut Binary, validity: &mut MutableBitmap, ) { - let length = values.len() + additional; - let values_iterator = values_iter(indices_buffer, dict, additional); let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); @@ -55,7 +53,7 @@ fn read_dict_buffer( extend_from_decoder( validity, &mut validity_iterator, - length, + additional, values, values_iterator, ); @@ -69,12 +67,11 @@ fn read_dict_required( values: &mut Binary, validity: &mut MutableBitmap, ) { + debug_assert_eq!(0, validity.len()); let values_iterator = values_iter(indices_buffer, dict, additional); - for value in values_iterator { values.push(value); } - validity.extend_constant(additional, true); } struct Offsets<'a, O: Offset>(pub &'a mut Vec); @@ -108,8 +105,6 @@ fn read_delta_optional( values: &mut Binary, validity: &mut MutableBitmap, ) { - let length = values.len() + additional; - let Binary { offsets, values, @@ -129,7 +124,7 @@ fn read_delta_optional( extend_from_decoder( validity, &mut validity_iterator, - length, + additional, &mut Offsets::(offsets), offsets_iterator, ); @@ -146,8 +141,6 @@ fn read_plain_optional( values: &mut Binary, validity: &mut MutableBitmap, ) { - let length = values.len() + additional; - // values_buffer: first 4 bytes are len, remaining is values let values_iterator = utils::BinaryIter::new(values_buffer); @@ -156,7 +149,7 @@ fn read_plain_optional( extend_from_decoder( validity, &mut validity_iterator, - length, + additional, values, values_iterator, ) diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index a0422f1a92d..3129ea1a415 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -31,16 +31,17 @@ where ArrowError: From, I: FallibleStreamingIterator, { + let is_nullable = nested.pop().unwrap().is_nullable(); let capacity = metadata.num_values() as usize; let mut values = Binary::::with_capacity(capacity); - let mut validity = MutableBitmap::with_capacity(capacity); - - let is_nullable = nested.pop().unwrap().is_nullable(); + let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable)); if nested.is_empty() { while let Some(page) = iter.next()? { basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)? } + debug_assert_eq!(values.len(), capacity); + debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable)); } else { while let Some(page) = iter.next()? { nested::extend_from_page( diff --git a/src/io/parquet/read/boolean/mod.rs b/src/io/parquet/read/boolean/mod.rs index ad6ef57cdee..e338f1e156d 100644 --- a/src/io/parquet/read/boolean/mod.rs +++ b/src/io/parquet/read/boolean/mod.rs @@ -24,16 +24,17 @@ where ArrowError: From, I: FallibleStreamingIterator, { + let is_nullable = nested.pop().unwrap().is_nullable(); let capacity = metadata.num_values() as usize; let mut values = MutableBitmap::with_capacity(capacity); - let mut validity = MutableBitmap::with_capacity(capacity); - - let is_nullable = nested.pop().unwrap().is_nullable(); + let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable)); if nested.is_empty() { while let Some(page) = iter.next()? { basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)? } + debug_assert_eq!(values.len(), capacity); + debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable)); } else { while let Some(page) = iter.next()? { nested::extend_from_page( diff --git a/src/io/parquet/read/fixed_size_binary/mod.rs b/src/io/parquet/read/fixed_size_binary/mod.rs index 2e1a93f991a..2a464dd5cf5 100644 --- a/src/io/parquet/read/fixed_size_binary/mod.rs +++ b/src/io/parquet/read/fixed_size_binary/mod.rs @@ -69,14 +69,12 @@ pub(crate) fn read_dict_required( values: &mut FixedSizeBinary, validity: &mut MutableBitmap, ) { - let size = values.size; + debug_assert!(validity.is_empty()); let values_iter = values_iter(indices_buffer, dict.values(), values.size, additional); - for value in values_iter { values.push(value); } - validity.extend_constant(additional * size, true); } pub(crate) fn read_optional( @@ -114,14 +112,17 @@ where ArrowError: From, I: FallibleStreamingIterator, { + let is_nullable = metadata.descriptor().max_def_level() == 1; let size = FixedSizeBinaryArray::get_size(&data_type); - let capacity = metadata.num_values() as usize; let mut values = FixedSizeBinary::with_capacity(capacity, size); - let mut validity = MutableBitmap::with_capacity(capacity); + let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable)); + while let Some(page) = iter.next()? { extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)? } + debug_assert_eq!(values.len(), capacity); + debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable)); Ok(FixedSizeBinaryArray::from_data( data_type, diff --git a/src/io/parquet/read/fixed_size_binary/utils.rs b/src/io/parquet/read/fixed_size_binary/utils.rs index 1e35aaba0d1..903af762993 100644 --- a/src/io/parquet/read/fixed_size_binary/utils.rs +++ b/src/io/parquet/read/fixed_size_binary/utils.rs @@ -26,6 +26,11 @@ impl FixedSizeBinary { self.values .resize(self.values.len() + additional * self.size, 0); } + + #[inline] + pub fn len(&mut self) -> usize { + self.values.len() / self.size + } } impl Pushable<&[u8]> for FixedSizeBinary { diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index 7cebf343b10..b973876bd92 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -46,8 +46,6 @@ fn read_dict_buffer_optional( A: ArrowNativeType, F: Fn(T) -> A, { - let length = additional + values.len(); - let values_iterator = values_iter(indices_buffer, dict.values(), additional, op); let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); @@ -55,10 +53,10 @@ fn read_dict_buffer_optional( extend_from_decoder( validity, &mut validity_iterator, - length, + additional, values, values_iterator, - ) + ); } fn read_dict_buffer_required( @@ -73,11 +71,9 @@ fn read_dict_buffer_required( A: ArrowNativeType, F: Fn(T) -> A, { + debug_assert_eq!(0, validity.len()); let values_iterator = values_iter(indices_buffer, dict.values(), additional, op); - values.extend(values_iterator); - - validity.extend_constant(additional, true); } fn read_nullable( diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index 99c666b784f..7111672ef4f 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -75,16 +75,17 @@ where F: Copy + Fn(T) -> A, I: FallibleStreamingIterator, { + let is_nullable = nested.pop().unwrap().is_nullable(); let capacity = metadata.num_values() as usize; let mut values = Vec::::with_capacity(capacity); - let mut validity = MutableBitmap::with_capacity(capacity); - - let is_nullable = nested.pop().unwrap().is_nullable(); + let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable)); if nested.is_empty() { while let Some(page) = iter.next()? { basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity, op)? } + debug_assert_eq!(values.len(), capacity); + debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable)); } else { while let Some(page) = iter.next()? { nested::extend_from_page( diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/utils.rs index 25aa171cbc3..a7e5fbe77b7 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/utils.rs @@ -117,16 +117,13 @@ pub(super) fn extend_from_decoder<'a, T: Default, C: Pushable, I: Iterator { // compute the length of the pack let pack_size = pack.len() * 8; - let pack_remaining = page_length; - let length = std::cmp::min(pack_size, pack_remaining); - - let additional = remaining.min(length); + let additional = pack_size.min(remaining); // extend validity validity.extend_from_slice(pack, 0, additional); @@ -140,13 +137,13 @@ pub(super) fn extend_from_decoder<'a, T: Default, C: Pushable, I: Iterator { + hybrid_rle::HybridEncoded::Rle(value, additional) => { let is_set = value[0] == 1; // extend validity - let length = length; - let additional = remaining.min(length); validity.extend_constant(additional, is_set); // extend values @@ -155,9 +152,13 @@ pub(super) fn extend_from_decoder<'a, T: Default, C: Pushable, I: Iterator(