From 9e96c9eceba6590cc346733480634aedad3beaa8 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 29 Jan 2022 08:02:36 +0000 Subject: [PATCH] Simpler --- src/io/parquet/read/boolean/basic.rs | 19 ++-- src/io/parquet/read/boolean/nested.rs | 150 ++++++-------------------- src/io/parquet/read/nested_utils.rs | 115 +++++++++++++++++++- src/io/parquet/read/utils.rs | 1 + 4 files changed, 158 insertions(+), 127 deletions(-) diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index a3a57e96da0..1f0dd5e8843 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -15,6 +15,16 @@ use super::super::utils::{ }; use super::super::DataPages; +#[inline] +pub(super) fn values_iter(values: &[u8]) -> BitmapIter { + // in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap. + // note that `values_buffer` contains only non-null values. + // thus, at this point, it is not known how many values this buffer contains + // values_len is the upper bound. The actual number depends on how many nulls there is. + let values_len = values.len() * 8; + BitmapIter::new(values, 0, values_len) +} + // The state of an optional DataPage with a boolean physical type #[derive(Debug)] struct Optional<'a> { @@ -26,15 +36,8 @@ impl<'a> Optional<'a> { pub fn new(page: &'a DataPage) -> Self { let (_, validity_buffer, values_buffer, _) = split_buffer(page, page.descriptor()); - // in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap. - // note that `values_buffer` contains only non-null values. - // thus, at this point, it is not known how many values this buffer contains - // values_len is the upper bound. The actual number depends on how many nulls there is. - let values_len = values_buffer.len() * 8; - let values = BitmapIter::new(values_buffer, 0, values_len); - Self { - values, + values: values_iter(values_buffer), validity: OptionalPageValidity::new(validity_buffer, page.num_values()), } } diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index 67ee315d6b5..942caf4a505 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -1,55 +1,19 @@ use std::collections::VecDeque; -use parquet2::{ - encoding::{hybrid_rle::HybridRleDecoder, Encoding}, - page::DataPage, - read::levels::get_bit_width, - schema::Repetition, -}; +use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition}; use crate::{ array::BooleanArray, bitmap::{utils::BitmapIter, MutableBitmap}, datatypes::{DataType, Field}, error::Result, - io::parquet::read::{utils::Decoder, DataPages}, }; use super::super::nested_utils::*; use super::super::utils; - -// The state of an optional DataPage with a boolean physical type -#[derive(Debug)] -struct Optional<'a> { - values: BitmapIter<'a>, - definition_levels: HybridRleDecoder<'a>, - max_def: u32, -} - -impl<'a> Optional<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, def_levels, values_buffer, _) = utils::split_buffer(page, page.descriptor()); - - // in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap. - // note that `values_buffer` contains only non-null values. - // thus, at this point, it is not known how many values this buffer contains - // values_len is the upper bound. The actual number depends on how many nulls there is. - let values_len = values_buffer.len() * 8; - let values = BitmapIter::new(values_buffer, 0, values_len); - - let max_def = page.descriptor().max_def_level(); - - Self { - values, - definition_levels: HybridRleDecoder::new( - def_levels, - get_bit_width(max_def), - page.num_values(), - ), - max_def: max_def as u32, - } - } -} +use super::super::utils::{Decoder, MaybeNext}; +use super::super::DataPages; +use super::basic::values_iter; // The state of a required DataPage with a boolean physical type #[derive(Debug)] @@ -74,14 +38,14 @@ impl<'a> Required<'a> { #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum State<'a> { - Optional(Optional<'a>), + Optional(Optional<'a>, BitmapIter<'a>), Required(Required<'a>), } impl<'a> State<'a> { pub fn len(&self) -> usize { match self { - State::Optional(page) => page.definition_levels.size_hint().0, + State::Optional(optional, _) => optional.len(), State::Required(page) => page.length - page.offset, } } @@ -104,7 +68,10 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; match (page.encoding(), is_optional) { - (Encoding::Plain, true) => Ok(State::Optional(Optional::new(page))), + (Encoding::Plain, true) => { + let (_, _, values, _) = utils::split_buffer(page, page.descriptor()); + Ok(State::Optional(Optional::new(page), values_iter(values))) + } (Encoding::Plain, false) => Ok(State::Required(Required::new(page))), _ => Err(utils::not_implemented( &page.encoding(), @@ -127,10 +94,10 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { required: usize, ) { match state { - State::Optional(page) => read_optional_values( - page.definition_levels.by_ref(), - page.max_def, - page.values.by_ref(), + State::Optional(page_validity, page_values) => read_optional_values( + page_validity.definition_levels.by_ref(), + page_validity.max_def(), + page_values.by_ref(), values, validity, required, @@ -166,82 +133,29 @@ impl ArrayIterator { } } +fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap) -> BooleanArray { + BooleanArray::from_data(data_type.clone(), values.into(), validity.into()) +} + impl Iterator for ArrayIterator { type Item = Result<(NestedState, BooleanArray)>; fn next(&mut self) -> Option { - // back[a1, a2, a3, ...]front - if self.items.len() > 1 { - let nested = self.nested.pop_back().unwrap(); - let (values, validity) = self.items.pop_back().unwrap(); - let array = BooleanArray::from_data(DataType::Boolean, values.into(), validity.into()); - return Some(Ok((nested, array))); - } - match ( - self.nested.pop_back(), - self.items.pop_back(), - self.iter.next(), - ) { - (_, _, Err(e)) => Some(Err(e.into())), - (None, None, Ok(None)) => None, - (state, p_state, Ok(Some(page))) => { - // the invariant - assert_eq!(state.is_some(), p_state.is_some()); - - // there is a new page => consume the page from the start - let mut nested_page = NestedPage::new(page); - - // read next chunk from `nested_page` and get number of values to read - let maybe_nested = extend_offsets1( - &mut nested_page, - state, - &self.field, - &mut self.nested, - self.chunk_size, - ); - let nested = match maybe_nested { - Ok(nested) => nested, - Err(e) => return Some(Err(e)), - }; - // at this point we know whether there were enough rows in `page` - // to fill chunk_size or not (`nested.is_some()`) - // irrespectively, we need to consume the values from the page - - let maybe_page = BooleanDecoder::default().build_state(page); - let page = match maybe_page { - Ok(page) => page, - Err(e) => return Some(Err(e)), - }; - - let maybe_array = extend_from_new_page::( - page, - p_state, - &mut self.items, - &nested, - &self.nested, - &BooleanDecoder::default(), - ); - let state = match maybe_array { - Ok(s) => s, - Err(e) => return Some(Err(e)), - }; - match nested { - Some(p_state) => Some(Ok(( - p_state, - BooleanArray::from_data(DataType::Boolean, state.0.into(), state.1.into()), - ))), - None => self.next(), - } - } - (Some(nested), Some((values, validity)), Ok(None)) => { - // we have a populated item and no more pages - // the only case where an item's length may be smaller than chunk_size - let array = - BooleanArray::from_data(DataType::Boolean, values.into(), validity.into()); - Some(Ok((nested, array))) + let maybe_state = next( + &mut self.iter, + &mut self.items, + &mut self.nested, + &self.field, + self.chunk_size, + &BooleanDecoder::default(), + ); + match maybe_state { + MaybeNext::Some(Ok((nested, values, validity))) => { + Some(Ok((nested, finish(&DataType::Boolean, values, validity)))) } - (Some(_), None, _) => unreachable!(), - (None, Some(_), _) => unreachable!(), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), } } } diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index fbee8238277..bd3ba52b9d8 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -12,7 +12,10 @@ use crate::{ error::{ArrowError, Result}, }; -use super::utils::{split_buffer, Decoder, Pushable}; +use super::{ + utils::{split_buffer, Decoder, MaybeNext, Pushable}, + DataPages, +}; /// trait describing deserialized repetition and definition levels pub trait Nested: std::fmt::Debug { @@ -598,3 +601,113 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi nested.close(*length); }); } + +// The state of an optional DataPage with a boolean physical type +#[derive(Debug)] +pub struct Optional<'a> { + pub definition_levels: HybridRleDecoder<'a>, + max_def: u32, +} + +impl<'a> Optional<'a> { + pub fn new(page: &'a DataPage) -> Self { + let (_, def_levels, values_buffer, _) = split_buffer(page, page.descriptor()); + + let max_def = page.descriptor().max_def_level(); + + Self { + definition_levels: HybridRleDecoder::new( + def_levels, + get_bit_width(max_def), + page.num_values(), + ), + max_def: max_def as u32, + } + } + + #[inline] + pub fn len(&self) -> usize { + self.definition_levels.size_hint().0 + } + + #[inline] + pub fn max_def(&self) -> u32 { + self.max_def + } +} + +#[inline] +pub(super) fn next<'a, I, C, P, D>( + iter: &'a mut I, + items: &mut VecDeque<(P, MutableBitmap)>, + nested_items: &mut VecDeque, + field: &Field, + chunk_size: usize, + decoder: &D, +) -> MaybeNext> +where + I: DataPages, + C: Default, + P: Pushable, + D: Decoder<'a, C, P>, +{ + // back[a1, a2, a3, ...]front + if items.len() > 1 { + let nested = nested_items.pop_back().unwrap(); + let (values, validity) = items.pop_back().unwrap(); + //let array = BooleanArray::from_data(DataType::Boolean, values.into(), validity.into()); + return MaybeNext::Some(Ok((nested, values, validity))); + } + match (nested_items.pop_back(), items.pop_back(), iter.next()) { + (_, _, Err(e)) => MaybeNext::Some(Err(e.into())), + (None, None, Ok(None)) => MaybeNext::None, + (state, p_state, Ok(Some(page))) => { + // the invariant + assert_eq!(state.is_some(), p_state.is_some()); + + // there is a new page => consume the page from the start + let mut nested_page = NestedPage::new(page); + + // read next chunk from `nested_page` and get number of values to read + let maybe_nested = + extend_offsets1(&mut nested_page, state, field, nested_items, chunk_size); + let nested = match maybe_nested { + Ok(nested) => nested, + Err(e) => return MaybeNext::Some(Err(e)), + }; + // at this point we know whether there were enough rows in `page` + // to fill chunk_size or not (`nested.is_some()`) + // irrespectively, we need to consume the values from the page + + let maybe_page = decoder.build_state(page); + let page = match maybe_page { + Ok(page) => page, + Err(e) => return MaybeNext::Some(Err(e)), + }; + + let maybe_array = extend_from_new_page::( + page, + p_state, + items, + &nested, + nested_items, + decoder, + ); + let state = match maybe_array { + Ok(s) => s, + Err(e) => return MaybeNext::Some(Err(e)), + }; + match nested { + Some(p_state) => MaybeNext::Some(Ok((p_state, state.0, state.1))), + None => MaybeNext::More, + } + } + (Some(nested), Some((values, validity)), Ok(None)) => { + // we have a populated item and no more pages + // the only case where an item's length may be smaller than chunk_size + MaybeNext::Some(Ok((nested, values, validity))) + } + (Some(_), None, _) => unreachable!(), + (None, Some(_), _) => unreachable!(), + } +} diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/utils.rs index 2d849cfdfef..8679763c330 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/utils.rs @@ -54,6 +54,7 @@ pub fn not_implemented( )) } +#[inline] pub fn split_buffer<'a>( page: &'a DataPage, descriptor: &ColumnDescriptor,