Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Made parallel write parquet example over batches (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Nov 4, 2021
1 parent fab0bc1 commit 8f2c9ea
Showing 1 changed file with 64 additions and 30 deletions.
94 changes: 64 additions & 30 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
/// Example demonstrating how to write to parquet in parallel.
//! Example demonstrating how to write to parquet in parallel.
use std::collections::VecDeque;
use std::sync::Arc;

use rayon::prelude::*;

use arrow2::{
array::*, datatypes::PhysicalType, error::Result, io::parquet::write::*,
array::*,
datatypes::PhysicalType,
error::{ArrowError, Result},
io::parquet::write::*,
record_batch::RecordBatch,
};

fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> {
struct Bla {
columns: VecDeque<CompressedPage>,
current: Option<CompressedPage>,
}

impl Bla {
pub fn new(columns: VecDeque<CompressedPage>) -> Self {
Self {
columns,
current: None,
}
}
}

impl FallibleStreamingIterator for Bla {
type Item = CompressedPage;
type Error = ArrowError;

fn advance(&mut self) -> Result<()> {
self.current = self.columns.pop_front();
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
self.current.as_ref()
}
}

fn parallel_write(path: &str, batches: &[RecordBatch]) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: Compression::Snappy,
version: Version::V2,
};
let encodings = batch.schema().fields().par_iter().map(|field| {
let encodings = batches[0].schema().fields().par_iter().map(|field| {
match field.data_type().to_physical_type() {
// let's be fancy and use delta-encoding for binary fields
PhysicalType::Binary
Expand All @@ -26,30 +58,32 @@ fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> {
}
});

let parquet_schema = to_parquet_schema(batch.schema())?;

// write batch to pages; parallelized by rayon
let columns = batch
.columns()
.par_iter()
.zip(parquet_schema.columns().to_vec().into_par_iter())
.zip(encodings)
.map(|((array, descriptor), encoding)| {
// create encoded and compressed pages this column
let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?;
encoded_pages
.map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into()))
.collect::<Result<Vec<_>>>()
})
.collect::<Result<Vec<Vec<CompressedPage>>>>()?;
let parquet_schema = to_parquet_schema(batches[0].schema())?;

let a = parquet_schema.clone();
let row_groups = batches.iter().map(|batch| {
// write batch to pages; parallelized by rayon
let columns = batch
.columns()
.par_iter()
.zip(a.columns().to_vec().into_par_iter())
.zip(encodings.clone())
.map(|((array, descriptor), encoding)| {
// create encoded and compressed pages this column
let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?;
encoded_pages
.map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into()))
.collect::<Result<VecDeque<_>>>()
})
.collect::<Result<Vec<VecDeque<CompressedPage>>>>()?;

// create the iterator over groups (one in this case)
// (for more batches, create the iterator from them here)
let row_groups = std::iter::once(Result::Ok(DynIter::new(columns.iter().map(|column| {
Ok(DynStreamingIterator::new(
fallible_streaming_iterator::convert(column.iter().map(Ok)),
))
}))));
let row_group = DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))),
);
Ok(row_group)
});

// Create a new empty file
let mut file = std::fs::File::create(path)?;
Expand All @@ -58,7 +92,7 @@ fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> {
let _file_size = write_file(
&mut file,
row_groups,
batch.schema(),
batches[0].schema(),
parquet_schema,
options,
None,
Expand Down Expand Up @@ -88,7 +122,7 @@ fn create_batch(size: usize) -> Result<RecordBatch> {
}

fn main() -> Result<()> {
let batch = create_batch(10_000_000)?;
let batch = create_batch(5_000_000)?;

parallel_write("example.parquet", &batch)
parallel_write("example.parquet", &[batch.clone(), batch])
}

0 comments on commit 8f2c9ea

Please sign in to comment.