diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index 831dd6b5f00..f9c95dc26b7 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -172,6 +172,7 @@ fn main() -> Result<()> { write_statistics: true, compression: args.compression.into(), version: args.version.into(), + data_pagesize_limit: None, }; let encodings = schema diff --git a/benches/write_parquet.rs b/benches/write_parquet.rs index f4053c92791..7062ab919d9 100644 --- a/benches/write_parquet.rs +++ b/benches/write_parquet.rs @@ -17,6 +17,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> { write_statistics: false, compression: CompressionOptions::Uncompressed, version: Version::V1, + data_pagesize_limit: None, }; let row_groups = RowGroupIterator::try_new( diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index fac426ab7de..1387f615ebc 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -16,6 +16,7 @@ fn write_chunk(path: &str, schema: Schema, chunk: Chunk>) -> Resu write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V2, + data_pagesize_limit: None, }; let iter = vec![Ok(chunk)]; diff --git a/examples/parquet_write_async.rs b/examples/parquet_write_async.rs index b6205c8fb0f..db2ae9e08ca 100644 --- a/examples/parquet_write_async.rs +++ b/examples/parquet_write_async.rs @@ -17,6 +17,7 @@ async fn write_batch(path: &str, schema: Schema, columns: Chunk>) write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V2, + data_pagesize_limit: None, }; let mut stream = futures::stream::iter(vec![Ok(columns)].into_iter()); diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index 28cf299310c..6c87be6143a 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -47,6 +47,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> { write_statistics: true, compression: CompressionOptions::Snappy, version: Version::V2, + data_pagesize_limit: None, }; let encoding_map = |data_type: &DataType| { diff --git a/src/doc/lib.md b/src/doc/lib.md index 6b278e08d63..9708a6cd3d1 100644 --- a/src/doc/lib.md +++ b/src/doc/lib.md @@ -42,6 +42,7 @@ fn main() -> Result<()> { write_statistics: true, compression: CompressionOptions::Snappy, version: Version::V1, + data_pagesize_limit: None, }; let row_groups = RowGroupIterator::try_new( diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 0b692e9835f..f48c1a67ea6 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -56,6 +56,8 @@ pub struct WriteOptions { pub version: Version, /// The compression to apply to every page pub compression: CompressionOptions, + /// The size to flush a page, defaults to 1024 * 1024 if None + pub data_pagesize_limit: Option, } use crate::compute::aggregate::estimated_bytes_size; @@ -129,6 +131,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { } /// Returns an iterator of [`EncodedPage`]. +#[allow(clippy::needless_collect)] pub fn array_to_pages( array: &dyn Array, type_: ParquetPrimitiveType, @@ -141,7 +144,9 @@ pub fn array_to_pages( // we also check for an array.len > 3 to prevent infinite recursion // still have to figure out how to deal with values that are i32::MAX size, such as very large // strings or a list column with many elements - if (estimated_bytes_size(array)) >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 { + + let array_byte_size = estimated_bytes_size(array); + if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 { let split_at = array.len() / 2; let left = array.slice(0, split_at); let right = array.slice(split_at, array.len() - split_at); @@ -163,8 +168,29 @@ pub fn array_to_pages( ) }) } - _ => array_to_page(array, type_, nested, options, encoding) - .map(|page| DynIter::new(std::iter::once(Ok(page)))), + _ => { + const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; + let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE); + let bytes_per_row = + ((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize; + let rows_per_page = (page_size / (bytes_per_row + 1)).max(1); + + let vs: Vec> = (0..array.len()) + .step_by(rows_per_page) + .map(|offset| { + let length = if offset + rows_per_page > array.len() { + array.len() - offset + } else { + rows_per_page + }; + + let sub_array = array.slice(offset, length); + array_to_page(sub_array.as_ref(), type_.clone(), nested, options, encoding) + }) + .collect(); + + Ok(DynIter::new(vs.into_iter())) + } } } } diff --git a/src/io/parquet/write/sink.rs b/src/io/parquet/write/sink.rs index 5086b8e09e2..1eeb83e21b2 100644 --- a/src/io/parquet/write/sink.rs +++ b/src/io/parquet/write/sink.rs @@ -35,6 +35,7 @@ use super::{Encoding, SchemaDescriptor, WriteOptions}; /// write_statistics: true, /// compression: CompressionOptions::Uncompressed, /// version: Version::V2, +/// data_pagesize_limit: None, /// }; /// /// let mut buffer = vec![]; diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 0681fac1bf7..9097d32fb30 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1128,6 +1128,7 @@ fn integration_write(schema: &Schema, chunks: &[Chunk>]) -> Resul write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V1, + data_pagesize_limit: None, }; let encodings = schema diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index c30d28ea965..b17803b16fb 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -30,6 +30,7 @@ fn pages( write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V1, + data_pagesize_limit: None, }; let pages1 = [array11, array12, array13] @@ -79,6 +80,7 @@ fn read_with_indexes( write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V1, + data_pagesize_limit: None, }; let to_compressed = |pages: Vec| { diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index dfeed7ccd00..365ad9e3fd6 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -36,6 +36,7 @@ fn round_trip( write_statistics: true, compression, version, + data_pagesize_limit: None, }; let iter = vec![Chunk::try_new(vec![array.clone()])]; diff --git a/tests/it/io/parquet/write_async.rs b/tests/it/io/parquet/write_async.rs index 3e1ba14fe9c..1197e31c0da 100644 --- a/tests/it/io/parquet/write_async.rs +++ b/tests/it/io/parquet/write_async.rs @@ -31,6 +31,7 @@ async fn test_parquet_async_roundtrip() { write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V2, + data_pagesize_limit: None, }; let mut buffer = Cursor::new(Vec::new());