diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 8c55e4d2844..f0fc329a8be 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -3,7 +3,6 @@ use std::default::Default; use parquet2::{ encoding::{delta_length_byte_array, hybrid_rle, Encoding}, - metadata::ColumnDescriptor, page::{BinaryPageDict, DataPage}, }; @@ -13,69 +12,12 @@ use crate::{ buffer::Buffer, datatypes::DataType, error::Result, - io::parquet::read::{ - utils::{extend_from_decoder, Decoder, OptionalPageValidity, Pushable}, - DataPages, - }, }; +use super::super::DataPages; +use super::super::utils::{extend_from_decoder, Decoder, OptionalPageValidity}; use super::{super::utils, utils::Binary}; -#[inline] -fn values_iter<'a>( - indices_buffer: &'a [u8], - dict: &'a BinaryPageDict, - additional: usize, -) -> impl Iterator + 'a { - let dict_values = dict.values(); - let dict_offsets = dict.offsets(); - - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; - - let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); - indices.map(move |index| { - let index = index as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - &dict_values[dict_offset_i..dict_offset_ip1] - }) -} - -/// Assumptions: No rep levels -#[allow(clippy::too_many_arguments)] -fn read_dict_buffer( - validity_buffer: &[u8], - indices_buffer: &[u8], - additional: usize, - dict: &BinaryPageDict, - values: &mut Binary, - validity: &mut MutableBitmap, -) { - let values_iter = values_iter(indices_buffer, dict, additional); - - let mut page_validity = OptionalPageValidity::new(validity_buffer, additional); - - extend_from_decoder(validity, &mut page_validity, None, values, values_iter); -} - -#[allow(clippy::too_many_arguments)] -fn read_dict_required( - indices_buffer: &[u8], - additional: usize, - dict: &BinaryPageDict, - values: &mut Binary, - validity: &mut MutableBitmap, -) { - debug_assert_eq!(0, validity.len()); - let values_iterator = values_iter(indices_buffer, dict, additional); - for value in values_iterator { - values.push(value); - } -} - fn read_delta_optional( validity_buffer: &[u8], values_buffer: &[u8], @@ -112,93 +54,6 @@ fn read_delta_optional( values.extend_from_slice(new_values); } -fn read_plain_optional( - validity_buffer: &[u8], - values_buffer: &[u8], - additional: usize, - values: &mut Binary, - validity: &mut MutableBitmap, -) { - // values_buffer: first 4 bytes are len, remaining is values - let values_iter = utils::BinaryIter::new(values_buffer); - - let mut page_validity = OptionalPageValidity::new(validity_buffer, additional); - - extend_from_decoder(validity, &mut page_validity, None, values, values_iter) -} - -pub(super) fn read_plain_required( - buffer: &[u8], - additional: usize, - values: &mut Binary, -) { - let values_iterator = utils::BinaryIter::new(buffer); - - // each value occupies 4 bytes + len declared in 4 bytes => reserve accordingly. - values.offsets.reserve(additional); - values.values.reserve(buffer.len() - 4 * additional); - let a = values.values.capacity(); - for value in values_iterator { - values.push(value); - } - debug_assert_eq!(a, values.values.capacity()); -} - -pub(super) fn extend_from_page( - page: &DataPage, - descriptor: &ColumnDescriptor, - values: &mut Binary, - validity: &mut MutableBitmap, -) -> Result<()> { - let additional = page.num_values(); - assert_eq!(descriptor.max_rep_level(), 0); - assert!(descriptor.max_def_level() <= 1); - let is_optional = descriptor.max_def_level() == 1; - - let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor); - - match (&page.encoding(), page.dictionary_page(), is_optional) { - (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { - read_dict_buffer::( - validity_buffer, - values_buffer, - additional, - dict.as_any().downcast_ref().unwrap(), - values, - validity, - ) - } - (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { - read_dict_required::( - values_buffer, - additional, - dict.as_any().downcast_ref().unwrap(), - values, - validity, - ) - } - (Encoding::DeltaLengthByteArray, None, true) => { - read_delta_optional::(validity_buffer, values_buffer, additional, values, validity) - } - (Encoding::Plain, _, true) => { - read_plain_optional::(validity_buffer, values_buffer, additional, values, validity) - } - (Encoding::Plain, _, false) => { - read_plain_required::(page.buffer(), page.num_values(), values) - } - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_optional, - page.dictionary_page().is_some(), - version, - "Binary", - )) - } - }; - Ok(()) -} - struct Optional<'a> { values: utils::BinaryIter<'a>, validity: OptionalPageValidity<'a>, diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 4839c882dd7..3ac7d92d697 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -1,14 +1,9 @@ use std::sync::Arc; -use futures::{pin_mut, Stream, StreamExt}; -use parquet2::{metadata::ColumnChunkMetaData, page::DataPage, FallibleStreamingIterator}; - use crate::{ array::{Array, Offset}, - bitmap::MutableBitmap, datatypes::DataType, - error::{ArrowError, Result}, - io::parquet::read::binary::utils::finish_array, + error::Result, }; mod basic; @@ -18,40 +13,11 @@ mod utils; pub use dictionary::iter_to_arrays as iter_to_dict_arrays; -use self::{basic::TraitBinaryArray, utils::Binary}; +use self::basic::TraitBinaryArray; use super::{nested_utils::Nested, DataPages}; use basic::BinaryArrayIterator; -pub async fn stream_to_array( - pages: I, - metadata: &ColumnChunkMetaData, - data_type: &DataType, -) -> Result> -where - ArrowError: From, - O: Offset, - E: Clone, - I: Stream>, -{ - let capacity = metadata.num_values() as usize; - let mut values = Binary::::with_capacity(capacity); - let mut validity = MutableBitmap::with_capacity(capacity); - - pin_mut!(pages); // needed for iteration - - while let Some(page) = pages.next().await { - basic::extend_from_page( - page.as_ref().map_err(|x| x.clone())?, - metadata.descriptor(), - &mut values, - &mut validity, - )? - } - - Ok(finish_array(data_type.clone(), values, validity)) -} - /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays<'a, O, A, I>( iter: I, diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index b927e7fa617..5ae93116a0f 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -6,11 +6,24 @@ use parquet2::{ }; use super::super::utils; -use super::basic::read_plain_required; +use super::super::utils::Pushable; use super::{super::nested_utils::*, utils::Binary}; use crate::{array::Offset, bitmap::MutableBitmap, error::Result}; +fn read_plain_required(buffer: &[u8], additional: usize, values: &mut Binary) { + let values_iterator = utils::BinaryIter::new(buffer); + + // each value occupies 4 bytes + len declared in 4 bytes => reserve accordingly. + values.offsets.reserve(additional); + values.values.reserve(buffer.len() - 4 * additional); + let a = values.values.capacity(); + for value in values_iterator { + values.push(value); + } + debug_assert_eq!(a, values.values.capacity()); +} + fn read_values<'a, O, D, G>( def_levels: D, max_def: u32, diff --git a/src/io/parquet/read/binary/utils.rs b/src/io/parquet/read/binary/utils.rs index 7468975ca5a..612dbccb0c9 100644 --- a/src/io/parquet/read/binary/utils.rs +++ b/src/io/parquet/read/binary/utils.rs @@ -1,31 +1,4 @@ -use crate::{ - array::{Array, BinaryArray, Offset, Utf8Array}, - bitmap::MutableBitmap, - datatypes::DataType, - io::parquet::read::utils::Pushable, -}; - -pub(super) fn finish_array( - data_type: DataType, - values: Binary, - validity: MutableBitmap, -) -> Box { - match data_type { - DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data( - data_type, - values.offsets.0.into(), - values.values.into(), - validity.into(), - )), - DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data( - data_type, - values.offsets.0.into(), - values.values.into(), - validity.into(), - )), - _ => unreachable!(), - } -} +use crate::{array::Offset, io::parquet::read::utils::Pushable}; /// [`Pushable`] for variable length binary data. #[derive(Debug)] diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index 4b5b70f6f51..fbeb771f311 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -10,14 +10,11 @@ use crate::{ }; use super::super::utils; -use super::super::utils::{extend_from_decoder, split_buffer, Decoder, OptionalPageValidity}; +use super::super::utils::{ + extend_from_decoder, extend_from_new_page, split_buffer, Decoder, OptionalPageValidity, +}; use super::super::DataPages; -pub(super) fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) { - // in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap. - values.extend_from_slice(buffer, 0, additional); -} - // The state of an optional DataPage with a boolean physical type #[derive(Debug)] struct Optional<'a> { @@ -101,7 +98,7 @@ fn build_state(page: &DataPage, is_optional: bool) -> Result { #[derive(Default)] struct BooleanDecoder {} -impl<'a> utils::Decoder<'a, bool, MutableBitmap> for BooleanDecoder { +impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { type State = BooleanPageState<'a>; type Array = BooleanArray; @@ -183,7 +180,7 @@ impl Iterator for BooleanArrayIterator { Err(e) => return Some(Err(e)), }; - let maybe_array = utils::extend_from_new_page::( + let maybe_array = extend_from_new_page::( page, state, &self.data_type, diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index 9bb4c5869cd..bf3d4125759 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -7,12 +7,16 @@ use parquet2::{ use super::super::nested_utils::*; use super::super::utils; -use super::basic::read_required; use crate::{ bitmap::{utils::BitmapIter, MutableBitmap}, error::Result, }; +fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) { + // in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap. + values.extend_from_slice(buffer, 0, additional); +} + fn read_values( def_levels: D, max_def: u32, diff --git a/src/io/parquet/read/fixed_size_binary/mod.rs b/src/io/parquet/read/fixed_size_binary/mod.rs index 471f378d6f9..4acd297d52a 100644 --- a/src/io/parquet/read/fixed_size_binary/mod.rs +++ b/src/io/parquet/read/fixed_size_binary/mod.rs @@ -1,205 +1,4 @@ mod basic; mod utils; -use futures::{pin_mut, Stream, StreamExt}; -use parquet2::{ - encoding::{hybrid_rle, Encoding}, - page::{DataPage, FixedLenByteArrayPageDict}, - FallibleStreamingIterator, -}; - -use self::utils::FixedSizeBinary; - -use super::{utils::extend_from_decoder, ColumnChunkMetaData, ColumnDescriptor}; -use crate::{ - array::FixedSizeBinaryArray, - bitmap::MutableBitmap, - datatypes::DataType, - error::{ArrowError, Result}, - io::parquet::read::utils::OptionalPageValidity, -}; pub use basic::BinaryArrayIterator; - -use super::utils as a_utils; - -#[inline] -fn values_iter<'a>( - indices_buffer: &'a [u8], - dict_values: &'a [u8], - size: usize, - additional: usize, -) -> impl Iterator + 'a { - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; - - let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); - indices.map(move |index| { - let index = index as usize; - &dict_values[index * size..(index + 1) * size] - }) -} - -/// Assumptions: No rep levels -#[allow(clippy::too_many_arguments)] -pub(crate) fn read_dict_buffer( - validity_buffer: &[u8], - indices_buffer: &[u8], - additional: usize, - dict: &FixedLenByteArrayPageDict, - values: &mut FixedSizeBinary, - validity: &mut MutableBitmap, -) { - let values_iter = values_iter(indices_buffer, dict.values(), values.size, additional); - - let mut page_validity = OptionalPageValidity::new(validity_buffer, additional); - - extend_from_decoder(validity, &mut page_validity, None, values, values_iter) -} - -/// Assumptions: No rep levels -pub(crate) fn read_dict_required( - indices_buffer: &[u8], - additional: usize, - dict: &FixedLenByteArrayPageDict, - values: &mut FixedSizeBinary, - validity: &mut MutableBitmap, -) { - debug_assert!(validity.is_empty()); - - let values_iter = values_iter(indices_buffer, dict.values(), values.size, additional); - for value in values_iter { - values.push(value); - } -} - -pub(crate) fn read_optional( - validity_buffer: &[u8], - values_buffer: &[u8], - additional: usize, - values: &mut FixedSizeBinary, - validity: &mut MutableBitmap, -) { - assert_eq!(values_buffer.len() % values.size, 0); - let values_iter = values_buffer.chunks_exact(values.size); - - let mut page_validity = OptionalPageValidity::new(validity_buffer, additional); - - extend_from_decoder(validity, &mut page_validity, None, values, values_iter) -} - -pub(crate) fn read_required(buffer: &[u8], additional: usize, values: &mut FixedSizeBinary) { - assert_eq!(buffer.len(), additional * values.size); - values.values.extend_from_slice(buffer); -} - -pub fn iter_to_array( - mut iter: I, - data_type: DataType, - metadata: &ColumnChunkMetaData, -) -> Result -where - ArrowError: From, - I: FallibleStreamingIterator, -{ - let is_nullable = metadata.descriptor().max_def_level() == 1; - let size = FixedSizeBinaryArray::get_size(&data_type); - let capacity = metadata.num_values() as usize; - let mut values = FixedSizeBinary::with_capacity(capacity, size); - let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable)); - - while let Some(page) = iter.next()? { - extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)? - } - debug_assert_eq!(values.len(), capacity); - debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable)); - - Ok(FixedSizeBinaryArray::from_data( - data_type, - values.values.into(), - validity.into(), - )) -} - -pub async fn stream_to_array( - pages: I, - data_type: DataType, - metadata: &ColumnChunkMetaData, -) -> Result -where - ArrowError: From, - E: Clone, - I: Stream>, -{ - let size = FixedSizeBinaryArray::get_size(&data_type); - - let capacity = metadata.num_values() as usize; - let mut values = FixedSizeBinary::with_capacity(capacity, size); - let mut validity = MutableBitmap::with_capacity(capacity); - - pin_mut!(pages); // needed for iteration - - while let Some(page) = pages.next().await { - extend_from_page( - page.as_ref().map_err(|x| x.clone())?, - metadata.descriptor(), - &mut values, - &mut validity, - )? - } - - Ok(FixedSizeBinaryArray::from_data( - data_type, - values.values.into(), - validity.into(), - )) -} - -pub(crate) fn extend_from_page( - page: &DataPage, - descriptor: &ColumnDescriptor, - values: &mut FixedSizeBinary, - validity: &mut MutableBitmap, -) -> Result<()> { - let additional = page.num_values(); - assert_eq!(descriptor.max_rep_level(), 0); - assert!(descriptor.max_def_level() <= 1); - let is_optional = descriptor.max_def_level() == 1; - - let (_, validity_buffer, values_buffer, version) = a_utils::split_buffer(page, descriptor); - - match (page.encoding(), page.dictionary_page(), is_optional) { - (Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer( - validity_buffer, - values_buffer, - additional, - dict.as_any().downcast_ref().unwrap(), - values, - validity, - ), - (Encoding::PlainDictionary, Some(dict), false) => read_dict_required( - values_buffer, - additional, - dict.as_any().downcast_ref().unwrap(), - values, - validity, - ), - (Encoding::Plain, _, true) => { - read_optional(validity_buffer, values_buffer, additional, values, validity) - } - // it can happen that there is a dictionary but the encoding is plain because - // it falled back. - (Encoding::Plain, _, false) => read_required(page.buffer(), additional, values), - _ => { - return Err(a_utils::not_implemented( - &page.encoding(), - is_optional, - page.dictionary_page().is_some(), - version, - "FixedSizeBinary", - )) - } - } - Ok(()) -} diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 54928d700d1..257141c93ea 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -7,7 +7,7 @@ use std::{ sync::Arc, }; -use futures::{AsyncRead, AsyncSeek, Stream}; +use futures::{AsyncRead, AsyncSeek}; pub use parquet2::{ error::ParquetError, fallible_streaming_iterator, @@ -74,17 +74,6 @@ pub fn get_page_iterator( )?) } -/// Creates a new iterator of compressed pages. -pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>( - column_metadata: &'a ColumnChunkMetaData, - reader: &'a mut RR, - pages_filter: Option, - buffer: Vec, -) -> Result> + 'a> { - let pages_filter = pages_filter.unwrap_or_else(|| Arc::new(|_, _| true)); - Ok(_get_page_stream(column_metadata, reader, buffer, pages_filter).await?) -} - /// Reads parquets' metadata syncronously. pub fn read_metadata(reader: &mut R) -> Result { Ok(_read_metadata(reader)?)