Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added data_pagesize_limit to write parquet pages (#1303)
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Nov 25, 2022
1 parent 619d8da commit 368aacc
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 3 deletions.
1 change: 1 addition & 0 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ fn main() -> Result<()> {
write_statistics: true,
compression: args.compression.into(),
version: args.version.into(),
data_pagesize_limit: None,
};

let encodings = schema
Expand Down
1 change: 1 addition & 0 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
write_statistics: false,
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
};

let row_groups = RowGroupIterator::try_new(
Expand Down
1 change: 1 addition & 0 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ fn write_chunk(path: &str, schema: Schema, chunk: Chunk<Box<dyn Array>>) -> Resu
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
data_pagesize_limit: None,
};

let iter = vec![Ok(chunk)];
Expand Down
1 change: 1 addition & 0 deletions examples/parquet_write_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async fn write_batch(path: &str, schema: Schema, columns: Chunk<Box<dyn Array>>)
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
data_pagesize_limit: None,
};

let mut stream = futures::stream::iter(vec![Ok(columns)].into_iter());
Expand Down
1 change: 1 addition & 0 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {
write_statistics: true,
compression: CompressionOptions::Snappy,
version: Version::V2,
data_pagesize_limit: None,
};

let encoding_map = |data_type: &DataType| {
Expand Down
1 change: 1 addition & 0 deletions src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ fn main() -> Result<()> {
write_statistics: true,
compression: CompressionOptions::Snappy,
version: Version::V1,
data_pagesize_limit: None,
};

let row_groups = RowGroupIterator::try_new(
Expand Down
32 changes: 29 additions & 3 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct WriteOptions {
pub version: Version,
/// The compression to apply to every page
pub compression: CompressionOptions,
/// The size to flush a page, defaults to 1024 * 1024 if None
pub data_pagesize_limit: Option<usize>,
}

use crate::compute::aggregate::estimated_bytes_size;
Expand Down Expand Up @@ -129,6 +131,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
}

/// Returns an iterator of [`EncodedPage`].
#[allow(clippy::needless_collect)]
pub fn array_to_pages(
array: &dyn Array,
type_: ParquetPrimitiveType,
Expand All @@ -141,7 +144,9 @@ pub fn array_to_pages(
// we also check for an array.len > 3 to prevent infinite recursion
// still have to figure out how to deal with values that are i32::MAX size, such as very large
// strings or a list column with many elements
if (estimated_bytes_size(array)) >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 {

let array_byte_size = estimated_bytes_size(array);
if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 {
let split_at = array.len() / 2;
let left = array.slice(0, split_at);
let right = array.slice(split_at, array.len() - split_at);
Expand All @@ -163,8 +168,29 @@ pub fn array_to_pages(
)
})
}
_ => array_to_page(array, type_, nested, options, encoding)
.map(|page| DynIter::new(std::iter::once(Ok(page)))),
_ => {
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
let bytes_per_row =
((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize;
let rows_per_page = (page_size / (bytes_per_row + 1)).max(1);

let vs: Vec<Result<EncodedPage>> = (0..array.len())
.step_by(rows_per_page)
.map(|offset| {
let length = if offset + rows_per_page > array.len() {
array.len() - offset
} else {
rows_per_page
};

let sub_array = array.slice(offset, length);
array_to_page(sub_array.as_ref(), type_.clone(), nested, options, encoding)
})
.collect();

Ok(DynIter::new(vs.into_iter()))
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/write/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use super::{Encoding, SchemaDescriptor, WriteOptions};
/// write_statistics: true,
/// compression: CompressionOptions::Uncompressed,
/// version: Version::V2,
/// data_pagesize_limit: None,
/// };
///
/// let mut buffer = vec![];
Expand Down
1 change: 1 addition & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,7 @@ fn integration_write(schema: &Schema, chunks: &[Chunk<Box<dyn Array>>]) -> Resul
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
};

let encodings = schema
Expand Down
2 changes: 2 additions & 0 deletions tests/it/io/parquet/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fn pages(
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
};

let pages1 = [array11, array12, array13]
Expand Down Expand Up @@ -79,6 +80,7 @@ fn read_with_indexes(
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
};

let to_compressed = |pages: Vec<EncodedPage>| {
Expand Down
1 change: 1 addition & 0 deletions tests/it/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ fn round_trip(
write_statistics: true,
compression,
version,
data_pagesize_limit: None,
};

let iter = vec![Chunk::try_new(vec![array.clone()])];
Expand Down
1 change: 1 addition & 0 deletions tests/it/io/parquet/write_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async fn test_parquet_async_roundtrip() {
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
data_pagesize_limit: None,
};

let mut buffer = Cursor::new(Vec::new());
Expand Down

0 comments on commit 368aacc

Please sign in to comment.