Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
ensure the validity also has the right offset in case of sliced lists
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 12, 2022
1 parent be7662d commit 8216c75
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 24 deletions.
9 changes: 5 additions & 4 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ where
{
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;

let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer);

Expand Down
8 changes: 4 additions & 4 deletions src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ pub fn array_to_page(
) -> Result<DataPage> {
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;
let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer)?;

Expand Down
10 changes: 7 additions & 3 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use parquet2::{
write::DynIter,
};

use crate::io::parquet::write::utils;
use crate::io::parquet::write::{slice_nested_leaf, utils};
use crate::{
array::{Array, DictionaryArray, DictionaryKey},
io::parquet::read::schema::is_nullable,
Expand Down Expand Up @@ -75,14 +75,15 @@ fn serialize_levels(
nested: &[Nested],
options: WriteOptions,
buffer: &mut Vec<u8>,
offset: usize,
) -> Result<(usize, usize)> {
if nested.len() == 1 {
let is_optional = is_nullable(&type_.field_info);
serialize_def_levels_simple(validity, length, is_optional, options, buffer)?;
let definition_levels_byte_length = buffer.len();
Ok((0, definition_levels_byte_length))
} else {
nested::write_rep_and_def(options.version, nested, buffer)
nested::write_rep_and_def(options.version, nested, buffer, offset)
}
}

Expand Down Expand Up @@ -112,6 +113,7 @@ fn serialize_keys<K: DictionaryKey>(
// parquet only accepts a single validity - we "&" the validities into a single one
// and ignore keys whole _value_ is null.
let validity = normalized_validity(array);
let (start, len) = slice_nested_leaf(nested);

let (repetition_levels_byte_length, definition_levels_byte_length) = serialize_levels(
validity.as_ref(),
Expand All @@ -120,9 +122,11 @@ fn serialize_keys<K: DictionaryKey>(
nested,
options,
&mut buffer,
start,
)?;
let array = array.slice(start, len);

serialize_keys_values(array, validity.as_ref(), &mut buffer)?;
serialize_keys_values(&array, validity.as_ref(), &mut buffer)?;

let (num_values, num_rows) = if nested.len() == 1 {
(array.len(), array.len())
Expand Down
9 changes: 6 additions & 3 deletions src/io/parquet/write/nested/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ pub struct DefLevelsIter<'a> {
}

impl<'a> DefLevelsIter<'a> {
pub fn new(nested: &'a [Nested]) -> Self {
pub fn new(nested: &'a [Nested], offset: usize) -> Self {
let remaining_values = num_values(nested);

let primitive_validity = iter(&nested[nested.len() - 1..]).pop().unwrap();
let mut primitive_validity = iter(&nested[nested.len() - 1..]).pop().unwrap();
if offset > 0 {
primitive_validity.nth(offset - 1);
}

let iter = iter(&nested[..nested.len() - 1]);
let remaining = std::iter::repeat(0).take(iter.len()).collect();
Expand Down Expand Up @@ -164,7 +167,7 @@ mod tests {
use super::*;

fn test(nested: Vec<Nested>, expected: Vec<u32>) {
let mut iter = DefLevelsIter::new(&nested);
let mut iter = DefLevelsIter::new(&nested, 0);
assert_eq!(iter.size_hint().0, expected.len());
let result = iter.by_ref().collect::<Vec<_>>();
assert_eq!(result, expected);
Expand Down
14 changes: 11 additions & 3 deletions src/io/parquet/write/nested/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,19 @@ fn write_rep_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -
}

/// writes the rep levels to a `Vec<u8>`.
fn write_def_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -> Result<()> {
fn write_def_levels(
buffer: &mut Vec<u8>,
nested: &[Nested],
version: Version,
offset: usize,
) -> Result<()> {
let max_level = max_def_level(nested) as i16;
if max_level == 0 {
return Ok(());
}
let num_bits = get_bit_width(max_level);

let levels = def::DefLevelsIter::new(nested);
let levels = def::DefLevelsIter::new(nested, offset);

match version {
Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec<u8>| {
Expand Down Expand Up @@ -105,11 +110,14 @@ pub fn write_rep_and_def(
page_version: Version,
nested: &[Nested],
buffer: &mut Vec<u8>,
// needed to take offset the validity iterator in case
// the list was sliced.
offset: usize,
) -> Result<(usize, usize)> {
write_rep_levels(buffer, nested, page_version)?;
let repetition_levels_byte_length = buffer.len();

write_def_levels(buffer, nested, page_version)?;
write_def_levels(buffer, nested, page_version, offset)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;

Ok((repetition_levels_byte_length, definition_levels_byte_length))
Expand Down
6 changes: 4 additions & 2 deletions src/io/parquet/write/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ where
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;

let array = array.slice(start, len);
let buffer = encode_plain(&array, is_optional, buffer);

Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/write/utf8/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ where
O: Offset,
{
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;

let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer);

Expand Down

0 comments on commit 8216c75

Please sign in to comment.