Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for parquet sidecar to FileReader #1215

Merged
merged 4 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ fn to_buffer(
buffer
}

fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let file = Cursor::new(buffer);
fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let mut reader = Cursor::new(buffer);

let reader = read::FileReader::try_new(file, Some(&[column]), None, None, None)?;
let metadata = read::read_metadata(&mut reader)?;

let schema = read::infer_schema(&metadata)?;

let schema = schema.filter(|index, _| index == column);

let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None);

for maybe_chunk in reader {
let columns = maybe_chunk?;
Expand All @@ -49,43 +55,43 @@ fn add_benchmark(c: &mut Criterion) {
let size = 2usize.pow(i);
let buffer = to_buffer(size, true, false, false, false);
let a = format!("read i64 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let a = format!("read utf8 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let a = format!("read utf8 large 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 6).unwrap()));

let a = format!("read utf8 emoji 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 12).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 12).unwrap()));

let a = format!("read bool 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 3).unwrap()));

let buffer = to_buffer(size, true, true, false, false);
let a = format!("read utf8 dict 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, true, false, false, true);
let a = format!("read i64 snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, true, false, true, false);
let a = format!("read utf8 multi 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, true, false, true, true);
let a = format!("read utf8 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, true, false, true, true);
let a = format!("read i64 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, false, false, false, false);
let a = format!("read required utf8 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));
});
}

Expand Down
36 changes: 25 additions & 11 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,44 @@
use std::fs::File;
use std::time::SystemTime;

use arrow2::error::Result;
use arrow2::error::Error;
use arrow2::io::parquet::read;

fn main() -> Result<()> {
fn main() -> Result<(), Error> {
// say we have a file
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let mut reader = File::open(file_path)?;

let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, None, Some(1024 * 8 * 8), None, None)?;
// we can read its metadata:
let metadata = read::read_metadata(&mut reader)?;

println!("{:#?}", reader.schema());
// and infer a [`Schema`] from the `metadata`.
let schema = read::infer_schema(&metadata)?;

// say we want to evaluate if the we can skip some row groups based on a field's value
let field = &reader.schema().fields[0];
// we can filter the columns we need (here we select all)
let schema = schema.filter(|_index, _field| true);

// we can deserialize the parquet statistics from this field
let statistics = read::statistics::deserialize(field, &reader.metadata().row_groups)?;
// we can read the statistics of all parquet's row groups (here for the first field)
let statistics = read::statistics::deserialize(&schema.fields[0], &metadata.row_groups)?;

println!("{:#?}", statistics);

// say we found that we only need to read the first two row groups, "0" and "1"
let row_groups = metadata
.row_groups
.into_iter()
.enumerate()
.filter(|(index, _)| *index == 0 || *index == 1)
.map(|(_, row_group)| row_group)
.collect();

// we can then read the row groups into chunks
let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None);

let start = SystemTime::now();
for maybe_chunk in reader {
for maybe_chunk in chunks {
let chunk = maybe_chunk?;
assert!(!chunk.is_empty());
}
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() -> Result<()> {
// the runtime.
// Furthermore, this operation is trivially paralellizable e.g. via rayon, as each iterator
// can be advanced in parallel (parallel decompression and deserialization).
let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows() as usize, None);
let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows(), None);
for maybe_chunk in chunks {
let chunk = maybe_chunk?;
println!("{}", chunk.len());
Expand Down
2 changes: 1 addition & 1 deletion examples/s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn main() -> Result<()> {

// this is CPU-bounded and should be sent to a separate thread-pool.
// We do it here for simplicity
let chunks = read::RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None);
let chunks = read::RowGroupDeserializer::new(column_chunks, group.num_rows(), None);
let chunks = chunks.collect::<Result<Vec<_>>>()?;

// this is a single chunk because chunk_size is `None`
Expand Down
22 changes: 22 additions & 0 deletions src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ impl Schema {
metadata,
}
}

/// Returns a new [`Schema`] with a subset of all fields whose `predicate`
/// evaluates to true.
pub fn filter<F: Fn(usize, &Field) -> bool>(self, predicate: F) -> Self {
let fields = self
.fields
.into_iter()
.enumerate()
.filter_map(|(index, f)| {
if (predicate)(index, &f) {
Some(f)
} else {
None
}
})
.collect();

Schema {
fields,
metadata: self.metadata,
}
}
}

impl From<Vec<Field>> for Schema {
Expand Down
120 changes: 15 additions & 105 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
@@ -1,115 +1,42 @@
use std::io::{Read, Seek};
use std::sync::Arc;

use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::error::Result;
use crate::io::parquet::read::read_columns_many;
use crate::{
datatypes::Field,
error::{Error, Result},
};

use super::{infer_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData};

type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool + Send + Sync>;
use super::{RowGroupDeserializer, RowGroupMetaData};

/// An iterator of [`Chunk`]s coming from row groups of a parquet file.
///
/// This can be thought of a flatten chain of [`Iterator<Item=Chunk>`] - each row group is sequentially
/// mapped to an [`Iterator<Item=Chunk>`] and each iterator is iterated upon until either the limit
/// or the last iterator ends.
/// # Implementation
/// This iterator mixes IO-bounded and CPU-bounded operations.
/// This iterator is single threaded on both IO-bounded and CPU-bounded tasks, and mixes them.
pub struct FileReader<R: Read + Seek> {
row_groups: RowGroupReader<R>,
metadata: FileMetaData,
remaining_rows: usize,
current_row_group: Option<RowGroupDeserializer>,
}

impl<R: Read + Seek> FileReader<R> {
/// Creates a new [`FileReader`] by reading the metadata from `reader` and constructing
/// Arrow's schema from it.
///
/// # Error
/// This function errors iff:
/// * reading the metadata from the reader fails
/// * it is not possible to derive an arrow schema from the parquet file
/// * the projection contains columns that do not exist
pub fn try_new(
mut reader: R,
projection: Option<&[usize]>,
/// Returns a new [`FileReader`].
pub fn new(
reader: R,
row_groups: Vec<RowGroupMetaData>,
schema: Schema,
chunk_size: Option<usize>,
limit: Option<usize>,
groups_filter: Option<GroupFilter>,
) -> Result<Self> {
let metadata = read_metadata(&mut reader)?;

let schema = infer_schema(&metadata)?;

let schema_metadata = schema.metadata;
let fields: Vec<Field> = if let Some(projection) = &projection {
schema
.fields
.into_iter()
.enumerate()
.filter_map(|(index, f)| {
if projection.iter().any(|&i| i == index) {
Some(f)
} else {
None
}
})
.collect()
} else {
schema.fields.into_iter().collect()
};

if let Some(projection) = &projection {
if fields.len() != projection.len() {
return Err(Error::InvalidArgumentError(
"While reading parquet, some columns in the projection do not exist in the file"
.to_string(),
));
}
}

let schema = Schema {
fields,
metadata: schema_metadata,
};

let row_groups = RowGroupReader::new(
reader,
schema,
groups_filter,
metadata.row_groups.clone(),
chunk_size,
limit,
);
) -> Self {
let row_groups = RowGroupReader::new(reader, schema, row_groups, chunk_size, limit);

Ok(Self {
Self {
row_groups,
metadata,
remaining_rows: limit.unwrap_or(usize::MAX),
current_row_group: None,
})
}

/// Returns the derived arrow [`Schema`] of the file
pub fn schema(&self) -> &Schema {
&self.row_groups.schema
}

/// Returns parquet's [`FileMetaData`].
pub fn metadata(&self) -> &FileMetaData {
&self.metadata
}

/// Sets the groups filter
pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) {
self.row_groups.set_groups_filter(groups_filter);
}
}

fn next_row_group(&mut self) -> Result<Option<RowGroupDeserializer>> {
Expand Down Expand Up @@ -178,7 +105,6 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
pub struct RowGroupReader<R: Read + Seek> {
reader: R,
schema: Schema,
groups_filter: Option<GroupFilter>,
row_groups: std::iter::Enumerate<std::vec::IntoIter<RowGroupMetaData>>,
chunk_size: Option<usize>,
remaining_rows: usize,
Expand All @@ -189,26 +115,19 @@ impl<R: Read + Seek> RowGroupReader<R> {
pub fn new(
reader: R,
schema: Schema,
groups_filter: Option<GroupFilter>,
row_groups: Vec<RowGroupMetaData>,
chunk_size: Option<usize>,
limit: Option<usize>,
) -> Self {
Self {
reader,
schema,
groups_filter,
row_groups: row_groups.into_iter().enumerate(),
chunk_size,
remaining_rows: limit.unwrap_or(usize::MAX),
}
}

/// Sets the groups filter
pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) {
self.groups_filter = Some(groups_filter);
}

#[inline]
fn _next(&mut self) -> Result<Option<RowGroupDeserializer>> {
if self.schema.fields.is_empty() {
Expand All @@ -219,14 +138,7 @@ impl<R: Read + Seek> RowGroupReader<R> {
return Ok(None);
}

let row_group = if let Some(groups_filter) = self.groups_filter.as_ref() {
self.row_groups
.by_ref()
.find(|(index, row_group)| (groups_filter)(*index, row_group))
} else {
self.row_groups.next()
};
let row_group = if let Some((_, row_group)) = row_group {
let row_group = if let Some((_, row_group)) = self.row_groups.next() {
row_group
} else {
return Ok(None);
Expand All @@ -242,12 +154,10 @@ impl<R: Read + Seek> RowGroupReader<R> {

let result = RowGroupDeserializer::new(
column_chunks,
row_group.num_rows() as usize,
row_group.num_rows(),
Some(self.remaining_rows),
);
self.remaining_rows = self
.remaining_rows
.saturating_sub(row_group.num_rows() as usize);
self.remaining_rows = self.remaining_rows.saturating_sub(row_group.num_rows());
Ok(Some(result))
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,6 @@ pub async fn read_columns_many_async<
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| {
to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size)
})
.map(|(columns, field)| to_deserializer(columns, field, row_group.num_rows(), chunk_size))
.collect()
}
Loading