diff --git a/Cargo.toml b/Cargo.toml index 3fe3cb06c78..ac0c51c1809 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,8 @@ futures = { version = "0.3", optional = true } ahash = { version = "0.7" } # parquet support -parquet2 = { version = "0.14.0", optional = true, default_features = false } +#parquet2 = { version = "0.14.0", optional = true, default_features = false } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "delay_dict", optional = true, default_features = false } # avro support avro-schema = { version = "0.2", optional = true } diff --git a/src/io/parquet/read/deserialize/README.md b/src/io/parquet/read/deserialize/README.md index 971ed424784..347424fca2f 100644 --- a/src/io/parquet/read/deserialize/README.md +++ b/src/io/parquet/read/deserialize/README.md @@ -7,7 +7,7 @@ module for non-nested arrays is `simple::page_iter_to_arrays`. This function expects -* a (fallible) streaming iterator of decompressed and encoded pages, `DataPages` +* a (fallible) streaming iterator of decompressed and encoded pages, `Pages` * the source (parquet) column type, including its logical information * the target (arrow) `DataType` * the chunk size @@ -18,7 +18,7 @@ This design is shared among _all_ `(parquet, arrow)` implemented tuples. Their m difference is how they are deserialized, which depends on the source and target types. When the array iterator is pulled the first time, the following happens: -* a page from `DataPages` is pulled +* a page from `Pages` is pulled * a `PageState<'a>` is built from the page * the `PageState` is consumed into a mutable array: * if `chunk_size` is larger than the number of rows in the page, the mutable array state is preserved and a new page is pulled and the process repeated until we fill a chunk. diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index 3732839b7b0..cd4a7805bbb 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -4,7 +4,7 @@ use std::default::Default; use parquet2::{ deserialize::SliceFilteredIter, encoding::{hybrid_rle, Encoding}, - page::{split_buffer, BinaryPageDict, DataPage}, + page::{split_buffer, DataPage, DictPage}, schema::Repetition, }; @@ -20,7 +20,7 @@ use super::super::utils::{ extend_from_decoder, get_selected_rows, next, DecodedState, FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity, }; -use super::super::DataPages; +use super::super::Pages; use super::{super::utils, utils::*}; /* @@ -99,14 +99,16 @@ impl<'a> FilteredRequired<'a> { } } +pub(super) type Dict = Vec>; + #[derive(Debug)] pub(super) struct RequiredDictionary<'a> { pub values: hybrid_rle::HybridRleDecoder<'a>, - pub dict: &'a BinaryPageDict, + pub dict: &'a Dict, } impl<'a> RequiredDictionary<'a> { - pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result { + pub fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result { let values = utils::dict_indices_decoder(page)?; Ok(Self { dict, values }) @@ -121,11 +123,11 @@ impl<'a> RequiredDictionary<'a> { #[derive(Debug)] pub(super) struct FilteredRequiredDictionary<'a> { pub values: SliceFilteredIter>, - pub dict: &'a BinaryPageDict, + pub dict: &'a Dict, } impl<'a> FilteredRequiredDictionary<'a> { - pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result { + pub fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result { let values = utils::dict_indices_decoder(page)?; let rows = get_selected_rows(page); @@ -143,11 +145,11 @@ impl<'a> FilteredRequiredDictionary<'a> { #[derive(Debug)] pub(super) struct ValuesDictionary<'a> { pub values: hybrid_rle::HybridRleDecoder<'a>, - pub dict: &'a BinaryPageDict, + pub dict: &'a Dict, } impl<'a> ValuesDictionary<'a> { - pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result { + pub fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result { let values = utils::dict_indices_decoder(page)?; Ok(Self { dict, values }) @@ -232,42 +234,29 @@ struct BinaryDecoder { impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { type State = State<'a>; + type Dict = Dict; type DecodedState = (Binary, MutableBitmap); - fn build_state(&self, page: &'a DataPage) -> Result { + fn build_state(&self, page: &'a DataPage, dict: Option<&'a Self::Dict>) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); - match ( - page.encoding(), - page.dictionary_page(), - is_optional, - is_filtered, - ) { - (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { - Ok(State::RequiredDictionary(RequiredDictionary::try_new( - page, - dict.as_any().downcast_ref().unwrap(), - )?)) - } + match (page.encoding(), dict, is_optional, is_filtered) { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => Ok( + State::RequiredDictionary(RequiredDictionary::try_new(page, dict)?), + ), (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { - let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::OptionalDictionary( OptionalPageValidity::try_new(page)?, ValuesDictionary::try_new(page, dict)?, )) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, true) => { - let dict = dict.as_any().downcast_ref().unwrap(); - FilteredRequiredDictionary::try_new(page, dict) .map(State::FilteredRequiredDictionary) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, true) => { - let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::FilteredOptionalDictionary( FilteredOptionalPageValidity::try_new(page)?, ValuesDictionary::try_new(page, dict)?, @@ -332,15 +321,8 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { } } State::OptionalDictionary(page_validity, page_values) => { - let dict_values = page_values.dict.values(); - let dict_offsets = page_values.dict.offsets(); - - let op = move |index: u32| { - let index = index as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - &dict_values[dict_offset_i..dict_offset_ip1] - }; + let page_dict = &page_values.dict; + let op = move |index: u32| page_dict[index as usize].as_ref(); utils::extend_from_decoder( validity, page_validity, @@ -350,14 +332,8 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { ) } State::RequiredDictionary(page) => { - let dict_values = page.dict.values(); - let dict_offsets = page.dict.offsets(); - let op = move |index: u32| { - let index = index as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - &dict_values[dict_offset_i..dict_offset_ip1] - }; + let page_dict = &page.dict; + let op = move |index: u32| page_dict[index as usize].as_ref(); for x in page.values.by_ref().map(op).take(additional) { values.push(x) @@ -373,29 +349,16 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { ); } State::FilteredRequiredDictionary(page) => { - let dict_values = page.dict.values(); - let dict_offsets = page.dict.offsets(); - let op = move |index: u32| { - let index = index as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - &dict_values[dict_offset_i..dict_offset_ip1] - }; + let page_dict = &page.dict; + let op = move |index: u32| page_dict[index as usize].as_ref(); for x in page.values.by_ref().map(op).take(additional) { values.push(x) } } State::FilteredOptionalDictionary(page_validity, page_values) => { - let dict_values = page_values.dict.values(); - let dict_offsets = page_values.dict.offsets(); - - let op = move |index: u32| { - let index = index as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - &dict_values[dict_offset_i..dict_offset_ip1] - }; + let page_dict = &page_values.dict; + let op = move |index: u32| page_dict[index as usize].as_ref(); utils::extend_from_decoder( validity, page_validity, @@ -406,6 +369,10 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { } } } + + fn deserialize_dict(&self, page: &DictPage) -> Self::Dict { + deserialize_plain(&page.buffer, page.num_values) + } } pub(super) fn finish>( @@ -421,21 +388,23 @@ pub(super) fn finish>( ) } -pub struct Iter, I: DataPages> { +pub struct Iter, I: Pages> { iter: I, data_type: DataType, items: VecDeque<(Binary, MutableBitmap)>, + dict: Option, chunk_size: Option, remaining: usize, phantom_a: std::marker::PhantomData, } -impl, I: DataPages> Iter { +impl, I: Pages> Iter { pub fn new(iter: I, data_type: DataType, chunk_size: Option, num_rows: usize) -> Self { Self { iter, data_type, items: VecDeque::new(), + dict: None, chunk_size, remaining: num_rows, phantom_a: Default::default(), @@ -443,13 +412,14 @@ impl, I: DataPages> Iter { } } -impl, I: DataPages> Iterator for Iter { +impl, I: Pages> Iterator for Iter { type Item = Result; fn next(&mut self) -> Option { let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.dict, &mut self.remaining, self.chunk_size, &BinaryDecoder::::default(), @@ -464,3 +434,9 @@ impl, I: DataPages> Iterator for Iter } } } + +pub(super) fn deserialize_plain(values: &[u8], num_values: usize) -> Dict { + SizedBinaryIter::new(values, num_values) + .map(|x| x.to_vec()) + .collect() +} diff --git a/src/io/parquet/read/deserialize/binary/dictionary.rs b/src/io/parquet/read/deserialize/binary/dictionary.rs index fae1f54882e..3000a7ca7d2 100644 --- a/src/io/parquet/read/deserialize/binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/binary/dictionary.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use parquet2::page::{BinaryPageDict, DictPage}; +use parquet2::page::DictPage; use crate::{ array::{Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, Utf8Array}, @@ -10,21 +10,21 @@ use crate::{ io::parquet::read::deserialize::nested_utils::{InitNested, NestedState}, }; -use super::super::dictionary::*; -use super::super::utils::MaybeNext; -use super::super::DataPages; +use super::super::Pages; +use super::{super::dictionary::*, utils::SizedBinaryIter}; +use super::{super::utils::MaybeNext, utils::Binary}; -/// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation +/// An iterator adapter over [`Pages`] assumed to be encoded as parquet's dictionary-encoded binary representation #[derive(Debug)] pub struct DictIter where - I: DataPages, + I: Pages, O: Offset, K: DictionaryKey, { iter: I, data_type: DataType, - values: Dict, + values: Option>, items: VecDeque<(Vec, MutableBitmap)>, remaining: usize, chunk_size: Option, @@ -35,13 +35,13 @@ impl DictIter where K: DictionaryKey, O: Offset, - I: DataPages, + I: Pages, { pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option) -> Self { Self { iter, data_type, - values: Dict::Empty, + values: None, items: VecDeque::new(), remaining: num_rows, chunk_size, @@ -50,40 +50,35 @@ where } } -fn read_dict(data_type: DataType, dict: &dyn DictPage) -> Box { +fn read_dict(data_type: DataType, dict: &DictPage) -> Box { let data_type = match data_type { DataType::Dictionary(_, values, _) => *values, _ => data_type, }; - let dict = dict.as_any().downcast_ref::().unwrap(); - let offsets = dict - .offsets() - .iter() - .map(|x| O::from_usize(*x as usize).unwrap()) - .collect::>(); - let values = dict.values().to_vec(); + let values = SizedBinaryIter::new(&dict.buffer, dict.num_values); + + let mut data = Binary::::with_capacity(dict.num_values); + data.values = Vec::with_capacity(dict.buffer.len() - 4 * dict.num_values); + for item in values { + data.push(item) + } match data_type.to_physical_type() { - PhysicalType::Utf8 | PhysicalType::LargeUtf8 => Box::new(Utf8Array::::from_data( - data_type, - offsets.into(), - values.into(), - None, - )) as _, - PhysicalType::Binary | PhysicalType::LargeBinary => Box::new(BinaryArray::::from_data( - data_type, - offsets.into(), - values.into(), - None, - )) as _, + PhysicalType::Utf8 | PhysicalType::LargeUtf8 => { + Utf8Array::::new(data_type, data.offsets.0.into(), data.values.into(), None).boxed() + } + PhysicalType::Binary | PhysicalType::LargeBinary => { + BinaryArray::::new(data_type, data.offsets.0.into(), data.values.into(), None) + .boxed() + } _ => unreachable!(), } } impl Iterator for DictIter where - I: DataPages, + I: Pages, O: Offset, K: DictionaryKey, { @@ -112,14 +107,14 @@ where #[derive(Debug)] pub struct NestedDictIter where - I: DataPages, + I: Pages, O: Offset, K: DictionaryKey, { iter: I, init: Vec, data_type: DataType, - values: Dict, + values: Option>, items: VecDeque<(NestedState, (Vec, MutableBitmap))>, remaining: usize, chunk_size: Option, @@ -128,7 +123,7 @@ where impl NestedDictIter where - I: DataPages, + I: Pages, O: Offset, K: DictionaryKey, { @@ -143,7 +138,7 @@ where iter, init, data_type, - values: Dict::Empty, + values: None, items: VecDeque::new(), remaining: num_rows, chunk_size, @@ -154,7 +149,7 @@ where impl Iterator for NestedDictIter where - I: DataPages, + I: Pages, O: Offset, K: DictionaryKey, { diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 355cf3d6cb5..19d362a967f 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -2,22 +2,22 @@ use std::collections::VecDeque; use parquet2::{ encoding::Encoding, - page::{split_buffer, DataPage}, + page::{split_buffer, DataPage, DictPage}, schema::Repetition, }; use crate::{ array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result, - io::parquet::read::DataPages, + io::parquet::read::Pages, }; -use super::super::nested_utils::*; use super::super::utils::MaybeNext; use super::basic::ValuesDictionary; use super::utils::*; +use super::{super::nested_utils::*, basic::deserialize_plain}; use super::{ super::utils, - basic::{finish, TraitBinaryArray}, + basic::{finish, Dict, TraitBinaryArray}, }; #[derive(Debug)] @@ -46,25 +46,23 @@ struct BinaryDecoder { impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { type State = State<'a>; + type Dictionary = Dict; type DecodedState = (Binary, MutableBitmap); - fn build_state(&self, page: &'a DataPage) -> Result { + fn build_state( + &self, + page: &'a DataPage, + dict: Option<&'a Self::Dictionary>, + ) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); - match ( - page.encoding(), - page.dictionary_page(), - is_optional, - is_filtered, - ) { + match (page.encoding(), dict, is_optional, is_filtered) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { - let dict = dict.as_any().downcast_ref().unwrap(); ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { - let dict = dict.as_any().downcast_ref().unwrap(); ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary) } (Encoding::Plain, _, true, false) => { @@ -105,28 +103,14 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { values.push(value); } State::RequiredDictionary(page) => { - let dict_values = page.dict.values(); - let dict_offsets = page.dict.offsets(); - - let op = move |index: u32| { - let index = index as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - &dict_values[dict_offset_i..dict_offset_ip1] - }; + let dict_values = &page.dict; + let op = move |index: u32| dict_values[index as usize].as_ref(); let item = page.values.next().map(op).unwrap_or_default(); values.push(item); } State::OptionalDictionary(page) => { - let dict_values = page.dict.values(); - let dict_offsets = page.dict.offsets(); - - let op = move |index: u32| { - let index = index as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - &dict_values[dict_offset_i..dict_offset_ip1] - }; + let dict_values = &page.dict; + let op = move |index: u32| dict_values[index as usize].as_ref(); let item = page.values.next().map(op).unwrap_or_default(); values.push(item); validity.push(true); @@ -139,19 +123,24 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { values.push(&[]); validity.push(false); } + + fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary { + deserialize_plain(&page.buffer, page.num_values) + } } -pub struct NestedIter, I: DataPages> { +pub struct NestedIter, I: Pages> { iter: I, data_type: DataType, init: Vec, items: VecDeque<(NestedState, (Binary, MutableBitmap))>, + dict: Option, chunk_size: Option, remaining: usize, phantom_a: std::marker::PhantomData, } -impl, I: DataPages> NestedIter { +impl, I: Pages> NestedIter { pub fn new( iter: I, init: Vec, @@ -164,6 +153,7 @@ impl, I: DataPages> NestedIter { data_type, init, items: VecDeque::new(), + dict: None, chunk_size, remaining: num_rows, phantom_a: Default::default(), @@ -171,13 +161,14 @@ impl, I: DataPages> NestedIter { } } -impl, I: DataPages> Iterator for NestedIter { +impl, I: Pages> Iterator for NestedIter { type Item = Result<(NestedState, A)>; fn next(&mut self) -> Option { let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.dict, &mut self.remaining, &self.init, self.chunk_size, diff --git a/src/io/parquet/read/deserialize/boolean/basic.rs b/src/io/parquet/read/deserialize/boolean/basic.rs index 572565a6d5d..e74cc50af5e 100644 --- a/src/io/parquet/read/deserialize/boolean/basic.rs +++ b/src/io/parquet/read/deserialize/boolean/basic.rs @@ -3,7 +3,7 @@ use std::collections::VecDeque; use parquet2::{ deserialize::SliceFilteredIter, encoding::Encoding, - page::{split_buffer, DataPage}, + page::{split_buffer, DataPage, DictPage}, schema::Repetition, }; @@ -19,7 +19,7 @@ use super::super::utils::{ extend_from_decoder, get_selected_rows, next, DecodedState, Decoder, FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity, }; -use super::super::DataPages; +use super::super::Pages; #[derive(Debug)] struct Values<'a>(BitmapIter<'a>); @@ -111,9 +111,10 @@ struct BooleanDecoder {} impl<'a> Decoder<'a> for BooleanDecoder { type State = State<'a>; + type Dict = (); type DecodedState = (MutableBitmap, MutableBitmap); - fn build_state(&self, page: &'a DataPage) -> Result { + fn build_state(&self, page: &'a DataPage, _: Option<&'a Self::Dict>) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); @@ -179,15 +180,17 @@ impl<'a> Decoder<'a> for BooleanDecoder { } } } + + fn deserialize_dict(&self, _: &DictPage) -> Self::Dict {} } fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap) -> BooleanArray { BooleanArray::new(data_type.clone(), values.into(), validity.into()) } -/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays +/// An iterator adapter over [`Pages`] assumed to be encoded as boolean arrays #[derive(Debug)] -pub struct Iter { +pub struct Iter { iter: I, data_type: DataType, items: VecDeque<(MutableBitmap, MutableBitmap)>, @@ -195,7 +198,7 @@ pub struct Iter { remaining: usize, } -impl Iter { +impl Iter { pub fn new(iter: I, data_type: DataType, chunk_size: Option, num_rows: usize) -> Self { Self { iter, @@ -207,13 +210,14 @@ impl Iter { } } -impl Iterator for Iter { +impl Iterator for Iter { type Item = Result; fn next(&mut self) -> Option { let maybe_state = next( &mut self.iter, &mut self.items, + &mut None, &mut self.remaining, self.chunk_size, &BooleanDecoder::default(), diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index d04e0d50bd1..d4c3e8ee124 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use parquet2::{ encoding::Encoding, - page::{split_buffer, DataPage}, + page::{split_buffer, DataPage, DictPage}, schema::Repetition, }; @@ -16,7 +16,7 @@ use crate::{ use super::super::nested_utils::*; use super::super::utils; use super::super::utils::MaybeNext; -use super::super::DataPages; +use super::super::Pages; // The state of a `DataPage` of `Boolean` parquet boolean type #[allow(clippy::large_enum_variant)] @@ -46,9 +46,14 @@ struct BooleanDecoder {} impl<'a> NestedDecoder<'a> for BooleanDecoder { type State = State<'a>; + type Dictionary = (); type DecodedState = (MutableBitmap, MutableBitmap); - fn build_state(&self, page: &'a DataPage) -> Result { + fn build_state( + &self, + page: &'a DataPage, + _: Option<&'a Self::Dictionary>, + ) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); @@ -97,11 +102,13 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder { values.push(false); validity.push(false); } + + fn deserialize_dict(&self, _: &DictPage) -> Self::Dictionary {} } -/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays +/// An iterator adapter over [`Pages`] assumed to be encoded as boolean arrays #[derive(Debug)] -pub struct NestedIter { +pub struct NestedIter { iter: I, init: Vec, items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>, @@ -109,7 +116,7 @@ pub struct NestedIter { chunk_size: Option, } -impl NestedIter { +impl NestedIter { pub fn new(iter: I, init: Vec, num_rows: usize, chunk_size: Option) -> Self { Self { iter, @@ -125,13 +132,14 @@ fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap) BooleanArray::new(data_type.clone(), values.into(), validity.into()) } -impl Iterator for NestedIter { +impl Iterator for NestedIter { type Item = Result<(NestedState, BooleanArray)>; fn next(&mut self) -> Option { let maybe_state = next( &mut self.iter, &mut self.items, + &mut None, &mut self.remaining, &self.init, self.chunk_size, diff --git a/src/io/parquet/read/deserialize/dictionary/mod.rs b/src/io/parquet/read/deserialize/dictionary/mod.rs index 4b37f6e8291..0f379359086 100644 --- a/src/io/parquet/read/deserialize/dictionary/mod.rs +++ b/src/io/parquet/read/deserialize/dictionary/mod.rs @@ -5,7 +5,7 @@ use std::collections::VecDeque; use parquet2::{ deserialize::SliceFilteredIter, encoding::{hybrid_rle::HybridRleDecoder, Encoding}, - page::{DataPage, DictPage}, + page::{DataPage, DictPage, Page}, schema::Repetition, }; @@ -21,7 +21,7 @@ use super::{ self, dict_indices_decoder, extend_from_decoder, get_selected_rows, DecodedState, Decoder, FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity, }, - DataPages, + Pages, }; // The state of a `DataPage` of `Primitive` parquet primitive type @@ -114,9 +114,10 @@ where K: DictionaryKey, { type State = State<'a>; + type Dict = (); type DecodedState = (Vec, MutableBitmap); - fn build_state(&self, page: &'a DataPage) -> Result { + fn build_state(&self, page: &'a DataPage, _: Option<&'a Self::Dict>) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); @@ -225,21 +226,8 @@ where } } } -} -#[derive(Debug)] -pub enum Dict { - Empty, - Complete(Box), -} - -impl Dict { - pub fn unwrap(&self) -> Box { - match self { - Self::Empty => panic!(), - Self::Complete(array) => array.clone(), - } - } + fn deserialize_dict(&self, _: &DictPage) -> Self::Dict {} } fn finish_key(values: Vec, validity: MutableBitmap) -> PrimitiveArray { @@ -247,15 +235,10 @@ fn finish_key(values: Vec, validity: MutableBitmap) -> Prim } #[inline] -pub(super) fn next_dict< - 'a, - K: DictionaryKey, - I: DataPages, - F: Fn(&dyn DictPage) -> Box, ->( +pub(super) fn next_dict<'a, K: DictionaryKey, I: Pages, F: Fn(&DictPage) -> Box>( iter: &'a mut I, items: &mut VecDeque<(Vec, MutableBitmap)>, - dict: &mut Dict, + dict: &mut Option>, data_type: DataType, remaining: &mut usize, chunk_size: Option, @@ -264,26 +247,32 @@ pub(super) fn next_dict< if items.len() > 1 { let (values, validity) = items.pop_front().unwrap(); let keys = finish_key(values, validity); - return MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap())); + return MaybeNext::Some(DictionaryArray::try_new( + data_type, + keys, + dict.clone().unwrap(), + )); } match iter.next() { Err(e) => MaybeNext::Some(Err(e.into())), Ok(Some(page)) => { - // consume the dictionary page - match (&dict, page.dictionary_page()) { - (Dict::Empty, None) => { + let (page, dict) = match (&dict, page) { + (None, Page::Data(_)) => { return MaybeNext::Some(Err(Error::nyi( "dictionary arrays from non-dict-encoded pages", ))); } - (Dict::Empty, Some(dict_page)) => { - *dict = Dict::Complete(read_dict(dict_page.as_ref())) + (_, Page::Dict(dict_page)) => { + *dict = Some(read_dict(dict_page)); + return next_dict( + iter, items, dict, data_type, remaining, chunk_size, read_dict, + ); } - (Dict::Complete(_), _) => {} + (Some(dict), Page::Data(page)) => (page, dict), }; // there is a new page => consume the page from the start - let maybe_page = PrimitiveDecoder::::default().build_state(page); + let maybe_page = PrimitiveDecoder::::default().build_state(page, None); let page = match maybe_page { Ok(page) => page, Err(e) => return MaybeNext::Some(Err(e)), @@ -302,7 +291,7 @@ pub(super) fn next_dict< } else { let (values, validity) = items.pop_front().unwrap(); let keys = finish_key(values, validity); - MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap())) + MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.clone())) } } Ok(None) => { @@ -312,7 +301,11 @@ pub(super) fn next_dict< debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX)); let keys = finish_key(values, validity); - MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap())) + MaybeNext::Some(DictionaryArray::try_new( + data_type, + keys, + dict.clone().unwrap(), + )) } else { MaybeNext::None } diff --git a/src/io/parquet/read/deserialize/dictionary/nested.rs b/src/io/parquet/read/deserialize/dictionary/nested.rs index ced5808349d..6d01679f25e 100644 --- a/src/io/parquet/read/deserialize/dictionary/nested.rs +++ b/src/io/parquet/read/deserialize/dictionary/nested.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use parquet2::{ encoding::{hybrid_rle::HybridRleDecoder, Encoding}, - page::{DataPage, DictPage}, + page::{DataPage, DictPage, Page}, schema::Repetition, }; @@ -14,10 +14,10 @@ use crate::{ }; use super::{ - super::super::DataPages, + super::super::Pages, super::nested_utils::*, super::utils::{dict_indices_decoder, not_implemented, MaybeNext, PageState}, - finish_key, Dict, + finish_key, }; // The state of a required DataPage with a boolean physical type @@ -80,9 +80,14 @@ where impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder { type State = State<'a>; + type Dictionary = (); type DecodedState = (Vec, MutableBitmap); - fn build_state(&self, page: &'a DataPage) -> Result { + fn build_state( + &self, + page: &'a DataPage, + _: Option<&'a Self::Dictionary>, + ) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); @@ -134,15 +139,17 @@ impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder { values.push(K::default()); validity.push(false) } + + fn deserialize_dict(&self, _: &DictPage) -> Self::Dictionary {} } #[allow(clippy::too_many_arguments)] -pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box>( +pub fn next_dict<'a, K: DictionaryKey, I: Pages, F: Fn(&DictPage) -> Box>( iter: &'a mut I, items: &mut VecDeque<(NestedState, (Vec, MutableBitmap))>, remaining: &mut usize, init: &[InitNested], - dict: &mut Dict, + dict: &mut Option>, data_type: DataType, chunk_size: Option, read_dict: F, @@ -150,29 +157,32 @@ pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box if items.len() > 1 { let (nested, (values, validity)) = items.pop_front().unwrap(); let keys = finish_key(values, validity); - let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + let dict = DictionaryArray::try_new(data_type, keys, dict.clone().unwrap()); return MaybeNext::Some(dict.map(|dict| (nested, dict))); } match iter.next() { Err(e) => MaybeNext::Some(Err(e.into())), Ok(Some(page)) => { - // consume the dictionary page - match (&dict, page.dictionary_page()) { - (Dict::Empty, None) => { + let (page, dict) = match (&dict, page) { + (None, Page::Data(_)) => { return MaybeNext::Some(Err(Error::nyi( "dictionary arrays from non-dict-encoded pages", ))); } - (Dict::Empty, Some(dict_page)) => { - *dict = Dict::Complete(read_dict(dict_page.as_ref())) + (_, Page::Dict(dict_page)) => { + *dict = Some(read_dict(dict_page)); + return next_dict( + iter, items, remaining, init, dict, data_type, chunk_size, read_dict, + ); } - (Dict::Complete(_), _) => {} + (Some(dict), Page::Data(page)) => (page, dict), }; let error = extend( page, init, items, + None, remaining, &DictionaryDecoder::::default(), chunk_size, @@ -187,7 +197,7 @@ pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box } else { let (nested, (values, validity)) = items.pop_front().unwrap(); let keys = finish_key(values, validity); - let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + let dict = DictionaryArray::try_new(data_type, keys, dict.clone()); MaybeNext::Some(dict.map(|dict| (nested, dict))) } } @@ -198,7 +208,7 @@ pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX)); let keys = finish_key(values, validity); - let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + let dict = DictionaryArray::try_new(data_type, keys, dict.clone().unwrap()); MaybeNext::Some(dict.map(|dict| (nested, dict))) } else { MaybeNext::None diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs index a2699c0520c..0f9eb508158 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs @@ -3,7 +3,7 @@ use std::collections::VecDeque; use parquet2::{ deserialize::SliceFilteredIter, encoding::{hybrid_rle, Encoding}, - page::{split_buffer, DataPage, FixedLenByteArrayPageDict}, + page::{split_buffer, DataPage, DictPage}, schema::Repetition, }; @@ -16,9 +16,11 @@ use super::super::utils::{ DecodedState, Decoder, FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity, PageState, Pushable, }; -use super::super::DataPages; +use super::super::Pages; use super::utils::FixedSizeBinary; +type Dict = Vec; + #[derive(Debug)] struct Optional<'a> { values: std::slice::ChunksExact<'a, u8>, @@ -83,11 +85,11 @@ impl<'a> FilteredRequired<'a> { #[derive(Debug)] struct RequiredDictionary<'a> { pub values: hybrid_rle::HybridRleDecoder<'a>, - dict: &'a FixedLenByteArrayPageDict, + dict: &'a Dict, } impl<'a> RequiredDictionary<'a> { - fn try_new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Result { + fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result { let values = dict_indices_decoder(page)?; Ok(Self { dict, values }) @@ -103,11 +105,11 @@ impl<'a> RequiredDictionary<'a> { struct OptionalDictionary<'a> { values: hybrid_rle::HybridRleDecoder<'a>, validity: OptionalPageValidity<'a>, - dict: &'a FixedLenByteArrayPageDict, + dict: &'a Dict, } impl<'a> OptionalDictionary<'a> { - fn try_new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Result { + fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result { let values = dict_indices_decoder(page)?; Ok(Self { @@ -156,19 +158,15 @@ impl<'a> DecodedState<'a> for (FixedSizeBinary, MutableBitmap) { impl<'a> Decoder<'a> for BinaryDecoder { type State = State<'a>; + type Dict = Dict; type DecodedState = (FixedSizeBinary, MutableBitmap); - fn build_state(&self, page: &'a DataPage) -> Result { + fn build_state(&self, page: &'a DataPage, dict: Option<&'a Self::Dict>) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); - match ( - page.encoding(), - page.dictionary_page(), - is_optional, - is_filtered, - ) { + match (page.encoding(), dict, is_optional, is_filtered) { (Encoding::Plain, _, true, false) => { Ok(State::Optional(Optional::try_new(page, self.size)?)) } @@ -176,12 +174,10 @@ impl<'a> Decoder<'a> for BinaryDecoder { Ok(State::Required(Required::new(page, self.size))) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { - RequiredDictionary::try_new(page, dict.as_any().downcast_ref().unwrap()) - .map(State::RequiredDictionary) + RequiredDictionary::try_new(page, dict).map(State::RequiredDictionary) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { - OptionalDictionary::try_new(page, dict.as_any().downcast_ref().unwrap()) - .map(State::OptionalDictionary) + OptionalDictionary::try_new(page, dict).map(State::OptionalDictionary) } (Encoding::Plain, None, false, true) => Ok(State::FilteredRequired( FilteredRequired::new(page, self.size), @@ -232,11 +228,9 @@ impl<'a> Decoder<'a> for BinaryDecoder { } } State::OptionalDictionary(page) => { - let dict_values = page.dict.values(); - let size = page.dict.size(); let op = |index: u32| { let index = index as usize; - &dict_values[index * size..(index + 1) * size] + &page.dict[index * self.size..(index + 1) * self.size] }; extend_from_decoder( @@ -248,11 +242,9 @@ impl<'a> Decoder<'a> for BinaryDecoder { ) } State::RequiredDictionary(page) => { - let dict_values = page.dict.values(); - let size = page.dict.size(); let op = |index: u32| { let index = index as usize; - &dict_values[index * size..(index + 1) * size] + &page.dict[index * self.size..(index + 1) * self.size] }; for x in page.values.by_ref().map(op).take(remaining) { @@ -270,6 +262,10 @@ impl<'a> Decoder<'a> for BinaryDecoder { } } } + + fn deserialize_dict(&self, page: &DictPage) -> Self::Dict { + page.buffer.clone() + } } fn finish( @@ -280,16 +276,17 @@ fn finish( FixedSizeBinaryArray::new(data_type.clone(), values.values.into(), validity.into()) } -pub struct Iter { +pub struct Iter { iter: I, data_type: DataType, size: usize, items: VecDeque<(FixedSizeBinary, MutableBitmap)>, + dict: Option, chunk_size: Option, remaining: usize, } -impl Iter { +impl Iter { pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option) -> Self { let size = FixedSizeBinaryArray::get_size(&data_type); Self { @@ -297,19 +294,21 @@ impl Iter { data_type, size, items: VecDeque::new(), + dict: None, chunk_size, remaining: num_rows, } } } -impl Iterator for Iter { +impl Iterator for Iter { type Item = Result; fn next(&mut self) -> Option { let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.dict, &mut self.remaining, self.chunk_size, &BinaryDecoder { size: self.size }, diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs index 6c73577472c..70b4ccab699 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use parquet2::page::{DictPage, FixedLenByteArrayPageDict}; +use parquet2::page::DictPage; use crate::{ array::{Array, DictionaryArray, DictionaryKey, FixedSizeBinaryArray}, @@ -12,18 +12,18 @@ use crate::{ use super::super::dictionary::*; use super::super::utils::MaybeNext; -use super::super::DataPages; +use super::super::Pages; -/// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation +/// An iterator adapter over [`Pages`] assumed to be encoded as parquet's dictionary-encoded binary representation #[derive(Debug)] pub struct DictIter where - I: DataPages, + I: Pages, K: DictionaryKey, { iter: I, data_type: DataType, - values: Dict, + values: Option>, items: VecDeque<(Vec, MutableBitmap)>, remaining: usize, chunk_size: Option, @@ -32,13 +32,13 @@ where impl DictIter where K: DictionaryKey, - I: DataPages, + I: Pages, { pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option) -> Self { Self { iter, data_type, - values: Dict::Empty, + values: None, items: VecDeque::new(), remaining: num_rows, chunk_size, @@ -46,27 +46,20 @@ where } } -fn read_dict(data_type: DataType, dict: &dyn DictPage) -> Box { +fn read_dict(data_type: DataType, dict: &DictPage) -> Box { let data_type = match data_type { DataType::Dictionary(_, values, _) => *values, _ => data_type, }; - let dict = dict - .as_any() - .downcast_ref::() - .unwrap(); - let values = dict.values().to_vec(); - Box::new(FixedSizeBinaryArray::from_data( - data_type, - values.into(), - None, - )) + let values = dict.buffer.clone(); + + FixedSizeBinaryArray::from_data(data_type, values.into(), None).boxed() } impl Iterator for DictIter where - I: DataPages, + I: Pages, K: DictionaryKey, { type Item = Result>; @@ -94,13 +87,13 @@ where #[derive(Debug)] pub struct NestedDictIter where - I: DataPages, + I: Pages, K: DictionaryKey, { iter: I, init: Vec, data_type: DataType, - values: Dict, + values: Option>, items: VecDeque<(NestedState, (Vec, MutableBitmap))>, remaining: usize, chunk_size: Option, @@ -108,7 +101,7 @@ where impl NestedDictIter where - I: DataPages, + I: Pages, K: DictionaryKey, { pub fn new( @@ -122,7 +115,7 @@ where iter, init, data_type, - values: Dict::Empty, + values: None, remaining: num_rows, items: VecDeque::new(), chunk_size, @@ -132,7 +125,7 @@ where impl Iterator for NestedDictIter where - I: DataPages, + I: Pages, K: DictionaryKey, { type Item = Result<(NestedState, DictionaryArray)>; diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 0d0b54449f8..d86b3d4615d 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -101,7 +101,7 @@ fn columns_to_iter_recursive<'a, I: 'a>( chunk_size: Option, ) -> Result> where - I: DataPages, + I: Pages, { if init.is_empty() && is_primitive(&field.data_type) { return Ok(Box::new( @@ -148,7 +148,7 @@ fn n_columns(data_type: &DataType) -> usize { } } -/// An iterator adapter that maps multiple iterators of [`DataPages`] into an iterator of [`Array`]s. +/// An iterator adapter that maps multiple iterators of [`Pages`] into an iterator of [`Array`]s. /// /// For a non-nested datatypes such as [`DataType::Int32`], this function requires a single element in `columns` and `types`. /// For nested types, `columns` must be composed by all parquet columns with associated types `types`. @@ -162,7 +162,7 @@ pub fn column_iter_to_arrays<'a, I: 'a>( num_rows: usize, ) -> Result> where - I: DataPages, + I: Pages, { Ok(Box::new( columns_to_iter_recursive(columns, types, field, vec![], num_rows, chunk_size)? diff --git a/src/io/parquet/read/deserialize/nested.rs b/src/io/parquet/read/deserialize/nested.rs index b24a9a8b11a..389d0479fec 100644 --- a/src/io/parquet/read/deserialize/nested.rs +++ b/src/io/parquet/read/deserialize/nested.rs @@ -33,7 +33,7 @@ pub fn columns_to_iter_recursive<'a, I: 'a>( chunk_size: Option, ) -> Result> where - I: DataPages, + I: Pages, { use crate::datatypes::PhysicalType::*; use crate::datatypes::PrimitiveType::*; @@ -311,7 +311,7 @@ where }) } -fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( +fn dict_read<'a, K: DictionaryKey, I: 'a + Pages>( iter: I, init: Vec, _type_: &PrimitiveType, diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 760cedd737e..882a1724f85 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use parquet2::{ encoding::hybrid_rle::HybridRleDecoder, - page::{split_buffer, DataPage}, + page::{split_buffer, DataPage, DictPage, Page}, read::levels::get_bit_width, }; @@ -10,7 +10,7 @@ use crate::{array::Array, bitmap::MutableBitmap, error::Result}; pub use super::utils::Zip; use super::utils::{DecodedState, MaybeNext}; -use super::{super::DataPages, utils::PageState}; +use super::{super::Pages, utils::PageState}; /// trait describing deserialized repetition and definition levels pub trait Nested: std::fmt::Debug + Send + Sync { @@ -246,15 +246,22 @@ impl Nested for NestedStruct { /// A decoder that knows how to map `State` -> Array pub(super) trait NestedDecoder<'a> { type State: PageState<'a>; + type Dictionary; type DecodedState: DecodedState<'a>; - fn build_state(&self, page: &'a DataPage) -> Result; + fn build_state( + &self, + page: &'a DataPage, + dict: Option<&'a Self::Dictionary>, + ) -> Result; /// Initializes a new state fn with_capacity(&self, capacity: usize) -> Self::DecodedState; fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState); fn push_null(&self, decoded: &mut Self::DecodedState); + + fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary; } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -340,11 +347,12 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( page: &'a DataPage, init: &[InitNested], items: &mut VecDeque<(NestedState, D::DecodedState)>, + dict: Option<&'a D::Dictionary>, remaining: &mut usize, decoder: &D, chunk_size: Option, ) -> Result<()> { - let mut values_page = decoder.build_state(page)?; + let mut values_page = decoder.build_state(page, dict)?; let mut page = NestedPage::try_new(page)?; let capacity = chunk_size.unwrap_or(0); @@ -467,13 +475,14 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>( pub(super) fn next<'a, I, D>( iter: &'a mut I, items: &mut VecDeque<(NestedState, D::DecodedState)>, + dict: &'a mut Option, remaining: &mut usize, init: &[InitNested], chunk_size: Option, decoder: &D, ) -> MaybeNext> where - I: DataPages, + I: Pages, D: NestedDecoder<'a>, { // front[a1, a2, a3, ...]back @@ -499,8 +508,24 @@ where } } Ok(Some(page)) => { + let page = match page { + Page::Data(page) => page, + Page::Dict(dict_page) => { + *dict = Some(decoder.deserialize_dict(dict_page)); + return MaybeNext::More; + } + }; + // there is a new page => consume the page from the start - let error = extend(page, init, items, remaining, decoder, chunk_size); + let error = extend( + page, + init, + items, + dict.as_ref(), + remaining, + decoder, + chunk_size, + ); match error { Ok(_) => {} Err(e) => return MaybeNext::Some(Err(e)), diff --git a/src/io/parquet/read/deserialize/null.rs b/src/io/parquet/read/deserialize/null.rs index e8c18eb8c65..e897a045b69 100644 --- a/src/io/parquet/read/deserialize/null.rs +++ b/src/io/parquet/read/deserialize/null.rs @@ -1,8 +1,10 @@ +use parquet2::page::Page; + use crate::{array::NullArray, datatypes::DataType}; -use super::super::{ArrayIter, DataPages}; +use super::super::{ArrayIter, Pages}; -/// Converts [`DataPages`] to an [`ArrayIter`] +/// Converts [`Pages`] to an [`ArrayIter`] pub fn iter_to_arrays<'a, I>( mut iter: I, data_type: DataType, @@ -10,15 +12,20 @@ pub fn iter_to_arrays<'a, I>( num_rows: usize, ) -> ArrayIter<'a> where - I: 'a + DataPages, + I: 'a + Pages, { let mut len = 0usize; - while let Ok(Some(x)) = iter.next() { - let rows = x.num_values(); - len = (len + rows).min(num_rows); - if len == num_rows { - break; + while let Ok(Some(page)) = iter.next() { + match page { + Page::Dict(_) => continue, + Page::Data(page) => { + let rows = page.num_values(); + len = (len + rows).min(num_rows); + if len == num_rows { + break; + } + } } } @@ -48,7 +55,7 @@ mod tests { encoding::Encoding, error::Error as ParquetError, metadata::Descriptor, - page::{DataPage, DataPageHeader, DataPageHeaderV1}, + page::{DataPage, DataPageHeader, DataPageHeaderV1, Page}, schema::types::{PhysicalType, PrimitiveType}, }; @@ -59,7 +66,7 @@ mod tests { #[test] fn limit() { let new_page = |values: i32| { - DataPage::new( + Page::Data(DataPage::new( DataPageHeader::V1(DataPageHeaderV1 { num_values: values, encoding: Encoding::Plain.into(), @@ -68,7 +75,6 @@ mod tests { statistics: None, }), vec![], - None, Descriptor { primitive_type: PrimitiveType::from_physical( "a".to_string(), @@ -78,7 +84,7 @@ mod tests { max_rep_level: 0, }, None, - ) + )) }; let p1 = new_page(100); diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index 0ed1d8b51cb..bb4648aeb07 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -3,7 +3,7 @@ use std::collections::VecDeque; use parquet2::{ deserialize::SliceFilteredIter, encoding::{hybrid_rle, Encoding}, - page::{split_buffer, DataPage, PrimitivePageDict}, + page::{split_buffer, DataPage, DictPage}, schema::Repetition, types::decode, types::NativeType as ParquetNativeType, @@ -16,7 +16,7 @@ use crate::{ use super::super::utils; use super::super::utils::{get_selected_rows, FilteredOptionalPageValidity, OptionalPageValidity}; -use super::super::DataPages; +use super::super::Pages; #[derive(Debug)] struct FilteredRequiredValues<'a> { @@ -63,25 +63,22 @@ impl<'a> Values<'a> { } #[derive(Debug)] -pub(super) struct ValuesDictionary<'a, P> +pub(super) struct ValuesDictionary<'a, T> where - P: ParquetNativeType, + T: NativeType, { pub values: hybrid_rle::HybridRleDecoder<'a>, - pub dict: &'a [P], + pub dict: &'a Vec, } -impl<'a, P> ValuesDictionary<'a, P> +impl<'a, T> ValuesDictionary<'a, T> where - P: ParquetNativeType, + T: NativeType, { - pub fn try_new(page: &'a DataPage, dict: &'a PrimitivePageDict

) -> Result { + pub fn try_new(page: &'a DataPage, dict: &'a Vec) -> Result { let values = utils::dict_indices_decoder(page)?; - Ok(Self { - dict: dict.values(), - values, - }) + Ok(Self { dict, values }) } #[inline] @@ -92,21 +89,21 @@ where // The state of a `DataPage` of `Primitive` parquet primitive type #[derive(Debug)] -enum State<'a, P> +enum State<'a, T> where - P: ParquetNativeType, + T: NativeType, { Optional(OptionalPageValidity<'a>, Values<'a>), Required(Values<'a>), - RequiredDictionary(ValuesDictionary<'a, P>), - OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a, P>), + RequiredDictionary(ValuesDictionary<'a, T>), + OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a, T>), FilteredRequired(FilteredRequiredValues<'a>), FilteredOptional(FilteredOptionalPageValidity<'a>, Values<'a>), } -impl<'a, P> utils::PageState<'a> for State<'a, P> +impl<'a, T> utils::PageState<'a> for State<'a, T> where - P: ParquetNativeType, + T: NativeType, { fn len(&self) -> usize { match self { @@ -160,27 +157,20 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - type State = State<'a, P>; + type State = State<'a, T>; + type Dict = Vec; type DecodedState = (Vec, MutableBitmap); - fn build_state(&self, page: &'a DataPage) -> Result { + fn build_state(&self, page: &'a DataPage, dict: Option<&'a Self::Dict>) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); - match ( - page.encoding(), - page.dictionary_page(), - is_optional, - is_filtered, - ) { + match (page.encoding(), dict, is_optional, is_filtered) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { - let dict = dict.as_any().downcast_ref().unwrap(); ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { - let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::OptionalDictionary( OptionalPageValidity::try_new(page)?, ValuesDictionary::try_new(page, dict)?, @@ -242,12 +232,12 @@ where page_validity, Some(remaining), values, - &mut page_values.values.by_ref().map(op1).map(self.op), + &mut page_values.values.by_ref().map(op1), ) } State::RequiredDictionary(page) => { let op1 = |index: u32| page.dict[index as usize]; - values.extend(page.values.by_ref().map(op1).map(self.op).take(remaining)); + values.extend(page.values.by_ref().map(op1).take(remaining)); } State::FilteredRequired(page) => { values.extend( @@ -269,6 +259,10 @@ where } } } + + fn deserialize_dict(&self, page: &DictPage) -> Self::Dict { + deserialize_plain(&page.buffer, self.op) + } } pub(super) fn finish( @@ -284,11 +278,11 @@ pub(super) fn finish( MutablePrimitiveArray::from_data(data_type.clone(), values, validity) } -/// An [`Iterator`] adapter over [`DataPages`] assumed to be encoded as primitive arrays +/// An [`Iterator`] adapter over [`Pages`] assumed to be encoded as primitive arrays #[derive(Debug)] pub struct Iter where - I: DataPages, + I: Pages, T: NativeType, P: ParquetNativeType, F: Fn(P) -> T, @@ -298,13 +292,14 @@ where items: VecDeque<(Vec, MutableBitmap)>, remaining: usize, chunk_size: Option, + dict: Option>, op: F, phantom: std::marker::PhantomData

, } impl Iter where - I: DataPages, + I: Pages, T: NativeType, P: ParquetNativeType, @@ -321,6 +316,7 @@ where iter, data_type, items: VecDeque::new(), + dict: None, remaining: num_rows, chunk_size, op, @@ -331,7 +327,7 @@ where impl Iterator for Iter where - I: DataPages, + I: Pages, T: NativeType, P: ParquetNativeType, F: Copy + Fn(P) -> T, @@ -342,6 +338,7 @@ where let maybe_state = utils::next( &mut self.iter, &mut self.items, + &mut self.dict, &mut self.remaining, self.chunk_size, &PrimitiveDecoder::new(self.op), @@ -356,3 +353,16 @@ where } } } + +pub(super) fn deserialize_plain(values: &[u8], op: F) -> Vec +where + T: NativeType, + P: ParquetNativeType, + F: Copy + Fn(P) -> T, +{ + values + .chunks_exact(std::mem::size_of::

()) + .map(decode) + .map(op) + .collect::>() +} diff --git a/src/io/parquet/read/deserialize/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs index cb9d63c4cc6..16fec526112 100644 --- a/src/io/parquet/read/deserialize/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -1,9 +1,6 @@ use std::collections::VecDeque; -use parquet2::{ - page::{DictPage, PrimitivePageDict}, - types::NativeType as ParquetNativeType, -}; +use parquet2::{page::DictPage, types::NativeType as ParquetNativeType}; use crate::{ array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}, @@ -17,9 +14,10 @@ use super::super::dictionary::nested_next_dict; use super::super::dictionary::*; use super::super::nested_utils::{InitNested, NestedState}; use super::super::utils::MaybeNext; -use super::super::DataPages; +use super::super::Pages; +use super::basic::deserialize_plain; -fn read_dict(data_type: DataType, op: F, dict: &dyn DictPage) -> Box +fn read_dict(data_type: DataType, op: F, dict: &DictPage) -> Box where T: NativeType, P: ParquetNativeType, @@ -29,20 +27,16 @@ where DataType::Dictionary(_, values, _) => *values, _ => data_type, }; - let dict = dict - .as_any() - .downcast_ref::>() - .unwrap(); - let values = dict.values().iter().map(|x| (op)(*x)).collect::>(); + let values = deserialize_plain(&dict.buffer, op); Box::new(PrimitiveArray::new(data_type, values.into(), None)) } -/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays +/// An iterator adapter over [`Pages`] assumed to be encoded as boolean arrays #[derive(Debug)] pub struct DictIter where - I: DataPages, + I: Pages, T: NativeType, K: DictionaryKey, P: ParquetNativeType, @@ -50,7 +44,7 @@ where { iter: I, data_type: DataType, - values: Dict, + values: Option>, items: VecDeque<(Vec, MutableBitmap)>, remaining: usize, chunk_size: Option, @@ -61,7 +55,7 @@ where impl DictIter where K: DictionaryKey, - I: DataPages, + I: Pages, T: NativeType, P: ParquetNativeType, @@ -77,7 +71,7 @@ where Self { iter, data_type, - values: Dict::Empty, + values: None, items: VecDeque::new(), chunk_size, remaining: num_rows, @@ -89,7 +83,7 @@ where impl Iterator for DictIter where - I: DataPages, + I: Pages, T: NativeType, K: DictionaryKey, P: ParquetNativeType, @@ -120,7 +114,7 @@ where #[derive(Debug)] pub struct NestedDictIter where - I: DataPages, + I: Pages, T: NativeType, K: DictionaryKey, P: ParquetNativeType, @@ -129,7 +123,7 @@ where iter: I, init: Vec, data_type: DataType, - values: Dict, + values: Option>, items: VecDeque<(NestedState, (Vec, MutableBitmap))>, remaining: usize, chunk_size: Option, @@ -140,7 +134,7 @@ where impl NestedDictIter where K: DictionaryKey, - I: DataPages, + I: Pages, T: NativeType, P: ParquetNativeType, @@ -158,7 +152,7 @@ where iter, init, data_type, - values: Dict::Empty, + values: None, items: VecDeque::new(), remaining: num_rows, chunk_size, @@ -170,7 +164,7 @@ where impl Iterator for NestedDictIter where - I: DataPages, + I: Pages, T: NativeType, K: DictionaryKey, P: ParquetNativeType, diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index b37aa955e96..438ca11afab 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -1,7 +1,10 @@ use std::collections::VecDeque; use parquet2::{ - encoding::Encoding, page::DataPage, schema::Repetition, types::decode, + encoding::Encoding, + page::{DataPage, DictPage}, + schema::Repetition, + types::decode, types::NativeType as ParquetNativeType, }; @@ -10,27 +13,27 @@ use crate::{ types::NativeType, }; -use super::super::nested_utils::*; use super::super::utils; -use super::super::DataPages; +use super::super::Pages; use super::basic::{Values, ValuesDictionary}; +use super::{super::nested_utils::*, basic::deserialize_plain}; // The state of a `DataPage` of `Primitive` parquet primitive type #[allow(clippy::large_enum_variant)] #[derive(Debug)] -enum State<'a, P> +enum State<'a, T> where - P: ParquetNativeType, + T: NativeType, { Optional(Values<'a>), Required(Values<'a>), - RequiredDictionary(ValuesDictionary<'a, P>), - OptionalDictionary(ValuesDictionary<'a, P>), + RequiredDictionary(ValuesDictionary<'a, T>), + OptionalDictionary(ValuesDictionary<'a, T>), } -impl<'a, P> utils::PageState<'a> for State<'a, P> +impl<'a, T> utils::PageState<'a> for State<'a, T> where - P: ParquetNativeType, + T: NativeType, { fn len(&self) -> usize { match self { @@ -76,26 +79,24 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - type State = State<'a, P>; + type State = State<'a, T>; + type Dictionary = Vec; type DecodedState = (Vec, MutableBitmap); - fn build_state(&self, page: &'a DataPage) -> Result { + fn build_state( + &self, + page: &'a DataPage, + dict: Option<&'a Self::Dictionary>, + ) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); - match ( - page.encoding(), - page.dictionary_page(), - is_optional, - is_filtered, - ) { + match (page.encoding(), dict, is_optional, is_filtered) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { - let dict = dict.as_any().downcast_ref().unwrap(); ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { - let dict = dict.as_any().downcast_ref().unwrap(); ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary) } (Encoding::Plain, _, true, false) => Values::try_new::

(page).map(State::Optional), @@ -128,13 +129,13 @@ where } State::RequiredDictionary(page) => { let op1 = |index: u32| page.dict[index as usize]; - let value = page.values.next().map(op1).map(self.op); + let value = page.values.next().map(op1); values.push(value.unwrap_or_default()); } State::OptionalDictionary(page) => { let op1 = |index: u32| page.dict[index as usize]; - let value = page.values.next().map(op1).map(self.op); + let value = page.values.next().map(op1); values.push(value.unwrap_or_default()); validity.push(true); @@ -147,6 +148,10 @@ where values.push(T::default()); validity.push(false) } + + fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary { + deserialize_plain(&page.buffer, self.op) + } } fn finish( @@ -157,11 +162,11 @@ fn finish( PrimitiveArray::from_data(data_type.clone(), values.into(), validity.into()) } -/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays +/// An iterator adapter over [`Pages`] assumed to be encoded as boolean arrays #[derive(Debug)] pub struct NestedIter where - I: DataPages, + I: Pages, T: NativeType, P: ParquetNativeType, @@ -171,6 +176,7 @@ where init: Vec, data_type: DataType, items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + dict: Option>, remaining: usize, chunk_size: Option, decoder: PrimitiveDecoder, @@ -178,7 +184,7 @@ where impl NestedIter where - I: DataPages, + I: Pages, T: NativeType, P: ParquetNativeType, @@ -197,6 +203,7 @@ where init, data_type, items: VecDeque::new(), + dict: None, chunk_size, remaining: num_rows, decoder: PrimitiveDecoder::new(op), @@ -206,7 +213,7 @@ where impl Iterator for NestedIter where - I: DataPages, + I: Pages, T: NativeType, P: ParquetNativeType, @@ -218,6 +225,7 @@ where let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.dict, &mut self.remaining, &self.init, self.chunk_size, diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 0ef14a4fd35..96a27228e96 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -12,7 +12,7 @@ use crate::{ types::{days_ms, NativeType}, }; -use super::super::{ArrayIter, DataPages}; +use super::super::{ArrayIter, Pages}; use super::binary; use super::boolean; use super::fixed_size_binary; @@ -54,9 +54,9 @@ where }) } -/// An iterator adapter that maps an iterator of DataPages into an iterator of Arrays +/// An iterator adapter that maps an iterator of Pages into an iterator of Arrays /// of [`DataType`] `data_type` and length `chunk_size`. -pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( +pub fn page_iter_to_arrays<'a, I: Pages + 'a>( pages: I, type_: &PrimitiveType, data_type: DataType, @@ -339,7 +339,7 @@ fn unifiy_timestmap_unit( } } -fn timestamp<'a, I: 'a + DataPages>( +fn timestamp<'a, I: Pages + 'a>( pages: I, physical_type: &PhysicalType, logical_type: &Option, @@ -377,7 +377,7 @@ fn timestamp<'a, I: 'a + DataPages>( } } -fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( +fn timestamp_dict<'a, K: DictionaryKey, I: Pages + 'a>( pages: I, physical_type: &PhysicalType, logical_type: &Option, @@ -429,7 +429,7 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( } } -fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( +fn dict_read<'a, K: DictionaryKey, I: Pages + 'a>( iter: I, physical_type: &PhysicalType, logical_type: &Option, diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index eaae1644df8..762ce62d1b2 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -5,30 +5,24 @@ use parquet2::deserialize::{ }; use parquet2::encoding::hybrid_rle; use parquet2::indexes::Interval; -use parquet2::page::{split_buffer, DataPage}; +use parquet2::page::{split_buffer, DataPage, DictPage, Page}; use parquet2::schema::Repetition; use crate::bitmap::utils::BitmapIter; use crate::bitmap::MutableBitmap; use crate::error::Error; -use super::super::DataPages; +use super::super::Pages; pub fn not_implemented(page: &DataPage) -> Error { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); let required = if is_optional { "optional" } else { "required" }; let is_filtered = if is_filtered { ", index-filtered" } else { "" }; - let dict = if page.dictionary_page().is_some() { - ", dictionary-encoded" - } else { - "" - }; Error::NotYetImplemented(format!( - "Decoding {:?} \"{:?}\"-encoded{} {} {} parquet pages", + "Decoding {:?} \"{:?}\"-encoded {} {} parquet pages", page.descriptor.primitive_type.physical_type, page.encoding(), - dict, required, is_filtered, )) @@ -351,9 +345,14 @@ pub(super) trait DecodedState<'a>: std::fmt::Debug { /// A decoder that knows how to map `State` -> Array pub(super) trait Decoder<'a> { type State: PageState<'a>; + type Dict; type DecodedState: DecodedState<'a>; - fn build_state(&self, page: &'a DataPage) -> Result; + fn build_state( + &self, + page: &'a DataPage, + dict: Option<&'a Self::Dict>, + ) -> Result; /// Initializes a new state fn with_capacity(&self, capacity: usize) -> Self::DecodedState; @@ -366,6 +365,9 @@ pub(super) trait Decoder<'a> { decoded: &mut Self::DecodedState, additional: usize, ); + + /// Deserializes a [`DictPage`] into a representation suited for decoding using it. + fn deserialize_dict(&self, page: &DictPage) -> Self::Dict; } pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( @@ -410,12 +412,13 @@ pub enum MaybeNext

{ } #[inline] -pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( +pub(super) fn next<'a, I: Pages, D: Decoder<'a>>( iter: &'a mut I, - items: &mut VecDeque, - remaining: &mut usize, + items: &'a mut VecDeque, + dict: &'a mut Option, + remaining: &'a mut usize, chunk_size: Option, - decoder: &D, + decoder: &'a D, ) -> MaybeNext> { // front[a1, a2, a3, ...]back if items.len() > 1 { @@ -430,11 +433,20 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( None => MaybeNext::None, }; } + match iter.next() { Err(e) => MaybeNext::Some(Err(e.into())), Ok(Some(page)) => { + let page = match page { + Page::Data(page) => page, + Page::Dict(dict_page) => { + *dict = Some(decoder.deserialize_dict(dict_page)); + return MaybeNext::More; + } + }; + // there is a new page => consume the page from the start - let maybe_page = decoder.build_state(page); + let maybe_page = decoder.build_state(page, dict.as_ref()); let page = match maybe_page { Ok(page) => page, Err(e) => return MaybeNext::Some(Err(e)), diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index b7609602e03..9276c303f69 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -17,7 +17,7 @@ pub use parquet2::{ error::Error as ParquetError, fallible_streaming_iterator, metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData}, - page::{CompressedDataPage, DataPage, DataPageHeader}, + page::{CompressedDataPage, DataPageHeader, Page}, read::{ decompress, get_column_iterator, get_page_stream, read_columns_indexes as _read_columns_indexes, read_metadata as _read_metadata, @@ -41,16 +41,13 @@ pub use indexes::{read_columns_indexes, ColumnIndex}; pub use row_group::*; pub use schema::{infer_schema, FileMetaData}; -/// Trait describing a [`FallibleStreamingIterator`] of [`DataPage`] -pub trait DataPages: - FallibleStreamingIterator + Send + Sync +/// Trait describing a [`FallibleStreamingIterator`] of [`Page`] +pub trait Pages: + FallibleStreamingIterator + Send + Sync { } -impl + Send + Sync> DataPages - for I -{ -} +impl + Send + Sync> Pages for I {} /// Type def for a sharable, boxed dyn [`Iterator`] of arrays pub type ArrayIter<'a> = Box>> + Send + Sync + 'a>; diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index befd8a5447c..f04acc2fc53 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -1,6 +1,6 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, - page::{EncodedDictPage, EncodedPage}, + page::{DictPage, EncodedPage}, schema::types::PrimitiveType, statistics::{serialize_statistics, ParquetStatistics}, write::DynIter, @@ -153,7 +153,7 @@ macro_rules! dyn_prim { primitive_encode_plain::<$from, $to>(values, false, &mut buffer); let stats = primitive_build_statistics::<$from, $to>(values, $type_.clone()); let stats = serialize_statistics(&stats); - (EncodedDictPage::new(buffer, values.len()), stats) + (DictPage::new(buffer, values.len(), false), stats) }}; } @@ -190,7 +190,7 @@ pub fn array_to_pages( let mut buffer = vec![]; utf8_encode_plain::(array, false, &mut buffer); let stats = utf8_build_statistics(array, type_.clone()); - (EncodedDictPage::new(buffer, array.len()), stats) + (DictPage::new(buffer, array.len(), false), stats) } DataType::LargeUtf8 => { let array = array.values().as_any().downcast_ref().unwrap(); @@ -198,7 +198,7 @@ pub fn array_to_pages( let mut buffer = vec![]; utf8_encode_plain::(array, false, &mut buffer); let stats = utf8_build_statistics(array, type_.clone()); - (EncodedDictPage::new(buffer, array.len()), stats) + (DictPage::new(buffer, array.len(), false), stats) } DataType::Binary => { let array = array.values().as_any().downcast_ref().unwrap(); @@ -206,7 +206,7 @@ pub fn array_to_pages( let mut buffer = vec![]; binary_encode_plain::(array, false, &mut buffer); let stats = binary_build_statistics(array, type_.clone()); - (EncodedDictPage::new(buffer, array.len()), stats) + (DictPage::new(buffer, array.len(), false), stats) } DataType::LargeBinary => { let array = array.values().as_any().downcast_ref().unwrap(); @@ -214,7 +214,7 @@ pub fn array_to_pages( let mut buffer = vec![]; binary_encode_plain::(array, false, &mut buffer); let stats = binary_build_statistics(array, type_.clone()); - (EncodedDictPage::new(buffer, array.len()), stats) + (DictPage::new(buffer, array.len(), false), stats) } DataType::FixedSizeBinary(_) => { let mut buffer = vec![]; @@ -222,7 +222,7 @@ pub fn array_to_pages( fixed_binary_encode_plain(array, false, &mut buffer); let stats = fixed_binary_build_statistics(array, type_.clone()); let stats = serialize_statistics(&stats); - (EncodedDictPage::new(buffer, array.len()), stats) + (DictPage::new(buffer, array.len(), false), stats) } other => { return Err(Error::NotYetImplemented(format!( diff --git a/src/io/parquet/write/utils.rs b/src/io/parquet/write/utils.rs index 0e6ab465f56..6cfe79837e0 100644 --- a/src/io/parquet/write/utils.rs +++ b/src/io/parquet/write/utils.rs @@ -93,7 +93,6 @@ pub fn build_plain_page( Ok(DataPage::new( header, buffer, - None, Descriptor { primitive_type: type_, max_def_level: 0,