Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for filter of parquet pages.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 10, 2021
1 parent bfb4910 commit 779b7bd
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 11 deletions.
2 changes: 1 addition & 1 deletion benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<
let file = Cursor::new(buffer);

let reader =
read::RecordReader::try_new(file, Some(vec![column]), None, Arc::new(|_, _| true))?;
read::RecordReader::try_new(file, Some(vec![column]), None, Arc::new(|_, _| true), None)?;

for maybe_batch in reader {
let batch = maybe_batch?;
Expand Down
3 changes: 2 additions & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<
// Construct an iterator over pages. This binds `file` to this iterator, and each iteration
// is IO intensive as it will read a compressed page into memory. There is almost no CPU work
// on this operation
let pages = read::get_page_iterator(&file_metadata, row_group, column, &mut file, vec![])?;
let pages =
read::get_page_iterator(&file_metadata, row_group, column, &mut file, None, vec![])?;

// get the columns' metadata
let metadata = file_metadata.row_groups[row_group].column(column);
Expand Down
1 change: 1 addition & 0 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
row_group,
column,
&mut file,
None,
vec![],
)
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() -> Result<()> {
let file_path = &args[1];

let reader = File::open(file_path)?;
let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true))?;
let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?;

for maybe_batch in reader {
let batch = maybe_batch?;
Expand Down
11 changes: 8 additions & 3 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ mod tests {
) -> Result<ArrayStats> {
let metadata = read::read_metadata(&mut reader)?;

let mut reader =
read::RecordReader::try_new(reader, Some(vec![column]), None, Arc::new(|_, _| true))?;
let mut reader = read::RecordReader::try_new(
reader,
Some(vec![column]),
None,
Arc::new(|_, _| true),
None,
)?;

let statistics = metadata.row_groups[row_group]
.column(column)
Expand Down Expand Up @@ -458,7 +463,7 @@ mod tests_integration {

fn integration_read(data: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
let reader = Cursor::new(data);
let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true))?;
let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?;
let schema = reader.schema().clone();
let batches = reader.collect::<Result<Vec<_>>>()?;

Expand Down
12 changes: 9 additions & 3 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub use parquet2::{
page::{CompressedDataPage, DataPage, DataPageHeader},
read::{
decompress, get_page_iterator as _get_page_iterator, read_metadata as _read_metadata,
streaming_iterator, Decompressor, PageIterator, StreamingIterator,
streaming_iterator, Decompressor, PageFilter, PageIterator, StreamingIterator,
},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
Expand All @@ -40,10 +40,16 @@ pub fn get_page_iterator<'b, RR: Read + Seek>(
row_group: usize,
column: usize,
reader: &'b mut RR,
pages_filter: Option<PageFilter>,
buffer: Vec<u8>,
) -> Result<PageIterator<'b, RR>> {
Ok(_get_page_iterator(
metadata, row_group, column, reader, None, buffer,
metadata,
row_group,
column,
reader,
pages_filter,
buffer,
)?)
}

Expand Down Expand Up @@ -394,7 +400,7 @@ mod tests_integration {
let path = "testing/parquet-testing/data/alltypes_plain.parquet";
let reader = std::fs::File::open(path)?;

let reader = RecordReader::try_new(reader, None, None, Arc::new(|_, _| true))?;
let reader = RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?;

let batches = reader.collect::<Result<Vec<_>>>()?;
assert_eq!(batches.len(), 1);
Expand Down
15 changes: 13 additions & 2 deletions src/io/parquet/read/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{

use super::{
get_page_iterator, get_schema, page_iter_to_array, read_metadata, Decompressor, FileMetaData,
RowGroupMetaData,
PageFilter, RowGroupMetaData,
};

type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool>;
Expand All @@ -25,6 +25,7 @@ pub struct RecordReader<R: Read + Seek> {
buffer: Vec<u8>,
decompress_buffer: Vec<u8>,
groups_filter: GroupFilter,
pages_filter: Option<PageFilter>,
metadata: Rc<FileMetaData>,
current_group: usize,
remaining_rows: usize,
Expand All @@ -36,6 +37,7 @@ impl<R: Read + Seek> RecordReader<R> {
projection: Option<Vec<usize>>,
limit: Option<usize>,
groups_filter: GroupFilter,
pages_filter: Option<PageFilter>,
) -> Result<Self> {
let metadata = read_metadata(&mut reader)?;

Expand Down Expand Up @@ -75,6 +77,7 @@ impl<R: Read + Seek> RecordReader<R> {
schema,
indices: Rc::new(indices),
groups_filter,
pages_filter,
metadata: Rc::new(metadata),
current_group: 0,
buffer: vec![],
Expand Down Expand Up @@ -124,7 +127,15 @@ impl<R: Read + Seek> Iterator for RecordReader<R> {
// column according to the file's indexing
let column = self.indices[column];
let column_meta = &columns_meta[column];
let pages = get_page_iterator(&metadata, row_group, column, &mut self.reader, b1)?;
let pages = get_page_iterator(
&metadata,
row_group,
column,
&mut self.reader,
self.pages_filter.clone(),
b1,
)?;

let mut pages = Decompressor::new(pages, b2);

let array = page_iter_to_array(&mut pages, column_meta, field.data_type().clone())?;
Expand Down

0 comments on commit 779b7bd

Please sign in to comment.