diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 61d98bfbdbd..b088ffc78b4 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use std::default::Default; use parquet2::{ - encoding::{delta_length_byte_array, hybrid_rle, Encoding}, + encoding::{hybrid_rle, Encoding}, page::{BinaryPageDict, DataPage}, schema::Repetition, }; @@ -19,6 +19,7 @@ use super::super::utils::{extend_from_decoder, next, MaybeNext, OptionalPageVali use super::super::DataPages; use super::{super::utils, utils::Binary}; +/* fn read_delta_optional( validity_buffer: &[u8], values_buffer: &[u8], @@ -54,6 +55,7 @@ fn read_delta_optional( let new_values = values_iterator.into_values(); values.extend_from_slice(new_values); } + */ struct Optional<'a> { values: utils::BinaryIter<'a>, @@ -68,7 +70,7 @@ impl<'a> Optional<'a> { Self { values, - validity: OptionalPageValidity::new(validity_buffer, page.num_values()), + validity: OptionalPageValidity::new(page), } } } @@ -135,13 +137,13 @@ struct OptionalDictionary<'a> { impl<'a> OptionalDictionary<'a> { fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self { - let (_, validity_buffer, values_buffer, _) = utils::split_buffer(page, page.descriptor()); + let (_, _, values_buffer, _) = utils::split_buffer(page, page.descriptor()); let values = values_iter1(values_buffer, dict, page.num_values()); Self { values, - validity: OptionalPageValidity::new(validity_buffer, page.num_values()), + validity: OptionalPageValidity::new(page), } } } diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 170eaa65bc8..366b50af857 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -15,7 +15,7 @@ pub use dictionary::iter_to_arrays as iter_to_dict_arrays; use self::basic::TraitBinaryArray; -use super::{nested_utils::Nested, DataPages}; +use super::DataPages; use basic::BinaryArrayIterator; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index 1f0dd5e8843..6fee13c3a0c 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -34,11 +34,11 @@ struct Optional<'a> { impl<'a> Optional<'a> { pub fn new(page: &'a DataPage) -> Self { - let (_, validity_buffer, values_buffer, _) = split_buffer(page, page.descriptor()); + let (_, _, values_buffer, _) = split_buffer(page, page.descriptor()); Self { values: values_iter(values_buffer), - validity: OptionalPageValidity::new(validity_buffer, page.num_values()), + validity: OptionalPageValidity::new(page), } } } diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index 942caf4a505..73b70c337c6 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -94,14 +94,17 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { required: usize, ) { match state { - State::Optional(page_validity, page_values) => read_optional_values( - page_validity.definition_levels.by_ref(), - page_validity.max_def(), - page_values.by_ref(), - values, - validity, - required, - ), + State::Optional(page_validity, page_values) => { + let max_def = page_validity.max_def(); + read_optional_values( + page_validity.definition_levels.by_ref(), + max_def, + page_values.by_ref(), + values, + validity, + required, + ) + } State::Required(page) => { values.extend_from_slice(page.values, page.offset, required); page.offset += required; diff --git a/src/io/parquet/read/dictionary.rs b/src/io/parquet/read/dictionary.rs index 904ca112a48..ab3af462a4c 100644 --- a/src/io/parquet/read/dictionary.rs +++ b/src/io/parquet/read/dictionary.rs @@ -55,13 +55,13 @@ where K: DictionaryKey, { fn new(page: &'a DataPage) -> Self { - let (_, validity_buffer, indices_buffer, _) = utils::split_buffer(page, page.descriptor()); + let (_, _, indices_buffer, _) = utils::split_buffer(page, page.descriptor()); let values = values_iter1(indices_buffer, page.num_values()); Self { values, - validity: OptionalPageValidity::new(validity_buffer, page.num_values()), + validity: OptionalPageValidity::new(page), } } } diff --git a/src/io/parquet/read/fixed_size_binary/basic.rs b/src/io/parquet/read/fixed_size_binary/basic.rs index 367586a2212..90eda3a67a1 100644 --- a/src/io/parquet/read/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/fixed_size_binary/basic.rs @@ -29,13 +29,13 @@ struct Optional<'a> { impl<'a> Optional<'a> { fn new(page: &'a DataPage, size: usize) -> Self { - let (_, validity_buffer, values_buffer, _) = split_buffer(page, page.descriptor()); + let (_, _, values_buffer, _) = split_buffer(page, page.descriptor()); let values = values_buffer.chunks_exact(size); Self { values, - validity: OptionalPageValidity::new(validity_buffer, page.num_values()), + validity: OptionalPageValidity::new(page), } } } @@ -100,13 +100,13 @@ struct OptionalDictionary<'a> { impl<'a> OptionalDictionary<'a> { fn new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Self { - let (_, validity_buffer, values_buffer, _) = split_buffer(page, page.descriptor()); + let (_, _, values_buffer, _) = split_buffer(page, page.descriptor()); let values = values_iter1(values_buffer, dict, page.num_values()); Self { values, - validity: OptionalPageValidity::new(validity_buffer, page.num_values()), + validity: OptionalPageValidity::new(page), } } } diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index ec199d46de6..d151da71fbe 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -13,7 +13,7 @@ use crate::{ }; use super::super::utils; -use super::super::utils::{split_buffer, OptionalPageValidity}; +use super::super::utils::OptionalPageValidity; use super::super::DataPages; #[derive(Debug)] @@ -35,10 +35,12 @@ where G: for<'b> Fn(&'b [u8]) -> P, F: Fn(P) -> T, { - fn new(data: &'a [u8], op1: G, op2: F) -> Self { + fn new(page: &'a DataPage, op1: G, op2: F) -> Self { + let (_, _, values, _) = utils::split_buffer(page, page.descriptor()); + assert_eq!(values.len(), page.num_values() * std::mem::size_of::()); Self { phantom: Default::default(), - values: data + values: values .chunks_exact(std::mem::size_of::

()) .map(op1) .map(op2), @@ -190,32 +192,21 @@ where (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { let dict = dict.as_any().downcast_ref().unwrap(); - let (_, validity_buffer, values_buffer, _) = - utils::split_buffer(page, page.descriptor()); + let (_, _, values_buffer, _) = utils::split_buffer(page, page.descriptor()); Ok(State::OptionalDictionary( - OptionalPageValidity::new(validity_buffer, page.num_values()), + OptionalPageValidity::new(page), ValuesDictionary::new(values_buffer, page.num_values(), dict, self.op2), )) } (Encoding::Plain, None, true) => { - let (_, validity_buffer, values_buffer, _) = split_buffer(page, page.descriptor()); - - let validity = OptionalPageValidity::new(validity_buffer, page.num_values()); - let values = Values::new(values_buffer, self.op1, self.op2); + let validity = OptionalPageValidity::new(page); + let values = Values::new(page, self.op1, self.op2); Ok(State::Optional(validity, values)) } (Encoding::Plain, None, false) => { - assert_eq!( - page.buffer().len(), - page.num_values() * std::mem::size_of::() - ); - Ok(State::Required(Values::new( - page.buffer(), - self.op1, - self.op2, - ))) + Ok(State::Required(Values::new(page, self.op1, self.op2))) } _ => Err(utils::not_implemented( &page.encoding(), diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/utils.rs index 8679763c330..ae7dbf50ac7 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/utils.rs @@ -6,7 +6,6 @@ use parquet2::metadata::ColumnDescriptor; use parquet2::page::{split_buffer as _split_buffer, DataPage, DataPageHeader}; use streaming_iterator::{convert, Convert, StreamingIterator}; -use crate::array::DictionaryKey; use crate::bitmap::utils::BitmapIter; use crate::bitmap::MutableBitmap; use crate::error::ArrowError; @@ -144,13 +143,15 @@ pub struct OptionalPageValidity<'a> { impl<'a> OptionalPageValidity<'a> { #[inline] - pub fn new(validity: &'a [u8], length: usize) -> Self { + pub fn new(page: &'a DataPage) -> Self { + let (_, validity, _, _) = split_buffer(page, page.descriptor()); + let validity = convert(hybrid_rle::Decoder::new(validity, 1)); Self { validity, run_offset: 0, consumed: 0, - length, + length: page.num_values(), } } @@ -238,6 +239,7 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable, I: Iterator( validity_buffer: &[u8], indices_buffer: &[u8], @@ -260,6 +262,7 @@ pub(super) fn read_dict_optional( extend_from_decoder(validity, &mut page_validity, None, indices, indices_iter) } + */ /// The state of a partially deserialized page pub(super) trait PageState<'a> {