Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed sliced
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 10, 2023
1 parent c36bd86 commit 698f16f
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 33 deletions.
2 changes: 1 addition & 1 deletion parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ def case_benches_required(size):


# for read benchmarks
for i in range(22, 22, 2):
for i in range(10, 22, 2):
# two pages (dict)
write_pyarrow(case_benches(2**i), 1, True, False, None)
# single page
Expand Down
12 changes: 10 additions & 2 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,17 @@ where
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut nested = nested.to_vec();
let array = array.slice(start, len);
if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() {
*c = len;
} else {
unreachable!("")
}

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;

let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer);
Expand All @@ -42,7 +50,7 @@ where

utils::build_plain_page(
buffer,
nested::num_values(nested),
nested::num_values(&nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
Expand Down
14 changes: 11 additions & 3 deletions src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@ pub fn array_to_page(
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut nested = nested.to_vec();
let array = array.slice(start, len);
if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() {
*c = len;
} else {
unreachable!("")
}

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;
let array = array.slice(start, len);
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;

encode_plain(&array, is_optional, &mut buffer)?;

let statistics = if options.write_statistics {
Expand All @@ -37,7 +45,7 @@ pub fn array_to_page(

utils::build_plain_page(
buffer,
nested::num_values(nested),
nested::num_values(&nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
Expand Down
17 changes: 11 additions & 6 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,14 @@ fn serialize_levels(
nested: &[Nested],
options: WriteOptions,
buffer: &mut Vec<u8>,
offset: usize,
) -> Result<(usize, usize)> {
if nested.len() == 1 {
let is_optional = is_nullable(&type_.field_info);
serialize_def_levels_simple(validity, length, is_optional, options, buffer)?;
let definition_levels_byte_length = buffer.len();
Ok((0, definition_levels_byte_length))
} else {
nested::write_rep_and_def(options.version, nested, buffer, offset)
nested::write_rep_and_def(options.version, nested, buffer)
}
}

Expand Down Expand Up @@ -115,23 +114,29 @@ fn serialize_keys<K: DictionaryKey>(
let validity = normalized_validity(array);
let (start, len) = slice_nested_leaf(nested);

let mut nested = nested.to_vec();
let array = array.slice(start, len);
if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() {
*c = len;
} else {
unreachable!("")
}

let (repetition_levels_byte_length, definition_levels_byte_length) = serialize_levels(
validity.as_ref(),
array.len(),
&type_,
nested,
&nested,
options,
&mut buffer,
start,
)?;
let array = array.slice(start, len);

serialize_keys_values(&array, validity.as_ref(), &mut buffer)?;

let (num_values, num_rows) = if nested.len() == 1 {
(array.len(), array.len())
} else {
(nested::num_values(nested), nested[0].len())
(nested::num_values(&nested), nested[0].len())
};

utils::build_plain_page(
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/nested/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub struct DefLevelsIter<'a> {
}

impl<'a> DefLevelsIter<'a> {
pub fn new(nested: &'a [Nested], _offset: usize) -> Self {
pub fn new(nested: &'a [Nested]) -> Self {
let remaining_values = num_values(nested);

let iter = iter(nested);
Expand Down Expand Up @@ -173,7 +173,7 @@ mod tests {
use super::*;

fn test(nested: Vec<Nested>, expected: Vec<u32>) {
let mut iter = DefLevelsIter::new(&nested, 0);
let mut iter = DefLevelsIter::new(&nested);
assert_eq!(iter.size_hint().0, expected.len());
let result = iter.by_ref().collect::<Vec<_>>();
assert_eq!(result, expected);
Expand Down
14 changes: 3 additions & 11 deletions src/io/parquet/write/nested/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,14 @@ fn write_rep_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -
}

/// writes the rep levels to a `Vec<u8>`.
fn write_def_levels(
buffer: &mut Vec<u8>,
nested: &[Nested],
version: Version,
offset: usize,
) -> Result<()> {
fn write_def_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -> Result<()> {
let max_level = max_def_level(nested) as i16;
if max_level == 0 {
return Ok(());
}
let num_bits = get_bit_width(max_level);

let levels = def::DefLevelsIter::new(nested, offset);
let levels = def::DefLevelsIter::new(nested);

match version {
Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec<u8>| {
Expand Down Expand Up @@ -111,14 +106,11 @@ pub fn write_rep_and_def(
page_version: Version,
nested: &[Nested],
buffer: &mut Vec<u8>,
// needed to take offset the validity iterator in case
// the list was sliced.
offset: usize,
) -> Result<(usize, usize)> {
write_rep_levels(buffer, nested, page_version)?;
let repetition_levels_byte_length = buffer.len();

write_def_levels(buffer, nested, page_version, offset)?;
write_def_levels(buffer, nested, page_version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;

Ok((repetition_levels_byte_length, definition_levels_byte_length))
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/nested/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl<'a> Iterator for RepLevelsIter<'a> {
.zip(self.remaining.iter_mut())
.skip(self.current_level)
{
let length: usize = iter.next().unwrap_or_default();
let length: usize = iter.next()?;
*remaining = length;
if length == 0 {
break;
Expand Down Expand Up @@ -311,7 +311,7 @@ mod tests {
}),
Nested::Primitive(None, true, 9),
];
let expected = vec![0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 2, 0, 0];
let expected = vec![0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 2, 0];

test(nested, expected)
}
Expand Down
13 changes: 10 additions & 3 deletions src/io/parquet/write/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@ where
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut nested = nested.to_vec();
let array = array.slice(start, len);
if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() {
*c = len;
} else {
unreachable!("")
}

let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;

let array = array.slice(start, len);
let buffer = encode_plain(&array, is_optional, buffer);

let statistics = if options.write_statistics {
Expand All @@ -51,7 +58,7 @@ where

utils::build_plain_page(
buffer,
nested::num_values(nested),
nested::num_values(&nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
Expand Down
13 changes: 10 additions & 3 deletions src/io/parquet/write/utf8/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ where
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut nested = nested.to_vec();
let array = array.slice(start, len);
if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() {
*c = len;
} else {
unreachable!("")
}

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;

let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
Expand All @@ -41,7 +48,7 @@ where

utils::build_plain_page(
buffer,
nested::num_values(nested),
nested::num_values(&nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
Expand Down

0 comments on commit 698f16f

Please sign in to comment.