Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added data_pagesize_limit to write parquet pages #1303

Merged
merged 10 commits into from
Nov 25, 2022
1 change: 1 addition & 0 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ fn main() -> Result<()> {
write_statistics: true,
compression: args.compression.into(),
version: args.version.into(),
data_pagesize_limit: None,
};

let encodings = schema
Expand Down
1 change: 1 addition & 0 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
write_statistics: false,
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
};

let row_groups = RowGroupIterator::try_new(
Expand Down
1 change: 1 addition & 0 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ fn write_chunk(path: &str, schema: Schema, chunk: Chunk<Box<dyn Array>>) -> Resu
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
data_pagesize_limit: None,
};

let iter = vec![Ok(chunk)];
Expand Down
1 change: 1 addition & 0 deletions examples/parquet_write_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async fn write_batch(path: &str, schema: Schema, columns: Chunk<Box<dyn Array>>)
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
data_pagesize_limit: None,
};

let mut stream = futures::stream::iter(vec![Ok(columns)].into_iter());
Expand Down
1 change: 1 addition & 0 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {
write_statistics: true,
compression: CompressionOptions::Snappy,
version: Version::V2,
data_pagesize_limit: None,
};

let encoding_map = |data_type: &DataType| {
Expand Down
1 change: 1 addition & 0 deletions src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ fn main() -> Result<()> {
write_statistics: true,
compression: CompressionOptions::Snappy,
version: Version::V1,
data_pagesize_limit: None,
};

let row_groups = RowGroupIterator::try_new(
Expand Down
30 changes: 28 additions & 2 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct WriteOptions {
pub version: Version,
/// The compression to apply to every page
pub compression: CompressionOptions,
/// The size to flush a page, defaults to 1024 * 1024 if zero
pub data_pagesize_limit: Option<usize>,
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
}

use crate::compute::aggregate::estimated_bytes_size;
Expand Down Expand Up @@ -141,6 +143,7 @@ pub fn array_to_pages(
// we also check for an array.len > 3 to prevent infinite recursion
// still have to figure out how to deal with values that are i32::MAX size, such as very large
// strings or a list column with many elements

if (estimated_bytes_size(array)) >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 {
let split_at = array.len() / 2;
let left = array.slice(0, split_at);
Expand All @@ -163,8 +166,31 @@ pub fn array_to_pages(
)
})
}
_ => array_to_page(array, type_, nested, options, encoding)
.map(|page| DynIter::new(std::iter::once(Ok(page)))),
_ => {
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);

let bytes_per_row = estimated_bytes_size(array.slice(0, 1).as_ref()).max(1);
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
let row_per_page = page_size / bytes_per_row + 1;

if array.len() <= row_per_page {
array_to_page(array, type_, nested, options, encoding)
.map(|page| DynIter::new(std::iter::once(Ok(page))))
} else {
let first = array.slice(0, row_per_page);
let other = array.slice(row_per_page, array.len() - row_per_page);
Ok(DynIter::new(
array_to_pages(first.as_ref(), type_.clone(), nested, options, encoding)?
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
.chain(array_to_pages(
other.as_ref(),
type_,
nested,
options,
encoding,
)?),
))
}
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,7 @@ fn integration_write(schema: &Schema, chunks: &[Chunk<Box<dyn Array>>]) -> Resul
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
};

let encodings = schema
Expand Down
2 changes: 2 additions & 0 deletions tests/it/io/parquet/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fn pages(
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
};

let pages1 = [array11, array12, array13]
Expand Down Expand Up @@ -79,6 +80,7 @@ fn read_with_indexes(
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
};

let to_compressed = |pages: Vec<EncodedPage>| {
Expand Down
1 change: 1 addition & 0 deletions tests/it/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ fn round_trip(
write_statistics: true,
compression,
version,
data_pagesize_limit: None,
};

let iter = vec![Chunk::try_new(vec![array.clone()])];
Expand Down
1 change: 1 addition & 0 deletions tests/it/io/parquet/write_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async fn test_parquet_async_roundtrip() {
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
data_pagesize_limit: None,
};

let mut buffer = Cursor::new(Vec::new());
Expand Down