Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed example.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 29, 2021
1 parent 2cd20fe commit 22a187d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 31 deletions.
28 changes: 16 additions & 12 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crossbeam_channel::unbounded;
use parquet2::metadata::ColumnChunkMetaData;

use std::fs::File;
use std::sync::Arc;
Expand All @@ -21,32 +22,35 @@ fn parallel_read(path: &str, row_group: usize) -> Result<RecordBatch> {
let start = SystemTime::now();
// spawn a thread to produce `Vec<CompressedPage>` (IO bounded)
let producer = thread::spawn(move || {
for field in 0..file_metadata.schema().fields().len() {
for (field_i, field) in file_metadata.schema().fields().iter().enumerate() {
let start = SystemTime::now();

let mut columns = read::get_column_iterator(
&mut file,
&file_metadata,
row_group,
field,
field_i,
None,
vec![],
);

println!("produce start - field: {}", field);
println!("produce start - field: {}", field_i);

let mut column_chunks = vec![];
while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

tx.send((field, metadata.clone(), pages)).unwrap();
column_chunks.push((pages, metadata.clone()));
}
columns = new_iter;
}
// todo: create API to allow sending each column (and not column chunks) to be processed in parallel
tx.send((field_i, field.clone(), column_chunks)).unwrap();
println!(
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
field,
field_i,
row_group
);
}
Expand All @@ -60,22 +64,22 @@ fn parallel_read(path: &str, row_group: usize) -> Result<RecordBatch> {
let arrow_schema_consumer = arrow_schema.clone();
thread::spawn(move || {
let mut arrays = vec![];
while let Ok((field, metadata, pages)) = rx_consumer.recv() {
while let Ok((field_i, field, column_chunks)) = rx_consumer.recv() {
let start = SystemTime::now();
let data_type = arrow_schema_consumer.fields()[field].data_type().clone();
println!("consumer {} start - {}", i, field);
let data_type = arrow_schema_consumer.fields()[field_i].data_type().clone();
println!("consumer {} start - {}", i, field_i);

let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]);
let columns = read::ReadColumnIterator::new(field, column_chunks);

let array = read::page_iter_to_array(&mut pages, &metadata, data_type);
let array = read::column_iter_to_array(columns, data_type, vec![]).map(|x| x.0);
println!(
"consumer {} end - {:?}: {}",
i,
start.elapsed().unwrap(),
field
field_i
);

arrays.push((field, array))
arrays.push((field_i, array))
}
arrays
})
Expand Down
37 changes: 19 additions & 18 deletions src/io/parquet/read/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,33 @@

### LSB equivalence between definition levels and bitmaps

* When the maximum repetition level is 0 and the maximum definition level is 1,
the RLE-encoded definition levels correspond exactly to Arrow's bitmap and can be
memcopied without further transformations.

* Reading a parquet nested field can be done by reading each individual primitive
column and "building" the nested struct in arrow.
When the maximum repetition level is 0 and the maximum definition level is 1,
the RLE-encoded definition levels correspond exactly to Arrow's bitmap and can be
memcopied without further transformations.

## Nested parquet groups are deserialized recursively

Reading a parquet nested field is done by reading each primitive
column sequentially, and build the nested struct recursively.

Rows of nested parquet groups are encoded in the repetition and definition levels.
In arrow, they correspond to:
* list's offsets and validity
* struct's validity

An implementation that leverages this observation:

When nested types are observed in a parquet column, we recurse over the struct to gather
whether the type is a Struct or List and whether it is required or optional, which we store
in a `Vec<Nested>`. `Nested` is an enum that can process definition and repetition
levels depending on the type and nullability.
The implementation in this module leverages this observation:

When processing pages, we process the definition and repetition levels into `Vec`.
Nested parquet fields are initially recursed over to gather
whether the type is a Struct or List, and whether it is required or optional, which we store
in `nested_info: Vec<Box<dyn Nested>>`. `Nested` is a trait object that receives definition
and repetition levels depending on the type and nullability of the nested item.
We process the definition and repetition levels into `nested_info`.

When we finish a column chunk, we recursively pop `Vec` as we are building the `StructArray`
or `ListArray`.
When we finish a field, we recursively pop from `nested_info` as we build
the `StructArray` or `ListArray`.

With this approach, the only difference vs flat is that we cannot leverage the bitmap
optimization, and instead need to deserialize the repetition and definition
levels to `i32`.
With this approach, the only difference vs flat is:
1. we do not leverage the bitmap optimization, and instead need to deserialize the repetition
and definition levels to `i32`.
2. we deserialize definition levels twice, once to extend the values/nullability and
one to extend `nested_info`.
2 changes: 1 addition & 1 deletion src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use parquet2::{
decompress, get_column_iterator, get_page_iterator as _get_page_iterator,
get_page_stream as _get_page_stream, read_metadata as _read_metadata,
read_metadata_async as _read_metadata_async, BasicDecompressor, ColumnChunkIter,
Decompressor, MutStreamingIterator, PageFilter, PageIterator, State,
Decompressor, MutStreamingIterator, PageFilter, PageIterator, ReadColumnIterator, State,
},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
Expand Down

0 comments on commit 22a187d

Please sign in to comment.