Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added parquet StructArray
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Nov 13, 2021
1 parent 73ee16d commit 7e86a85
Show file tree
Hide file tree
Showing 17 changed files with 587 additions and 452 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] }
#parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "struct", optional = true, default_features = false, features = ["stream"] }
#parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] }

avro-rs = { version = "0.13", optional = true, default_features = false }

Expand Down
34 changes: 14 additions & 20 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,43 @@ use std::io::BufReader;
use arrow2::io::parquet::read;
use arrow2::{array::Array, error::Result};

fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<dyn Array>> {
fn read_field(path: &str, row_group: usize, field: usize) -> Result<Box<dyn Array>> {
// Open a file, a common operation in Rust
let mut file = BufReader::new(File::open(path)?);

// Read the files' metadata. This has a small IO cost because it requires seeking to the end
// of the file to read its footer.
let file_metadata = read::read_metadata(&mut file)?;
let metadata = read::read_metadata(&mut file)?;

// Convert the files' metadata into an arrow schema. This is CPU-only and amounts to
// parse thrift if the arrow format is available on a key, or infering the arrow schema from
// the parquet's physical, converted and logical types.
let arrow_schema = read::get_schema(&file_metadata)?;
let arrow_schema = read::get_schema(&metadata)?;

// get the columns' metadata
let metadata = file_metadata.row_groups[row_group].column(column);

// Construct an iterator over pages. This binds `file` to this iterator, and each iteration
// is IO intensive as it will read a compressed page into memory. There is almost no CPU work
// on this operation
let pages = read::get_page_iterator(metadata, &mut file, None, vec![])?;
// Created an iterator of column chunks. Each iteration
// yields an iterator of compressed pages. There is almost no CPU work in iterating.
let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]);

// get the columns' logical type
let data_type = arrow_schema.fields()[column].data_type().clone();
let data_type = arrow_schema.fields()[field].data_type().clone();

// This is the actual work. In this case, pages are read (by calling `iter.next()`) and are
// immediately decompressed, decoded, deserialized to arrow and deallocated.
// This uses a combination of IO and CPU. At this point, `array` is the arrow-corresponding
// array of the parquets' physical type.
// `Decompressor` re-uses an internal buffer for de-compression, thereby maximizing memory re-use.
let mut pages = read::Decompressor::new(pages, vec![]);
// This is the actual work. In this case, pages are read and
// decompressed, decoded and deserialized to arrow.
// Because `columns` is an iterator, it uses a combination of IO and CPU.
let (array, _, _) = read::column_iter_to_array(columns, data_type, vec![])?;

read::page_iter_to_array(&mut pages, metadata, data_type)
Ok(array)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let column = args[2].parse::<usize>().unwrap();
let field = args[2].parse::<usize>().unwrap();
let row_group = args[3].parse::<usize>().unwrap();

let array = read_column_chunk(file_path, row_group, column)?;
let array = read_field(file_path, row_group, field)?;
println!("{}", array);
Ok(())
}
123 changes: 72 additions & 51 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,86 +5,107 @@ use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

use arrow2::{array::Array, error::Result, io::parquet::read};
use arrow2::{
array::Array, error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator,
record_batch::RecordBatch,
};

fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
// prepare a channel to send serialized records from threads
fn parallel_read(path: &str, row_group: usize) -> Result<RecordBatch> {
// prepare a channel to send compressed pages across threads.
let (tx, rx) = unbounded();

let mut file = File::open(path)?;
let file_metadata = read::read_metadata(&mut file)?;
let arrow_schema = Arc::new(read::get_schema(&file_metadata)?);

let file_metadata = Arc::new(file_metadata);

let start = SystemTime::now();
// spawn a thread to produce `Vec<CompressedPage>` (IO bounded)
let producer_metadata = file_metadata.clone();
let child = thread::spawn(move || {
for column in 0..producer_metadata.schema().num_columns() {
for row_group in 0..producer_metadata.row_groups.len() {
let start = SystemTime::now();
let column_metadata = producer_metadata.row_groups[row_group].column(column);
println!("produce start: {} {}", column, row_group);
let pages = read::get_page_iterator(column_metadata, &mut file, None, vec![])
.unwrap()
.collect::<Vec<_>>();
println!(
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
column,
row_group
);
tx.send((column, row_group, pages)).unwrap();
}
}
});

let mut children = Vec::new();
// use 3 consumers of to decompress, decode and deserialize.
for _ in 0..3 {
let rx_consumer = rx.clone();
let metadata_consumer = file_metadata.clone();
let arrow_schema_consumer = arrow_schema.clone();
let child = thread::spawn(move || {
let (column, row_group, pages) = rx_consumer.recv().unwrap();
let producer = thread::spawn(move || {
for field in 0..file_metadata.schema().fields().len() {
let start = SystemTime::now();
println!("consumer start - {} {}", column, row_group);
let metadata = metadata_consumer.row_groups[row_group].column(column);
let data_type = arrow_schema_consumer.fields()[column].data_type().clone();

let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]);
let mut columns = read::get_column_iterator(
&mut file,
&file_metadata,
row_group,
field,
None,
vec![],
);

println!("produce start - field: {}", field);

while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

let array = read::page_iter_to_array(&mut pages, metadata, data_type);
tx.send((field, metadata.clone(), pages)).unwrap();
}
columns = new_iter;
}
println!(
"consumer end - {:?}: {} {}",
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
column,
field,
row_group
);
array
});
children.push(child);
}
}
});

// use 2 consumers for CPU-intensive to decompress, decode and deserialize.
#[allow(clippy::needless_collect)] // we need to collect to parallelize
let consumers = (0..2)
.map(|i| {
let rx_consumer = rx.clone();
let arrow_schema_consumer = arrow_schema.clone();
thread::spawn(move || {
let mut arrays = vec![];
while let Ok((field, metadata, pages)) = rx_consumer.recv() {
let start = SystemTime::now();
let data_type = arrow_schema_consumer.fields()[field].data_type().clone();
println!("consumer {} start - {}", i, field);

let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]);

let array = read::page_iter_to_array(&mut pages, &metadata, data_type);
println!(
"consumer {} end - {:?}: {}",
i,
start.elapsed().unwrap(),
field
);

arrays.push((field, array))
}
arrays
})
})
.collect::<Vec<_>>();

child.join().expect("child thread panicked");
producer.join().expect("producer thread panicked");

let arrays = children
// collect all columns (join threads)
let mut columns = consumers
.into_iter()
.map(|x| x.join().unwrap())
.collect::<Result<Vec<_>>>()?;
.flatten()
.map(|x| Ok((x.0, x.1?)))
.collect::<Result<Vec<(usize, Box<dyn Array>)>>>()?;
// order may not be the same
columns.sort_unstable_by_key(|x| x.0);
let columns = columns.into_iter().map(|x| x.1.into()).collect();
println!("Finished - {:?}", start.elapsed().unwrap());

Ok(arrays)
RecordBatch::try_new(arrow_schema, columns)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();
let file_path = &args[1];

let arrays = parallel_read(file_path)?;
for array in arrays {
let batch = parallel_read(file_path, 0)?;
for array in batch.columns() {
println!("{}", array)
}
Ok(())
Expand Down
28 changes: 27 additions & 1 deletion parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,32 @@ def case_nested(size):
)


def case_struct(size):
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
struct_fields = [
("f1", pa.utf8()),
("f2", pa.bool_()),
]
fields = [
pa.field(
"struct",
pa.struct(struct_fields),
)
]
schema = pa.schema(fields)
return (
{
"struct": pa.StructArray.from_arrays(
[pa.array(string * size), pa.array(boolean * size)],
fields=struct_fields,
),
},
schema,
f"struct_nullable_{size*10}.parquet",
)


def write_pyarrow(
case,
size: int,
Expand Down Expand Up @@ -228,7 +254,7 @@ def write_pyarrow(
)


for case in [case_basic_nullable, case_basic_required, case_nested]:
for case in [case_basic_nullable, case_basic_required, case_nested, case_struct]:
for version in [1, 2]:
for use_dict in [True, False]:
write_pyarrow(case, 1, version, use_dict, False, False)
Expand Down
33 changes: 33 additions & 0 deletions src/io/parquet/read/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
## Observations

### LSB equivalence between definition levels and bitmaps

* When the maximum repetition level is 0 and the maximum definition level is 1,
the RLE-encoded definition levels correspond exactly to Arrow's bitmap and can be
memcopied without further transformations.

* Reading a parquet nested field can be done by reading each individual primitive
column and "building" the nested struct in arrow.

## Nested parquet groups are deserialized recursively

Rows of nested parquet groups are encoded in the repetition and definition levels.
In arrow, they correspond to:
* list's offsets and validity
* struct's validity

An implementation that leverages this observation:

When nested types are observed in a parquet column, we recurse over the struct to gather
whether the type is a Struct or List and whether it is required or optional, which we store
in a `Vec<Nested>`. `Nested` is an enum that can process definition and repetition
levels depending on the type and nullability.

When processing pages, we process the definition and repetition levels into `Vec`.

When we finish a column chunk, we recursively pop `Vec` as we are building the `StructArray`
or `ListArray`.

With this approach, the only difference vs flat is that we cannot leverage the bitmap
optimization, and instead need to deserialize the repetition and definition
levels to `i32`.
Loading

0 comments on commit 7e86a85

Please sign in to comment.