From 9ea3406f32411a14facaf17a641159aa52276a57 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 28 Jan 2022 07:08:47 +0000 Subject: [PATCH] Simpler --- src/io/parquet/read/binary/basic.rs | 68 +++++++------------ src/io/parquet/read/binary/dictionary.rs | 19 ++++-- src/io/parquet/read/boolean/basic.rs | 55 +++++++-------- src/io/parquet/read/dictionary.rs | 10 ++- .../parquet/read/fixed_size_binary/basic.rs | 37 ++++------ src/io/parquet/read/primitive/basic.rs | 39 +++++------ src/io/parquet/read/primitive/dictionary.rs | 18 +++-- src/io/parquet/read/utils.rs | 10 +-- 8 files changed, 113 insertions(+), 143 deletions(-) diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index f0fc329a8be..ed6ec36df4b 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -14,8 +14,8 @@ use crate::{ error::Result, }; +use super::super::utils::{extend_from_decoder, OptionalPageValidity}; use super::super::DataPages; -use super::super::utils::{extend_from_decoder, Decoder, OptionalPageValidity}; use super::{super::utils, utils::Binary}; fn read_delta_optional( @@ -220,27 +220,13 @@ impl TraitBinaryArray for Utf8Array { } } -#[derive(Debug)] -struct BinaryDecoder> { +#[derive(Debug, Default)] +struct BinaryDecoder { phantom_o: std::marker::PhantomData, - phantom_a: std::marker::PhantomData, } -impl> Default for BinaryDecoder { - #[inline] - fn default() -> Self { - Self { - phantom_o: std::marker::PhantomData, - phantom_a: std::marker::PhantomData, - } - } -} - -impl<'a, O: Offset, A: TraitBinaryArray> utils::Decoder<'a, &'a [u8], Binary> - for BinaryDecoder -{ +impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder { type State = State<'a>; - type Array = A; fn with_capacity(&self, capacity: usize) -> Binary { Binary::::with_capacity(capacity) @@ -281,15 +267,19 @@ impl<'a, O: Offset, A: TraitBinaryArray> utils::Decoder<'a, &'a [u8], Binary< } } } +} - fn finish(data_type: DataType, values: Binary, validity: MutableBitmap) -> Self::Array { - A::from_data( - data_type, - values.offsets.0.into(), - values.values.into(), - validity.into(), - ) - } +fn finish>( + data_type: &DataType, + values: Binary, + validity: MutableBitmap, +) -> A { + A::from_data( + data_type.clone(), + values.offsets.0.into(), + values.values.into(), + validity.into(), + ) } pub struct BinaryArrayIterator, I: DataPages> { @@ -320,13 +310,10 @@ impl, I: DataPages> Iterator for BinaryArrayIt fn next(&mut self) -> Option { // back[a1, a2, a3, ...]front if self.items.len() > 1 { - return self.items.pop_back().map(|(values, validity)| { - Ok(BinaryDecoder::finish( - self.data_type.clone(), - values, - validity, - )) - }); + return self + .items + .pop_back() + .map(|(values, validity)| Ok(finish(&self.data_type, values, validity))); } match (self.items.pop_back(), self.iter.next()) { (_, Err(e)) => Some(Err(e.into())), @@ -340,17 +327,18 @@ impl, I: DataPages> Iterator for BinaryArrayIt Err(e) => return Some(Err(e)), }; - utils::extend_from_new_page::, _, _>( + utils::extend_from_new_page( page, state, - &self.data_type, self.chunk_size, &mut self.items, - &BinaryDecoder::::default(), + &BinaryDecoder::::default(), ) }; match maybe_array { - Ok(Some(array)) => Some(Ok(array)), + Ok(Some((values, validity))) => { + Some(Ok(finish(&self.data_type, values, validity))) + } Ok(None) => self.next(), Err(e) => Some(Err(e)), } @@ -359,11 +347,7 @@ impl, I: DataPages> Iterator for BinaryArrayIt // we have a populated item and no more pages // the only case where an item's length may be smaller than chunk_size debug_assert!(values.len() <= self.chunk_size); - Some(Ok(BinaryDecoder::finish( - self.data_type.clone(), - values, - validity, - ))) + Some(Ok(finish(&self.data_type, values, validity))) } } } diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/binary/dictionary.rs index df530ec5a5b..55c34c171cc 100644 --- a/src/io/parquet/read/binary/dictionary.rs +++ b/src/io/parquet/read/binary/dictionary.rs @@ -3,7 +3,9 @@ use std::{collections::VecDeque, sync::Arc}; use parquet2::page::BinaryPageDict; use crate::{ - array::{Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, Utf8Array}, + array::{ + Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array, + }, bitmap::MutableBitmap, datatypes::{DataType, PhysicalType}, error::{ArrowError, Result}, @@ -11,7 +13,6 @@ use crate::{ use super::super::dictionary::*; use super::super::utils; -use super::super::utils::Decoder; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation @@ -66,7 +67,7 @@ where // back[a1, a2, a3, ...]front if self.items.len() > 1 { return self.items.pop_back().map(|(values, validity)| { - let keys = PrimitiveDecoder::::finish(self.data_type.clone(), values, validity); + let keys = finish_key(values, validity); let values = self.values.unwrap(); Ok(DictionaryArray::from_data(keys, values)) }); @@ -128,14 +129,19 @@ where utils::extend_from_new_page::, _, _>( page, state, - &self.data_type, self.chunk_size, &mut self.items, &PrimitiveDecoder::default(), ) }; match maybe_array { - Ok(Some(keys)) => { + Ok(Some((values, validity))) => { + let keys = PrimitiveArray::from_data( + K::PRIMITIVE.into(), + values.into(), + validity.into(), + ); + let values = self.values.unwrap(); Some(Ok(DictionaryArray::from_data(keys, values))) } @@ -148,7 +154,8 @@ where // the only case where an item's length may be smaller than chunk_size debug_assert!(values.len() <= self.chunk_size); - let keys = PrimitiveDecoder::::finish(self.data_type.clone(), values, validity); + let keys = finish_key(values, validity); + let values = self.values.unwrap(); Some(Ok(DictionaryArray::from_data(keys, values))) } diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index fbeb771f311..98e376d9cb9 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -59,32 +59,32 @@ impl<'a> Required<'a> { } } -// The state of a `DataPage` of `Boolean` parquet primitive type +// The state of a `DataPage` of `Boolean` parquet boolean type #[derive(Debug)] -enum BooleanPageState<'a> { +enum State<'a> { Optional(Optional<'a>), Required(Required<'a>), } -impl<'a> BooleanPageState<'a> { +impl<'a> State<'a> { pub fn len(&self) -> usize { match self { - BooleanPageState::Optional(page) => page.validity.len(), - BooleanPageState::Required(page) => page.length - page.offset, + State::Optional(page) => page.validity.len(), + State::Required(page) => page.length - page.offset, } } } -impl<'a> utils::PageState<'a> for BooleanPageState<'a> { +impl<'a> utils::PageState<'a> for State<'a> { fn len(&self) -> usize { self.len() } } -fn build_state(page: &DataPage, is_optional: bool) -> Result { +fn build_state(page: &DataPage, is_optional: bool) -> Result { match (page.encoding(), is_optional) { - (Encoding::Plain, true) => Ok(BooleanPageState::Optional(Optional::new(page))), - (Encoding::Plain, false) => Ok(BooleanPageState::Required(Required::new(page))), + (Encoding::Plain, true) => Ok(State::Optional(Optional::new(page))), + (Encoding::Plain, false) => Ok(State::Required(Required::new(page))), _ => Err(utils::not_implemented( &page.encoding(), is_optional, @@ -99,8 +99,7 @@ fn build_state(page: &DataPage, is_optional: bool) -> Result { struct BooleanDecoder {} impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { - type State = BooleanPageState<'a>; - type Array = BooleanArray; + type State = State<'a>; fn with_capacity(&self, capacity: usize) -> MutableBitmap { MutableBitmap::with_capacity(capacity) @@ -113,24 +112,24 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { remaining: usize, ) { match state { - BooleanPageState::Optional(page) => extend_from_decoder( + State::Optional(page) => extend_from_decoder( validity, &mut page.validity, Some(remaining), values, &mut page.values, ), - BooleanPageState::Required(page) => { + State::Required(page) => { let remaining = remaining.min(page.length - page.offset); values.extend_from_slice(page.values, page.offset, remaining); page.offset += remaining; } } } +} - fn finish(data_type: DataType, values: MutableBitmap, validity: MutableBitmap) -> Self::Array { - BooleanArray::from_data(data_type, values.into(), validity.into()) - } +fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap) -> BooleanArray { + BooleanArray::from_data(data_type.clone(), values.into(), validity.into()) } /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays @@ -161,13 +160,10 @@ impl Iterator for BooleanArrayIterator { fn next(&mut self) -> Option { // back[a1, a2, a3, ...]front if self.items.len() > 1 { - return self.items.pop_back().map(|(values, validity)| { - Ok(BooleanDecoder::finish( - self.data_type.clone(), - values, - validity, - )) - }); + return self + .items + .pop_back() + .map(|(values, validity)| Ok(finish(&self.data_type, values, validity))); } match (self.items.pop_back(), self.iter.next()) { (_, Err(e)) => Some(Err(e.into())), @@ -180,16 +176,17 @@ impl Iterator for BooleanArrayIterator { Err(e) => return Some(Err(e)), }; - let maybe_array = extend_from_new_page::( + let maybe_array = extend_from_new_page( page, state, - &self.data_type, self.chunk_size, &mut self.items, &BooleanDecoder::default(), ); match maybe_array { - Ok(Some(array)) => Some(Ok(array)), + Ok(Some((values, validity))) => { + Some(Ok(finish(&self.data_type, values, validity))) + } Ok(None) => self.next(), Err(e) => Some(Err(e)), } @@ -198,11 +195,7 @@ impl Iterator for BooleanArrayIterator { // we have a populated item and no more pages // the only case where an item's length may be smaller than chunk_size debug_assert!(values.len() <= self.chunk_size); - Some(Ok(BooleanDecoder::finish( - self.data_type.clone(), - values, - validity, - ))) + Some(Ok(finish(&self.data_type, values, validity))) } } } diff --git a/src/io/parquet/read/dictionary.rs b/src/io/parquet/read/dictionary.rs index 2b3e1163e2a..fc774bb249b 100644 --- a/src/io/parquet/read/dictionary.rs +++ b/src/io/parquet/read/dictionary.rs @@ -9,7 +9,6 @@ use super::utils; use crate::{ array::{Array, DictionaryKey, PrimitiveArray}, bitmap::MutableBitmap, - datatypes::DataType, error::Result, io::parquet::read::utils::{extend_from_decoder, OptionalPageValidity}, }; @@ -126,7 +125,6 @@ where K: DictionaryKey, { type State = State<'a, K>; - type Array = PrimitiveArray; fn with_capacity(&self, capacity: usize) -> Vec { Vec::::with_capacity(capacity) @@ -151,10 +149,6 @@ where }*/ } } - - fn finish(data_type: DataType, values: Vec, validity: MutableBitmap) -> Self::Array { - PrimitiveArray::from_data(data_type, values.into(), validity.into()) - } } #[derive(Debug)] @@ -171,3 +165,7 @@ impl Dict { } } } + +pub fn finish_key(values: Vec, validity: MutableBitmap) -> PrimitiveArray { + PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into()) +} diff --git a/src/io/parquet/read/fixed_size_binary/basic.rs b/src/io/parquet/read/fixed_size_binary/basic.rs index d432d16c6ae..6b958f5318a 100644 --- a/src/io/parquet/read/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/fixed_size_binary/basic.rs @@ -160,7 +160,6 @@ struct BinaryDecoder { impl<'a> Decoder<'a, &'a [u8], FixedSizeBinary> for BinaryDecoder { type State = State<'a>; - type Array = FixedSizeBinaryArray; fn with_capacity(&self, capacity: usize) -> FixedSizeBinary { FixedSizeBinary::with_capacity(capacity, self.size) @@ -201,14 +200,14 @@ impl<'a> Decoder<'a, &'a [u8], FixedSizeBinary> for BinaryDecoder { } } } +} - fn finish( - data_type: DataType, - values: FixedSizeBinary, - validity: MutableBitmap, - ) -> Self::Array { - FixedSizeBinaryArray::from_data(data_type, values.values.into(), validity.into()) - } +fn finish( + data_type: &DataType, + values: FixedSizeBinary, + validity: MutableBitmap, +) -> FixedSizeBinaryArray { + FixedSizeBinaryArray::from_data(data_type.clone(), values.values.into(), validity.into()) } pub struct BinaryArrayIterator { @@ -240,13 +239,10 @@ impl Iterator for BinaryArrayIterator { fn next(&mut self) -> Option { // back[a1, a2, a3, ...]front if self.items.len() > 1 { - return self.items.pop_back().map(|(values, validity)| { - Ok(BinaryDecoder::finish( - self.data_type.clone(), - values, - validity, - )) - }); + return self + .items + .pop_back() + .map(|(values, validity)| Ok(finish(&self.data_type, values, validity))); } match (self.items.pop_back(), self.iter.next()) { (_, Err(e)) => Some(Err(e.into())), @@ -263,14 +259,15 @@ impl Iterator for BinaryArrayIterator { extend_from_new_page::( page, state, - &self.data_type, self.chunk_size, &mut self.items, &BinaryDecoder { size: self.size }, ) }; match maybe_array { - Ok(Some(array)) => Some(Ok(array)), + Ok(Some((values, validity))) => { + Some(Ok(finish(&self.data_type, values, validity))) + } Ok(None) => self.next(), Err(e) => Some(Err(e)), } @@ -279,11 +276,7 @@ impl Iterator for BinaryArrayIterator { // we have a populated item and no more pages // the only case where an item's length may be smaller than chunk_size debug_assert!(values.len() <= self.chunk_size); - Some(Ok(BinaryDecoder::finish( - self.data_type.clone(), - values, - validity, - ))) + Some(Ok(finish(&self.data_type, values, validity))) } } } diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index c272240de04..636625117cc 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -6,7 +6,7 @@ use parquet2::{ types::NativeType as ParquetNativeType, }; -use crate::io::parquet::read::utils::{Decoder, OptionalPageValidity}; +use crate::io::parquet::read::utils::OptionalPageValidity; use crate::{ array::PrimitiveArray, bitmap::MutableBitmap, datatypes::DataType, error::Result, types::NativeType, @@ -287,7 +287,6 @@ where F: Copy + Fn(P) -> T, { type State = PrimitivePageState<'a, T, P, G, F>; - type Array = PrimitiveArray; fn with_capacity(&self, capacity: usize) -> Vec { Vec::::with_capacity(capacity) @@ -322,14 +321,14 @@ where } } } +} - fn finish(data_type: DataType, values: Vec, validity: MutableBitmap) -> Self::Array { - let data_type = match data_type { - DataType::Dictionary(_, values, _) => values.as_ref().clone(), - _ => data_type, - }; - PrimitiveArray::from_data(data_type, values.into(), validity.into()) - } +pub(super) fn finish( + data_type: &DataType, + values: Vec, + validity: MutableBitmap, +) -> PrimitiveArray { + PrimitiveArray::from_data(data_type.clone(), values.into(), validity.into()) } /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays @@ -395,13 +394,10 @@ where fn next(&mut self) -> Option { // back[a1, a2, a3, ...]front if self.items.len() > 1 { - return self.items.pop_back().map(|(values, validity)| { - Ok(PrimitiveDecoder::::finish( - self.data_type.clone(), - values, - validity, - )) - }); + return self + .items + .pop_back() + .map(|(values, validity)| Ok(finish(&self.data_type, values, validity))); } match (self.items.pop_back(), self.iter.next()) { (_, Err(e)) => Some(Err(e.into())), @@ -418,14 +414,15 @@ where utils::extend_from_new_page::, _, _>( page, state, - &self.data_type, self.chunk_size, &mut self.items, &PrimitiveDecoder::default(), ) }; match maybe_array { - Ok(Some(array)) => Some(Ok(array)), + Ok(Some((values, validity))) => { + Some(Ok(finish(&self.data_type, values, validity))) + } Ok(None) => self.next(), Err(e) => Some(Err(e)), } @@ -434,11 +431,7 @@ where // we have a populated item and no more pages // the only case where an item's length may be smaller than chunk_size debug_assert!(values.len() <= self.chunk_size); - Some(Ok(PrimitiveDecoder::::finish( - self.data_type.clone(), - values, - validity, - ))) + Some(Ok(finish(&self.data_type, values, validity))) } } } diff --git a/src/io/parquet/read/primitive/dictionary.rs b/src/io/parquet/read/primitive/dictionary.rs index 6d3e3436222..56172494407 100644 --- a/src/io/parquet/read/primitive/dictionary.rs +++ b/src/io/parquet/read/primitive/dictionary.rs @@ -12,7 +12,6 @@ use crate::{ use super::super::dictionary::*; use super::super::utils; -use super::super::utils::Decoder; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays @@ -45,6 +44,10 @@ where F: Copy + Fn(P) -> T, { fn new(iter: I, data_type: DataType, chunk_size: usize, is_optional: bool, op: F) -> Self { + let data_type = match data_type { + DataType::Dictionary(_, values, _) => *values, + _ => data_type, + }; Self { iter, data_type, @@ -72,7 +75,7 @@ where // back[a1, a2, a3, ...]front if self.items.len() > 1 { return self.items.pop_back().map(|(values, validity)| { - let keys = PrimitiveDecoder::::finish(self.data_type.clone(), values, validity); + let keys = finish_key(values, validity); let values = self.values.unwrap(); Ok(DictionaryArray::from_data(keys, values)) }); @@ -96,7 +99,7 @@ where .collect::>(); Dict::Complete(Arc::new(PrimitiveArray::from_data( - T::PRIMITIVE.into(), + self.data_type.clone(), values.into(), None, )) as _) @@ -120,14 +123,15 @@ where utils::extend_from_new_page::, _, _>( page, state, - &self.data_type, self.chunk_size, &mut self.items, &PrimitiveDecoder::default(), ) }; match maybe_array { - Ok(Some(keys)) => { + Ok(Some((values, validity))) => { + let keys = finish_key(values, validity); + let values = self.values.unwrap(); Some(Ok(DictionaryArray::from_data(keys, values))) } @@ -140,7 +144,9 @@ where // the only case where an item's length may be smaller than chunk_size debug_assert!(values.len() <= self.chunk_size); - let keys = PrimitiveDecoder::::finish(self.data_type.clone(), values, validity); + let keys = + PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into()); + let values = self.values.unwrap(); Some(Ok(DictionaryArray::from_data(keys, values))) } diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/utils.rs index 880f95e2e2a..c4c01c464e5 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/utils.rs @@ -6,10 +6,9 @@ use parquet2::metadata::ColumnDescriptor; use parquet2::page::{split_buffer as _split_buffer, DataPage, DataPageHeader}; use streaming_iterator::{convert, Convert, StreamingIterator}; -use crate::array::{Array, DictionaryKey}; +use crate::array::DictionaryKey; use crate::bitmap::utils::BitmapIter; use crate::bitmap::MutableBitmap; -use crate::datatypes::DataType; use crate::error::ArrowError; pub struct BinaryIter<'a> { @@ -269,7 +268,6 @@ pub(super) trait PageState<'a> { /// A decoder that knows how to map `State` -> Array pub(super) trait Decoder<'a, C: Default, P: Pushable> { type State: PageState<'a>; - type Array: Array; /// Initializes a new pushable fn with_capacity(&self, capacity: usize) -> P; @@ -282,17 +280,15 @@ pub(super) trait Decoder<'a, C: Default, P: Pushable> { validity: &mut MutableBitmap, additional: usize, ); - fn finish(data_type: DataType, values: P, validity: MutableBitmap) -> Self::Array; } pub(super) fn extend_from_new_page<'a, T: Decoder<'a, C, P>, C: Default, P: Pushable>( mut page: T::State, state: Option<(P, MutableBitmap)>, - data_type: &DataType, chunk_size: usize, items: &mut VecDeque<(P, MutableBitmap)>, decoder: &T, -) -> Result, ArrowError> { +) -> Result, ArrowError> { let (mut values, mut validity) = if let Some((values, validity)) = state { // there is a already a state => it must be incomplete... debug_assert!( @@ -329,5 +325,5 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a, C, P>, C: Default, P: Push } // and return this array - Ok(Some(T::finish(data_type.clone(), values, validity))) + Ok(Some((values, validity))) }