Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Exposed missing APIs to write parquet in parallel #539

Merged
merged 1 commit into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,21 @@ fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> {
.zip(parquet_schema.columns().to_vec().into_par_iter())
.zip(encodings)
.map(|((array, descriptor), encoding)| {
let array = array.clone();

// create encoded and compressed pages this column
Ok(array_to_pages(array, descriptor, options, encoding)?.collect::<Vec<_>>())
let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?;
encoded_pages
.map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into()))
.collect::<Result<Vec<_>>>()
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<Vec<CompressedPage>>>>()?;

// create the iterator over groups (one in this case)
// (for more batches, create the iterator from them here)
let row_groups = std::iter::once(Result::Ok(DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynIter::new(column.into_iter()))),
)));
let row_groups = std::iter::once(Result::Ok(DynIter::new(columns.iter().map(|column| {
Ok(DynStreamingIterator::new(
fallible_streaming_iterator::convert(column.iter().map(Ok)),
))
}))));

// Create a new empty file
let mut file = std::fs::File::create(path)?;
Expand Down
5 changes: 3 additions & 2 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ use parquet2::page::DataPage;
pub use parquet2::{
compression::Compression,
encoding::Encoding,
fallible_streaming_iterator,
metadata::{ColumnDescriptor, KeyValue, SchemaDescriptor},
page::{CompressedDataPage, CompressedPage, EncodedPage},
schema::types::ParquetType,
write::{
write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator, RowGroupIter,
Version, WriteOptions,
compress, write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator,
RowGroupIter, Version, WriteOptions,
},
FallibleStreamingIterator,
};
Expand Down