Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved lifetime in writing parquet (#1038)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 1, 2022
1 parent 8ced21b commit 4477126
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 18 deletions.
49 changes: 32 additions & 17 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arrow2::{
chunk::Chunk as AChunk,
datatypes::*,
error::{Error, Result},
io::parquet::write::*,
io::parquet::{read::ParquetError, write::*},
};

type Chunk = AChunk<Arc<dyn Array>>;
Expand Down Expand Up @@ -42,17 +42,16 @@ impl FallibleStreamingIterator for Bla {
}
}

fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()> {
fn parallel_write(path: &str, schema: Schema, batches: &[Chunk]) -> Result<()> {
// declare the options
let options = WriteOptions {
write_statistics: true,
compression: CompressionOptions::Snappy,
version: Version::V2,
};

// declare encodings
let encodings = schema.fields().par_iter().map(|field| {
match field.data_type().to_physical_type() {
let encoding_map = |data_type: &DataType| {
match data_type.to_physical_type() {
// let's be fancy and use delta-encoding for binary fields
PhysicalType::Binary
| PhysicalType::LargeBinary
Expand All @@ -61,25 +60,41 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()>
// remaining is plain
_ => Encoding::Plain,
}
});
};

// declare encodings
let encodings = (&schema.fields)
.par_iter()
.map(|f| transverse(&f.data_type, encoding_map))
.collect::<Vec<_>>();

// derive the parquet schema (physical types) from arrow's schema.
let parquet_schema = to_parquet_schema(schema)?;
let parquet_schema = to_parquet_schema(&schema)?;

let a = parquet_schema.clone();
let row_groups = batches.iter().map(|batch| {
// write batch to pages; parallelized by rayon
let columns = batch
.columns()
.par_iter()
.zip(a.columns().to_vec().into_par_iter())
.zip(parquet_schema.fields().to_vec())
.zip(encodings.clone())
.map(|((array, descriptor), encoding)| {
// create encoded and compressed pages this column
let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?;
encoded_pages
.map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into()))
.collect::<Result<VecDeque<_>>>()
.flat_map(move |((array, type_), encoding)| {
let encoded_columns = array_to_columns(array, type_, options, encoding).unwrap();
encoded_columns
.into_iter()
.map(|encoded_pages| {
let encoded_pages = DynIter::new(
encoded_pages
.into_iter()
.map(|x| x.map_err(|e| ParquetError::General(e.to_string()))),
);
encoded_pages
.map(|page| {
compress(page?, vec![], options.compression).map_err(|x| x.into())
})
.collect::<Result<VecDeque<_>>>()
})
.collect::<Vec<_>>()
})
.collect::<Result<Vec<VecDeque<CompressedPage>>>>()?;

Expand All @@ -88,7 +103,7 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()>
.into_iter()
.map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))),
);
Ok(row_group)
Result::Ok(row_group)
});

// Create a new empty file
Expand Down Expand Up @@ -135,5 +150,5 @@ fn main() -> Result<()> {
};
let batch = create_batch(5_000_000)?;

parallel_write("example.parquet", &schema, &[batch.clone(), batch])
parallel_write("example.parquet", schema, &[batch.clone(), batch])
}
2 changes: 1 addition & 1 deletion src/io/parquet/write/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec<ParquetPrimiti
}

/// Returns a vector of iterators of [`EncodedPage`], one per leaf column in the array
pub fn array_to_columns<A: AsRef<dyn Array> + 'static + Send + Sync>(
pub fn array_to_columns<A: AsRef<dyn Array> + Send + Sync>(
array: A,
type_: ParquetType,
options: WriteOptions,
Expand Down

0 comments on commit 4477126

Please sign in to comment.