diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 950ea4190ca..08c21d616f9 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, BinaryArray}, error::Result, @@ -22,14 +22,20 @@ where { let is_optional = is_nullable(&type_.field_info); + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; - encode_plain(array, is_optional, &mut buffer); + let array = array.slice(start, len); + encode_plain(&array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone())) + Some(build_statistics(&array, type_.clone())) } else { None }; diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 9d9e49100f6..2b8d430ac4d 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, BooleanArray}, error::Result, @@ -18,14 +18,19 @@ pub fn array_to_page( ) -> Result { let is_optional = is_nullable(&type_.field_info); + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer)?; - - encode_plain(array, is_optional, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; + let array = array.slice(start, len); + encode_plain(&array, is_optional, &mut buffer)?; let statistics = if options.write_statistics { - Some(build_statistics(array)) + Some(build_statistics(&array)) } else { None }; diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 9f20d4692de..01a5f8412fa 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -6,7 +6,7 @@ use parquet2::{ write::DynIter, }; -use crate::io::parquet::write::utils; +use crate::io::parquet::write::{slice_nested_leaf, utils}; use crate::{ array::{Array, DictionaryArray, DictionaryKey}, io::parquet::read::schema::is_nullable, @@ -75,6 +75,7 @@ fn serialize_levels( nested: &[Nested], options: WriteOptions, buffer: &mut Vec, + offset: usize, ) -> Result<(usize, usize)> { if nested.len() == 1 { let is_optional = is_nullable(&type_.field_info); @@ -82,7 +83,7 @@ fn serialize_levels( let definition_levels_byte_length = buffer.len(); Ok((0, definition_levels_byte_length)) } else { - nested::write_rep_and_def(options.version, nested, buffer) + nested::write_rep_and_def(options.version, nested, buffer, offset) } } @@ -112,6 +113,7 @@ fn serialize_keys( // parquet only accepts a single validity - we "&" the validities into a single one // and ignore keys whole _value_ is null. let validity = normalized_validity(array); + let (start, len) = slice_nested_leaf(nested); let (repetition_levels_byte_length, definition_levels_byte_length) = serialize_levels( validity.as_ref(), @@ -120,9 +122,11 @@ fn serialize_keys( nested, options, &mut buffer, + start, )?; + let array = array.slice(start, len); - serialize_keys_values(array, validity.as_ref(), &mut buffer)?; + serialize_keys_values(&array, validity.as_ref(), &mut buffer)?; let (num_values, num_rows) = if nested.len() == 1 { (array.len(), array.len()) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 84b4f1cab58..54815ccbd3e 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -69,6 +69,30 @@ pub use sink::FileSink; pub use pages::array_to_columns; pub use pages::Nested; +/// returns offset and length to slice the leaf values +pub(self) fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) { + // find the deepest recursive dremel structure as that one determines how many values we must + // take + let mut out = (0, 0); + for nested in nested.iter().rev() { + match nested { + Nested::LargeList(l_nested) => { + let start = *l_nested.offsets.first().unwrap(); + let end = *l_nested.offsets.last().unwrap(); + return (start as usize, (end - start) as usize); + } + Nested::List(l_nested) => { + let start = *l_nested.offsets.first().unwrap(); + let end = *l_nested.offsets.last().unwrap(); + return (start as usize, (end - start) as usize); + } + Nested::Primitive(_, _, len) => out = (0, *len), + _ => {} + } + } + out +} + pub(self) fn decimal_length_from_precision(precision: usize) -> usize { // digits = floor(log_10(2^(8*n - 1) - 1)) // ceil(digits) = log10(2^(8*n - 1) - 1) @@ -130,6 +154,52 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { ) } +fn slice_parquet_array<'a>( + array: &'a dyn Array, + nested: &'a [Nested<'a>], + offset: usize, + length: usize, +) -> (Box, Vec>) { + let mut nested = nested.to_vec(); + + let mut is_nested = false; + for nested in nested.iter_mut() { + match nested { + Nested::LargeList(l_nested) => { + is_nested = true; + // the slice is a bit awkward because we always want the latest value to compute the next length; + l_nested.offsets = &l_nested.offsets + [offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())]; + } + Nested::List(l_nested) => { + is_nested = true; + l_nested.offsets = &l_nested.offsets + [offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())]; + } + _ => {} + } + } + if is_nested { + (array.to_boxed(), nested) + } else { + (array.slice(offset, length), nested) + } +} + +fn get_max_length(array: &dyn Array, nested: &[Nested]) -> usize { + // get the length that should be sliced. + // that is the inner nested structure that + // dictates how often the primitive should be repeated + for nested in nested.iter().rev() { + match nested { + Nested::LargeList(l_nested) => return l_nested.offsets.len() - 1, + Nested::List(l_nested) => return l_nested.offsets.len() - 1, + _ => {} + } + } + array.len() +} + /// Returns an iterator of [`Page`]. #[allow(clippy::needless_collect)] pub fn array_to_pages( @@ -147,13 +217,27 @@ pub fn array_to_pages( let array_byte_size = estimated_bytes_size(array); if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 { - let split_at = array.len() / 2; - let left = array.slice(0, split_at); - let right = array.slice(split_at, array.len() - split_at); + let length = get_max_length(array, nested); + let split_at = length / 2; + let (sub_array_left, subnested_left) = slice_parquet_array(array, nested, 0, split_at); + let (sub_array_right, subnested_right) = + slice_parquet_array(array, nested, split_at, length - split_at); Ok(DynIter::new( - array_to_pages(&*left, type_.clone(), nested, options, encoding)? - .chain(array_to_pages(&*right, type_, nested, options, encoding)?), + array_to_pages( + sub_array_left.as_ref(), + type_.clone(), + subnested_left.as_ref(), + options, + encoding, + )? + .chain(array_to_pages( + sub_array_right.as_ref(), + type_, + subnested_right.as_ref(), + options, + encoding, + )?), )) } else { match array.data_type() { @@ -175,17 +259,25 @@ pub fn array_to_pages( ((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize; let rows_per_page = (page_size / (bytes_per_row + 1)).max(1); - let vs: Vec> = (0..array.len()) + let length = get_max_length(array, nested); + let vs: Vec> = (0..length) .step_by(rows_per_page) .map(|offset| { - let length = if offset + rows_per_page > array.len() { - array.len() - offset + let length = if offset + rows_per_page > length { + length - offset } else { rows_per_page }; - let sub_array = array.slice(offset, length); - array_to_page(sub_array.as_ref(), type_.clone(), nested, options, encoding) + let (sub_array, subnested) = + slice_parquet_array(array, nested, offset, length); + array_to_page( + sub_array.as_ref(), + type_.clone(), + &subnested, + options, + encoding, + ) }) .collect(); diff --git a/src/io/parquet/write/nested/def.rs b/src/io/parquet/write/nested/def.rs index eb945a4c683..4e82ad88af3 100644 --- a/src/io/parquet/write/nested/def.rs +++ b/src/io/parquet/write/nested/def.rs @@ -86,10 +86,13 @@ pub struct DefLevelsIter<'a> { } impl<'a> DefLevelsIter<'a> { - pub fn new(nested: &'a [Nested]) -> Self { + pub fn new(nested: &'a [Nested], offset: usize) -> Self { let remaining_values = num_values(nested); - let primitive_validity = iter(&nested[nested.len() - 1..]).pop().unwrap(); + let mut primitive_validity = iter(&nested[nested.len() - 1..]).pop().unwrap(); + if offset > 0 { + primitive_validity.nth(offset - 1); + } let iter = iter(&nested[..nested.len() - 1]); let remaining = std::iter::repeat(0).take(iter.len()).collect(); @@ -164,7 +167,7 @@ mod tests { use super::*; fn test(nested: Vec, expected: Vec) { - let mut iter = DefLevelsIter::new(&nested); + let mut iter = DefLevelsIter::new(&nested, 0); assert_eq!(iter.size_hint().0, expected.len()); let result = iter.by_ref().collect::>(); assert_eq!(result, expected); diff --git a/src/io/parquet/write/nested/mod.rs b/src/io/parquet/write/nested/mod.rs index 5f6cd4d3524..bd6747a1530 100644 --- a/src/io/parquet/write/nested/mod.rs +++ b/src/io/parquet/write/nested/mod.rs @@ -53,14 +53,19 @@ fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - } /// writes the rep levels to a `Vec`. -fn write_def_levels(buffer: &mut Vec, nested: &[Nested], version: Version) -> Result<()> { +fn write_def_levels( + buffer: &mut Vec, + nested: &[Nested], + version: Version, + offset: usize, +) -> Result<()> { let max_level = max_def_level(nested) as i16; if max_level == 0 { return Ok(()); } let num_bits = get_bit_width(max_level); - let levels = def::DefLevelsIter::new(nested); + let levels = def::DefLevelsIter::new(nested, offset); match version { Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { @@ -105,11 +110,14 @@ pub fn write_rep_and_def( page_version: Version, nested: &[Nested], buffer: &mut Vec, + // needed to take offset the validity iterator in case + // the list was sliced. + offset: usize, ) -> Result<(usize, usize)> { write_rep_levels(buffer, nested, page_version)?; let repetition_levels_byte_length = buffer.len(); - write_def_levels(buffer, nested, page_version)?; + write_def_levels(buffer, nested, page_version, offset)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; Ok((repetition_levels_byte_length, definition_levels_byte_length)) diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index e3e1eec410f..b83335aa570 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -1,5 +1,6 @@ use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType}; use parquet2::{page::Page, write::DynIter}; +use std::fmt::Debug; use crate::array::{ListArray, StructArray}; use crate::bitmap::Bitmap; @@ -34,6 +35,7 @@ impl<'a, O: Offset> ListNested<'a, O> { #[derive(Debug, Clone, PartialEq)] pub enum Nested<'a> { /// a primitive (leaf or parquet column) + /// bitmap, _, length Primitive(Option<&'a Bitmap>, bool, usize), /// a list List(ListNested<'a, i32>), @@ -147,13 +149,13 @@ fn to_nested_recursive<'a>( Ok(()) } -fn to_leafs(array: &dyn Array) -> Vec<&dyn Array> { - let mut leafs = vec![]; - to_leafs_recursive(array, &mut leafs); - leafs +fn to_leaves(array: &dyn Array) -> Vec<&dyn Array> { + let mut leaves = vec![]; + to_leaves_recursive(array, &mut leaves); + leaves } -fn to_leafs_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) { +fn to_leaves_recursive<'a>(array: &'a dyn Array, leaves: &mut Vec<&'a dyn Array>) { use PhysicalType::*; match array.data_type().to_physical_type() { Struct => { @@ -161,35 +163,35 @@ fn to_leafs_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) array .values() .iter() - .for_each(|a| to_leafs_recursive(a.as_ref(), leafs)); + .for_each(|a| to_leaves_recursive(a.as_ref(), leaves)); } List => { let array = array.as_any().downcast_ref::>().unwrap(); - to_leafs_recursive(array.values().as_ref(), leafs); + to_leaves_recursive(array.values().as_ref(), leaves); } LargeList => { let array = array.as_any().downcast_ref::>().unwrap(); - to_leafs_recursive(array.values().as_ref(), leafs); + to_leaves_recursive(array.values().as_ref(), leaves); } Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 - | LargeUtf8 | Dictionary(_) => leafs.push(array), + | LargeUtf8 | Dictionary(_) => leaves.push(array), other => todo!("Writing {:?} to parquet not yet implemented", other), } } -fn to_parquet_leafs(type_: ParquetType) -> Vec { - let mut leafs = vec![]; - to_parquet_leafs_recursive(type_, &mut leafs); - leafs +fn to_parquet_leaves(type_: ParquetType) -> Vec { + let mut leaves = vec![]; + to_parquet_leaves_recursive(type_, &mut leaves); + leaves } -fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec) { +fn to_parquet_leaves_recursive(type_: ParquetType, leaves: &mut Vec) { match type_ { - ParquetType::PrimitiveType(primitive) => leafs.push(primitive), + ParquetType::PrimitiveType(primitive) => leaves.push(primitive), ParquetType::GroupType { fields, .. } => { fields .into_iter() - .for_each(|type_| to_parquet_leafs_recursive(type_, leafs)); + .for_each(|type_| to_parquet_leaves_recursive(type_, leaves)); } } } @@ -204,9 +206,9 @@ pub fn array_to_columns + Send + Sync>( let array = array.as_ref(); let nested = to_nested(array, &type_)?; - let types = to_parquet_leafs(type_); + let types = to_parquet_leaves(type_); - let values = to_leafs(array); + let values = to_leaves(array); assert_eq!(encoding.len(), types.len()); diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index e2909b02ceb..1a9a8cf7d75 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -7,7 +7,7 @@ use super::super::utils; use super::super::WriteOptions; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, PrimitiveArray}, error::Result, @@ -28,14 +28,21 @@ where let is_optional = is_nullable(&type_.field_info); let mut buffer = vec![]; + + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; - let buffer = encode_plain(array, is_optional, buffer); + let array = array.slice(start, len); + let buffer = encode_plain(&array, is_optional, buffer); let statistics = if options.write_statistics { Some(serialize_statistics(&build_statistics( - array, + &array, type_.clone(), ))) } else { diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 2792ef35712..7e8d018c957 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, Utf8Array}, error::Result, @@ -21,15 +21,20 @@ where O: Offset, { let is_optional = is_nullable(&type_.field_info); + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; - encode_plain(array, is_optional, &mut buffer); + let array = array.slice(start, len); + encode_plain(&array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone())) + Some(build_statistics(&array, type_.clone())) } else { None }; diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 365ad9e3fd6..ef5a18888d2 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -11,6 +11,17 @@ fn round_trip( version: Version, compression: CompressionOptions, encodings: Vec, +) -> Result<()> { + round_trip_opt_stats(column, file, version, compression, encodings, true) +} + +fn round_trip_opt_stats( + column: &str, + file: &str, + version: Version, + compression: CompressionOptions, + encodings: Vec, + check_stats: bool, ) -> Result<()> { let (array, statistics) = match file { "nested" => ( @@ -56,7 +67,9 @@ fn round_trip( let (result, stats) = read_column(&mut Cursor::new(data), "a1")?; assert_eq!(array.as_ref(), result.as_ref()); - assert_eq!(statistics, stats); + if check_stats { + assert_eq!(statistics, stats); + } Ok(()) } @@ -361,6 +374,18 @@ fn list_large_binary_optional_v1() -> Result<()> { ) } +#[test] +fn list_nested_inner_required_required_i64() -> Result<()> { + round_trip_opt_stats( + "list_nested_inner_required_required_i64", + "nested", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + false, + ) +} + #[test] fn utf8_optional_v2_delta() -> Result<()> { round_trip(