From 0e0817425aa648ae3090cc554c015146a1c9b0d9 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 22 Jul 2022 04:38:26 +0000 Subject: [PATCH] Simpler --- examples/parquet_write.rs | 10 +- src/array/primitive/mod.rs | 2 +- .../read/deserialize/binary/dictionary.rs | 89 +++++++++ src/io/parquet/read/deserialize/binary/mod.rs | 2 +- .../fixed_size_binary/dictionary.rs | 83 +++++++++ .../read/deserialize/fixed_size_binary/mod.rs | 2 +- src/io/parquet/read/deserialize/mod.rs | 98 +++++----- .../read/deserialize/primitive/dictionary.rs | 8 +- src/io/parquet/write/binary/nested.rs | 6 +- src/io/parquet/write/boolean/nested.rs | 6 +- src/io/parquet/write/dictionary.rs | 170 +++++++++++------- src/io/parquet/write/mod.rs | 9 +- src/io/parquet/write/pages.rs | 2 +- src/io/parquet/write/primitive/nested.rs | 6 +- src/io/parquet/write/utf8/nested.rs | 6 +- tests/it/io/parquet/mod.rs | 4 +- tests/it/io/parquet/read_indexes.rs | 4 +- 17 files changed, 357 insertions(+), 150 deletions(-) diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index fef11079ac9..5b9e4331bf9 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -11,19 +11,19 @@ use arrow2::{ }, }; -fn write_batch(path: &str, schema: Schema, columns: Chunk>) -> Result<()> { +fn write_chunk(path: &str, schema: Schema, chunk: Chunk>) -> Result<()> { let options = WriteOptions { write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V2, }; - let iter = vec![Ok(columns)]; + let iter = vec![Ok(chunk)]; let encodings = schema .fields .iter() - .map(|f| transverse(&f.data_type, |_| Encoding::Plain)) + .map(|f| transverse(&f.data_type, |_| Encoding::RleDictionary)) .collect(); let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?; @@ -52,7 +52,7 @@ fn main() -> Result<()> { ]); let field = Field::new("c1", array.data_type().clone(), true); let schema = Schema::from(vec![field]); - let columns = Chunk::new(vec![array.boxed()]); + let chunk = Chunk::new(vec![array.boxed()]); - write_batch("test.parquet", schema, columns) + write_chunk("test.parquet", schema, chunk) } diff --git a/src/array/primitive/mod.rs b/src/array/primitive/mod.rs index 74604ed7e8b..e83c56bbffd 100644 --- a/src/array/primitive/mod.rs +++ b/src/array/primitive/mod.rs @@ -69,7 +69,7 @@ fn check( if data_type.to_physical_type() != PhysicalType::Primitive(T::PRIMITIVE) { return Err(Error::oos( - "BooleanArray can only be initialized with a DataType whose physical type is Primitive", + "PrimitiveArray can only be initialized with a DataType whose physical type is Primitive", )); } Ok(()) diff --git a/src/io/parquet/read/deserialize/binary/dictionary.rs b/src/io/parquet/read/deserialize/binary/dictionary.rs index bf656929e65..72c56396dd2 100644 --- a/src/io/parquet/read/deserialize/binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/binary/dictionary.rs @@ -7,6 +7,7 @@ use crate::{ bitmap::MutableBitmap, datatypes::{DataType, PhysicalType}, error::Result, + io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState}, }; use super::super::dictionary::*; @@ -104,3 +105,91 @@ where } } } + +#[derive(Debug)] +pub struct NestedDictIter +where + I: DataPages, + O: Offset, + K: DictionaryKey, +{ + iter: I, + init: Vec, + data_type: DataType, + values: Dict, + items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + chunk_size: Option, + phantom: std::marker::PhantomData, +} + +impl NestedDictIter +where + I: DataPages, + O: Offset, + K: DictionaryKey, +{ + pub fn new( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + ) -> Self { + Self { + iter, + init, + data_type, + values: Dict::Empty, + items: VecDeque::new(), + chunk_size, + phantom: Default::default(), + } + } +} + +impl Iterator for NestedDictIter +where + I: DataPages, + O: Offset, + K: DictionaryKey, +{ + type Item = Result<(NestedState, DictionaryArray)>; + + fn next(&mut self) -> Option { + let maybe_state = nested_next_dict( + &mut self.iter, + &mut self.items, + &self.init, + &mut self.values, + self.data_type.clone(), + self.chunk_size, + |dict| read_dict::(self.data_type.clone(), dict), + ); + match maybe_state { + MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), + } + } +} + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, K, O, I>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + O: Offset, + K: DictionaryKey, +{ + Box::new( + NestedDictIter::::new(iter, init, data_type, chunk_size).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + }), + ) +} diff --git a/src/io/parquet/read/deserialize/binary/mod.rs b/src/io/parquet/read/deserialize/binary/mod.rs index 613b95e9ca9..e17557c4b41 100644 --- a/src/io/parquet/read/deserialize/binary/mod.rs +++ b/src/io/parquet/read/deserialize/binary/mod.rs @@ -5,5 +5,5 @@ mod utils; pub use self::nested::NestedIter; pub use basic::Iter; -pub use dictionary::DictIter; +pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter}; pub use nested::iter_to_arrays_nested; diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs index 4d44ef4f724..ee28924189d 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs @@ -7,6 +7,7 @@ use crate::{ bitmap::MutableBitmap, datatypes::DataType, error::Result, + io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState}, }; use super::super::dictionary::*; @@ -87,3 +88,85 @@ where } } } + +#[derive(Debug)] +pub struct NestedDictIter +where + I: DataPages, + K: DictionaryKey, +{ + iter: I, + init: Vec, + data_type: DataType, + values: Dict, + items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + chunk_size: Option, +} + +impl NestedDictIter +where + I: DataPages, + K: DictionaryKey, +{ + pub fn new( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + ) -> Self { + Self { + iter, + init, + data_type, + values: Dict::Empty, + items: VecDeque::new(), + chunk_size, + } + } +} + +impl Iterator for NestedDictIter +where + I: DataPages, + K: DictionaryKey, +{ + type Item = Result<(NestedState, DictionaryArray)>; + + fn next(&mut self) -> Option { + let maybe_state = nested_next_dict( + &mut self.iter, + &mut self.items, + &self.init, + &mut self.values, + self.data_type.clone(), + self.chunk_size, + |dict| read_dict(self.data_type.clone(), dict), + ); + match maybe_state { + MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), + } + } +} + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, K, I>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + K: DictionaryKey, +{ + Box::new( + NestedDictIter::::new(iter, init, data_type, chunk_size).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + }), + ) +} diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs b/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs index 8173065d37c..55b57a519f6 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs @@ -3,4 +3,4 @@ mod dictionary; mod utils; pub use basic::Iter; -pub use dictionary::DictIter; +pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter}; diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 2ee84b6ca7d..ffe19ac8d9f 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -17,7 +17,7 @@ use crate::{ array::{ Array, BinaryArray, DictionaryKey, FixedSizeListArray, ListArray, MapArray, Utf8Array, }, - datatypes::{DataType, Field}, + datatypes::{DataType, Field, IntervalUnit}, error::{Error, Result}, }; @@ -289,9 +289,9 @@ where chunk_size, ) } - _ => match field.data_type().to_logical_type() { DataType::Dictionary(key_type, _, _) => { + init.push(InitNested::Primitive(field.is_nullable)); let type_ = types.pop().unwrap(); let iter = columns.pop().unwrap(); let data_type = field.data_type().clone(); @@ -434,50 +434,76 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( chunk_size, |x: i32| x as u8, ), - Float32 => primitive::iter_to_dict_arrays_nested::( + UInt16 => primitive::iter_to_dict_arrays_nested::( iter, init, data_type, chunk_size, - |x: f32| x, + |x: i32| x as u16, ), - Float64 => primitive::iter_to_dict_arrays_nested::( + UInt32 => primitive::iter_to_dict_arrays_nested::( iter, init, data_type, chunk_size, - |x: f64| x, + |x: i32| x as u32, ), - /* - UInt16 => dyn_iter(primitive::DictIter::::new( + Int8 => primitive::iter_to_dict_arrays_nested::( iter, + init, data_type, chunk_size, - |x: i32| x as u16, - )), - UInt32 => dyn_iter(primitive::DictIter::::new( + |x: i32| x as i8, + ), + Int16 => primitive::iter_to_dict_arrays_nested::( iter, + init, data_type, chunk_size, - |x: i32| x as u32, - )), - Int8 => dyn_iter(primitive::DictIter::::new( + |x: i32| x as i16, + ), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { + primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: i32| x, + ) + } + Int64 | Date64 | Time64(_) | Duration(_) => { + primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: i64| x as i32, + ) + } + Float32 => primitive::iter_to_dict_arrays_nested::( iter, + init, data_type, chunk_size, - |x: i32| x as i8, - )), - Int16 => dyn_iter(primitive::DictIter::::new( + |x: f32| x, + ), + Float64 => primitive::iter_to_dict_arrays_nested::( iter, + init, data_type, chunk_size, - |x: i32| x as i16, - )), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter( - primitive::DictIter::::new(iter, data_type, chunk_size, |x: i32| { - x as i32 - }), + |x: f64| x, ), + Utf8 | Binary => { + binary::iter_to_dict_arrays_nested::(iter, init, data_type, chunk_size) + } + LargeUtf8 | LargeBinary => { + binary::iter_to_dict_arrays_nested::(iter, init, data_type, chunk_size) + } + FixedSizeBinary(_) => { + fixed_size_binary::iter_to_dict_arrays_nested::(iter, init, data_type, chunk_size) + } + /* Timestamp(time_unit, _) => { let time_unit = *time_unit; @@ -490,32 +516,6 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( time_unit, ); } - - Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter( - primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), - ), - Float32 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: f32| x, - )), - Float64 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: f64| x, - )), - - Utf8 | Binary => dyn_iter(binary::DictIter::::new( - iter, data_type, chunk_size, - )), - LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( - iter, data_type, chunk_size, - )), - FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( - iter, data_type, chunk_size, - )), */ other => { return Err(Error::nyi(format!( diff --git a/src/io/parquet/read/deserialize/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs index bc0a5f43e8f..a0e0d9e0633 100644 --- a/src/io/parquet/read/deserialize/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -25,6 +25,10 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { + let data_type = match data_type { + DataType::Dictionary(_, values, _) => *values, + _ => data_type, + }; let dict = dict .as_any() .downcast_ref::>() @@ -144,10 +148,6 @@ where chunk_size: Option, op: F, ) -> Self { - let data_type = match data_type { - DataType::Dictionary(_, values, _) => *values, - _ => data_type, - }; Self { iter, init, diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 198e1d2156c..55f5d2ef247 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -14,7 +14,7 @@ pub fn array_to_page( array: &BinaryArray, options: WriteOptions, type_: PrimitiveType, - nested: Vec, + nested: &[Nested], ) -> Result where O: Offset, @@ -23,7 +23,7 @@ where let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer); @@ -35,7 +35,7 @@ where utils::build_plain_page( buffer, - nested::num_values(&nested), + nested::num_values(nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 4bd741ab52a..9d9e49100f6 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -14,13 +14,13 @@ pub fn array_to_page( array: &BooleanArray, options: WriteOptions, type_: PrimitiveType, - nested: Vec, + nested: &[Nested], ) -> Result { let is_optional = is_nullable(&type_.field_info); let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer)?; @@ -32,7 +32,7 @@ pub fn array_to_page( utils::build_plain_page( buffer, - nested::num_values(&nested), + nested::num_values(nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index f6a9bcabbb6..befd8a5447c 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -6,8 +6,17 @@ use parquet2::{ write::DynIter, }; -use super::binary::build_statistics as binary_build_statistics; -use super::binary::encode_plain as binary_encode_plain; +use crate::io::parquet::write::utils; +use crate::{ + array::{Array, DictionaryArray, DictionaryKey}, + io::parquet::read::schema::is_nullable, +}; +use crate::{bitmap::Bitmap, datatypes::DataType}; +use crate::{ + bitmap::MutableBitmap, + error::{Error, Result}, +}; + use super::fixed_len_bytes::build_statistics as fixed_binary_build_statistics; use super::fixed_len_bytes::encode_plain as fixed_binary_encode_plain; use super::primitive::build_statistics as primitive_build_statistics; @@ -15,94 +24,118 @@ use super::primitive::encode_plain as primitive_encode_plain; use super::utf8::build_statistics as utf8_build_statistics; use super::utf8::encode_plain as utf8_encode_plain; use super::WriteOptions; -use crate::bitmap::Bitmap; -use crate::datatypes::DataType; -use crate::error::{Error, Result}; -use crate::io::parquet::write::utils; -use crate::{ - array::{Array, DictionaryArray, DictionaryKey}, - io::parquet::read::schema::is_nullable, -}; +use super::{binary::build_statistics as binary_build_statistics, Nested}; +use super::{binary::encode_plain as binary_encode_plain, nested}; -fn encode_keys( - array: &DictionaryArray, - type_: PrimitiveType, - statistics: ParquetStatistics, +fn serialize_def_levels_simple( + validity: Option<&Bitmap>, + length: usize, + is_optional: bool, options: WriteOptions, -) -> Result { - let validity = array.values().validity(); - let is_optional = is_nullable(&type_.field_info); - - let mut buffer = vec![]; - - let null_count = if let Some(validity) = validity { - let projected_validity = array - .keys_iter() - .map(|x| x.map(|x| validity.get_bit(x)).unwrap_or(false)); - let projected_val = Bitmap::from_trusted_len_iter(projected_validity); - - let null_count = projected_val.unset_bits(); - - utils::write_def_levels( - &mut buffer, - is_optional, - Some(&projected_val), - array.len(), - options.version, - )?; - null_count - } else { - utils::write_def_levels( - &mut buffer, - is_optional, - array.validity(), - array.len(), - options.version, - )?; - array.null_count() - }; - - let definition_levels_byte_length = buffer.len(); + buffer: &mut Vec, +) -> Result<()> { + utils::write_def_levels(buffer, is_optional, validity, length, options.version) +} - // encode indices - // compute the required number of bits +fn serialize_keys_values( + array: &DictionaryArray, + validity: Option<&Bitmap>, + buffer: &mut Vec, +) -> Result<()> { + let keys = array.keys_values_iter().map(|x| x as u32); if let Some(validity) = validity { - let keys = array.keys_iter().flatten().filter_map(|index| { - // discard indices whose values are null, since they are part of the def levels. - if validity.get_bit(index) { - Some(index as u32) - } else { - None - } - }); + // discard indices whose values are null. + let keys = keys + .zip(validity.iter()) + .filter_map(|(key, is_valid)| is_valid.then(|| key)); let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8; - let keys = utils::ExactSizedIter::new(keys, array.len() - null_count); + let keys = utils::ExactSizedIter::new(keys, array.len() - validity.unset_bits()); // num_bits as a single byte buffer.push(num_bits); // followed by the encoded indices. - encode_u32(&mut buffer, keys, num_bits)?; + Ok(encode_u32(buffer, keys, num_bits)?) } else { - let keys = array.keys_iter().flatten().map(|x| x as u32); let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8; - let keys = utils::ExactSizedIter::new(keys, array.len() - array.null_count()); - // num_bits as a single byte buffer.push(num_bits); // followed by the encoded indices. - encode_u32(&mut buffer, keys, num_bits)?; + Ok(encode_u32(buffer, keys, num_bits)?) } +} + +fn serialize_levels( + validity: Option<&Bitmap>, + length: usize, + type_: &PrimitiveType, + nested: &[Nested], + options: WriteOptions, + buffer: &mut Vec, +) -> Result<(usize, usize)> { + if nested.len() == 1 { + let is_optional = is_nullable(&type_.field_info); + serialize_def_levels_simple(validity, length, is_optional, options, buffer)?; + let definition_levels_byte_length = buffer.len(); + Ok((0, definition_levels_byte_length)) + } else { + nested::write_rep_and_def(options.version, nested, buffer) + } +} + +fn normalized_validity(array: &DictionaryArray) -> Option { + match (array.keys().validity(), array.values().validity()) { + (None, None) => None, + (None, rhs) => rhs.cloned(), + (lhs, None) => lhs.cloned(), + (Some(_), Some(rhs)) => { + let projected_validity = array + .keys_iter() + .map(|x| x.map(|x| rhs.get_bit(x)).unwrap_or(false)); + MutableBitmap::from_trusted_len_iter(projected_validity).into() + } + } +} + +fn serialize_keys( + array: &DictionaryArray, + type_: PrimitiveType, + nested: &[Nested], + statistics: ParquetStatistics, + options: WriteOptions, +) -> Result { + let mut buffer = vec![]; + + // parquet only accepts a single validity - we "&" the validities into a single one + // and ignore keys whole _value_ is null. + let validity = normalized_validity(array); + + let (repetition_levels_byte_length, definition_levels_byte_length) = serialize_levels( + validity.as_ref(), + array.len(), + &type_, + nested, + options, + &mut buffer, + )?; + + serialize_keys_values(array, validity.as_ref(), &mut buffer)?; + + let (num_values, num_rows) = if nested.len() == 1 { + (array.len(), array.len()) + } else { + (nested::num_values(nested), nested[0].len()) + }; utils::build_plain_page( buffer, - array.len(), - array.len(), + num_values, + num_rows, array.null_count(), - 0, + repetition_levels_byte_length, definition_levels_byte_length, Some(statistics), type_, @@ -127,6 +160,7 @@ macro_rules! dyn_prim { pub fn array_to_pages( array: &DictionaryArray, type_: PrimitiveType, + nested: &[Nested], options: WriteOptions, encoding: Encoding, ) -> Result>> { @@ -200,7 +234,7 @@ pub fn array_to_pages( let dict_page = EncodedPage::Dict(dict_page); // write DataPage pointing to DictPage - let data_page = encode_keys(array, type_, statistics, options)?; + let data_page = serialize_keys(array, type_, nested, statistics, options)?; let iter = std::iter::once(Ok(dict_page)).chain(std::iter::once(Ok(data_page))); Ok(DynIter::new(Box::new(iter))) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 4fea3d9c4aa..9f16ea9fb01 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -95,7 +95,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { pub fn array_to_pages( array: &dyn Array, type_: ParquetPrimitiveType, - nested: Vec, + nested: &[Nested], options: WriteOptions, encoding: Encoding, ) -> Result>> { @@ -110,7 +110,7 @@ pub fn array_to_pages( let right = array.slice(split_at, array.len() - split_at); Ok(DynIter::new( - array_to_pages(&*left, type_.clone(), nested.clone(), options, encoding)? + array_to_pages(&*left, type_.clone(), nested, options, encoding)? .chain(array_to_pages(&*right, type_, nested, options, encoding)?), )) } else { @@ -120,6 +120,7 @@ pub fn array_to_pages( dictionary::array_to_pages::<$T>( array.as_any().downcast_ref().unwrap(), type_, + nested, options, encoding, ) @@ -135,7 +136,7 @@ pub fn array_to_pages( pub fn array_to_page( array: &dyn Array, type_: ParquetPrimitiveType, - nested: Vec, + nested: &[Nested], options: WriteOptions, encoding: Encoding, ) -> Result { @@ -373,7 +374,7 @@ pub fn array_to_page_simple( fn array_to_page_nested( array: &dyn Array, type_: ParquetPrimitiveType, - nested: Vec, + nested: &[Nested], options: WriteOptions, _encoding: Encoding, ) -> Result { diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index 64f6da9a0f6..0e8dcf3d69d 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -215,7 +215,7 @@ pub fn array_to_columns + Send + Sync>( .zip(types.into_iter()) .zip(encoding.iter()) .map(|(((values, nested), type_), encoding)| { - array_to_pages(*values, type_, nested, options, *encoding) + array_to_pages(*values, type_, &nested, options, *encoding) }) .collect() } diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 9bd13184ca4..ffbfde6554c 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -18,7 +18,7 @@ pub fn array_to_page( array: &PrimitiveArray, options: WriteOptions, type_: PrimitiveType, - nested: Vec, + nested: &[Nested], ) -> Result where T: ArrowNativeType, @@ -29,7 +29,7 @@ where let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer); @@ -44,7 +44,7 @@ where utils::build_plain_page( buffer, - nested::num_values(&nested), + nested::num_values(nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 7a6e0ee05be..42babd46cd7 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -14,7 +14,7 @@ pub fn array_to_page( array: &Utf8Array, options: WriteOptions, type_: PrimitiveType, - nested: Vec, + nested: &[Nested], ) -> Result where O: Offset, @@ -23,7 +23,7 @@ where let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer); @@ -35,7 +35,7 @@ where utils::build_plain_page( buffer, - nested::num_values(&nested), + nested::num_values(nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 2eab739a4ad..369d69d73a3 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1441,9 +1441,9 @@ fn nested_dict() -> Result<()> { DataType::List(Box::new(Field::new( "item", floats.data_type().clone(), - true, + false, ))), - vec![0i32, 0, 2, 3, 3].into(), + vec![0i32, 0, 0, 2, 3].into(), floats.boxed(), Some([true, false, true, true].into()), )?; diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index ee91ba846a5..b5fb2ba9c19 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -41,7 +41,7 @@ fn pages( .descriptor .primitive_type .clone(), - vec![Nested::Primitive(None, true, array.len())], + &[Nested::Primitive(None, true, array.len())], options, Encoding::Plain, ) @@ -57,7 +57,7 @@ fn pages( .descriptor .primitive_type .clone(), - vec![Nested::Primitive(None, true, array.len())], + &[Nested::Primitive(None, true, array.len())], options, encoding, )