From 971d223688617d741a19033d1d43aa1b705fe664 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 10 Feb 2023 07:48:16 +0000 Subject: [PATCH] Fixed sliced --- parquet_integration/write_parquet.py | 2 +- src/io/parquet/write/binary/nested.rs | 12 ++++++++++-- src/io/parquet/write/boolean/nested.rs | 14 +++++++++++--- src/io/parquet/write/dictionary.rs | 17 +++++++++++------ src/io/parquet/write/nested/def.rs | 4 ++-- src/io/parquet/write/nested/mod.rs | 14 +++----------- src/io/parquet/write/nested/rep.rs | 6 +++--- src/io/parquet/write/primitive/nested.rs | 13 ++++++++++--- src/io/parquet/write/utf8/nested.rs | 13 ++++++++++--- 9 files changed, 61 insertions(+), 34 deletions(-) diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 279567c8643..1bb262e5439 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -472,7 +472,7 @@ def case_benches_required(size): # for read benchmarks -for i in range(22, 22, 2): +for i in range(10, 22, 2): # two pages (dict) write_pyarrow(case_benches(2**i), 1, True, False, None) # single page diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 08c21d616f9..fd2fad7cf74 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -27,9 +27,17 @@ where // By slicing the leaf array we also don't write too many values. let (start, len) = slice_nested_leaf(nested); + let mut nested = nested.to_vec(); + let array = array.slice(start, len); + if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() { + *c = len; + } else { + unreachable!("") + } + let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; + nested::write_rep_and_def(options.version, &nested, &mut buffer)?; let array = array.slice(start, len); encode_plain(&array, is_optional, &mut buffer); @@ -42,7 +50,7 @@ where utils::build_plain_page( buffer, - nested::num_values(nested), + nested::num_values(&nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 2b8d430ac4d..0e6ce9f5718 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -23,10 +23,18 @@ pub fn array_to_page( // By slicing the leaf array we also don't write too many values. let (start, len) = slice_nested_leaf(nested); + let mut nested = nested.to_vec(); + let array = array.slice(start, len); + if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() { + *c = len; + } else { + unreachable!("") + } + let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; - let array = array.slice(start, len); + nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + encode_plain(&array, is_optional, &mut buffer)?; let statistics = if options.write_statistics { @@ -37,7 +45,7 @@ pub fn array_to_page( utils::build_plain_page( buffer, - nested::num_values(nested), + nested::num_values(&nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 38f4e2113cf..8cbe6c766e6 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -75,7 +75,6 @@ fn serialize_levels( nested: &[Nested], options: WriteOptions, buffer: &mut Vec, - offset: usize, ) -> Result<(usize, usize)> { if nested.len() == 1 { let is_optional = is_nullable(&type_.field_info); @@ -83,7 +82,7 @@ fn serialize_levels( let definition_levels_byte_length = buffer.len(); Ok((0, definition_levels_byte_length)) } else { - nested::write_rep_and_def(options.version, nested, buffer, offset) + nested::write_rep_and_def(options.version, nested, buffer) } } @@ -115,23 +114,29 @@ fn serialize_keys( let validity = normalized_validity(array); let (start, len) = slice_nested_leaf(nested); + let mut nested = nested.to_vec(); + let array = array.slice(start, len); + if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() { + *c = len; + } else { + unreachable!("") + } + let (repetition_levels_byte_length, definition_levels_byte_length) = serialize_levels( validity.as_ref(), array.len(), &type_, - nested, + &nested, options, &mut buffer, - start, )?; - let array = array.slice(start, len); serialize_keys_values(&array, validity.as_ref(), &mut buffer)?; let (num_values, num_rows) = if nested.len() == 1 { (array.len(), array.len()) } else { - (nested::num_values(nested), nested[0].len()) + (nested::num_values(&nested), nested[0].len()) }; utils::build_plain_page( diff --git a/src/io/parquet/write/nested/def.rs b/src/io/parquet/write/nested/def.rs index 2cadc1a25b9..2b958cfcb9a 100644 --- a/src/io/parquet/write/nested/def.rs +++ b/src/io/parquet/write/nested/def.rs @@ -87,7 +87,7 @@ pub struct DefLevelsIter<'a> { } impl<'a> DefLevelsIter<'a> { - pub fn new(nested: &'a [Nested], _offset: usize) -> Self { + pub fn new(nested: &'a [Nested]) -> Self { let remaining_values = num_values(nested); let iter = iter(nested); @@ -173,7 +173,7 @@ mod tests { use super::*; fn test(nested: Vec, expected: Vec) { - let mut iter = DefLevelsIter::new(&nested, 0); + let mut iter = DefLevelsIter::new(&nested); assert_eq!(iter.size_hint().0, expected.len()); let result = iter.by_ref().collect::>(); assert_eq!(result, expected); diff --git a/src/io/parquet/write/nested/mod.rs b/src/io/parquet/write/nested/mod.rs index 3629f0a6220..49951ec7fb1 100644 --- a/src/io/parquet/write/nested/mod.rs +++ b/src/io/parquet/write/nested/mod.rs @@ -53,19 +53,14 @@ fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - } /// writes the rep levels to a `Vec`. -fn write_def_levels( - buffer: &mut Vec, - nested: &[Nested], - version: Version, - offset: usize, -) -> Result<()> { +fn write_def_levels(buffer: &mut Vec, nested: &[Nested], version: Version) -> Result<()> { let max_level = max_def_level(nested) as i16; if max_level == 0 { return Ok(()); } let num_bits = get_bit_width(max_level); - let levels = def::DefLevelsIter::new(nested, offset); + let levels = def::DefLevelsIter::new(nested); match version { Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { @@ -111,14 +106,11 @@ pub fn write_rep_and_def( page_version: Version, nested: &[Nested], buffer: &mut Vec, - // needed to take offset the validity iterator in case - // the list was sliced. - offset: usize, ) -> Result<(usize, usize)> { write_rep_levels(buffer, nested, page_version)?; let repetition_levels_byte_length = buffer.len(); - write_def_levels(buffer, nested, page_version, offset)?; + write_def_levels(buffer, nested, page_version)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; Ok((repetition_levels_byte_length, definition_levels_byte_length)) diff --git a/src/io/parquet/write/nested/rep.rs b/src/io/parquet/write/nested/rep.rs index 6dd8414792e..c8dc59e7fdc 100644 --- a/src/io/parquet/write/nested/rep.rs +++ b/src/io/parquet/write/nested/rep.rs @@ -94,7 +94,7 @@ impl<'a> Iterator for RepLevelsIter<'a> { .zip(self.remaining.iter_mut()) .skip(self.current_level) { - let length: usize = iter.next().unwrap_or_default(); + let length: usize = iter.next()?; *remaining = length; if length == 0 { break; @@ -309,9 +309,9 @@ mod tests { offsets: &[0, 1, 2, 3, 3, 4, 4, 4, 4, 5, 6, 8], validity: None, }), - Nested::Primitive(None, true, 9), + Nested::Primitive(None, true, 8), ]; - let expected = vec![0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 2, 0, 0]; + let expected = vec![0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 2, 0]; test(nested, expected) } diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 1a9a8cf7d75..6c7bf59f5a2 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -34,10 +34,17 @@ where // By slicing the leaf array we also don't write too many values. let (start, len) = slice_nested_leaf(nested); + let mut nested = nested.to_vec(); + let array = array.slice(start, len); + if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() { + *c = len; + } else { + unreachable!("") + } + let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; + nested::write_rep_and_def(options.version, &nested, &mut buffer)?; - let array = array.slice(start, len); let buffer = encode_plain(&array, is_optional, buffer); let statistics = if options.write_statistics { @@ -51,7 +58,7 @@ where utils::build_plain_page( buffer, - nested::num_values(nested), + nested::num_values(&nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 7e8d018c957..1054de6e311 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -26,11 +26,18 @@ where // By slicing the leaf array we also don't write too many values. let (start, len) = slice_nested_leaf(nested); + let mut nested = nested.to_vec(); + let array = array.slice(start, len); + if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() { + *c = len; + } else { + unreachable!("") + } + let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, nested, &mut buffer, start)?; + nested::write_rep_and_def(options.version, &nested, &mut buffer)?; - let array = array.slice(start, len); encode_plain(&array, is_optional, &mut buffer); let statistics = if options.write_statistics { @@ -41,7 +48,7 @@ where utils::build_plain_page( buffer, - nested::num_values(nested), + nested::num_values(&nested), nested[0].len(), array.null_count(), repetition_levels_byte_length,