Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read parquet row groups in chunks #789

Merged
merged 23 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ flate2 = "1"
doc-comment = "0.3"
crossbeam-channel = "0.5.1"
# used to test async readers
tokio = { version = "1", features = ["macros", "rt", "fs"] }
tokio = { version = "1", features = ["macros", "rt", "fs", "io-util"] }
tokio-util = { version = "0.6", features = ["compat"] }
# used to run formal property testing
proptest = { version = "1", default_features = false, features = ["std"] }
Expand Down Expand Up @@ -134,6 +134,8 @@ io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]
# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"]
io_parquet_compression = [
"parquet2/zstd",
"parquet2/snappy",
Expand Down Expand Up @@ -207,8 +209,6 @@ compute = [
"compute_lower",
"compute_upper"
]
# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = ["parquet2", "io_ipc", "base64", "futures"]
benchmarks = ["rand"]
simd = ["packed_simd"]

Expand Down
2 changes: 1 addition & 1 deletion benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn to_buffer(
fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let file = Cursor::new(buffer);

let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?;
let reader = read::FileReader::try_new(file, Some(&[column]), None, None, None)?;

for maybe_chunk in reader {
let columns = maybe_chunk?;
Expand Down
45 changes: 11 additions & 34 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,23 @@
use std::fs::File;
use std::io::BufReader;
use std::time::SystemTime;

use arrow2::error::Result;
use arrow2::io::parquet::read;
use arrow2::{array::Array, error::Result};

fn read_field(path: &str, row_group: usize, field: usize) -> Result<Box<dyn Array>> {
// Open a file, a common operation in Rust
let mut file = BufReader::new(File::open(path)?);

// Read the files' metadata. This has a small IO cost because it requires seeking to the end
// of the file to read its footer.
let metadata = read::read_metadata(&mut file)?;

// Convert the files' metadata into an arrow schema. This is CPU-only and amounts to
// parse thrift if the arrow format is available on a key, or infering the arrow schema from
// the parquet's physical, converted and logical types.
let arrow_schema = read::get_schema(&metadata)?;

// Created an iterator of column chunks. Each iteration
// yields an iterator of compressed pages. There is almost no CPU work in iterating.
let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]);

// get the columns' field
let field = &arrow_schema.fields[field];

// This is the actual work. In this case, pages are read and
// decompressed, decoded and deserialized to arrow.
// Because `columns` is an iterator, it uses a combination of IO and CPU.
let (array, _, _) = read::column_iter_to_array(columns, field, vec![])?;

Ok(array)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let field = args[2].parse::<usize>().unwrap();
let row_group = args[3].parse::<usize>().unwrap();

let array = read_field(file_path, row_group, field)?;
println!("{:?}", array);
let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, None, None, None, None)?;

let start = SystemTime::now();
for maybe_chunk in reader {
let columns = maybe_chunk?;
assert!(!columns.is_empty());
}
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
57 changes: 57 additions & 0 deletions examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::Arc;
use std::time::SystemTime;

use futures::future::BoxFuture;
use futures::FutureExt;
use tokio;
use tokio::fs::File;
use tokio::io::BufReader;
use tokio_util::compat::*;

use arrow2::error::Result;
use arrow2::io::parquet::read::{self, RowGroupDeserializer};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let start = SystemTime::now();

use std::env;
let args: Vec<String> = env::args().collect();
let file_path = Arc::new(args[1].clone());

// # Read metadata
let mut reader = BufReader::new(File::open(file_path.as_ref()).await?).compat();

// this operation is usually done before reading the data, during planning.
// This is a mix of IO and CPU-bounded tasks but both of them are O(1)
let metadata = read::read_metadata_async(&mut reader).await?;
let schema = read::get_schema(&metadata)?;

// This factory yields one file descriptor per column and is used to read columns concurrently.
// They do not need to be buffered since we execute exactly 1 seek and 1 read on them.
let factory = || {
Box::pin(async { Ok(File::open(file_path.clone().as_ref()).await?.compat()) })
as BoxFuture<_>
};

// This is the row group loop. Groups can be skipped based on the statistics they carry.
for row_group in &metadata.row_groups {
// A row group is consumed in two steps: the first step is to read the (compressed)
// columns into memory, which is IO-bounded.
let column_chunks =
read::read_columns_async(factory, row_group, schema.fields.clone(), None).await?;

// the second step is to iterate over the columns in chunks.
// this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block
// the runtime.
// Furthermore, this operation is trivially paralellizable e.g. via rayon, as each iterator
// can be advanced in parallel (parallel decompression and deserialization).
let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows() as usize, None);
for maybe_chunk in chunks {
let chunk = maybe_chunk?;
println!("{}", chunk.len());
}
}
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
115 changes: 0 additions & 115 deletions examples/parquet_read_parallel.rs

This file was deleted.

62 changes: 15 additions & 47 deletions examples/parquet_read_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,29 @@ use std::time::SystemTime;

use rayon::prelude::*;

use arrow2::{
array::Array, chunk::Chunk, error::Result, io::parquet::read,
io::parquet::read::MutStreamingIterator,
};
use arrow2::{array::Array, chunk::Chunk, error::Result, io::parquet::read};

fn parallel_read(path: &str, row_group: usize) -> Result<Chunk<Arc<dyn Array>>> {
let mut file = BufReader::new(File::open(path)?);
let file_metadata = read::read_metadata(&mut file)?;
let schema = read::get_schema(&file_metadata)?;
let metadata = read::read_metadata(&mut file)?;
let schema = read::get_schema(&metadata)?;

// IO-bounded
let columns = file_metadata
.schema()
.fields()
.iter()
.enumerate()
.map(|(field_i, field)| {
let start = SystemTime::now();
println!("read start - field: {}", field_i);
let mut columns = read::get_column_iterator(
&mut file,
&file_metadata,
row_group,
field_i,
None,
vec![],
);

let mut column_chunks = vec![];
while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

column_chunks.push((pages, metadata.clone()));
}
columns = new_iter;
}
println!(
"read end - {:?}: {} {}",
start.elapsed().unwrap(),
field_i,
row_group
);
(field_i, field.clone(), column_chunks)
})
.collect::<Vec<_>>();
// read (IO-bounded) all columns into memory (use a subset of the fields to project)
let columns = read::read_columns(
&mut file,
&metadata.row_groups[row_group],
schema.fields,
None,
)?;

// CPU-bounded
let columns = columns
.into_par_iter()
.map(|(field_i, parquet_field, column_chunks)| {
let columns = read::ReadColumnIterator::new(parquet_field, column_chunks);
let field = &schema.fields()[field_i];

read::column_iter_to_array(columns, field, vec![]).map(|x| x.0.into())
.map(|mut iter| {
// when chunk_size != None, `iter` must be iterated multiple times to get all the chunks
// see the implementation of `arrow2::io::parquet::read::RowGroupDeserializer::next`
// to see how this can be done.
iter.next().unwrap()
})
.collect::<Result<Vec<_>>>()?;

Expand Down
23 changes: 0 additions & 23 deletions examples/parquet_read_record.rs

This file was deleted.

7 changes: 7 additions & 0 deletions src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ impl StructArray {
let fields = Self::get_fields(&data_type);
assert!(!fields.is_empty());
assert_eq!(fields.len(), values.len());
assert!(
fields
.iter()
.map(|f| f.data_type())
.eq(values.iter().map(|a| a.data_type())),
"The fields' datatypes must equal the values datatypes"
);
assert!(values.iter().all(|x| x.len() == values[0].len()));
if let Some(ref validity) = validity {
assert_eq!(values[0].len(), validity.len());
Expand Down
Loading