From 289e66429e027fe04daf399243e5c168021fb7f0 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 11 Dec 2022 16:39:38 +0100 Subject: [PATCH 1/5] fix/perf writing nested/sliced arrays to parquet --- src/io/parquet/write/binary/nested.rs | 11 ++- src/io/parquet/write/boolean/nested.rs | 11 ++- src/io/parquet/write/mod.rs | 118 +++++++++++++++++++++-- src/io/parquet/write/pages.rs | 16 +-- src/io/parquet/write/primitive/nested.rs | 11 ++- src/io/parquet/write/utf8/nested.rs | 11 ++- 6 files changed, 149 insertions(+), 29 deletions(-) diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 950ea4190ca..ccb47775290 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, BinaryArray}, error::Result, @@ -26,10 +26,15 @@ where let (repetition_levels_byte_length, definition_levels_byte_length) = nested::write_rep_and_def(options.version, nested, &mut buffer)?; - encode_plain(array, is_optional, &mut buffer); + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let array = array.slice(start, len); + encode_plain(&array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone())) + Some(build_statistics(&array, type_.clone())) } else { None }; diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 9d9e49100f6..a65ebf8534b 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, BooleanArray}, error::Result, @@ -22,10 +22,15 @@ pub fn array_to_page( let (repetition_levels_byte_length, definition_levels_byte_length) = nested::write_rep_and_def(options.version, nested, &mut buffer)?; - encode_plain(array, is_optional, &mut buffer)?; + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let array = array.slice(start, len); + encode_plain(&array, is_optional, &mut buffer)?; let statistics = if options.write_statistics { - Some(build_statistics(array)) + Some(build_statistics(&array)) } else { None }; diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 84b4f1cab58..681886c374c 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -69,6 +69,30 @@ pub use sink::FileSink; pub use pages::array_to_columns; pub use pages::Nested; +/// returns offset and length to slice the leaf values +pub(self) fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) { + // find the deepest recursive dremel structure as that one determines how many values we must + // take + let mut out = (0, 0); + for nested in nested.iter().rev() { + match nested { + Nested::LargeList(l_nested) => { + let start = *l_nested.offsets.first().unwrap(); + let end = *l_nested.offsets.last().unwrap(); + return (start as usize, (end - start) as usize); + } + Nested::List(l_nested) => { + let start = *l_nested.offsets.first().unwrap(); + let end = *l_nested.offsets.last().unwrap(); + return (start as usize, (end - start) as usize); + } + Nested::Primitive(_, _, len) => out = (0, *len), + _ => {} + } + } + out +} + pub(self) fn decimal_length_from_precision(precision: usize) -> usize { // digits = floor(log_10(2^(8*n - 1) - 1)) // ceil(digits) = log10(2^(8*n - 1) - 1) @@ -130,6 +154,58 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { ) } +fn slice_parquet_array<'a>( + array: &'a dyn Array, + nested: &'a [Nested<'a>], + offset: usize, + length: usize, +) -> (Box, Vec>) { + let mut nested = nested.to_vec(); + + let mut is_nested = false; + for nested in nested.iter_mut() { + match nested { + Nested::LargeList(l_nested) => { + is_nested = true; + // the slice is a bit awkward because we always want the latest value to compute the next length; + l_nested.offsets = &l_nested.offsets + [offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())]; + } + Nested::List(l_nested) => { + is_nested = true; + l_nested.offsets = &l_nested.offsets + [offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())]; + } + _ => {} + } + } + if is_nested { + (array.to_boxed(), nested) + } else { + (array.slice(offset, length), nested) + } +} + +fn get_max_length(array: &dyn Array, nested: &[Nested]) -> usize { + let mut sum = 0; + for nested in nested.iter() { + match nested { + Nested::LargeList(l_nested) => { + sum += l_nested.offsets.len() - 1; + } + Nested::List(l_nested) => { + sum += l_nested.offsets.len() - 1; + } + _ => {} + } + } + if sum > 0 { + sum + } else { + array.len() + } +} + /// Returns an iterator of [`Page`]. #[allow(clippy::needless_collect)] pub fn array_to_pages( @@ -147,13 +223,27 @@ pub fn array_to_pages( let array_byte_size = estimated_bytes_size(array); if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 { - let split_at = array.len() / 2; - let left = array.slice(0, split_at); - let right = array.slice(split_at, array.len() - split_at); + let length = get_max_length(array, nested); + let split_at = length / 2; + let (sub_array_left, subnested_left) = slice_parquet_array(array, nested, 0, split_at); + let (sub_array_right, subnested_right) = + slice_parquet_array(array, nested, split_at, length - split_at); Ok(DynIter::new( - array_to_pages(&*left, type_.clone(), nested, options, encoding)? - .chain(array_to_pages(&*right, type_, nested, options, encoding)?), + array_to_pages( + sub_array_left.as_ref(), + type_.clone(), + subnested_left.as_ref(), + options, + encoding, + )? + .chain(array_to_pages( + sub_array_right.as_ref(), + type_, + subnested_right.as_ref(), + options, + encoding, + )?), )) } else { match array.data_type() { @@ -175,17 +265,25 @@ pub fn array_to_pages( ((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize; let rows_per_page = (page_size / (bytes_per_row + 1)).max(1); - let vs: Vec> = (0..array.len()) + let length = get_max_length(array, nested); + let vs: Vec> = (0..length) .step_by(rows_per_page) .map(|offset| { - let length = if offset + rows_per_page > array.len() { - array.len() - offset + let length = if offset + rows_per_page > length { + length - offset } else { rows_per_page }; - let sub_array = array.slice(offset, length); - array_to_page(sub_array.as_ref(), type_.clone(), nested, options, encoding) + let (sub_array, subnested) = + slice_parquet_array(array, nested, offset, length); + array_to_page( + sub_array.as_ref(), + type_.clone(), + &subnested, + options, + encoding, + ) }) .collect(); diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index e3e1eec410f..ba292ae82f1 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -1,5 +1,6 @@ use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType}; use parquet2::{page::Page, write::DynIter}; +use std::fmt::Debug; use crate::array::{ListArray, StructArray}; use crate::bitmap::Bitmap; @@ -34,6 +35,7 @@ impl<'a, O: Offset> ListNested<'a, O> { #[derive(Debug, Clone, PartialEq)] pub enum Nested<'a> { /// a primitive (leaf or parquet column) + /// bitmap, _, length Primitive(Option<&'a Bitmap>, bool, usize), /// a list List(ListNested<'a, i32>), @@ -147,13 +149,13 @@ fn to_nested_recursive<'a>( Ok(()) } -fn to_leafs(array: &dyn Array) -> Vec<&dyn Array> { +fn to_leaves(array: &dyn Array) -> Vec<&dyn Array> { let mut leafs = vec![]; - to_leafs_recursive(array, &mut leafs); + to_leaves_recursive(array, &mut leafs); leafs } -fn to_leafs_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) { +fn to_leaves_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) { use PhysicalType::*; match array.data_type().to_physical_type() { Struct => { @@ -161,15 +163,15 @@ fn to_leafs_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) array .values() .iter() - .for_each(|a| to_leafs_recursive(a.as_ref(), leafs)); + .for_each(|a| to_leaves_recursive(a.as_ref(), leafs)); } List => { let array = array.as_any().downcast_ref::>().unwrap(); - to_leafs_recursive(array.values().as_ref(), leafs); + to_leaves_recursive(array.values().as_ref(), leafs); } LargeList => { let array = array.as_any().downcast_ref::>().unwrap(); - to_leafs_recursive(array.values().as_ref(), leafs); + to_leaves_recursive(array.values().as_ref(), leafs); } Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 | LargeUtf8 | Dictionary(_) => leafs.push(array), @@ -206,7 +208,7 @@ pub fn array_to_columns + Send + Sync>( let types = to_parquet_leafs(type_); - let values = to_leafs(array); + let values = to_leaves(array); assert_eq!(encoding.len(), types.len()); diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index e2909b02ceb..74248851787 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -7,7 +7,7 @@ use super::super::utils; use super::super::WriteOptions; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, PrimitiveArray}, error::Result, @@ -31,11 +31,16 @@ where let (repetition_levels_byte_length, definition_levels_byte_length) = nested::write_rep_and_def(options.version, nested, &mut buffer)?; - let buffer = encode_plain(array, is_optional, buffer); + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let array = array.slice(start, len); + let buffer = encode_plain(&array, is_optional, buffer); let statistics = if options.write_statistics { Some(serialize_statistics(&build_statistics( - array, + &array, type_.clone(), ))) } else { diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 2792ef35712..244dd148691 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, Utf8Array}, error::Result, @@ -26,10 +26,15 @@ where let (repetition_levels_byte_length, definition_levels_byte_length) = nested::write_rep_and_def(options.version, nested, &mut buffer)?; - encode_plain(array, is_optional, &mut buffer); + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let array = array.slice(start, len); + encode_plain(&array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone())) + Some(build_statistics(&array, type_.clone())) } else { None }; From 924f88d1566ef586497a57829f339ccfc6e08466 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 11 Dec 2022 17:53:02 +0100 Subject: [PATCH 2/5] fix double nestedness --- src/io/parquet/write/mod.rs | 20 +++++++------------- src/io/parquet/write/pages.rs | 32 ++++++++++++++++---------------- 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 681886c374c..54815ccbd3e 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -187,23 +187,17 @@ fn slice_parquet_array<'a>( } fn get_max_length(array: &dyn Array, nested: &[Nested]) -> usize { - let mut sum = 0; - for nested in nested.iter() { + // get the length that should be sliced. + // that is the inner nested structure that + // dictates how often the primitive should be repeated + for nested in nested.iter().rev() { match nested { - Nested::LargeList(l_nested) => { - sum += l_nested.offsets.len() - 1; - } - Nested::List(l_nested) => { - sum += l_nested.offsets.len() - 1; - } + Nested::LargeList(l_nested) => return l_nested.offsets.len() - 1, + Nested::List(l_nested) => return l_nested.offsets.len() - 1, _ => {} } } - if sum > 0 { - sum - } else { - array.len() - } + array.len() } /// Returns an iterator of [`Page`]. diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index ba292ae82f1..b83335aa570 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -150,12 +150,12 @@ fn to_nested_recursive<'a>( } fn to_leaves(array: &dyn Array) -> Vec<&dyn Array> { - let mut leafs = vec![]; - to_leaves_recursive(array, &mut leafs); - leafs + let mut leaves = vec![]; + to_leaves_recursive(array, &mut leaves); + leaves } -fn to_leaves_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) { +fn to_leaves_recursive<'a>(array: &'a dyn Array, leaves: &mut Vec<&'a dyn Array>) { use PhysicalType::*; match array.data_type().to_physical_type() { Struct => { @@ -163,35 +163,35 @@ fn to_leaves_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) array .values() .iter() - .for_each(|a| to_leaves_recursive(a.as_ref(), leafs)); + .for_each(|a| to_leaves_recursive(a.as_ref(), leaves)); } List => { let array = array.as_any().downcast_ref::>().unwrap(); - to_leaves_recursive(array.values().as_ref(), leafs); + to_leaves_recursive(array.values().as_ref(), leaves); } LargeList => { let array = array.as_any().downcast_ref::>().unwrap(); - to_leaves_recursive(array.values().as_ref(), leafs); + to_leaves_recursive(array.values().as_ref(), leaves); } Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 - | LargeUtf8 | Dictionary(_) => leafs.push(array), + | LargeUtf8 | Dictionary(_) => leaves.push(array), other => todo!("Writing {:?} to parquet not yet implemented", other), } } -fn to_parquet_leafs(type_: ParquetType) -> Vec { - let mut leafs = vec![]; - to_parquet_leafs_recursive(type_, &mut leafs); - leafs +fn to_parquet_leaves(type_: ParquetType) -> Vec { + let mut leaves = vec![]; + to_parquet_leaves_recursive(type_, &mut leaves); + leaves } -fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec) { +fn to_parquet_leaves_recursive(type_: ParquetType, leaves: &mut Vec) { match type_ { - ParquetType::PrimitiveType(primitive) => leafs.push(primitive), + ParquetType::PrimitiveType(primitive) => leaves.push(primitive), ParquetType::GroupType { fields, .. } => { fields .into_iter() - .for_each(|type_| to_parquet_leafs_recursive(type_, leafs)); + .for_each(|type_| to_parquet_leaves_recursive(type_, leaves)); } } } @@ -206,7 +206,7 @@ pub fn array_to_columns + Send + Sync>( let array = array.as_ref(); let nested = to_nested(array, &type_)?; - let types = to_parquet_leafs(type_); + let types = to_parquet_leaves(type_); let values = to_leaves(array); From be7662dc8e16ae206e73d92f4df472034f113915 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 11 Dec 2022 18:09:08 +0100 Subject: [PATCH 3/5] add test --- tests/it/io/parquet/write.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 365ad9e3fd6..3b7f1db120c 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -361,6 +361,17 @@ fn list_large_binary_optional_v1() -> Result<()> { ) } +#[test] +fn list_nested_inner_required_required_i64() -> Result<()> { + round_trip( + "list_nested_inner_required_required_i64", + "nested", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + #[test] fn utf8_optional_v2_delta() -> Result<()> { round_trip( From 8216c755ce7ad13d355fbb02adae4a89767d945e Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 11 Dec 2022 21:09:25 +0100 Subject: [PATCH 4/5] ensure the validity also has the right offset in case of sliced lists --- src/io/parquet/write/binary/nested.rs | 9 +++++---- src/io/parquet/write/boolean/nested.rs | 8 ++++---- src/io/parquet/write/dictionary.rs | 10 +++++++--- src/io/parquet/write/nested/def.rs | 9 ++++++--- src/io/parquet/write/nested/mod.rs | 14 +++++++++++--- src/io/parquet/write/primitive/nested.rs | 6 ++++-- src/io/parquet/write/utf8/nested.rs | 10 +++++----- 7 files changed, 42 insertions(+), 24 deletions(-) diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index ccb47775290..08c21d616f9 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -22,14 +22,15 @@ where { let is_optional = is_nullable(&type_.field_info); - let mut buffer = vec![]; - let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer)?; - // we slice the leaf by the offsets as dremel only computes lengths and thus // does NOT take the starting offset into account. // By slicing the leaf array we also don't write too many values. let (start, len) = slice_nested_leaf(nested); + + let mut buffer = vec![]; + let (repetition_levels_byte_length, definition_levels_byte_length) = + nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; + let array = array.slice(start, len); encode_plain(&array, is_optional, &mut buffer); diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index a65ebf8534b..2b8d430ac4d 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -18,14 +18,14 @@ pub fn array_to_page( ) -> Result { let is_optional = is_nullable(&type_.field_info); - let mut buffer = vec![]; - let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer)?; - // we slice the leaf by the offsets as dremel only computes lengths and thus // does NOT take the starting offset into account. // By slicing the leaf array we also don't write too many values. let (start, len) = slice_nested_leaf(nested); + + let mut buffer = vec![]; + let (repetition_levels_byte_length, definition_levels_byte_length) = + nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; let array = array.slice(start, len); encode_plain(&array, is_optional, &mut buffer)?; diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 9f20d4692de..01a5f8412fa 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -6,7 +6,7 @@ use parquet2::{ write::DynIter, }; -use crate::io::parquet::write::utils; +use crate::io::parquet::write::{slice_nested_leaf, utils}; use crate::{ array::{Array, DictionaryArray, DictionaryKey}, io::parquet::read::schema::is_nullable, @@ -75,6 +75,7 @@ fn serialize_levels( nested: &[Nested], options: WriteOptions, buffer: &mut Vec, + offset: usize, ) -> Result<(usize, usize)> { if nested.len() == 1 { let is_optional = is_nullable(&type_.field_info); @@ -82,7 +83,7 @@ fn serialize_levels( let definition_levels_byte_length = buffer.len(); Ok((0, definition_levels_byte_length)) } else { - nested::write_rep_and_def(options.version, nested, buffer) + nested::write_rep_and_def(options.version, nested, buffer, offset) } } @@ -112,6 +113,7 @@ fn serialize_keys( // parquet only accepts a single validity - we "&" the validities into a single one // and ignore keys whole _value_ is null. let validity = normalized_validity(array); + let (start, len) = slice_nested_leaf(nested); let (repetition_levels_byte_length, definition_levels_byte_length) = serialize_levels( validity.as_ref(), @@ -120,9 +122,11 @@ fn serialize_keys( nested, options, &mut buffer, + start, )?; + let array = array.slice(start, len); - serialize_keys_values(array, validity.as_ref(), &mut buffer)?; + serialize_keys_values(&array, validity.as_ref(), &mut buffer)?; let (num_values, num_rows) = if nested.len() == 1 { (array.len(), array.len()) diff --git a/src/io/parquet/write/nested/def.rs b/src/io/parquet/write/nested/def.rs index eb945a4c683..4e82ad88af3 100644 --- a/src/io/parquet/write/nested/def.rs +++ b/src/io/parquet/write/nested/def.rs @@ -86,10 +86,13 @@ pub struct DefLevelsIter<'a> { } impl<'a> DefLevelsIter<'a> { - pub fn new(nested: &'a [Nested]) -> Self { + pub fn new(nested: &'a [Nested], offset: usize) -> Self { let remaining_values = num_values(nested); - let primitive_validity = iter(&nested[nested.len() - 1..]).pop().unwrap(); + let mut primitive_validity = iter(&nested[nested.len() - 1..]).pop().unwrap(); + if offset > 0 { + primitive_validity.nth(offset - 1); + } let iter = iter(&nested[..nested.len() - 1]); let remaining = std::iter::repeat(0).take(iter.len()).collect(); @@ -164,7 +167,7 @@ mod tests { use super::*; fn test(nested: Vec, expected: Vec) { - let mut iter = DefLevelsIter::new(&nested); + let mut iter = DefLevelsIter::new(&nested, 0); assert_eq!(iter.size_hint().0, expected.len()); let result = iter.by_ref().collect::>(); assert_eq!(result, expected); diff --git a/src/io/parquet/write/nested/mod.rs b/src/io/parquet/write/nested/mod.rs index 5f6cd4d3524..bd6747a1530 100644 --- a/src/io/parquet/write/nested/mod.rs +++ b/src/io/parquet/write/nested/mod.rs @@ -53,14 +53,19 @@ fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - } /// writes the rep levels to a `Vec`. -fn write_def_levels(buffer: &mut Vec, nested: &[Nested], version: Version) -> Result<()> { +fn write_def_levels( + buffer: &mut Vec, + nested: &[Nested], + version: Version, + offset: usize, +) -> Result<()> { let max_level = max_def_level(nested) as i16; if max_level == 0 { return Ok(()); } let num_bits = get_bit_width(max_level); - let levels = def::DefLevelsIter::new(nested); + let levels = def::DefLevelsIter::new(nested, offset); match version { Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { @@ -105,11 +110,14 @@ pub fn write_rep_and_def( page_version: Version, nested: &[Nested], buffer: &mut Vec, + // needed to take offset the validity iterator in case + // the list was sliced. + offset: usize, ) -> Result<(usize, usize)> { write_rep_levels(buffer, nested, page_version)?; let repetition_levels_byte_length = buffer.len(); - write_def_levels(buffer, nested, page_version)?; + write_def_levels(buffer, nested, page_version, offset)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; Ok((repetition_levels_byte_length, definition_levels_byte_length)) diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 74248851787..1a9a8cf7d75 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -28,13 +28,15 @@ where let is_optional = is_nullable(&type_.field_info); let mut buffer = vec![]; - let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer)?; // we slice the leaf by the offsets as dremel only computes lengths and thus // does NOT take the starting offset into account. // By slicing the leaf array we also don't write too many values. let (start, len) = slice_nested_leaf(nested); + + let (repetition_levels_byte_length, definition_levels_byte_length) = + nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; + let array = array.slice(start, len); let buffer = encode_plain(&array, is_optional, buffer); diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 244dd148691..7e8d018c957 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -21,15 +21,15 @@ where O: Offset, { let is_optional = is_nullable(&type_.field_info); - - let mut buffer = vec![]; - let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer)?; - // we slice the leaf by the offsets as dremel only computes lengths and thus // does NOT take the starting offset into account. // By slicing the leaf array we also don't write too many values. let (start, len) = slice_nested_leaf(nested); + + let mut buffer = vec![]; + let (repetition_levels_byte_length, definition_levels_byte_length) = + nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; + let array = array.slice(start, len); encode_plain(&array, is_optional, &mut buffer); From bbd2b3ad923c6d56c7958d9c371fdf8b4f18a7fc Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 12 Dec 2022 14:39:22 +0100 Subject: [PATCH 5/5] don't check stats --- tests/it/io/parquet/write.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 3b7f1db120c..ef5a18888d2 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -11,6 +11,17 @@ fn round_trip( version: Version, compression: CompressionOptions, encodings: Vec, +) -> Result<()> { + round_trip_opt_stats(column, file, version, compression, encodings, true) +} + +fn round_trip_opt_stats( + column: &str, + file: &str, + version: Version, + compression: CompressionOptions, + encodings: Vec, + check_stats: bool, ) -> Result<()> { let (array, statistics) = match file { "nested" => ( @@ -56,7 +67,9 @@ fn round_trip( let (result, stats) = read_column(&mut Cursor::new(data), "a1")?; assert_eq!(array.as_ref(), result.as_ref()); - assert_eq!(statistics, stats); + if check_stats { + assert_eq!(statistics, stats); + } Ok(()) } @@ -363,12 +376,13 @@ fn list_large_binary_optional_v1() -> Result<()> { #[test] fn list_nested_inner_required_required_i64() -> Result<()> { - round_trip( + round_trip_opt_stats( "list_nested_inner_required_required_i64", "nested", Version::V1, CompressionOptions::Uncompressed, vec![Encoding::Plain], + false, ) }