diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index 204de4e835e..415da272c59 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -35,20 +35,21 @@ fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> { .zip(parquet_schema.columns().to_vec().into_par_iter()) .zip(encodings) .map(|((array, descriptor), encoding)| { - let array = array.clone(); - // create encoded and compressed pages this column - Ok(array_to_pages(array, descriptor, options, encoding)?.collect::>()) + let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?; + encoded_pages + .map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into())) + .collect::>>() }) - .collect::>>()?; + .collect::>>>()?; // create the iterator over groups (one in this case) // (for more batches, create the iterator from them here) - let row_groups = std::iter::once(Result::Ok(DynIter::new( - columns - .into_iter() - .map(|column| Ok(DynIter::new(column.into_iter()))), - ))); + let row_groups = std::iter::once(Result::Ok(DynIter::new(columns.iter().map(|column| { + Ok(DynStreamingIterator::new( + fallible_streaming_iterator::convert(column.iter().map(Ok)), + )) + })))); // Create a new empty file let mut file = std::fs::File::create(path)?; diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index e1a6aa6b5e9..ab1f07c5e76 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -26,12 +26,13 @@ use parquet2::page::DataPage; pub use parquet2::{ compression::Compression, encoding::Encoding, + fallible_streaming_iterator, metadata::{ColumnDescriptor, KeyValue, SchemaDescriptor}, page::{CompressedDataPage, CompressedPage, EncodedPage}, schema::types::ParquetType, write::{ - write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator, RowGroupIter, - Version, WriteOptions, + compress, write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator, + RowGroupIter, Version, WriteOptions, }, FallibleStreamingIterator, };