Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed writing of sliced arrays to parquet (#1397)
Browse files Browse the repository at this point in the history
* Fixed slice nested

* More tests
  • Loading branch information
jorgecarleitao authored Feb 13, 2023
1 parent d7d7239 commit b94e6f6
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 323 deletions.
2 changes: 1 addition & 1 deletion src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<T> Buffer<T> {
/// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
/// Doing so allows the same memory region to be shared between buffers.
/// # Panics
/// Panics iff `offset` is larger than `len`.
/// Panics iff `offset + length` is larger than `len`.
#[inline]
pub fn sliced(self, offset: usize, length: usize) -> Self {
assert!(
Expand Down
23 changes: 5 additions & 18 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage};
use super::super::{nested, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::io::parquet::write::{slice_nested_leaf, Nested};
use crate::io::parquet::write::Nested;
use crate::{
array::{Array, BinaryArray},
error::Result,
Expand All @@ -22,34 +22,21 @@ where
{
let is_optional = is_nullable(&type_.field_info);

// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut nested = nested.to_vec();
let array = array.clone().sliced(start, len);
if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() {
*c = len;
} else {
unreachable!("")
}

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

encode_plain(&array, is_optional, &mut buffer);
encode_plain(array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(&array, type_.clone()))
Some(build_statistics(array, type_.clone()))
} else {
None
};

utils::build_plain_page(
buffer,
nested::num_values(&nested),
nested::num_values(nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
Expand Down
201 changes: 94 additions & 107 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
for nested in nested.iter().rev() {
match nested {
Nested::LargeList(l_nested) => {
let start = *l_nested.offsets.first().unwrap();
let end = *l_nested.offsets.last().unwrap();
let start = *l_nested.offsets.first();
let end = *l_nested.offsets.last();
return (start as usize, (end - start) as usize);
}
Nested::List(l_nested) => {
let start = *l_nested.offsets.first().unwrap();
let end = *l_nested.offsets.last().unwrap();
let start = *l_nested.offsets.first();
let end = *l_nested.offsets.last();
return (start as usize, (end - start) as usize);
}
Nested::Primitive(_, _, len) => out = (0, *len),
Expand Down Expand Up @@ -158,137 +158,124 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
}

/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.
pub fn slice_parquet_array<'a>(
array: &'a dyn Array,
nested: &'a [Nested<'a>],
offset: usize,
length: usize,
) -> (Box<dyn Array>, Vec<Nested<'a>>) {
let mut nested = nested.to_vec();

let mut is_nested = false;
pub fn slice_parquet_array(
primitive_array: &mut dyn Array,
nested: &mut [Nested],
mut current_offset: usize,
mut current_length: usize,
) {
for nested in nested.iter_mut() {
match nested {
Nested::LargeList(l_nested) => {
is_nested = true;
// the slice is a bit awkward because we always want the latest value to compute the next length;
l_nested.offsets = &l_nested.offsets
[offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())];
l_nested.offsets.slice(current_offset, current_length + 1);
if let Some(validity) = l_nested.validity.as_mut() {
validity.slice(current_offset, current_length)
};

current_length = l_nested.offsets.range() as usize;
current_offset = *l_nested.offsets.first() as usize;
}
Nested::List(l_nested) => {
is_nested = true;
l_nested.offsets = &l_nested.offsets
[offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())];
l_nested.offsets.slice(current_offset, current_length + 1);
if let Some(validity) = l_nested.validity.as_mut() {
validity.slice(current_offset, current_length)
};

current_length = l_nested.offsets.range() as usize;
current_offset = *l_nested.offsets.first() as usize;
}
Nested::Struct(validity, _, length) => {
*length = current_length;
if let Some(validity) = validity.as_mut() {
validity.slice(current_offset, current_length)
};
}
Nested::Primitive(validity, _, length) => {
*length = current_length;
if let Some(validity) = validity.as_mut() {
validity.slice(current_offset, current_length)
};
primitive_array.slice(current_offset, current_length);
}
_ => {}
}
}
if is_nested {
(array.to_boxed(), nested)
} else {
(array.to_boxed().sliced(offset, length), nested)
}
}

/// Get the length of [`Array`] that should be sliced.
pub fn get_max_length(array: &dyn Array, nested: &[Nested]) -> usize {
// the inner nested structure that
// dictates how often the primitive should be repeated
for nested in nested.iter().rev() {
pub fn get_max_length(nested: &[Nested]) -> usize {
let mut length = 0;
for nested in nested.iter() {
match nested {
Nested::LargeList(l_nested) => return l_nested.offsets.len() - 1,
Nested::List(l_nested) => return l_nested.offsets.len() - 1,
Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
_ => {}
}
}
array.len()
length
}

/// Returns an iterator of [`Page`].
#[allow(clippy::needless_collect)]
pub fn array_to_pages(
array: &dyn Array,
primitive_array: &dyn Array,
type_: ParquetPrimitiveType,
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<Page>>> {
// maximum page size is 2^31 e.g. i32::MAX
// we split at 2^31 - 2^25 to err on the safe side
// we also check for an array.len > 3 to prevent infinite recursion
// still have to figure out how to deal with values that are i32::MAX size, such as very large
// strings or a list column with many elements

let array_byte_size = estimated_bytes_size(array);
if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 {
let length = get_max_length(array, nested);
let split_at = length / 2;
let (sub_array_left, subnested_left) = slice_parquet_array(array, nested, 0, split_at);
let (sub_array_right, subnested_right) =
slice_parquet_array(array, nested, split_at, length - split_at);

Ok(DynIter::new(
array_to_pages(
sub_array_left.as_ref(),
type_.clone(),
subnested_left.as_ref(),
options,
encoding,
)?
.chain(array_to_pages(
sub_array_right.as_ref(),
if let DataType::Dictionary(key_type, _, _) = primitive_array.data_type().to_logical_type() {
return match_integer_type!(key_type, |$T| {
dictionary::array_to_pages::<$T>(
primitive_array.as_any().downcast_ref().unwrap(),
type_,
subnested_right.as_ref(),
&nested,
options,
encoding,
)?),
))
)
});
};

let nested = nested.to_vec();
let primitive_array = primitive_array.to_boxed();

let number_of_rows = nested[0].len();

// note: this is not correct if the array is sliced - the estimation should happen on the
// primitive after sliced for parquet
let byte_size = estimated_bytes_size(primitive_array.as_ref());

const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let max_page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
let bytes_per_row = if number_of_rows == 0 {
0
} else {
match array.data_type() {
DataType::Dictionary(key_type, _, _) => {
match_integer_type!(key_type, |$T| {
dictionary::array_to_pages::<$T>(
array.as_any().downcast_ref().unwrap(),
type_,
nested,
options,
encoding,
)
})
}
_ => {
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
let bytes_per_row =
((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize;
let rows_per_page = (page_size / (bytes_per_row + 1)).max(1);

let length = get_max_length(array, nested);
let vs: Vec<Result<Page>> = (0..length)
.step_by(rows_per_page)
.map(|offset| {
let length = if offset + rows_per_page > length {
length - offset
} else {
rows_per_page
};

let (sub_array, subnested) =
slice_parquet_array(array, nested, offset, length);
array_to_page(
sub_array.as_ref(),
type_.clone(),
&subnested,
options,
encoding,
)
})
.collect();

Ok(DynIter::new(vs.into_iter()))
}
}
}
((byte_size as f64) / (number_of_rows as f64)) as usize
};
let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);

let pages = (0..number_of_rows)
.step_by(rows_per_page)
.map(move |offset| {
let length = if offset + rows_per_page > number_of_rows {
number_of_rows - offset
} else {
rows_per_page
};

let mut right_array = primitive_array.clone();
let mut right_nested = nested.clone();
slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);

array_to_page(
right_array.as_ref(),
type_.clone(),
&right_nested,
options,
encoding,
)
});

Ok(DynIter::new(pages))
}

/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
Expand Down
Loading

0 comments on commit b94e6f6

Please sign in to comment.