Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read StructArray from parquet #547

Merged
merged 6 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] }
# parquet support
parquet2 = { version = "0.7", optional = true, default_features = false, features = ["stream"] }

# avro
avro-rs = { version = "0.13", optional = true, default_features = false }

# compression of avro
libflate = { version = "1.1.1", optional = true }

# for division/remainder optimization at runtime
Expand Down
36 changes: 15 additions & 21 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,43 @@ use std::io::BufReader;
use arrow2::io::parquet::read;
use arrow2::{array::Array, error::Result};

fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<dyn Array>> {
fn read_field(path: &str, row_group: usize, field: usize) -> Result<Box<dyn Array>> {
// Open a file, a common operation in Rust
let mut file = BufReader::new(File::open(path)?);

// Read the files' metadata. This has a small IO cost because it requires seeking to the end
// of the file to read its footer.
let file_metadata = read::read_metadata(&mut file)?;
let metadata = read::read_metadata(&mut file)?;

// Convert the files' metadata into an arrow schema. This is CPU-only and amounts to
// parse thrift if the arrow format is available on a key, or infering the arrow schema from
// the parquet's physical, converted and logical types.
let arrow_schema = read::get_schema(&file_metadata)?;
let arrow_schema = read::get_schema(&metadata)?;

// get the columns' metadata
let metadata = file_metadata.row_groups[row_group].column(column);
// Created an iterator of column chunks. Each iteration
// yields an iterator of compressed pages. There is almost no CPU work in iterating.
let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]);

// Construct an iterator over pages. This binds `file` to this iterator, and each iteration
// is IO intensive as it will read a compressed page into memory. There is almost no CPU work
// on this operation
let pages = read::get_page_iterator(metadata, &mut file, None, vec![])?;
// get the columns' field
let field = &arrow_schema.fields()[field];

// get the columns' logical type
let data_type = arrow_schema.fields()[column].data_type().clone();
// This is the actual work. In this case, pages are read and
// decompressed, decoded and deserialized to arrow.
// Because `columns` is an iterator, it uses a combination of IO and CPU.
let (array, _, _) = read::column_iter_to_array(columns, field, vec![])?;

// This is the actual work. In this case, pages are read (by calling `iter.next()`) and are
// immediately decompressed, decoded, deserialized to arrow and deallocated.
// This uses a combination of IO and CPU. At this point, `array` is the arrow-corresponding
// array of the parquets' physical type.
// `Decompressor` re-uses an internal buffer for de-compression, thereby maximizing memory re-use.
let mut pages = read::Decompressor::new(pages, vec![]);

read::page_iter_to_array(&mut pages, metadata, data_type)
Ok(array)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let column = args[2].parse::<usize>().unwrap();
let field = args[2].parse::<usize>().unwrap();
let row_group = args[3].parse::<usize>().unwrap();

let array = read_column_chunk(file_path, row_group, column)?;
let array = read_field(file_path, row_group, field)?;
println!("{}", array);
Ok(())
}
127 changes: 76 additions & 51 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,90 +1,115 @@
use crossbeam_channel::unbounded;
use parquet2::metadata::ColumnChunkMetaData;

use std::fs::File;
use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

use arrow2::{array::Array, error::Result, io::parquet::read};
use arrow2::{
array::Array, error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator,
record_batch::RecordBatch,
};

fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
// prepare a channel to send serialized records from threads
fn parallel_read(path: &str, row_group: usize) -> Result<RecordBatch> {
// prepare a channel to send compressed pages across threads.
let (tx, rx) = unbounded();

let mut file = File::open(path)?;
let file_metadata = read::read_metadata(&mut file)?;
let arrow_schema = Arc::new(read::get_schema(&file_metadata)?);

let file_metadata = Arc::new(file_metadata);

let start = SystemTime::now();
// spawn a thread to produce `Vec<CompressedPage>` (IO bounded)
let producer_metadata = file_metadata.clone();
let child = thread::spawn(move || {
for column in 0..producer_metadata.schema().num_columns() {
for row_group in 0..producer_metadata.row_groups.len() {
let start = SystemTime::now();
let column_metadata = producer_metadata.row_groups[row_group].column(column);
println!("produce start: {} {}", column, row_group);
let pages = read::get_page_iterator(column_metadata, &mut file, None, vec![])
.unwrap()
.collect::<Vec<_>>();
println!(
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
column,
row_group
);
tx.send((column, row_group, pages)).unwrap();
}
}
});

let mut children = Vec::new();
// use 3 consumers of to decompress, decode and deserialize.
for _ in 0..3 {
let rx_consumer = rx.clone();
let metadata_consumer = file_metadata.clone();
let arrow_schema_consumer = arrow_schema.clone();
let child = thread::spawn(move || {
let (column, row_group, pages) = rx_consumer.recv().unwrap();
let producer = thread::spawn(move || {
for (field_i, field) in file_metadata.schema().fields().iter().enumerate() {
let start = SystemTime::now();
println!("consumer start - {} {}", column, row_group);
let metadata = metadata_consumer.row_groups[row_group].column(column);
let data_type = arrow_schema_consumer.fields()[column].data_type().clone();

let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]);
let mut columns = read::get_column_iterator(
&mut file,
&file_metadata,
row_group,
field_i,
None,
vec![],
);

println!("produce start - field: {}", field_i);

let mut column_chunks = vec![];
while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

let array = read::page_iter_to_array(&mut pages, metadata, data_type);
column_chunks.push((pages, metadata.clone()));
}
columns = new_iter;
}
// todo: create API to allow sending each column (and not column chunks) to be processed in parallel
tx.send((field_i, field.clone(), column_chunks)).unwrap();
println!(
"consumer end - {:?}: {} {}",
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
column,
field_i,
row_group
);
array
});
children.push(child);
}
}
});

// use 2 consumers for CPU-intensive to decompress, decode and deserialize.
#[allow(clippy::needless_collect)] // we need to collect to parallelize
let consumers = (0..2)
.map(|i| {
let rx_consumer = rx.clone();
let arrow_schema_consumer = arrow_schema.clone();
thread::spawn(move || {
let mut arrays = vec![];
while let Ok((field_i, parquet_field, column_chunks)) = rx_consumer.recv() {
let start = SystemTime::now();
let field = &arrow_schema_consumer.fields()[field_i];
println!("consumer {} start - {}", i, field_i);

let columns = read::ReadColumnIterator::new(parquet_field, column_chunks);

let array = read::column_iter_to_array(columns, field, vec![]).map(|x| x.0);
println!(
"consumer {} end - {:?}: {}",
i,
start.elapsed().unwrap(),
field_i
);

arrays.push((field_i, array))
}
arrays
})
})
.collect::<Vec<_>>();

child.join().expect("child thread panicked");
producer.join().expect("producer thread panicked");

let arrays = children
// collect all columns (join threads)
let mut columns = consumers
.into_iter()
.map(|x| x.join().unwrap())
.collect::<Result<Vec<_>>>()?;
.flatten()
.map(|x| Ok((x.0, x.1?)))
.collect::<Result<Vec<(usize, Box<dyn Array>)>>>()?;
// order may not be the same
columns.sort_unstable_by_key(|x| x.0);
let columns = columns.into_iter().map(|x| x.1.into()).collect();
println!("Finished - {:?}", start.elapsed().unwrap());

Ok(arrays)
RecordBatch::try_new(arrow_schema, columns)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();
let file_path = &args[1];

let arrays = parallel_read(file_path)?;
for array in arrays {
let batch = parallel_read(file_path, 0)?;
for array in batch.columns() {
println!("{}", array)
}
Ok(())
Expand Down
44 changes: 43 additions & 1 deletion parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,48 @@ def case_nested(size):
)


def case_struct(size):
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
struct_fields = [
("f1", pa.utf8()),
("f2", pa.bool_()),
]
schema = pa.schema(
[
pa.field(
"struct",
pa.struct(struct_fields),
),
pa.field(
"struct_struct",
pa.struct(
[
("f1", pa.struct(struct_fields)),
("f2", pa.bool_()),
]
),
),
]
)

struct = pa.StructArray.from_arrays(
[pa.array(string * size), pa.array(boolean * size)],
fields=struct_fields,
)
return (
{
"struct": struct,
"struct_struct": pa.StructArray.from_arrays(
[struct, pa.array(boolean * size)],
names=["f1", "f2"],
),
},
schema,
f"struct_nullable_{size*10}.parquet",
)


def write_pyarrow(
case,
size: int,
Expand Down Expand Up @@ -228,7 +270,7 @@ def write_pyarrow(
)


for case in [case_basic_nullable, case_basic_required, case_nested]:
for case in [case_basic_nullable, case_basic_required, case_nested, case_struct]:
for version in [1, 2]:
for use_dict in [True, False]:
write_pyarrow(case, 1, version, use_dict, False, False)
Expand Down
34 changes: 34 additions & 0 deletions src/io/parquet/read/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
## Observations

### LSB equivalence between definition levels and bitmaps

When the maximum repetition level is 0 and the maximum definition level is 1,
the RLE-encoded definition levels correspond exactly to Arrow's bitmap and can be
memcopied without further transformations.

## Nested parquet groups are deserialized recursively

Reading a parquet nested field is done by reading each primitive
column sequentially, and build the nested struct recursively.

Rows of nested parquet groups are encoded in the repetition and definition levels.
In arrow, they correspond to:
* list's offsets and validity
* struct's validity

The implementation in this module leverages this observation:

Nested parquet fields are initially recursed over to gather
whether the type is a Struct or List, and whether it is required or optional, which we store
in `nested_info: Vec<Box<dyn Nested>>`. `Nested` is a trait object that receives definition
and repetition levels depending on the type and nullability of the nested item.
We process the definition and repetition levels into `nested_info`.

When we finish a field, we recursively pop from `nested_info` as we build
the `StructArray` or `ListArray`.

With this approach, the only difference vs flat is:
1. we do not leverage the bitmap optimization, and instead need to deserialize the repetition
and definition levels to `i32`.
2. we deserialize definition levels twice, once to extend the values/nullability and
one to extend `nested_info`.
Loading