Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Updated examples.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 24, 2021
1 parent 7d7d8e3 commit b816c7a
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 75 deletions.
34 changes: 14 additions & 20 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,43 @@ use std::io::BufReader;
use arrow2::io::parquet::read;
use arrow2::{array::Array, error::Result};

fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<dyn Array>> {
fn read_field(path: &str, row_group: usize, field: usize) -> Result<Box<dyn Array>> {
// Open a file, a common operation in Rust
let mut file = BufReader::new(File::open(path)?);

// Read the files' metadata. This has a small IO cost because it requires seeking to the end
// of the file to read its footer.
let file_metadata = read::read_metadata(&mut file)?;
let metadata = read::read_metadata(&mut file)?;

// Convert the files' metadata into an arrow schema. This is CPU-only and amounts to
// parse thrift if the arrow format is available on a key, or infering the arrow schema from
// the parquet's physical, converted and logical types.
let arrow_schema = read::get_schema(&file_metadata)?;
let arrow_schema = read::get_schema(&metadata)?;

// get the columns' metadata
let metadata = file_metadata.row_groups[row_group].column(column);

// Construct an iterator over pages. This binds `file` to this iterator, and each iteration
// is IO intensive as it will read a compressed page into memory. There is almost no CPU work
// on this operation
let pages = read::get_page_iterator(metadata, &mut file, None, vec![])?;
// Created an iterator of column chunks. Each iteration
// yields an iterator of compressed pages. There is almost no CPU work in iterating.
let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]);

// get the columns' logical type
let data_type = arrow_schema.fields()[column].data_type().clone();
let data_type = arrow_schema.fields()[field].data_type().clone();

// This is the actual work. In this case, pages are read (by calling `iter.next()`) and are
// immediately decompressed, decoded, deserialized to arrow and deallocated.
// This uses a combination of IO and CPU. At this point, `array` is the arrow-corresponding
// array of the parquets' physical type.
// `Decompressor` re-uses an internal buffer for de-compression, thereby maximizing memory re-use.
let mut pages = read::Decompressor::new(pages, vec![]);
// This is the actual work. In this case, pages are read and
// decompressed, decoded and deserialized to arrow.
// Because `columns` is an iterator, it uses a combination of IO and CPU.
let (array, _, _) = read::column_iter_to_array(columns, data_type, vec![])?;

read::page_iter_to_array(&mut pages, metadata, data_type)
Ok(array)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let column = args[2].parse::<usize>().unwrap();
let field = args[2].parse::<usize>().unwrap();
let row_group = args[3].parse::<usize>().unwrap();

let array = read_column_chunk(file_path, row_group, column)?;
let array = read_field(file_path, row_group, field)?;
println!("{}", array);
Ok(())
}
123 changes: 72 additions & 51 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,86 +5,107 @@ use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

use arrow2::{array::Array, error::Result, io::parquet::read};
use arrow2::{
array::Array, error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator,
record_batch::RecordBatch,
};

fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
// prepare a channel to send serialized records from threads
fn parallel_read(path: &str, row_group: usize) -> Result<RecordBatch> {
// prepare a channel to send compressed pages across threads.
let (tx, rx) = unbounded();

let mut file = File::open(path)?;
let file_metadata = read::read_metadata(&mut file)?;
let arrow_schema = Arc::new(read::get_schema(&file_metadata)?);

let file_metadata = Arc::new(file_metadata);

let start = SystemTime::now();
// spawn a thread to produce `Vec<CompressedPage>` (IO bounded)
let producer_metadata = file_metadata.clone();
let child = thread::spawn(move || {
for column in 0..producer_metadata.schema().num_columns() {
for row_group in 0..producer_metadata.row_groups.len() {
let start = SystemTime::now();
let column_metadata = producer_metadata.row_groups[row_group].column(column);
println!("produce start: {} {}", column, row_group);
let pages = read::get_page_iterator(column_metadata, &mut file, None, vec![])
.unwrap()
.collect::<Vec<_>>();
println!(
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
column,
row_group
);
tx.send((column, row_group, pages)).unwrap();
}
}
});

let mut children = Vec::new();
// use 3 consumers of to decompress, decode and deserialize.
for _ in 0..3 {
let rx_consumer = rx.clone();
let metadata_consumer = file_metadata.clone();
let arrow_schema_consumer = arrow_schema.clone();
let child = thread::spawn(move || {
let (column, row_group, pages) = rx_consumer.recv().unwrap();
let producer = thread::spawn(move || {
for field in 0..file_metadata.schema().fields().len() {
let start = SystemTime::now();
println!("consumer start - {} {}", column, row_group);
let metadata = metadata_consumer.row_groups[row_group].column(column);
let data_type = arrow_schema_consumer.fields()[column].data_type().clone();

let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]);
let mut columns = read::get_column_iterator(
&mut file,
&file_metadata,
row_group,
field,
None,
vec![],
);

println!("produce start - field: {}", field);

while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

let array = read::page_iter_to_array(&mut pages, metadata, data_type);
tx.send((field, metadata.clone(), pages)).unwrap();
}
columns = new_iter;
}
println!(
"consumer end - {:?}: {} {}",
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
column,
field,
row_group
);
array
});
children.push(child);
}
}
});

// use 2 consumers for CPU-intensive to decompress, decode and deserialize.
#[allow(clippy::needless_collect)] // we need to collect to parallelize
let consumers = (0..2)
.map(|i| {
let rx_consumer = rx.clone();
let arrow_schema_consumer = arrow_schema.clone();
thread::spawn(move || {
let mut arrays = vec![];
while let Ok((field, metadata, pages)) = rx_consumer.recv() {
let start = SystemTime::now();
let data_type = arrow_schema_consumer.fields()[field].data_type().clone();
println!("consumer {} start - {}", i, field);

let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]);

let array = read::page_iter_to_array(&mut pages, &metadata, data_type);
println!(
"consumer {} end - {:?}: {}",
i,
start.elapsed().unwrap(),
field
);

arrays.push((field, array))
}
arrays
})
})
.collect::<Vec<_>>();

child.join().expect("child thread panicked");
producer.join().expect("producer thread panicked");

let arrays = children
// collect all columns (join threads)
let mut columns = consumers
.into_iter()
.map(|x| x.join().unwrap())
.collect::<Result<Vec<_>>>()?;
.flatten()
.map(|x| Ok((x.0, x.1?)))
.collect::<Result<Vec<(usize, Box<dyn Array>)>>>()?;
// order may not be the same
columns.sort_unstable_by_key(|x| x.0);
let columns = columns.into_iter().map(|x| x.1.into()).collect();
println!("Finished - {:?}", start.elapsed().unwrap());

Ok(arrays)
RecordBatch::try_new(arrow_schema, columns)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();
let file_path = &args[1];

let arrays = parallel_read(file_path)?;
for array in arrays {
let batch = parallel_read(file_path, 0)?;
for array in batch.columns() {
println!("{}", array)
}
Ok(())
Expand Down
12 changes: 8 additions & 4 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
};

use futures::{AsyncRead, AsyncSeek, Stream};
use parquet2::read::MutStreamingIterator;
pub use parquet2::{
error::ParquetError,
fallible_streaming_iterator,
Expand All @@ -15,8 +14,8 @@ pub use parquet2::{
read::{
decompress, get_column_iterator, get_page_iterator as _get_page_iterator,
get_page_stream as _get_page_stream, read_metadata as _read_metadata,
read_metadata_async as _read_metadata_async, BasicDecompressor, Decompressor, PageFilter,
PageIterator, State,
read_metadata_async as _read_metadata_async, BasicDecompressor, Decompressor,
MutStreamingIterator, PageFilter, PageIterator, State,
},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
Expand Down Expand Up @@ -168,6 +167,7 @@ fn dict_read<
}

/// Returns an Array built from an iterator of column chunks
#[allow(clippy::type_complexity)]
pub fn column_iter_to_array<II, I>(
mut columns: I,
data_type: DataType,
Expand Down Expand Up @@ -212,7 +212,11 @@ where
}

/// Converts an iterator of [`DataPage`] into a single [`Array`].
fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>>(
///
/// This only handles types with a single column.
/// Use [`column_iter_to_array`] to support multi-column arrays.
/// This is useful to parallelize CPU work across nested types.
pub fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>>(
iter: &mut I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
Expand Down

0 comments on commit b816c7a

Please sign in to comment.