Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Read parquet row groups in chunks (#789)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Feb 4, 2022
1 parent b46a636 commit f35e02a
Show file tree
Hide file tree
Showing 39 changed files with 4,163 additions and 2,140 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ flate2 = "1"
doc-comment = "0.3"
crossbeam-channel = "0.5.1"
# used to test async readers
tokio = { version = "1", features = ["macros", "rt", "fs"] }
tokio = { version = "1", features = ["macros", "rt", "fs", "io-util"] }
tokio-util = { version = "0.6", features = ["compat"] }
# used to run formal property testing
proptest = { version = "1", default_features = false, features = ["std"] }
Expand Down Expand Up @@ -134,6 +134,8 @@ io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]
# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"]
io_parquet_compression = [
"parquet2/zstd",
"parquet2/snappy",
Expand Down Expand Up @@ -207,8 +209,6 @@ compute = [
"compute_lower",
"compute_upper"
]
# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = ["parquet2", "io_ipc", "base64", "futures"]
benchmarks = ["rand"]
simd = ["packed_simd"]

Expand Down
2 changes: 1 addition & 1 deletion benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn to_buffer(
fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let file = Cursor::new(buffer);

let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?;
let reader = read::FileReader::try_new(file, Some(&[column]), None, None, None)?;

for maybe_chunk in reader {
let columns = maybe_chunk?;
Expand Down
45 changes: 11 additions & 34 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,23 @@
use std::fs::File;
use std::io::BufReader;
use std::time::SystemTime;

use arrow2::error::Result;
use arrow2::io::parquet::read;
use arrow2::{array::Array, error::Result};

fn read_field(path: &str, row_group: usize, field: usize) -> Result<Box<dyn Array>> {
// Open a file, a common operation in Rust
let mut file = BufReader::new(File::open(path)?);

// Read the files' metadata. This has a small IO cost because it requires seeking to the end
// of the file to read its footer.
let metadata = read::read_metadata(&mut file)?;

// Convert the files' metadata into an arrow schema. This is CPU-only and amounts to
// parse thrift if the arrow format is available on a key, or infering the arrow schema from
// the parquet's physical, converted and logical types.
let arrow_schema = read::get_schema(&metadata)?;

// Created an iterator of column chunks. Each iteration
// yields an iterator of compressed pages. There is almost no CPU work in iterating.
let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]);

// get the columns' field
let field = &arrow_schema.fields[field];

// This is the actual work. In this case, pages are read and
// decompressed, decoded and deserialized to arrow.
// Because `columns` is an iterator, it uses a combination of IO and CPU.
let (array, _, _) = read::column_iter_to_array(columns, field, vec![])?;

Ok(array)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let field = args[2].parse::<usize>().unwrap();
let row_group = args[3].parse::<usize>().unwrap();

let array = read_field(file_path, row_group, field)?;
println!("{:?}", array);
let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, None, None, None, None)?;

let start = SystemTime::now();
for maybe_chunk in reader {
let columns = maybe_chunk?;
assert!(!columns.is_empty());
}
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
57 changes: 57 additions & 0 deletions examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::Arc;
use std::time::SystemTime;

use futures::future::BoxFuture;
use futures::FutureExt;
use tokio;
use tokio::fs::File;
use tokio::io::BufReader;
use tokio_util::compat::*;

use arrow2::error::Result;
use arrow2::io::parquet::read::{self, RowGroupDeserializer};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let start = SystemTime::now();

use std::env;
let args: Vec<String> = env::args().collect();
let file_path = Arc::new(args[1].clone());

// # Read metadata
let mut reader = BufReader::new(File::open(file_path.as_ref()).await?).compat();

// this operation is usually done before reading the data, during planning.
// This is a mix of IO and CPU-bounded tasks but both of them are O(1)
let metadata = read::read_metadata_async(&mut reader).await?;
let schema = read::get_schema(&metadata)?;

// This factory yields one file descriptor per column and is used to read columns concurrently.
// They do not need to be buffered since we execute exactly 1 seek and 1 read on them.
let factory = || {
Box::pin(async { Ok(File::open(file_path.clone().as_ref()).await?.compat()) })
as BoxFuture<_>
};

// This is the row group loop. Groups can be skipped based on the statistics they carry.
for row_group in &metadata.row_groups {
// A row group is consumed in two steps: the first step is to read the (compressed)
// columns into memory, which is IO-bounded.
let column_chunks =
read::read_columns_async(factory, row_group, schema.fields.clone(), None).await?;

// the second step is to iterate over the columns in chunks.
// this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block
// the runtime.
// Furthermore, this operation is trivially paralellizable e.g. via rayon, as each iterator
// can be advanced in parallel (parallel decompression and deserialization).
let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows() as usize, None);
for maybe_chunk in chunks {
let chunk = maybe_chunk?;
println!("{}", chunk.len());
}
}
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
115 changes: 0 additions & 115 deletions examples/parquet_read_parallel.rs

This file was deleted.

62 changes: 15 additions & 47 deletions examples/parquet_read_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,29 @@ use std::time::SystemTime;

use rayon::prelude::*;

use arrow2::{
array::Array, chunk::Chunk, error::Result, io::parquet::read,
io::parquet::read::MutStreamingIterator,
};
use arrow2::{array::Array, chunk::Chunk, error::Result, io::parquet::read};

fn parallel_read(path: &str, row_group: usize) -> Result<Chunk<Arc<dyn Array>>> {
let mut file = BufReader::new(File::open(path)?);
let file_metadata = read::read_metadata(&mut file)?;
let schema = read::get_schema(&file_metadata)?;
let metadata = read::read_metadata(&mut file)?;
let schema = read::get_schema(&metadata)?;

// IO-bounded
let columns = file_metadata
.schema()
.fields()
.iter()
.enumerate()
.map(|(field_i, field)| {
let start = SystemTime::now();
println!("read start - field: {}", field_i);
let mut columns = read::get_column_iterator(
&mut file,
&file_metadata,
row_group,
field_i,
None,
vec![],
);

let mut column_chunks = vec![];
while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

column_chunks.push((pages, metadata.clone()));
}
columns = new_iter;
}
println!(
"read end - {:?}: {} {}",
start.elapsed().unwrap(),
field_i,
row_group
);
(field_i, field.clone(), column_chunks)
})
.collect::<Vec<_>>();
// read (IO-bounded) all columns into memory (use a subset of the fields to project)
let columns = read::read_columns(
&mut file,
&metadata.row_groups[row_group],
schema.fields,
None,
)?;

// CPU-bounded
let columns = columns
.into_par_iter()
.map(|(field_i, parquet_field, column_chunks)| {
let columns = read::ReadColumnIterator::new(parquet_field, column_chunks);
let field = &schema.fields()[field_i];

read::column_iter_to_array(columns, field, vec![]).map(|x| x.0.into())
.map(|mut iter| {
// when chunk_size != None, `iter` must be iterated multiple times to get all the chunks
// see the implementation of `arrow2::io::parquet::read::RowGroupDeserializer::next`
// to see how this can be done.
iter.next().unwrap()
})
.collect::<Result<Vec<_>>>()?;

Expand Down
23 changes: 0 additions & 23 deletions examples/parquet_read_record.rs

This file was deleted.

7 changes: 7 additions & 0 deletions src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ impl StructArray {
let fields = Self::get_fields(&data_type);
assert!(!fields.is_empty());
assert_eq!(fields.len(), values.len());
assert!(
fields
.iter()
.map(|f| f.data_type())
.eq(values.iter().map(|a| a.data_type())),
"The fields' datatypes must equal the values datatypes"
);
assert!(values.iter().all(|x| x.len() == values[0].len()));
if let Some(ref validity) = validity {
assert_eq!(values[0].len(), validity.len());
Expand Down
Loading

0 comments on commit f35e02a

Please sign in to comment.