Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added example showing parallel writes to parquet (x num_cores) #436

Merged
merged 1 commit into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/parquet_write_parallel/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.parquet
8 changes: 8 additions & 0 deletions examples/parquet_write_parallel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "parquet_write_parallel"
version = "0.1.0"
edition = "2018"

[dependencies]
arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] }
rayon = { version = "1", default-features = false }
93 changes: 93 additions & 0 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/// Example demonstrating how to write to parquet in parallel.
use std::sync::Arc;

use rayon::prelude::*;

use arrow2::{
array::*, datatypes::PhysicalType, error::Result, io::parquet::write::*,
record_batch::RecordBatch,
};

fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: Compression::Snappy,
version: Version::V2,
};
let encodings = batch.schema().fields().par_iter().map(|field| {
match field.data_type().to_physical_type() {
// let's be fancy and use delta-encoding for binary fields
PhysicalType::Binary
| PhysicalType::LargeBinary
| PhysicalType::Utf8
| PhysicalType::LargeUtf8 => Encoding::DeltaLengthByteArray,
// remaining is plain
_ => Encoding::Plain,
}
});

let parquet_schema = to_parquet_schema(batch.schema())?;

// write batch to pages; parallelized by rayon
let columns = batch
.columns()
.par_iter()
.zip(parquet_schema.columns().to_vec().into_par_iter())
.zip(encodings)
.map(|((array, descriptor), encoding)| {
let array = array.clone();

// create encoded and compressed pages this column
Ok(array_to_pages(array, descriptor, options, encoding)?.collect::<Vec<_>>())
})
.collect::<Result<Vec<_>>>()?;

// create the iterator over groups (one in this case)
// (for more batches, create the iterator from them here)
let row_groups = std::iter::once(Result::Ok(DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynIter::new(column.into_iter()))),
)));

// Create a new empty file
let mut file = std::fs::File::create(path)?;

// Write the file.
let _file_size = write_file(
&mut file,
row_groups,
batch.schema(),
parquet_schema,
options,
None,
)?;

Ok(())
}

fn create_batch(size: usize) -> Result<RecordBatch> {
let c1: Int32Array = (0..size)
.map(|x| if x % 9 == 0 { None } else { Some(x as i32) })
.collect();
let c2: Utf8Array<i32> = (0..size)
.map(|x| {
if x % 8 == 0 {
None
} else {
Some(x.to_string())
}
})
.collect();

RecordBatch::try_from_iter([
("c1", Arc::new(c1) as Arc<dyn Array>),
("c2", Arc::new(c2) as Arc<dyn Array>),
])
}

fn main() -> Result<()> {
let batch = create_batch(10_000_000)?;

parallel_write("example.parquet", &batch)
}
16 changes: 16 additions & 0 deletions guide/src/io/parquet_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ First, some notation:
* `column chunk`: composed of multiple pages (similar of an `Array`)
* `row group`: a group of columns with the same length (similar of a `RecordBatch` in Arrow)

## Single threaded

Here is an example of how to write a single column chunk into a single row group:

```rust
Expand All @@ -23,3 +25,17 @@ assumes that a `RecordBatch` is mapped to a single row group with a single page
```rust
{{#include ../../../examples/parquet_write_record.rs}}
```

## Multi-threaded writing

As user of this crate, you will need to decide how you would like to parallelize,
and whether order is important. Below you can find an example where we
use [`rayon`](https://crates.io/crates/rayon) to perform the heavy lift of
encoding and compression.
This operation is [embarrassingly parallel](https://en.wikipedia.org/wiki/Embarrassingly_parallel)
and results in a speed up equal to minimum between the number of cores
and number of columns in the record.

```rust
{{#include ../../../examples/parquet_write_parallel/src/main.rs}}
```