From d1acd3f66419e2f5eb80bda78c607ce42d777f1d Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 23 Nov 2022 13:48:30 +0800 Subject: [PATCH 01/10] feat(parquet): introduce data_pagesize_limit to write parquet pages --- src/io/parquet/write/mod.rs | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 0b692e9835f..7105ad310d8 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -56,6 +56,8 @@ pub struct WriteOptions { pub version: Version, /// The compression to apply to every page pub compression: CompressionOptions, + /// The size to flush a page, defaults to 1024 * 1024 if zero + pub data_pagesize_limit: Option, } use crate::compute::aggregate::estimated_bytes_size; @@ -141,6 +143,7 @@ pub fn array_to_pages( // we also check for an array.len > 3 to prevent infinite recursion // still have to figure out how to deal with values that are i32::MAX size, such as very large // strings or a list column with many elements + if (estimated_bytes_size(array)) >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 { let split_at = array.len() / 2; let left = array.slice(0, split_at); @@ -163,8 +166,31 @@ pub fn array_to_pages( ) }) } - _ => array_to_page(array, type_, nested, options, encoding) - .map(|page| DynIter::new(std::iter::once(Ok(page)))), + _ => { + const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; + let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE); + + let bytes_per_row = estimated_bytes_size(array.slice(0, 1).as_ref()).max(1); + let row_per_page = PAGE_SIZE / bytes_per_row + 1; + + if array.len() <= row_per_page { + array_to_page(array, type_, nested, options, encoding) + .map(|page| DynIter::new(std::iter::once(Ok(page)))) + } else { + let first = array.slice(0, row_per_page); + let other = array.slice(row_per_page, array.len() - row_per_page); + Ok(DynIter::new( + array_to_pages(first.as_ref(), type_.clone(), nested, options, encoding)? + .chain(array_to_pages( + other.as_ref(), + type_, + nested, + options, + encoding, + )?), + )) + } + } } } } From adfc5791102232879ff222a95315e36fa8671fd4 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 23 Nov 2022 14:03:33 +0800 Subject: [PATCH 02/10] feat(parquet): introduce data_pagesize_limit to write parquet pages --- arrow-parquet-integration-testing/src/main.rs | 1 + benches/write_parquet.rs | 1 + src/doc/lib.md | 1 + src/io/parquet/write/mod.rs | 2 +- tests/it/io/parquet/mod.rs | 1 + tests/it/io/parquet/read_indexes.rs | 2 ++ tests/it/io/parquet/write.rs | 1 + tests/it/io/parquet/write_async.rs | 1 + 8 files changed, 9 insertions(+), 1 deletion(-) diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index 831dd6b5f00..f9c95dc26b7 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -172,6 +172,7 @@ fn main() -> Result<()> { write_statistics: true, compression: args.compression.into(), version: args.version.into(), + data_pagesize_limit: None, }; let encodings = schema diff --git a/benches/write_parquet.rs b/benches/write_parquet.rs index f4053c92791..7062ab919d9 100644 --- a/benches/write_parquet.rs +++ b/benches/write_parquet.rs @@ -17,6 +17,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> { write_statistics: false, compression: CompressionOptions::Uncompressed, version: Version::V1, + data_pagesize_limit: None, }; let row_groups = RowGroupIterator::try_new( diff --git a/src/doc/lib.md b/src/doc/lib.md index 6b278e08d63..9708a6cd3d1 100644 --- a/src/doc/lib.md +++ b/src/doc/lib.md @@ -42,6 +42,7 @@ fn main() -> Result<()> { write_statistics: true, compression: CompressionOptions::Snappy, version: Version::V1, + data_pagesize_limit: None, }; let row_groups = RowGroupIterator::try_new( diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 7105ad310d8..132ab690fe7 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -171,7 +171,7 @@ pub fn array_to_pages( let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE); let bytes_per_row = estimated_bytes_size(array.slice(0, 1).as_ref()).max(1); - let row_per_page = PAGE_SIZE / bytes_per_row + 1; + let row_per_page = page_size / bytes_per_row + 1; if array.len() <= row_per_page { array_to_page(array, type_, nested, options, encoding) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 0681fac1bf7..9097d32fb30 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1128,6 +1128,7 @@ fn integration_write(schema: &Schema, chunks: &[Chunk>]) -> Resul write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V1, + data_pagesize_limit: None, }; let encodings = schema diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index c30d28ea965..b17803b16fb 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -30,6 +30,7 @@ fn pages( write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V1, + data_pagesize_limit: None, }; let pages1 = [array11, array12, array13] @@ -79,6 +80,7 @@ fn read_with_indexes( write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V1, + data_pagesize_limit: None, }; let to_compressed = |pages: Vec| { diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index dfeed7ccd00..365ad9e3fd6 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -36,6 +36,7 @@ fn round_trip( write_statistics: true, compression, version, + data_pagesize_limit: None, }; let iter = vec![Chunk::try_new(vec![array.clone()])]; diff --git a/tests/it/io/parquet/write_async.rs b/tests/it/io/parquet/write_async.rs index 3e1ba14fe9c..1197e31c0da 100644 --- a/tests/it/io/parquet/write_async.rs +++ b/tests/it/io/parquet/write_async.rs @@ -31,6 +31,7 @@ async fn test_parquet_async_roundtrip() { write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V2, + data_pagesize_limit: None, }; let mut buffer = Cursor::new(Vec::new()); From e8b51325ddbf2c2fa522f4156e03b68ba7ceb06c Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 23 Nov 2022 14:06:04 +0800 Subject: [PATCH 03/10] feat(parquet): introduce data_pagesize_limit to write parquet pages --- examples/parquet_write.rs | 1 + examples/parquet_write_async.rs | 1 + examples/parquet_write_parallel/src/main.rs | 1 + 3 files changed, 3 insertions(+) diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index fac426ab7de..1387f615ebc 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -16,6 +16,7 @@ fn write_chunk(path: &str, schema: Schema, chunk: Chunk>) -> Resu write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V2, + data_pagesize_limit: None, }; let iter = vec![Ok(chunk)]; diff --git a/examples/parquet_write_async.rs b/examples/parquet_write_async.rs index b6205c8fb0f..db2ae9e08ca 100644 --- a/examples/parquet_write_async.rs +++ b/examples/parquet_write_async.rs @@ -17,6 +17,7 @@ async fn write_batch(path: &str, schema: Schema, columns: Chunk>) write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V2, + data_pagesize_limit: None, }; let mut stream = futures::stream::iter(vec![Ok(columns)].into_iter()); diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index 28cf299310c..6c87be6143a 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -47,6 +47,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> { write_statistics: true, compression: CompressionOptions::Snappy, version: Version::V2, + data_pagesize_limit: None, }; let encoding_map = |data_type: &DataType| { From 828830de4ca0b05eb3cd4ac808ab973c618214c3 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 23 Nov 2022 15:11:50 +0800 Subject: [PATCH 04/10] feat(parquet): use loop to avoid recursion --- src/io/parquet/write/mod.rs | 41 ++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 132ab690fe7..2c5f1ca021e 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -143,8 +143,9 @@ pub fn array_to_pages( // we also check for an array.len > 3 to prevent infinite recursion // still have to figure out how to deal with values that are i32::MAX size, such as very large // strings or a list column with many elements - - if (estimated_bytes_size(array)) >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 { + + let array_byte_size = estimated_bytes_size(array.as_ref()); + if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 { let split_at = array.len() / 2; let left = array.slice(0, split_at); let right = array.slice(split_at, array.len() - split_at); @@ -169,27 +170,21 @@ pub fn array_to_pages( _ => { const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE); - - let bytes_per_row = estimated_bytes_size(array.slice(0, 1).as_ref()).max(1); - let row_per_page = page_size / bytes_per_row + 1; - - if array.len() <= row_per_page { - array_to_page(array, type_, nested, options, encoding) - .map(|page| DynIter::new(std::iter::once(Ok(page)))) - } else { - let first = array.slice(0, row_per_page); - let other = array.slice(row_per_page, array.len() - row_per_page); - Ok(DynIter::new( - array_to_pages(first.as_ref(), type_.clone(), nested, options, encoding)? - .chain(array_to_pages( - other.as_ref(), - type_, - nested, - options, - encoding, - )?), - )) - } + let bytes_per_row = ((array_byte_size as f64) / (array.len() as f64)) as usize; + let row_per_page = (page_size / bytes_per_row).max(1); + + let iter = (0..array.len()).step_by(row_per_page).map(move |offset| { + let length = if offset + row_per_page > array.len() { + array.len() - offset + } else { + row_per_page + }; + + let sub_array = array.slice(offset, length); + array_to_page(sub_array.as_ref(), type_, nested, options, encoding) + }); + + Ok(DynIter::new(iter)) } } } From 541be086836ab77152ded0d16b27306b66f3083c Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 23 Nov 2022 15:27:06 +0800 Subject: [PATCH 05/10] feat(parquet): use loop to avoid recursion --- src/io/parquet/write/mod.rs | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 2c5f1ca021e..5302fb30dc1 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -143,7 +143,7 @@ pub fn array_to_pages( // we also check for an array.len > 3 to prevent infinite recursion // still have to figure out how to deal with values that are i32::MAX size, such as very large // strings or a list column with many elements - + let array_byte_size = estimated_bytes_size(array.as_ref()); if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 { let split_at = array.len() / 2; @@ -171,20 +171,23 @@ pub fn array_to_pages( const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE); let bytes_per_row = ((array_byte_size as f64) / (array.len() as f64)) as usize; - let row_per_page = (page_size / bytes_per_row).max(1); - - let iter = (0..array.len()).step_by(row_per_page).map(move |offset| { - let length = if offset + row_per_page > array.len() { - array.len() - offset - } else { - row_per_page - }; - - let sub_array = array.slice(offset, length); - array_to_page(sub_array.as_ref(), type_, nested, options, encoding) - }); - - Ok(DynIter::new(iter)) + let rows_per_page = (page_size / bytes_per_row).max(1); + + let vs: Vec> = (0..array.len()) + .step_by(rows_per_page) + .map(|offset| { + let length = if offset + rows_per_page > array.len() { + array.len() - offset + } else { + rows_per_page + }; + + let sub_array = array.slice(offset, length); + array_to_page(sub_array.as_ref(), type_.clone(), nested, options, encoding) + }) + .collect(); + + Ok(DynIter::new(vs.into_iter())) } } } From fab780dea26243fb4dbb867379abd3b01cae8baa Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 23 Nov 2022 15:53:40 +0800 Subject: [PATCH 06/10] feat(parquet): fix div by zero --- src/io/parquet/write/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 5302fb30dc1..1f868d21e6e 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -170,7 +170,8 @@ pub fn array_to_pages( _ => { const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE); - let bytes_per_row = ((array_byte_size as f64) / (array.len() as f64)) as usize; + let bytes_per_row = + (((array_byte_size as f64) / (array.len() as f64)) as usize).max(1); let rows_per_page = (page_size / bytes_per_row).max(1); let vs: Vec> = (0..array.len()) From ee33260e3b788f596eb0e35771ea3b8cb12d47ca Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 23 Nov 2022 16:01:40 +0800 Subject: [PATCH 07/10] feat(parquet): fix clippy --- src/io/parquet/write/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 1f868d21e6e..b2f7854174f 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -131,6 +131,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { } /// Returns an iterator of [`EncodedPage`]. +#[allow(clippy::needless_collect)] pub fn array_to_pages( array: &dyn Array, type_: ParquetPrimitiveType, @@ -144,7 +145,7 @@ pub fn array_to_pages( // still have to figure out how to deal with values that are i32::MAX size, such as very large // strings or a list column with many elements - let array_byte_size = estimated_bytes_size(array.as_ref()); + let array_byte_size = estimated_bytes_size(array); if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 { let split_at = array.len() / 2; let left = array.slice(0, split_at); From 3a0561689443ef488c148c3ed89c0619908ea7ab Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 23 Nov 2022 17:12:05 +0800 Subject: [PATCH 08/10] feat(parquet): fix doc test --- src/io/parquet/write/sink.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/io/parquet/write/sink.rs b/src/io/parquet/write/sink.rs index 5086b8e09e2..1eeb83e21b2 100644 --- a/src/io/parquet/write/sink.rs +++ b/src/io/parquet/write/sink.rs @@ -35,6 +35,7 @@ use super::{Encoding, SchemaDescriptor, WriteOptions}; /// write_statistics: true, /// compression: CompressionOptions::Uncompressed, /// version: Version::V2, +/// data_pagesize_limit: None, /// }; /// /// let mut buffer = vec![]; From b272cd9d33239d1c71ec4c88b2ac1d7836d77d23 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 24 Nov 2022 09:32:59 +0800 Subject: [PATCH 09/10] feat(parquet): address comments --- src/io/parquet/write/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index b2f7854174f..eee7762a173 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -56,7 +56,7 @@ pub struct WriteOptions { pub version: Version, /// The compression to apply to every page pub compression: CompressionOptions, - /// The size to flush a page, defaults to 1024 * 1024 if zero + /// The size to flush a page, defaults to 1024 * 1024 if None pub data_pagesize_limit: Option, } @@ -172,8 +172,8 @@ pub fn array_to_pages( const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE); let bytes_per_row = - (((array_byte_size as f64) / (array.len() as f64)) as usize).max(1); - let rows_per_page = (page_size / bytes_per_row).max(1); + ((array_byte_size as f64) / (array.len() as f64)) as usize; + let rows_per_page = page_size / (bytes_per_row + 1); let vs: Vec> = (0..array.len()) .step_by(rows_per_page) From 795b09536ca2034353976d3459fb34ad08d313f6 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 24 Nov 2022 12:21:36 +0800 Subject: [PATCH 10/10] feat(parquet): address comments --- src/io/parquet/write/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index eee7762a173..f48c1a67ea6 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -172,8 +172,8 @@ pub fn array_to_pages( const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE); let bytes_per_row = - ((array_byte_size as f64) / (array.len() as f64)) as usize; - let rows_per_page = page_size / (bytes_per_row + 1); + ((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize; + let rows_per_page = (page_size / (bytes_per_row + 1)).max(1); let vs: Vec> = (0..array.len()) .step_by(rows_per_page)