Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed parquet_write_parallel (#539)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Oct 19, 2021
1 parent bd8f02b commit d86f35c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
19 changes: 10 additions & 9 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,21 @@ fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> {
.zip(parquet_schema.columns().to_vec().into_par_iter())
.zip(encodings)
.map(|((array, descriptor), encoding)| {
let array = array.clone();

// create encoded and compressed pages this column
Ok(array_to_pages(array, descriptor, options, encoding)?.collect::<Vec<_>>())
let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?;
encoded_pages
.map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into()))
.collect::<Result<Vec<_>>>()
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<Vec<CompressedPage>>>>()?;

// create the iterator over groups (one in this case)
// (for more batches, create the iterator from them here)
let row_groups = std::iter::once(Result::Ok(DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynIter::new(column.into_iter()))),
)));
let row_groups = std::iter::once(Result::Ok(DynIter::new(columns.iter().map(|column| {
Ok(DynStreamingIterator::new(
fallible_streaming_iterator::convert(column.iter().map(Ok)),
))
}))));

// Create a new empty file
let mut file = std::fs::File::create(path)?;
Expand Down
5 changes: 3 additions & 2 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ use parquet2::page::DataPage;
pub use parquet2::{
compression::Compression,
encoding::Encoding,
fallible_streaming_iterator,
metadata::{ColumnDescriptor, KeyValue, SchemaDescriptor},
page::{CompressedDataPage, CompressedPage, EncodedPage},
schema::types::ParquetType,
write::{
write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator, RowGroupIter,
Version, WriteOptions,
compress, write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator,
RowGroupIter, Version, WriteOptions,
},
FallibleStreamingIterator,
};
Expand Down

0 comments on commit d86f35c

Please sign in to comment.