diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index 56230b4a4bb..b928a1428c5 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -9,7 +9,7 @@ use arrow2::{ chunk::Chunk as AChunk, datatypes::*, error::{Error, Result}, - io::parquet::write::*, + io::parquet::{read::ParquetError, write::*}, }; type Chunk = AChunk>; @@ -42,7 +42,7 @@ impl FallibleStreamingIterator for Bla { } } -fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()> { +fn parallel_write(path: &str, schema: Schema, batches: &[Chunk]) -> Result<()> { // declare the options let options = WriteOptions { write_statistics: true, @@ -50,9 +50,8 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()> version: Version::V2, }; - // declare encodings - let encodings = schema.fields().par_iter().map(|field| { - match field.data_type().to_physical_type() { + let encoding_map = |data_type: &DataType| { + match data_type.to_physical_type() { // let's be fancy and use delta-encoding for binary fields PhysicalType::Binary | PhysicalType::LargeBinary @@ -61,25 +60,41 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()> // remaining is plain _ => Encoding::Plain, } - }); + }; + + // declare encodings + let encodings = (&schema.fields) + .par_iter() + .map(|f| transverse(&f.data_type, encoding_map)) + .collect::>(); // derive the parquet schema (physical types) from arrow's schema. - let parquet_schema = to_parquet_schema(schema)?; + let parquet_schema = to_parquet_schema(&schema)?; - let a = parquet_schema.clone(); let row_groups = batches.iter().map(|batch| { // write batch to pages; parallelized by rayon let columns = batch .columns() .par_iter() - .zip(a.columns().to_vec().into_par_iter()) + .zip(parquet_schema.fields().to_vec()) .zip(encodings.clone()) - .map(|((array, descriptor), encoding)| { - // create encoded and compressed pages this column - let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?; - encoded_pages - .map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into())) - .collect::>>() + .flat_map(move |((array, type_), encoding)| { + let encoded_columns = array_to_columns(array, type_, options, encoding).unwrap(); + encoded_columns + .into_iter() + .map(|encoded_pages| { + let encoded_pages = DynIter::new( + encoded_pages + .into_iter() + .map(|x| x.map_err(|e| ParquetError::General(e.to_string()))), + ); + encoded_pages + .map(|page| { + compress(page?, vec![], options.compression).map_err(|x| x.into()) + }) + .collect::>>() + }) + .collect::>() }) .collect::>>>()?; @@ -88,7 +103,7 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()> .into_iter() .map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))), ); - Ok(row_group) + Result::Ok(row_group) }); // Create a new empty file @@ -135,5 +150,5 @@ fn main() -> Result<()> { }; let batch = create_batch(5_000_000)?; - parallel_write("example.parquet", &schema, &[batch.clone(), batch]) + parallel_write("example.parquet", schema, &[batch.clone(), batch]) } diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index 95c8057a459..e666c8df05f 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -194,7 +194,7 @@ fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec + 'static + Send + Sync>( +pub fn array_to_columns + Send + Sync>( array: A, type_: ParquetType, options: WriteOptions,