diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index ac33073e46d..81da62548fd 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -72,61 +72,40 @@ impl<'a> Required<'a> { } } -#[inline] -fn values_iter1<'a>( - indices_buffer: &'a [u8], - dict: &'a BinaryPageDict, - additional: usize, -) -> std::iter::Map, Box &'a [u8] + 'a>> { - let dict_values = dict.values(); - let dict_offsets = dict.offsets(); - - let op = Box::new(move |index: u32| { - let index = index as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - &dict_values[dict_offset_i..dict_offset_ip1] - }) as _; - - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; - - let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); - indices.map(op) -} - struct RequiredDictionary<'a> { - pub values: std::iter::Map, Box &'a [u8] + 'a>>, + pub values: hybrid_rle::HybridRleDecoder<'a>, pub remaining: usize, + pub dict: &'a BinaryPageDict, } impl<'a> RequiredDictionary<'a> { fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self { - let values = values_iter1(page.buffer(), dict, page.num_values()); + let values = utils::dict_indices_decoder(page.buffer(), page.num_values()); Self { values, remaining: page.num_values(), + dict, } } } struct OptionalDictionary<'a> { - values: std::iter::Map, Box &'a [u8] + 'a>>, + values: hybrid_rle::HybridRleDecoder<'a>, validity: OptionalPageValidity<'a>, + dict: &'a BinaryPageDict, } impl<'a> OptionalDictionary<'a> { fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self { - let (_, _, values_buffer, _) = utils::split_buffer(page, page.descriptor()); + let (_, _, indices_buffer) = utils::split_buffer(page); - let values = values_iter1(values_buffer, dict, page.num_values()); + let values = utils::dict_indices_decoder(indices_buffer, page.num_values()); Self { values, validity: OptionalPageValidity::new(page), + dict, } } } @@ -208,7 +187,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder ))) } (Encoding::Plain, _, true) => { - let (_, _, values, _) = utils::split_buffer(page, page.descriptor()); + let (_, _, values) = utils::split_buffer(page); let values = BinaryIter::new(values); @@ -230,6 +209,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder } fn extend_from_state( + &self, state: &mut Self::State, values: &mut Binary, validity: &mut MutableBitmap, @@ -249,16 +229,36 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder values.push(x) } } - State::OptionalDictionary(page) => extend_from_decoder( - validity, - &mut page.validity, - Some(additional), - values, - &mut page.values, - ), + State::OptionalDictionary(page) => { + let dict_values = page.dict.values(); + let dict_offsets = page.dict.offsets(); + + let op = move |index: u32| { + let index = index as usize; + let dict_offset_i = dict_offsets[index] as usize; + let dict_offset_ip1 = dict_offsets[index + 1] as usize; + &dict_values[dict_offset_i..dict_offset_ip1] + }; + extend_from_decoder( + validity, + &mut page.validity, + Some(additional), + values, + &mut page.values.by_ref().map(op), + ) + } State::RequiredDictionary(page) => { + let dict_values = page.dict.values(); + let dict_offsets = page.dict.offsets(); + let op = move |index: u32| { + let index = index as usize; + let dict_offset_i = dict_offsets[index] as usize; + let dict_offset_ip1 = dict_offsets[index + 1] as usize; + &dict_values[dict_offset_i..dict_offset_ip1] + }; + page.remaining = page.remaining.saturating_sub(additional); - for x in page.values.by_ref().take(additional) { + for x in page.values.by_ref().map(op).take(additional) { values.push(x) } } diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index 1efe7e79be7..2276829e70c 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -47,7 +47,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::Plain, None, true) => { - let (_, _, values, _) = utils::split_buffer(page, page.descriptor()); + let (_, _, values) = utils::split_buffer(page); let values = utils::BinaryIter::new(values); @@ -69,6 +69,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder } fn extend_from_state( + &self, state: &mut Self::State, values: &mut Binary, validity: &mut MutableBitmap, diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index 8172356411b..ced615b6101 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -15,16 +15,6 @@ use super::super::utils::{ }; use super::super::DataPages; -#[inline] -pub(super) fn values_iter(values: &[u8]) -> BitmapIter { - // in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap. - // note that `values_buffer` contains only non-null values. - // thus, at this point, it is not known how many values this buffer contains - // values_len is the upper bound. The actual number depends on how many nulls there is. - let values_len = values.len() * 8; - BitmapIter::new(values, 0, values_len) -} - // The state of an optional DataPage with a boolean physical type #[derive(Debug)] struct Optional<'a> { @@ -34,10 +24,10 @@ struct Optional<'a> { impl<'a> Optional<'a> { pub fn new(page: &'a DataPage) -> Self { - let (_, _, values_buffer, _) = split_buffer(page, page.descriptor()); + let (_, _, values_buffer) = split_buffer(page); Self { - values: values_iter(values_buffer), + values: BitmapIter::new(values_buffer, 0, values_buffer.len() * 8), validity: OptionalPageValidity::new(page), } } @@ -112,6 +102,7 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { } fn extend_from_state( + &self, state: &mut Self::State, values: &mut MutableBitmap, validity: &mut MutableBitmap, diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index ade7e82719d..2010e3633f6 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -13,7 +13,6 @@ use super::super::nested_utils::*; use super::super::utils; use super::super::utils::{Decoder, MaybeNext}; use super::super::DataPages; -use super::basic::values_iter; // The state of a required DataPage with a boolean physical type #[derive(Debug)] @@ -69,8 +68,10 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { match (page.encoding(), is_optional) { (Encoding::Plain, true) => { - let (_, _, values, _) = utils::split_buffer(page, page.descriptor()); - Ok(State::Optional(Optional::new(page), values_iter(values))) + let (_, _, values) = utils::split_buffer(page); + let values = BitmapIter::new(values, 0, values.len() * 8); + + Ok(State::Optional(Optional::new(page), values)) } (Encoding::Plain, false) => Ok(State::Required(Required::new(page))), _ => Err(utils::not_implemented( @@ -88,6 +89,7 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { } fn extend_from_state( + &self, state: &mut State, values: &mut MutableBitmap, validity: &mut MutableBitmap, diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index aff31d43998..fe870d904d2 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -12,7 +12,6 @@ use crate::{ array::{Array, BinaryArray, DictionaryKey, PrimitiveArray, Utf8Array}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{ArrowError, Result}, - io::parquet::read::primitive::read_item, }; use super::binary; @@ -50,39 +49,34 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( pages, data_type, chunk_size, - read_item, |x: i32| x as u8, ))), UInt16 => dyn_iter(iden(primitive::Iter::new( pages, data_type, chunk_size, - read_item, |x: i32| x as u16, ))), UInt32 => dyn_iter(iden(primitive::Iter::new( pages, data_type, chunk_size, - read_item, |x: i32| x as u32, ))), Int8 => dyn_iter(iden(primitive::Iter::new( pages, data_type, chunk_size, - read_item, |x: i32| x as i8, ))), Int16 => dyn_iter(iden(primitive::Iter::new( pages, data_type, chunk_size, - read_item, |x: i32| x as i16, ))), Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(iden( - primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i32| x as i32), + primitive::Iter::new(pages, data_type, chunk_size, |x: i32| x as i32), )), Timestamp(time_unit, None) => { @@ -104,14 +98,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( pages, data_type, chunk_size, - read_item, |x: i32| x as i128, ))), PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, chunk_size, - read_item, |x: i64| x as i128, ))), &PhysicalType::FixedLenByteArray(n) if n > 16 => { @@ -159,13 +151,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( // INT64 Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => dyn_iter(iden( - primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x as i64), + primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x as i64), )), UInt64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, chunk_size, - read_item, |x: i64| x as u64, ))), @@ -173,14 +164,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( pages, data_type, chunk_size, - read_item, |x: f32| x, ))), Float64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, chunk_size, - read_item, |x: f64| x, ))), @@ -226,7 +215,6 @@ fn timestamp<'a, I: 'a + DataPages>( pages, data_type, chunk_size, - read_item, int96_to_i64_ns, )))); } else { @@ -241,7 +229,7 @@ fn timestamp<'a, I: 'a + DataPages>( )); } - let iter = primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x); + let iter = primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x); let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { unit diff --git a/src/io/parquet/read/dictionary.rs b/src/io/parquet/read/dictionary.rs index 00a2ccdf8cb..250facbd5a8 100644 --- a/src/io/parquet/read/dictionary.rs +++ b/src/io/parquet/read/dictionary.rs @@ -57,7 +57,7 @@ where K: DictionaryKey, { fn new(page: &'a DataPage) -> Self { - let (_, _, indices_buffer, _) = utils::split_buffer(page, page.descriptor()); + let (_, _, indices_buffer) = utils::split_buffer(page); let values = values_iter1(indices_buffer, page.num_values()); @@ -79,7 +79,7 @@ where K: DictionaryKey, { fn new(page: &'a DataPage) -> Self { - let (_, _, indices_buffer, _) = utils::split_buffer(page, page.descriptor()); + let (_, _, indices_buffer) = utils::split_buffer(page); let values = values_iter1(indices_buffer, page.num_values()); @@ -154,6 +154,7 @@ where } fn extend_from_state( + &self, state: &mut Self::State, values: &mut Vec, validity: &mut MutableBitmap, diff --git a/src/io/parquet/read/fixed_size_binary/basic.rs b/src/io/parquet/read/fixed_size_binary/basic.rs index 0c7801ac5fa..e3e8e593b7f 100644 --- a/src/io/parquet/read/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/fixed_size_binary/basic.rs @@ -13,8 +13,8 @@ use crate::{ error::Result, io::parquet::read::{ utils::{ - extend_from_decoder, next, not_implemented, split_buffer, Decoder, MaybeNext, - OptionalPageValidity, PageState, + dict_indices_decoder, extend_from_decoder, next, not_implemented, split_buffer, + Decoder, MaybeNext, OptionalPageValidity, PageState, }, DataPages, }, @@ -29,7 +29,7 @@ struct Optional<'a> { impl<'a> Optional<'a> { fn new(page: &'a DataPage, size: usize) -> Self { - let (_, _, values_buffer, _) = split_buffer(page, page.descriptor()); + let (_, _, values_buffer) = split_buffer(page); let values = values_buffer.chunks_exact(size); @@ -54,59 +54,40 @@ impl<'a> Required<'a> { } } -#[inline] -fn values_iter1<'a>( - indices_buffer: &'a [u8], - dict: &'a FixedLenByteArrayPageDict, - additional: usize, -) -> std::iter::Map, Box &'a [u8] + 'a>> { - let dict_values = dict.values(); - let size = dict.size(); - - let op = Box::new(move |index: u32| { - let index = index as usize; - &dict_values[index * size..(index + 1) * size] - }) as _; - - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; - - let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); - indices.map(op) -} - struct RequiredDictionary<'a> { - pub values: std::iter::Map, Box &'a [u8] + 'a>>, + pub values: hybrid_rle::HybridRleDecoder<'a>, pub remaining: usize, + dict: &'a FixedLenByteArrayPageDict, } impl<'a> RequiredDictionary<'a> { fn new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Self { - let values = values_iter1(page.buffer(), dict, page.num_values()); + let values = dict_indices_decoder(page.buffer(), page.num_values()); Self { values, remaining: page.num_values(), + dict, } } } struct OptionalDictionary<'a> { - values: std::iter::Map, Box &'a [u8] + 'a>>, + values: hybrid_rle::HybridRleDecoder<'a>, validity: OptionalPageValidity<'a>, + dict: &'a FixedLenByteArrayPageDict, } impl<'a> OptionalDictionary<'a> { fn new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Self { - let (_, _, values_buffer, _) = split_buffer(page, page.descriptor()); + let (_, _, indices_buffer) = split_buffer(page); - let values = values_iter1(values_buffer, dict, page.num_values()); + let values = dict_indices_decoder(indices_buffer, page.num_values()); Self { values, validity: OptionalPageValidity::new(page), + dict, } } } @@ -170,6 +151,7 @@ impl<'a> Decoder<'a, &'a [u8], FixedSizeBinary> for BinaryDecoder { } fn extend_from_state( + &self, state: &mut Self::State, values: &mut FixedSizeBinary, validity: &mut MutableBitmap, @@ -189,16 +171,32 @@ impl<'a> Decoder<'a, &'a [u8], FixedSizeBinary> for BinaryDecoder { values.push(x) } } - State::OptionalDictionary(page) => extend_from_decoder( - validity, - &mut page.validity, - Some(remaining), - values, - &mut page.values, - ), + State::OptionalDictionary(page) => { + let dict_values = page.dict.values(); + let size = page.dict.size(); + let op = |index: u32| { + let index = index as usize; + &dict_values[index * size..(index + 1) * size] + }; + + extend_from_decoder( + validity, + &mut page.validity, + Some(remaining), + values, + page.values.by_ref().map(op), + ) + } State::RequiredDictionary(page) => { + let dict_values = page.dict.values(); + let size = page.dict.size(); + let op = |index: u32| { + let index = index as usize; + &dict_values[index * size..(index + 1) * size] + }; + page.remaining -= remaining; - for x in page.values.by_ref().take(remaining) { + for x in page.values.by_ref().map(op).take(remaining) { values.push(x) } } diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 4a4c6be60f0..9539652a7de 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -33,7 +33,6 @@ use crate::{ }, datatypes::{DataType, Field}, error::{ArrowError, Result}, - io::parquet::read::primitive::read_item, types::NativeType, }; @@ -243,7 +242,6 @@ where init.pop().unwrap(), field.data_type().clone(), chunk_size, - read_item, |x: i32| x as i16, ) } @@ -254,7 +252,6 @@ where init.pop().unwrap(), field.data_type().clone(), chunk_size, - read_item, |x: i64| x, ) } diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index 603c3bcdddb..8f9e2d32bf2 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -280,7 +280,7 @@ pub struct NestedPage<'a> { impl<'a> NestedPage<'a> { pub fn new(page: &'a DataPage) -> Self { - let (rep_levels, def_levels, _, _) = split_buffer(page, page.descriptor()); + let (rep_levels, def_levels, _) = split_buffer(page); let max_rep_level = page.descriptor().max_rep_level(); let max_def_level = page.descriptor().max_def_level(); @@ -366,7 +366,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a, C, P>, C: Default, P: Push let remaining = needed - values.len(); // extend the current state - T::extend_from_state(&mut page, &mut values, &mut validity, remaining); + decoder.extend_from_state(&mut page, &mut values, &mut validity, remaining); // the number of values required is always fulfilled because // dremel assigns one (rep, def) to each value and we request @@ -377,7 +377,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a, C, P>, C: Default, P: Push let num_values = nest.num_values(); let mut values = decoder.with_capacity(num_values); let mut validity = MutableBitmap::with_capacity(num_values); - T::extend_from_state(&mut page, &mut values, &mut validity, num_values); + decoder.extend_from_state(&mut page, &mut values, &mut validity, num_values); items.push_back((values, validity)); } @@ -502,7 +502,7 @@ pub struct Optional<'a> { impl<'a> Optional<'a> { pub fn new(page: &'a DataPage) -> Self { - let (_, def_levels, _, _) = split_buffer(page, page.descriptor()); + let (_, def_levels, _) = split_buffer(page); let max_def = page.descriptor().max_def_level(); diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index f160cd408cf..5f289bb050a 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -4,6 +4,7 @@ use parquet2::{ encoding::{hybrid_rle, Encoding}, page::{DataPage, PrimitivePageDict}, schema::Repetition, + types::decode, types::NativeType as ParquetNativeType, }; @@ -17,33 +18,24 @@ use super::super::utils::OptionalPageValidity; use super::super::DataPages; #[derive(Debug)] -pub(super) struct Values<'a, T, P, G, F> +pub(super) struct Values<'a, P> where - T: NativeType, P: ParquetNativeType, - G: for<'b> Fn(&'b [u8]) -> P, - F: Fn(P) -> T, { - pub values: std::iter::Map, G>, F>, + pub values: std::slice::ChunksExact<'a, u8>, phantom: std::marker::PhantomData

, } -impl<'a, T, P, G, F> Values<'a, T, P, G, F> +impl<'a, P> Values<'a, P> where - T: NativeType, P: ParquetNativeType, - G: for<'b> Fn(&'b [u8]) -> P, - F: Fn(P) -> T, { - pub fn new(page: &'a DataPage, op1: G, op2: F) -> Self { - let (_, _, values, _) = utils::split_buffer(page, page.descriptor()); + pub fn new(page: &'a DataPage) -> Self { + let (_, _, values) = utils::split_buffer(page); assert_eq!(values.len() % std::mem::size_of::

(), 0); Self { - phantom: Default::default(), - values: values - .chunks_exact(std::mem::size_of::

()) - .map(op1) - .map(op2), + values: values.chunks_exact(std::mem::size_of::

()), + phantom: std::marker::PhantomData, } } @@ -53,55 +45,24 @@ where } } -#[inline] -fn values_iter1( - indices_buffer: &[u8], - additional: usize, - op1: G, - op2: F, -) -> std::iter::Map, F> -where - T: NativeType, - G: Fn(u32) -> P, - F: Fn(P) -> T, -{ - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; - - let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); - indices.map(op1).map(op2) -} - #[derive(Debug)] -pub(super) struct ValuesDictionary<'a, T, P, F> +pub(super) struct ValuesDictionary<'a, P> where - T: NativeType, P: ParquetNativeType, - F: Fn(P) -> T, { - values: std::iter::Map< - std::iter::Map, Box P + 'a>>, - F, - >, - phantom: std::marker::PhantomData

, + values: hybrid_rle::HybridRleDecoder<'a>, + dict: &'a [P], } -impl<'a, T, P, F> ValuesDictionary<'a, T, P, F> +impl<'a, P> ValuesDictionary<'a, P> where - T: NativeType, P: ParquetNativeType, - F: Fn(P) -> T, { - fn new(data: &'a [u8], length: usize, dict: &'a PrimitivePageDict

, op2: F) -> Self { - let values = dict.values(); - let op1 = Box::new(move |index: u32| values[index as usize]) as Box P>; - - let values = values_iter1(data, length, op1, op2); + fn new(data: &'a [u8], length: usize, dict: &'a PrimitivePageDict

) -> Self { + let values = utils::dict_indices_decoder(data, length); Self { - phantom: Default::default(), + dict: dict.values(), values, } } @@ -114,25 +75,19 @@ where // The state of a `DataPage` of `Primitive` parquet primitive type #[derive(Debug)] -enum State<'a, T, P, G, F> +enum State<'a, P> where - T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, - F: Copy + Fn(P) -> T, { - Optional(OptionalPageValidity<'a>, Values<'a, T, P, G, F>), - Required(Values<'a, T, P, G, F>), - RequiredDictionary(ValuesDictionary<'a, T, P, F>), - OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a, T, P, F>), + Optional(OptionalPageValidity<'a>, Values<'a, P>), + Required(Values<'a, P>), + RequiredDictionary(ValuesDictionary<'a, P>), + OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a, P>), } -impl<'a, T, P, G, F> utils::PageState<'a> for State<'a, T, P, G, F> +impl<'a, P> utils::PageState<'a> for State<'a, P> where - T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, - F: Copy + Fn(P) -> T, { fn len(&self) -> usize { match self { @@ -145,45 +100,40 @@ where } #[derive(Debug)] -struct PrimitiveDecoder +struct PrimitiveDecoder where T: NativeType, P: ParquetNativeType, - G: for<'b> Fn(&'b [u8]) -> P, F: Fn(P) -> T, { phantom: std::marker::PhantomData, phantom_p: std::marker::PhantomData

, - op1: G, - op2: F, + op: F, } -impl<'a, T, P, G, F> PrimitiveDecoder +impl<'a, T, P, F> PrimitiveDecoder where T: NativeType, P: ParquetNativeType, - G: for<'b> Fn(&'b [u8]) -> P, F: Fn(P) -> T, { #[inline] - fn new(op1: G, op2: F) -> Self { + fn new(op: F) -> Self { Self { phantom: std::marker::PhantomData, phantom_p: std::marker::PhantomData, - op1, - op2, + op, } } } -impl<'a, T, P, G, F> utils::Decoder<'a, T, Vec> for PrimitiveDecoder +impl<'a, T, P, F> utils::Decoder<'a, T, Vec> for PrimitiveDecoder where T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, F: Copy + Fn(P) -> T, { - type State = State<'a, T, P, G, F>; + type State = State<'a, P>; fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = @@ -196,28 +146,25 @@ where page.buffer(), page.num_values(), dict, - self.op2, ))) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { let dict = dict.as_any().downcast_ref().unwrap(); - let (_, _, values_buffer, _) = utils::split_buffer(page, page.descriptor()); + let (_, _, values_buffer) = utils::split_buffer(page); Ok(State::OptionalDictionary( OptionalPageValidity::new(page), - ValuesDictionary::new(values_buffer, page.num_values(), dict, self.op2), + ValuesDictionary::new(values_buffer, page.num_values(), dict), )) } (Encoding::Plain, _, true) => { let validity = OptionalPageValidity::new(page); - let values = Values::new(page, self.op1, self.op2); + let values = Values::new(page); Ok(State::Optional(validity, values)) } - (Encoding::Plain, _, false) => { - Ok(State::Required(Values::new(page, self.op1, self.op2))) - } + (Encoding::Plain, _, false) => Ok(State::Required(Values::new(page))), _ => Err(utils::not_implemented( &page.encoding(), is_optional, @@ -233,6 +180,7 @@ where } fn extend_from_state( + &self, state: &mut Self::State, values: &mut Vec, validity: &mut MutableBitmap, @@ -244,20 +192,30 @@ where page_validity, Some(remaining), values, - &mut page_values.values, + page_values.values.by_ref().map(decode).map(self.op), ), State::Required(page) => { - values.extend(page.values.by_ref().take(remaining)); + values.extend( + page.values + .by_ref() + .map(decode) + .map(self.op) + .take(remaining), + ); + } + State::OptionalDictionary(page_validity, page_values) => { + let op1 = |index: u32| page_values.dict[index as usize]; + utils::extend_from_decoder( + validity, + page_validity, + Some(remaining), + values, + &mut page_values.values.by_ref().map(op1).map(self.op), + ) } - State::OptionalDictionary(page_validity, page_values) => utils::extend_from_decoder( - validity, - page_validity, - Some(remaining), - values, - &mut page_values.values, - ), State::RequiredDictionary(page) => { - values.extend(page.values.by_ref().take(remaining)); + let op1 = |index: u32| page.dict[index as usize]; + values.extend(page.values.by_ref().map(op1).map(self.op).take(remaining)); } } } @@ -278,51 +236,46 @@ pub(super) fn finish( /// An iterator adapter over [`DataPages`] assumed to be encoded as primitive arrays #[derive(Debug)] -pub struct Iter +pub struct Iter where I: DataPages, T: NativeType, P: ParquetNativeType, - G: for<'b> Fn(&'b [u8]) -> P, F: Fn(P) -> T, { iter: I, data_type: DataType, items: VecDeque<(Vec, MutableBitmap)>, chunk_size: usize, - op1: G, - op2: F, + op: F, phantom: std::marker::PhantomData

, } -impl Iter +impl Iter where I: DataPages, T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, F: Copy + Fn(P) -> T, { - pub fn new(iter: I, data_type: DataType, chunk_size: usize, op1: G, op2: F) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: usize, op: F) -> Self { Self { iter, data_type, items: VecDeque::new(), chunk_size, - op1, - op2, + op, phantom: Default::default(), } } } -impl Iterator for Iter +impl Iterator for Iter where I: DataPages, T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, F: Copy + Fn(P) -> T, { type Item = Result>; @@ -332,7 +285,7 @@ where &mut self.iter, &mut self.items, self.chunk_size, - &PrimitiveDecoder::new(self.op1, self.op2), + &PrimitiveDecoder::new(self.op), ); match maybe_state { utils::MaybeNext::Some(Ok((values, validity))) => { diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index a6cf2ad21c5..b2eba1534ff 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -1,10 +1,8 @@ mod basic; mod dictionary; mod nested; -mod utils; pub use dictionary::DictIter; -pub use utils::read_item; use std::sync::Arc; @@ -16,23 +14,21 @@ pub use basic::Iter; use nested::ArrayIterator; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, I, T, P, G, F>( +pub fn iter_to_arrays_nested<'a, I, T, P, F>( iter: I, init: InitNested, data_type: DataType, chunk_size: usize, - op1: G, - op2: F, + op: F, ) -> NestedArrayIter<'a> where I: 'a + DataPages, T: crate::types::NativeType, P: parquet2::types::NativeType, - G: 'a + Copy + Send + Sync + for<'b> Fn(&'b [u8]) -> P, F: 'a + Copy + Send + Sync + Fn(P) -> T, { Box::new( - ArrayIterator::::new(iter, init, data_type, chunk_size, op1, op2).map(|x| { + ArrayIterator::::new(iter, init, data_type, chunk_size, op).map(|x| { x.map(|(mut nested, array)| { let _ = nested.nested.pop().unwrap(); // the primitive let values = Arc::new(array) as Arc; diff --git a/src/io/parquet/read/primitive/nested.rs b/src/io/parquet/read/primitive/nested.rs index 96f65959d91..62b00119315 100644 --- a/src/io/parquet/read/primitive/nested.rs +++ b/src/io/parquet/read/primitive/nested.rs @@ -1,7 +1,8 @@ use std::collections::VecDeque; use parquet2::{ - encoding::Encoding, page::DataPage, schema::Repetition, types::NativeType as ParquetNativeType, + encoding::Encoding, page::DataPage, schema::Repetition, types::decode, + types::NativeType as ParquetNativeType, }; use crate::{ @@ -17,25 +18,19 @@ use super::basic::Values; // The state of a `DataPage` of `Primitive` parquet primitive type #[allow(clippy::large_enum_variant)] #[derive(Debug)] -enum State<'a, T, P, G, F> +enum State<'a, P> where - T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, - F: Copy + Fn(P) -> T, { - Optional(Optional<'a>, Values<'a, T, P, G, F>), - Required(Values<'a, T, P, G, F>), + Optional(Optional<'a>, Values<'a, P>), + Required(Values<'a, P>), //RequiredDictionary(ValuesDictionary<'a, T, P, F>), //OptionalDictionary(Optional<'a>, ValuesDictionary<'a, T, P, F>), } -impl<'a, T, P, G, F> utils::PageState<'a> for State<'a, T, P, G, F> +impl<'a, P> utils::PageState<'a> for State<'a, P> where - T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, - F: Copy + Fn(P) -> T, { fn len(&self) -> usize { match self { @@ -48,45 +43,40 @@ where } #[derive(Debug)] -struct PrimitiveDecoder +struct PrimitiveDecoder where T: NativeType, P: ParquetNativeType, - G: for<'b> Fn(&'b [u8]) -> P, F: Fn(P) -> T, { phantom: std::marker::PhantomData, phantom_p: std::marker::PhantomData

, - op1: G, - op2: F, + op: F, } -impl<'a, T, P, G, F> PrimitiveDecoder +impl<'a, T, P, F> PrimitiveDecoder where T: NativeType, P: ParquetNativeType, - G: for<'b> Fn(&'b [u8]) -> P, F: Fn(P) -> T, { #[inline] - fn new(op1: G, op2: F) -> Self { + fn new(op: F) -> Self { Self { phantom: std::marker::PhantomData, phantom_p: std::marker::PhantomData, - op1, - op2, + op, } } } -impl<'a, T, P, G, F> utils::Decoder<'a, T, Vec> for PrimitiveDecoder +impl<'a, T, P, F> utils::Decoder<'a, T, Vec> for PrimitiveDecoder where T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, F: Copy + Fn(P) -> T, { - type State = State<'a, T, P, G, F>; + type State = State<'a, P>; fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = @@ -102,13 +92,10 @@ where page, dict, self.op2, ))) }*/ - (Encoding::Plain, None, true) => Ok(State::Optional( - Optional::new(page), - Values::new(page, self.op1, self.op2), - )), - (Encoding::Plain, None, false) => { - Ok(State::Required(Values::new(page, self.op1, self.op2))) + (Encoding::Plain, None, true) => { + Ok(State::Optional(Optional::new(page), Values::new(page))) } + (Encoding::Plain, None, false) => Ok(State::Required(Values::new(page))), _ => Err(utils::not_implemented( &page.encoding(), is_optional, @@ -124,6 +111,7 @@ where } fn extend_from_state( + &self, state: &mut Self::State, values: &mut Vec, validity: &mut MutableBitmap, @@ -135,14 +123,14 @@ where read_optional_values( page_validity.definition_levels.by_ref(), max_def, - page_values.values.by_ref(), + page_values.values.by_ref().map(decode).map(self.op), values, validity, remaining, ) } State::Required(page) => { - values.extend(page.values.by_ref().take(remaining)); + values.extend(page.values.by_ref().map(decode).map(self.op).take(remaining)); } //State::OptionalDictionary(page) => todo!(), //State::RequiredDictionary(page) => todo!(), @@ -160,13 +148,12 @@ fn finish( /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays #[derive(Debug)] -pub struct ArrayIterator +pub struct ArrayIterator where I: DataPages, T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, F: Copy + Fn(P) -> T, { iter: I, @@ -176,26 +163,18 @@ where items: VecDeque<(Vec, MutableBitmap)>, nested: VecDeque, chunk_size: usize, - decoder: PrimitiveDecoder, + decoder: PrimitiveDecoder, } -impl ArrayIterator +impl ArrayIterator where I: DataPages, T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, F: Copy + Fn(P) -> T, { - pub fn new( - iter: I, - init: InitNested, - data_type: DataType, - chunk_size: usize, - op1: G, - op2: F, - ) -> Self { + pub fn new(iter: I, init: InitNested, data_type: DataType, chunk_size: usize, op: F) -> Self { Self { iter, init, @@ -203,18 +182,17 @@ where items: VecDeque::new(), nested: VecDeque::new(), chunk_size, - decoder: PrimitiveDecoder::new(op1, op2), + decoder: PrimitiveDecoder::new(op), } } } -impl Iterator for ArrayIterator +impl Iterator for ArrayIterator where I: DataPages, T: NativeType, P: ParquetNativeType, - G: Copy + for<'b> Fn(&'b [u8]) -> P, F: Copy + Fn(P) -> T, { type Item = Result<(NestedState, PrimitiveArray)>; diff --git a/src/io/parquet/read/primitive/utils.rs b/src/io/parquet/read/primitive/utils.rs deleted file mode 100644 index 97dac6f5489..00000000000 --- a/src/io/parquet/read/primitive/utils.rs +++ /dev/null @@ -1,12 +0,0 @@ -use std::convert::TryInto; - -use parquet2::types::NativeType; - -#[inline] -pub fn read_item(chunk: &[u8]) -> T { - let chunk: ::Bytes = match chunk.try_into() { - Ok(v) => v, - Err(_) => unreachable!(), - }; - T::from_le_bytes(chunk) -} diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/utils.rs index a040bfc2592..b86c8da58f3 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/utils.rs @@ -2,8 +2,7 @@ use std::collections::VecDeque; use std::convert::TryInto; use parquet2::encoding::{hybrid_rle, Encoding}; -use parquet2::metadata::ColumnDescriptor; -use parquet2::page::{split_buffer as _split_buffer, DataPage, DataPageHeader}; +use parquet2::page::{split_buffer as _split_buffer, DataPage}; use streaming_iterator::{convert, Convert, StreamingIterator}; use crate::bitmap::utils::BitmapIter; @@ -55,17 +54,8 @@ pub fn not_implemented( } #[inline] -pub fn split_buffer<'a>( - page: &'a DataPage, - descriptor: &ColumnDescriptor, -) -> (&'a [u8], &'a [u8], &'a [u8], &'static str) { - let (rep_levels, validity_buffer, values_buffer) = _split_buffer(page, descriptor); - - let version = match page.header() { - DataPageHeader::V1(_) => "V1", - DataPageHeader::V2(_) => "V2", - }; - (rep_levels, validity_buffer, values_buffer, version) +pub fn split_buffer(page: &DataPage) -> (&[u8], &[u8], &[u8]) { + _split_buffer(page, page.descriptor()) } /// A private trait representing structs that can receive elements. @@ -135,7 +125,7 @@ pub struct OptionalPageValidity<'a> { impl<'a> OptionalPageValidity<'a> { #[inline] pub fn new(page: &'a DataPage) -> Self { - let (_, validity, _, _) = split_buffer(page, page.descriptor()); + let (_, validity, _) = split_buffer(page); let validity = convert(hybrid_rle::Decoder::new(validity, 1)); Self { @@ -248,6 +238,7 @@ pub(super) trait Decoder<'a, C: Default, P: Pushable> { /// extends (values, validity) by deserializing items in `State`. /// It guarantees that the length of `values` is at most `values.len() + remaining`. fn extend_from_state( + &self, page: &mut Self::State, values: &mut P, validity: &mut MutableBitmap, @@ -280,7 +271,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a, C, P>, C: Default, P: Push let remaining = chunk_size - values.len(); // extend the current state - T::extend_from_state(&mut page, &mut values, &mut validity, remaining); + decoder.extend_from_state(&mut page, &mut values, &mut validity, remaining); if values.len() < chunk_size { // the whole page was consumed and we still do not have enough items @@ -293,7 +284,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a, C, P>, C: Default, P: Push while page.len() > 0 { let mut values = decoder.with_capacity(chunk_size); let mut validity = MutableBitmap::with_capacity(chunk_size); - T::extend_from_state(&mut page, &mut values, &mut validity, chunk_size); + decoder.extend_from_state(&mut page, &mut values, &mut validity, chunk_size); items.push_back((values, validity)) } @@ -347,3 +338,16 @@ pub(super) fn next<'a, I: DataPages, C: Default, P: Pushable, D: Decoder<'a, } } } + +#[inline] +pub(super) fn dict_indices_decoder( + indices_buffer: &[u8], + additional: usize, +) -> hybrid_rle::HybridRleDecoder { + // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), + // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). + let bit_width = indices_buffer[0]; + let indices_buffer = &indices_buffer[1..]; + + hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional) +}