From 99a91b2ce2500c69a2770b7986d5dc2d8bb84e09 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 17 Aug 2022 18:01:59 +0000 Subject: [PATCH] Bumped parquet2 --- Cargo.toml | 2 +- src/io/parquet/mod.rs | 2 +- .../parquet/read/deserialize/binary/basic.rs | 37 ++++++++++++------- .../parquet/read/deserialize/binary/nested.rs | 17 ++++++--- .../read/deserialize/boolean/nested.rs | 3 +- .../read/deserialize/dictionary/mod.rs | 12 ++++-- .../read/deserialize/dictionary/nested.rs | 7 ++-- .../deserialize/fixed_size_binary/basic.rs | 37 +++++++++---------- .../parquet/read/deserialize/nested_utils.rs | 25 +++++++++---- .../read/deserialize/primitive/basic.rs | 10 ++++- .../read/deserialize/primitive/integer.rs | 26 ++++++++----- .../read/deserialize/primitive/nested.rs | 15 +++++--- src/io/parquet/read/deserialize/utils.rs | 11 ++---- src/io/parquet/write/dictionary.rs | 8 ++-- src/io/parquet/write/nested/mod.rs | 4 +- src/io/parquet/write/row_group.rs | 2 +- 16 files changed, 133 insertions(+), 85 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 26dd1acc388..e52ad355085 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ futures = { version = "0.3", optional = true } async-stream = { version = "0.3.2", optional = true } # parquet support -parquet2 = { version = "0.15.0", optional = true, default_features = false, features = ["async"] } +parquet2 = { version = "0.16", optional = true, default_features = false, features = ["async"] } # avro support avro-schema = { version = "0.3", optional = true } diff --git a/src/io/parquet/mod.rs b/src/io/parquet/mod.rs index 87090910bb3..04684369110 100644 --- a/src/io/parquet/mod.rs +++ b/src/io/parquet/mod.rs @@ -22,6 +22,6 @@ impl From for Error { impl From for parquet2::error::Error { fn from(error: Error) -> Self { - parquet2::error::Error::General(error.to_string()) + parquet2::error::Error::OutOfSpec(error.to_string()) } } diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index 545008ad119..8790b43ea45 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -13,7 +13,7 @@ use crate::{ bitmap::{Bitmap, MutableBitmap}, buffer::Buffer, datatypes::DataType, - error::Result, + error::{Error, Result}, }; use super::super::utils::{ @@ -51,13 +51,13 @@ impl<'a> Delta<'a> { pub fn try_new(page: &'a DataPage) -> Result { let (_, _, values) = split_buffer(page)?; - let mut lengths_iter = delta_length_byte_array::Decoder::new(values); + let mut lengths_iter = delta_length_byte_array::Decoder::try_new(values)?; #[allow(clippy::needless_collect)] // we need to consume it to get the values let lengths = lengths_iter .by_ref() - .map(|x| x as usize) - .collect::>(); + .map(|x| x.map(|x| x as usize).map_err(Error::from)) + .collect::>>()?; let values = lengths_iter.into_values(); Ok(Self { @@ -405,20 +405,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { } State::OptionalDictionary(page_validity, page_values) => { let page_dict = &page_values.dict; - let op = move |index: u32| page_dict[index as usize].as_ref(); utils::extend_from_decoder( validity, page_validity, Some(additional), values, - &mut page_values.values.by_ref().map(op), + &mut page_values + .values + .by_ref() + .map(|index| page_dict[index.unwrap() as usize].as_ref()), ) } State::RequiredDictionary(page) => { let page_dict = &page.dict; - let op = move |index: u32| page_dict[index as usize].as_ref(); - for x in page.values.by_ref().map(op).take(additional) { + for x in page + .values + .by_ref() + .map(|index| page_dict[index.unwrap() as usize].as_ref()) + .take(additional) + { values.push(x) } } @@ -442,21 +448,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { } State::FilteredRequiredDictionary(page) => { let page_dict = &page.dict; - let op = move |index: u32| page_dict[index as usize].as_ref(); - - for x in page.values.by_ref().map(op).take(additional) { + for x in page + .values + .by_ref() + .map(|index| page_dict[index.unwrap() as usize].as_ref()) + .take(additional) + { values.push(x) } } State::FilteredOptionalDictionary(page_validity, page_values) => { let page_dict = &page_values.dict; - let op = move |index: u32| page_dict[index as usize].as_ref(); utils::extend_from_decoder( validity, page_validity, Some(additional), values, - &mut page_values.values.by_ref().map(op), + &mut page_values + .values + .by_ref() + .map(|index| page_dict[index.unwrap() as usize].as_ref()), ) } } diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 19d362a967f..8c0f5ef419e 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -90,7 +90,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { ) } - fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) { + fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> Result<()> { let (values, validity) = decoded; match state { State::Optional(page) => { @@ -104,18 +104,25 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { } State::RequiredDictionary(page) => { let dict_values = &page.dict; - let op = move |index: u32| dict_values[index as usize].as_ref(); - let item = page.values.next().map(op).unwrap_or_default(); + let item = page + .values + .next() + .map(|index| dict_values[index.unwrap() as usize].as_ref()) + .unwrap_or_default(); values.push(item); } State::OptionalDictionary(page) => { let dict_values = &page.dict; - let op = move |index: u32| dict_values[index as usize].as_ref(); - let item = page.values.next().map(op).unwrap_or_default(); + let item = page + .values + .next() + .map(|index| dict_values[index.unwrap() as usize].as_ref()) + .unwrap_or_default(); values.push(item); validity.push(true); } } + Ok(()) } fn push_null(&self, decoded: &mut Self::DecodedState) { diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index d4c3e8ee124..e8a3b0d5c45 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -82,7 +82,7 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder { ) } - fn push_valid(&self, state: &mut State, decoded: &mut Self::DecodedState) { + fn push_valid(&self, state: &mut State, decoded: &mut Self::DecodedState) -> Result<()> { let (values, validity) = decoded; match state { State::Optional(page_values) => { @@ -95,6 +95,7 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder { values.push(value); } } + Ok(()) } fn push_null(&self, decoded: &mut Self::DecodedState) { diff --git a/src/io/parquet/read/deserialize/dictionary/mod.rs b/src/io/parquet/read/deserialize/dictionary/mod.rs index 0f379359086..43fcaa8ab21 100644 --- a/src/io/parquet/read/deserialize/dictionary/mod.rs +++ b/src/io/parquet/read/deserialize/dictionary/mod.rs @@ -163,7 +163,8 @@ where Some(remaining), values, &mut page.values.by_ref().map(|x| { - let x: usize = x.try_into().unwrap(); + // todo: rm unwrap + let x: usize = x.unwrap().try_into().unwrap(); match x.try_into() { Ok(key) => key, // todo: convert this to an error. @@ -176,7 +177,8 @@ where page.values .by_ref() .map(|x| { - let x: usize = x.try_into().unwrap(); + // todo: rm unwrap + let x: usize = x.unwrap().try_into().unwrap(); let x: K = match x.try_into() { Ok(key) => key, // todo: convert this to an error. @@ -195,7 +197,8 @@ where Some(remaining), values, &mut page_values.by_ref().map(|x| { - let x: usize = x.try_into().unwrap(); + // todo: rm unwrap + let x: usize = x.unwrap().try_into().unwrap(); let x: K = match x.try_into() { Ok(key) => key, // todo: convert this to an error. @@ -211,7 +214,8 @@ where page.values .by_ref() .map(|x| { - let x: usize = x.try_into().unwrap(); + // todo: rm unwrap + let x: usize = x.unwrap().try_into().unwrap(); let x: K = match x.try_into() { Ok(key) => key, // todo: convert this to an error. diff --git a/src/io/parquet/read/deserialize/dictionary/nested.rs b/src/io/parquet/read/deserialize/dictionary/nested.rs index 6d01679f25e..89b8701231f 100644 --- a/src/io/parquet/read/deserialize/dictionary/nested.rs +++ b/src/io/parquet/read/deserialize/dictionary/nested.rs @@ -110,11 +110,11 @@ impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder { ) } - fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) { + fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> Result<()> { let (values, validity) = decoded; match state { State::Optional(page_values) => { - let key = page_values.next(); + let key = page_values.next().transpose()?; // todo: convert unwrap to error let key = match K::try_from(key.unwrap_or_default() as usize) { Ok(key) => key, @@ -124,7 +124,7 @@ impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder { validity.push(true); } State::Required(page_values) => { - let key = page_values.values.next(); + let key = page_values.values.next().transpose()?; let key = match K::try_from(key.unwrap_or_default() as usize) { Ok(key) => key, Err(_) => todo!(), @@ -132,6 +132,7 @@ impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder { values.push(key); } } + Ok(()) } fn push_null(&self, decoded: &mut Self::DecodedState) { diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs index e8de6cbf3f8..ab47aa98cf3 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs @@ -227,27 +227,26 @@ impl<'a> Decoder<'a> for BinaryDecoder { values.push(x) } } - State::OptionalDictionary(page) => { - let op = |index: u32| { - let index = index as usize; + State::OptionalDictionary(page) => extend_from_decoder( + validity, + &mut page.validity, + Some(remaining), + values, + page.values.by_ref().map(|index| { + let index = index.unwrap() as usize; &page.dict[index * self.size..(index + 1) * self.size] - }; - - extend_from_decoder( - validity, - &mut page.validity, - Some(remaining), - values, - page.values.by_ref().map(op), - ) - } + }), + ), State::RequiredDictionary(page) => { - let op = |index: u32| { - let index = index as usize; - &page.dict[index * self.size..(index + 1) * self.size] - }; - - for x in page.values.by_ref().map(op).take(remaining) { + for x in page + .values + .by_ref() + .map(|index| { + let index = index.unwrap() as usize; + &page.dict[index * self.size..(index + 1) * self.size] + }) + .take(remaining) + { values.push(x) } } diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 73679d34224..8bfc2e8c95e 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -258,7 +258,7 @@ pub(super) trait NestedDecoder<'a> { /// Initializes a new state fn with_capacity(&self, capacity: usize) -> Self::DecodedState; - fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState); + fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> Result<()>; fn push_null(&self, decoded: &mut Self::DecodedState); fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary; @@ -309,9 +309,9 @@ impl<'a> NestedPage<'a> { let max_def_level = page.descriptor.max_def_level; let reps = - HybridRleDecoder::new(rep_levels, get_bit_width(max_rep_level), page.num_values()); + HybridRleDecoder::try_new(rep_levels, get_bit_width(max_rep_level), page.num_values())?; let defs = - HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values()); + HybridRleDecoder::try_new(def_levels, get_bit_width(max_def_level), page.num_values())?; let iter = reps.zip(defs).peekable(); @@ -377,7 +377,7 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( &mut decoded, decoder, additional, - ); + )?; *remaining -= nested.len() - existing; items.push_back((nested, decoded)); @@ -393,7 +393,7 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( &mut decoded, decoder, additional, - ); + )?; *remaining -= nested.len(); items.push_back((nested, decoded)); } @@ -407,7 +407,7 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>( decoded: &mut D::DecodedState, decoder: &D, additional: usize, -) { +) -> Result<()> { let mut values_count = vec![0; nested.len()]; for (depth, nest) in nested.iter().enumerate().skip(1) { @@ -431,6 +431,8 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>( let mut rows = 0; while let Some((rep, def)) = page.iter.next() { + let rep = rep?; + let def = def?; if rep == 0 { rows += 1; } @@ -455,7 +457,7 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>( // the leaf / primitive let is_valid = (def != cum_sum[depth]) || !nest.is_nullable(); if right_level && is_valid { - decoder.push_valid(values_state, decoded); + decoder.push_valid(values_state, decoded)?; } else { decoder.push_null(decoded); } @@ -463,12 +465,19 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>( } } - let next_rep = page.iter.peek().map(|x| x.0).unwrap_or(0); + let next_rep = *page + .iter + .peek() + .map(|x| x.0.as_ref()) + .transpose() + .unwrap() // todo: fix this + .unwrap_or(&0); if next_rep == 0 && rows == additional { break; } } + Ok(()) } #[inline] diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index 67f017872ca..34a3702f3d8 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -232,12 +232,18 @@ where page_validity, Some(remaining), values, - &mut page_values.values.by_ref().map(op1), + &mut page_values.values.by_ref().map(|x| x.unwrap()).map(op1), ) } State::RequiredDictionary(page) => { let op1 = |index: u32| page.dict[index as usize]; - values.extend(page.values.by_ref().map(op1).take(remaining)); + values.extend( + page.values + .by_ref() + .map(|x| x.unwrap()) + .map(op1) + .take(remaining), + ); } State::FilteredRequired(page) => { values.extend( diff --git a/src/io/parquet/read/deserialize/primitive/integer.rs b/src/io/parquet/read/deserialize/primitive/integer.rs index 3e09e421505..f55b0bec3da 100644 --- a/src/io/parquet/read/deserialize/primitive/integer.rs +++ b/src/io/parquet/read/deserialize/primitive/integer.rs @@ -13,7 +13,7 @@ use crate::{ array::MutablePrimitiveArray, bitmap::MutableBitmap, datatypes::DataType, - error::Result, + error::{Error, Result}, io::parquet::read::deserialize::utils::{ get_selected_rows, FilteredOptionalPageValidity, OptionalPageValidity, }, @@ -94,18 +94,20 @@ where match (page.encoding(), dict, is_optional, is_filtered) { (Encoding::DeltaBinaryPacked, _, false, false) => { let (_, _, values) = split_buffer(page)?; - Ok(State::DeltaBinaryPackedRequired(Decoder::new(values))) + Decoder::try_new(values) + .map(State::DeltaBinaryPackedRequired) + .map_err(Error::from) } (Encoding::DeltaBinaryPacked, _, true, false) => { let (_, _, values) = split_buffer(page)?; Ok(State::DeltaBinaryPackedOptional( OptionalPageValidity::try_new(page)?, - Decoder::new(values), + Decoder::try_new(values)?, )) } (Encoding::DeltaBinaryPacked, _, false, true) => { let (_, _, values) = split_buffer(page)?; - let values = Decoder::new(values); + let values = Decoder::try_new(values)?; let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); @@ -114,7 +116,7 @@ where } (Encoding::DeltaBinaryPacked, _, true, true) => { let (_, _, values) = split_buffer(page)?; - let values = Decoder::new(values); + let values = Decoder::try_new(values)?; Ok(State::FilteredDeltaBinaryPackedOptional( FilteredOptionalPageValidity::try_new(page)?, @@ -142,7 +144,7 @@ where values.extend( state .by_ref() - .map(|x| x.as_()) + .map(|x| x.unwrap().as_()) .map(self.0.op) .take(remaining), ); @@ -153,13 +155,16 @@ where page_validity, Some(remaining), values, - page_values.by_ref().map(|x| x.as_()).map(self.0.op), + page_values + .by_ref() + .map(|x| x.unwrap().as_()) + .map(self.0.op), ) } State::FilteredDeltaBinaryPackedRequired(page) => { values.extend( page.by_ref() - .map(|x| x.as_()) + .map(|x| x.unwrap().as_()) .map(self.0.op) .take(remaining), ); @@ -170,7 +175,10 @@ where page_validity, Some(remaining), values, - page_values.by_ref().map(|x| x.as_()).map(self.0.op), + page_values + .by_ref() + .map(|x| x.unwrap().as_()) + .map(self.0.op), ); } } diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 438ca11afab..4798409acb3 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -113,7 +113,7 @@ where ) } - fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) { + fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> Result<()> { let (values, validity) = decoded; match state { State::Optional(page_values) => { @@ -128,19 +128,24 @@ where values.push(value.unwrap_or_default()); } State::RequiredDictionary(page) => { - let op1 = |index: u32| page.dict[index as usize]; - let value = page.values.next().map(op1); + let value = page + .values + .next() + .map(|index| page.dict[index.unwrap() as usize]); values.push(value.unwrap_or_default()); } State::OptionalDictionary(page) => { - let op1 = |index: u32| page.dict[index as usize]; - let value = page.values.next().map(op1); + let value = page + .values + .next() + .map(|index| page.dict[index.unwrap() as usize]); values.push(value.unwrap_or_default()); validity.push(true); } } + Ok(()) } fn push_null(&self, decoded: &mut Self::DecodedState) { diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index b847709e7b2..2c137119b43 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -126,7 +126,7 @@ impl<'a> PageValidity<'a> for FilteredOptionalPageValidity<'a> { (run, offset) } else { // a new run - let run = self.iter.next()?; // no run -> None + let run = self.iter.next()?.unwrap(); // no run -> None self.current = Some((run, 0)); return self.next_limited(limit); }; @@ -234,7 +234,7 @@ impl<'a> OptionalPageValidity<'a> { (run, offset) } else { // a new run - let run = self.iter.next()?; // no run -> None + let run = self.iter.next()?.unwrap(); // no run -> None self.current = Some((run, 0)); return self.next_limited(limit); }; @@ -492,9 +492,6 @@ pub(super) fn dict_indices_decoder(page: &DataPage) -> Result( let keys = keys .zip(validity.iter()) .filter_map(|(key, is_valid)| is_valid.then(|| key)); - let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8; + let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64); let keys = utils::ExactSizedIter::new(keys, array.len() - validity.unset_bits()); // num_bits as a single byte - buffer.push(num_bits); + buffer.push(num_bits as u8); // followed by the encoded indices. Ok(encode_u32(buffer, keys, num_bits)?) } else { - let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8; + let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64); // num_bits as a single byte - buffer.push(num_bits); + buffer.push(num_bits as u8); // followed by the encoded indices. Ok(encode_u32(buffer, keys, num_bits)?) diff --git a/src/io/parquet/write/nested/mod.rs b/src/io/parquet/write/nested/mod.rs index 39a3f4eddf5..9331ec45f2b 100644 --- a/src/io/parquet/write/nested/mod.rs +++ b/src/io/parquet/write/nested/mod.rs @@ -33,7 +33,7 @@ fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - if max_level == 0 { return Ok(()); } - let num_bits = get_bit_width(max_level) as u8; + let num_bits = get_bit_width(max_level); let levels = rep::RepLevelsIter::new(nested); @@ -61,7 +61,7 @@ fn write_def_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - if max_level == 0 { return Ok(()); } - let num_bits = get_bit_width(max_level) as u8; + let num_bits = get_bit_width(max_level); let levels = def::DefLevelsIter::new(nested); diff --git a/src/io/parquet/write/row_group.rs b/src/io/parquet/write/row_group.rs index 5558bab49e9..00fe53278d8 100644 --- a/src/io/parquet/write/row_group.rs +++ b/src/io/parquet/write/row_group.rs @@ -45,7 +45,7 @@ pub fn row_group_iter + 'static + Send + Sync>( let pages = DynIter::new( pages .into_iter() - .map(|x| x.map_err(|e| ParquetError::General(e.to_string()))), + .map(|x| x.map_err(|e| ParquetError::OutOfSpec(e.to_string()))), ); let compressed_pages = Compressor::new(pages, options.compression, vec![])