Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added data_pagesize_limit to write parquet pages #1303

Merged
merged 10 commits into from
Nov 25, 2022
30 changes: 28 additions & 2 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct WriteOptions {
pub version: Version,
/// The compression to apply to every page
pub compression: CompressionOptions,
/// The size to flush a page, defaults to 1024 * 1024 if zero
pub data_pagesize_limit: Option<usize>,
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
}

use crate::compute::aggregate::estimated_bytes_size;
Expand Down Expand Up @@ -141,6 +143,7 @@ pub fn array_to_pages(
// we also check for an array.len > 3 to prevent infinite recursion
// still have to figure out how to deal with values that are i32::MAX size, such as very large
// strings or a list column with many elements

if (estimated_bytes_size(array)) >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 {
let split_at = array.len() / 2;
let left = array.slice(0, split_at);
Expand All @@ -163,8 +166,31 @@ pub fn array_to_pages(
)
})
}
_ => array_to_page(array, type_, nested, options, encoding)
.map(|page| DynIter::new(std::iter::once(Ok(page)))),
_ => {
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);

let bytes_per_row = estimated_bytes_size(array.slice(0, 1).as_ref()).max(1);
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
let row_per_page = PAGE_SIZE / bytes_per_row + 1;

if array.len() <= row_per_page {
array_to_page(array, type_, nested, options, encoding)
.map(|page| DynIter::new(std::iter::once(Ok(page))))
} else {
let first = array.slice(0, row_per_page);
let other = array.slice(row_per_page, array.len() - row_per_page);
Ok(DynIter::new(
array_to_pages(first.as_ref(), type_.clone(), nested, options, encoding)?
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
.chain(array_to_pages(
other.as_ref(),
type_,
nested,
options,
encoding,
)?),
))
}
}
}
}
}
Expand Down