Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed writing of sliced arrays to parquet #1397

Merged
merged 2 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<T> Buffer<T> {
/// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
/// Doing so allows the same memory region to be shared between buffers.
/// # Panics
/// Panics iff `offset` is larger than `len`.
/// Panics iff `offset + length` is larger than `len`.
#[inline]
pub fn sliced(self, offset: usize, length: usize) -> Self {
assert!(
Expand Down
23 changes: 5 additions & 18 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage};
use super::super::{nested, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::io::parquet::write::{slice_nested_leaf, Nested};
use crate::io::parquet::write::Nested;
use crate::{
array::{Array, BinaryArray},
error::Result,
Expand All @@ -22,34 +22,21 @@ where
{
let is_optional = is_nullable(&type_.field_info);

// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut nested = nested.to_vec();
let array = array.clone().sliced(start, len);
if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() {
*c = len;
} else {
unreachable!("")
}

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

encode_plain(&array, is_optional, &mut buffer);
encode_plain(array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(&array, type_.clone()))
Some(build_statistics(array, type_.clone()))
} else {
None
};

utils::build_plain_page(
buffer,
nested::num_values(&nested),
nested::num_values(nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
Expand Down
201 changes: 94 additions & 107 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
for nested in nested.iter().rev() {
match nested {
Nested::LargeList(l_nested) => {
let start = *l_nested.offsets.first().unwrap();
let end = *l_nested.offsets.last().unwrap();
let start = *l_nested.offsets.first();
let end = *l_nested.offsets.last();
return (start as usize, (end - start) as usize);
}
Nested::List(l_nested) => {
let start = *l_nested.offsets.first().unwrap();
let end = *l_nested.offsets.last().unwrap();
let start = *l_nested.offsets.first();
let end = *l_nested.offsets.last();
return (start as usize, (end - start) as usize);
}
Nested::Primitive(_, _, len) => out = (0, *len),
Expand Down Expand Up @@ -158,137 +158,124 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
}

/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.
pub fn slice_parquet_array<'a>(
array: &'a dyn Array,
nested: &'a [Nested<'a>],
offset: usize,
length: usize,
) -> (Box<dyn Array>, Vec<Nested<'a>>) {
let mut nested = nested.to_vec();

let mut is_nested = false;
pub fn slice_parquet_array(
primitive_array: &mut dyn Array,
nested: &mut [Nested],
mut current_offset: usize,
mut current_length: usize,
) {
for nested in nested.iter_mut() {
match nested {
Nested::LargeList(l_nested) => {
is_nested = true;
// the slice is a bit awkward because we always want the latest value to compute the next length;
l_nested.offsets = &l_nested.offsets
[offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())];
l_nested.offsets.slice(current_offset, current_length + 1);
if let Some(validity) = l_nested.validity.as_mut() {
validity.slice(current_offset, current_length)
};

current_length = l_nested.offsets.range() as usize;
current_offset = *l_nested.offsets.first() as usize;
}
Nested::List(l_nested) => {
is_nested = true;
l_nested.offsets = &l_nested.offsets
[offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())];
l_nested.offsets.slice(current_offset, current_length + 1);
if let Some(validity) = l_nested.validity.as_mut() {
validity.slice(current_offset, current_length)
};

current_length = l_nested.offsets.range() as usize;
current_offset = *l_nested.offsets.first() as usize;
}
Nested::Struct(validity, _, length) => {
*length = current_length;
if let Some(validity) = validity.as_mut() {
validity.slice(current_offset, current_length)
};
}
Nested::Primitive(validity, _, length) => {
*length = current_length;
if let Some(validity) = validity.as_mut() {
validity.slice(current_offset, current_length)
};
primitive_array.slice(current_offset, current_length);
}
_ => {}
}
}
if is_nested {
(array.to_boxed(), nested)
} else {
(array.to_boxed().sliced(offset, length), nested)
}
}

/// Get the length of [`Array`] that should be sliced.
pub fn get_max_length(array: &dyn Array, nested: &[Nested]) -> usize {
// the inner nested structure that
// dictates how often the primitive should be repeated
for nested in nested.iter().rev() {
pub fn get_max_length(nested: &[Nested]) -> usize {
let mut length = 0;
for nested in nested.iter() {
match nested {
Nested::LargeList(l_nested) => return l_nested.offsets.len() - 1,
Nested::List(l_nested) => return l_nested.offsets.len() - 1,
Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
_ => {}
}
}
array.len()
length
}

/// Returns an iterator of [`Page`].
#[allow(clippy::needless_collect)]
pub fn array_to_pages(
array: &dyn Array,
primitive_array: &dyn Array,
type_: ParquetPrimitiveType,
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<Page>>> {
// maximum page size is 2^31 e.g. i32::MAX
// we split at 2^31 - 2^25 to err on the safe side
// we also check for an array.len > 3 to prevent infinite recursion
// still have to figure out how to deal with values that are i32::MAX size, such as very large
// strings or a list column with many elements

let array_byte_size = estimated_bytes_size(array);
if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 {
let length = get_max_length(array, nested);
let split_at = length / 2;
let (sub_array_left, subnested_left) = slice_parquet_array(array, nested, 0, split_at);
let (sub_array_right, subnested_right) =
slice_parquet_array(array, nested, split_at, length - split_at);

Ok(DynIter::new(
array_to_pages(
sub_array_left.as_ref(),
type_.clone(),
subnested_left.as_ref(),
options,
encoding,
)?
.chain(array_to_pages(
sub_array_right.as_ref(),
if let DataType::Dictionary(key_type, _, _) = primitive_array.data_type().to_logical_type() {
return match_integer_type!(key_type, |$T| {
dictionary::array_to_pages::<$T>(
primitive_array.as_any().downcast_ref().unwrap(),
type_,
subnested_right.as_ref(),
&nested,
options,
encoding,
)?),
))
)
});
};

let nested = nested.to_vec();
let primitive_array = primitive_array.to_boxed();

let number_of_rows = nested[0].len();

// note: this is not correct if the array is sliced - the estimation should happen on the
// primitive after sliced for parquet
let byte_size = estimated_bytes_size(primitive_array.as_ref());

const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let max_page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
let bytes_per_row = if number_of_rows == 0 {
0
} else {
match array.data_type() {
DataType::Dictionary(key_type, _, _) => {
match_integer_type!(key_type, |$T| {
dictionary::array_to_pages::<$T>(
array.as_any().downcast_ref().unwrap(),
type_,
nested,
options,
encoding,
)
})
}
_ => {
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
let bytes_per_row =
((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize;
let rows_per_page = (page_size / (bytes_per_row + 1)).max(1);

let length = get_max_length(array, nested);
let vs: Vec<Result<Page>> = (0..length)
.step_by(rows_per_page)
.map(|offset| {
let length = if offset + rows_per_page > length {
length - offset
} else {
rows_per_page
};

let (sub_array, subnested) =
slice_parquet_array(array, nested, offset, length);
array_to_page(
sub_array.as_ref(),
type_.clone(),
&subnested,
options,
encoding,
)
})
.collect();

Ok(DynIter::new(vs.into_iter()))
}
}
}
((byte_size as f64) / (number_of_rows as f64)) as usize
};
let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);

let pages = (0..number_of_rows)
.step_by(rows_per_page)
.map(move |offset| {
let length = if offset + rows_per_page > number_of_rows {
number_of_rows - offset
} else {
rows_per_page
};

let mut right_array = primitive_array.clone();
let mut right_nested = nested.clone();
slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);

array_to_page(
right_array.as_ref(),
type_.clone(),
&right_nested,
options,
encoding,
)
});

Ok(DynIter::new(pages))
}

/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
Expand Down
Loading