Skip to content

Commit

Permalink
Fixed writing nested/sliced arrays to parquet (jorgecarleitao#1326)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 5, 2023
1 parent 5365f4a commit cbf28db
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 55 deletions.
14 changes: 10 additions & 4 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage};
use super::super::{nested, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::io::parquet::write::Nested;
use crate::io::parquet::write::{slice_nested_leaf, Nested};
use crate::{
array::{Array, BinaryArray},
error::Result,
Expand All @@ -22,14 +22,20 @@ where
{
let is_optional = is_nullable(&type_.field_info);

// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;

encode_plain(array, is_optional, &mut buffer);
let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
Some(build_statistics(&array, type_.clone()))
} else {
None
};
Expand Down
15 changes: 10 additions & 5 deletions src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage};
use super::super::{nested, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::io::parquet::write::Nested;
use crate::io::parquet::write::{slice_nested_leaf, Nested};
use crate::{
array::{Array, BooleanArray},
error::Result,
Expand All @@ -18,14 +18,19 @@ pub fn array_to_page(
) -> Result<DataPage> {
let is_optional = is_nullable(&type_.field_info);

// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

encode_plain(array, is_optional, &mut buffer)?;
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;
let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer)?;

let statistics = if options.write_statistics {
Some(build_statistics(array))
Some(build_statistics(&array))
} else {
None
};
Expand Down
10 changes: 7 additions & 3 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use parquet2::{
write::DynIter,
};

use crate::io::parquet::write::utils;
use crate::io::parquet::write::{slice_nested_leaf, utils};
use crate::{
array::{Array, DictionaryArray, DictionaryKey},
io::parquet::read::schema::is_nullable,
Expand Down Expand Up @@ -75,14 +75,15 @@ fn serialize_levels(
nested: &[Nested],
options: WriteOptions,
buffer: &mut Vec<u8>,
offset: usize,
) -> Result<(usize, usize)> {
if nested.len() == 1 {
let is_optional = is_nullable(&type_.field_info);
serialize_def_levels_simple(validity, length, is_optional, options, buffer)?;
let definition_levels_byte_length = buffer.len();
Ok((0, definition_levels_byte_length))
} else {
nested::write_rep_and_def(options.version, nested, buffer)
nested::write_rep_and_def(options.version, nested, buffer, offset)
}
}

Expand Down Expand Up @@ -112,6 +113,7 @@ fn serialize_keys<K: DictionaryKey>(
// parquet only accepts a single validity - we "&" the validities into a single one
// and ignore keys whole _value_ is null.
let validity = normalized_validity(array);
let (start, len) = slice_nested_leaf(nested);

let (repetition_levels_byte_length, definition_levels_byte_length) = serialize_levels(
validity.as_ref(),
Expand All @@ -120,9 +122,11 @@ fn serialize_keys<K: DictionaryKey>(
nested,
options,
&mut buffer,
start,
)?;
let array = array.slice(start, len);

serialize_keys_values(array, validity.as_ref(), &mut buffer)?;
serialize_keys_values(&array, validity.as_ref(), &mut buffer)?;

let (num_values, num_rows) = if nested.len() == 1 {
(array.len(), array.len())
Expand Down
112 changes: 102 additions & 10 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,30 @@ pub use sink::FileSink;
pub use pages::array_to_columns;
pub use pages::Nested;

/// returns offset and length to slice the leaf values
pub(self) fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
// find the deepest recursive dremel structure as that one determines how many values we must
// take
let mut out = (0, 0);
for nested in nested.iter().rev() {
match nested {
Nested::LargeList(l_nested) => {
let start = *l_nested.offsets.first().unwrap();
let end = *l_nested.offsets.last().unwrap();
return (start as usize, (end - start) as usize);
}
Nested::List(l_nested) => {
let start = *l_nested.offsets.first().unwrap();
let end = *l_nested.offsets.last().unwrap();
return (start as usize, (end - start) as usize);
}
Nested::Primitive(_, _, len) => out = (0, *len),
_ => {}
}
}
out
}

pub(self) fn decimal_length_from_precision(precision: usize) -> usize {
// digits = floor(log_10(2^(8*n - 1) - 1))
// ceil(digits) = log10(2^(8*n - 1) - 1)
Expand Down Expand Up @@ -130,6 +154,52 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
)
}

fn slice_parquet_array<'a>(
array: &'a dyn Array,
nested: &'a [Nested<'a>],
offset: usize,
length: usize,
) -> (Box<dyn Array>, Vec<Nested<'a>>) {
let mut nested = nested.to_vec();

let mut is_nested = false;
for nested in nested.iter_mut() {
match nested {
Nested::LargeList(l_nested) => {
is_nested = true;
// the slice is a bit awkward because we always want the latest value to compute the next length;
l_nested.offsets = &l_nested.offsets
[offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())];
}
Nested::List(l_nested) => {
is_nested = true;
l_nested.offsets = &l_nested.offsets
[offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())];
}
_ => {}
}
}
if is_nested {
(array.to_boxed(), nested)
} else {
(array.slice(offset, length), nested)
}
}

fn get_max_length(array: &dyn Array, nested: &[Nested]) -> usize {
// get the length that should be sliced.
// that is the inner nested structure that
// dictates how often the primitive should be repeated
for nested in nested.iter().rev() {
match nested {
Nested::LargeList(l_nested) => return l_nested.offsets.len() - 1,
Nested::List(l_nested) => return l_nested.offsets.len() - 1,
_ => {}
}
}
array.len()
}

/// Returns an iterator of [`Page`].
#[allow(clippy::needless_collect)]
pub fn array_to_pages(
Expand All @@ -147,13 +217,27 @@ pub fn array_to_pages(

let array_byte_size = estimated_bytes_size(array);
if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 {
let split_at = array.len() / 2;
let left = array.slice(0, split_at);
let right = array.slice(split_at, array.len() - split_at);
let length = get_max_length(array, nested);
let split_at = length / 2;
let (sub_array_left, subnested_left) = slice_parquet_array(array, nested, 0, split_at);
let (sub_array_right, subnested_right) =
slice_parquet_array(array, nested, split_at, length - split_at);

Ok(DynIter::new(
array_to_pages(&*left, type_.clone(), nested, options, encoding)?
.chain(array_to_pages(&*right, type_, nested, options, encoding)?),
array_to_pages(
sub_array_left.as_ref(),
type_.clone(),
subnested_left.as_ref(),
options,
encoding,
)?
.chain(array_to_pages(
sub_array_right.as_ref(),
type_,
subnested_right.as_ref(),
options,
encoding,
)?),
))
} else {
match array.data_type() {
Expand All @@ -175,17 +259,25 @@ pub fn array_to_pages(
((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize;
let rows_per_page = (page_size / (bytes_per_row + 1)).max(1);

let vs: Vec<Result<Page>> = (0..array.len())
let length = get_max_length(array, nested);
let vs: Vec<Result<Page>> = (0..length)
.step_by(rows_per_page)
.map(|offset| {
let length = if offset + rows_per_page > array.len() {
array.len() - offset
let length = if offset + rows_per_page > length {
length - offset
} else {
rows_per_page
};

let sub_array = array.slice(offset, length);
array_to_page(sub_array.as_ref(), type_.clone(), nested, options, encoding)
let (sub_array, subnested) =
slice_parquet_array(array, nested, offset, length);
array_to_page(
sub_array.as_ref(),
type_.clone(),
&subnested,
options,
encoding,
)
})
.collect();

Expand Down
9 changes: 6 additions & 3 deletions src/io/parquet/write/nested/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ pub struct DefLevelsIter<'a> {
}

impl<'a> DefLevelsIter<'a> {
pub fn new(nested: &'a [Nested]) -> Self {
pub fn new(nested: &'a [Nested], offset: usize) -> Self {
let remaining_values = num_values(nested);

let primitive_validity = iter(&nested[nested.len() - 1..]).pop().unwrap();
let mut primitive_validity = iter(&nested[nested.len() - 1..]).pop().unwrap();
if offset > 0 {
primitive_validity.nth(offset - 1);
}

let iter = iter(&nested[..nested.len() - 1]);
let remaining = std::iter::repeat(0).take(iter.len()).collect();
Expand Down Expand Up @@ -164,7 +167,7 @@ mod tests {
use super::*;

fn test(nested: Vec<Nested>, expected: Vec<u32>) {
let mut iter = DefLevelsIter::new(&nested);
let mut iter = DefLevelsIter::new(&nested, 0);
assert_eq!(iter.size_hint().0, expected.len());
let result = iter.by_ref().collect::<Vec<_>>();
assert_eq!(result, expected);
Expand Down
14 changes: 11 additions & 3 deletions src/io/parquet/write/nested/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,19 @@ fn write_rep_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -
}

/// writes the rep levels to a `Vec<u8>`.
fn write_def_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -> Result<()> {
fn write_def_levels(
buffer: &mut Vec<u8>,
nested: &[Nested],
version: Version,
offset: usize,
) -> Result<()> {
let max_level = max_def_level(nested) as i16;
if max_level == 0 {
return Ok(());
}
let num_bits = get_bit_width(max_level);

let levels = def::DefLevelsIter::new(nested);
let levels = def::DefLevelsIter::new(nested, offset);

match version {
Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec<u8>| {
Expand Down Expand Up @@ -105,11 +110,14 @@ pub fn write_rep_and_def(
page_version: Version,
nested: &[Nested],
buffer: &mut Vec<u8>,
// needed to take offset the validity iterator in case
// the list was sliced.
offset: usize,
) -> Result<(usize, usize)> {
write_rep_levels(buffer, nested, page_version)?;
let repetition_levels_byte_length = buffer.len();

write_def_levels(buffer, nested, page_version)?;
write_def_levels(buffer, nested, page_version, offset)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;

Ok((repetition_levels_byte_length, definition_levels_byte_length))
Expand Down
Loading

0 comments on commit cbf28db

Please sign in to comment.