Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to filter parquet pages. #256

Merged
merged 1 commit into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<
let file = Cursor::new(buffer);

let reader =
read::RecordReader::try_new(file, Some(vec![column]), None, Arc::new(|_, _| true))?;
read::RecordReader::try_new(file, Some(vec![column]), None, Arc::new(|_, _| true), None)?;

for maybe_batch in reader {
let batch = maybe_batch?;
Expand Down
3 changes: 2 additions & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<
// Construct an iterator over pages. This binds `file` to this iterator, and each iteration
// is IO intensive as it will read a compressed page into memory. There is almost no CPU work
// on this operation
let pages = read::get_page_iterator(&file_metadata, row_group, column, &mut file, vec![])?;
let pages =
read::get_page_iterator(&file_metadata, row_group, column, &mut file, None, vec![])?;

// get the columns' metadata
let metadata = file_metadata.row_groups[row_group].column(column);
Expand Down
1 change: 1 addition & 0 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
row_group,
column,
&mut file,
None,
vec![],
)
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() -> Result<()> {
let file_path = &args[1];

let reader = File::open(file_path)?;
let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true))?;
let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?;

for maybe_batch in reader {
let batch = maybe_batch?;
Expand Down
11 changes: 8 additions & 3 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ mod tests {
) -> Result<ArrayStats> {
let metadata = read::read_metadata(&mut reader)?;

let mut reader =
read::RecordReader::try_new(reader, Some(vec![column]), None, Arc::new(|_, _| true))?;
let mut reader = read::RecordReader::try_new(
reader,
Some(vec![column]),
None,
Arc::new(|_, _| true),
None,
)?;

let statistics = metadata.row_groups[row_group]
.column(column)
Expand Down Expand Up @@ -458,7 +463,7 @@ mod tests_integration {

fn integration_read(data: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
let reader = Cursor::new(data);
let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true))?;
let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?;
let schema = reader.schema().clone();
let batches = reader.collect::<Result<Vec<_>>>()?;

Expand Down
12 changes: 9 additions & 3 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub use parquet2::{
page::{CompressedDataPage, DataPage, DataPageHeader},
read::{
decompress, get_page_iterator as _get_page_iterator, read_metadata as _read_metadata,
streaming_iterator, Decompressor, PageIterator, StreamingIterator,
streaming_iterator, Decompressor, PageFilter, PageIterator, StreamingIterator,
},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
Expand All @@ -40,10 +40,16 @@ pub fn get_page_iterator<'b, RR: Read + Seek>(
row_group: usize,
column: usize,
reader: &'b mut RR,
pages_filter: Option<PageFilter>,
buffer: Vec<u8>,
) -> Result<PageIterator<'b, RR>> {
Ok(_get_page_iterator(
metadata, row_group, column, reader, None, buffer,
metadata,
row_group,
column,
reader,
pages_filter,
buffer,
)?)
}

Expand Down Expand Up @@ -394,7 +400,7 @@ mod tests_integration {
let path = "testing/parquet-testing/data/alltypes_plain.parquet";
let reader = std::fs::File::open(path)?;

let reader = RecordReader::try_new(reader, None, None, Arc::new(|_, _| true))?;
let reader = RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?;

let batches = reader.collect::<Result<Vec<_>>>()?;
assert_eq!(batches.len(), 1);
Expand Down
15 changes: 13 additions & 2 deletions src/io/parquet/read/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{

use super::{
get_page_iterator, get_schema, page_iter_to_array, read_metadata, Decompressor, FileMetaData,
RowGroupMetaData,
PageFilter, RowGroupMetaData,
};

type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool>;
Expand All @@ -25,6 +25,7 @@ pub struct RecordReader<R: Read + Seek> {
buffer: Vec<u8>,
decompress_buffer: Vec<u8>,
groups_filter: GroupFilter,
pages_filter: Option<PageFilter>,
metadata: Rc<FileMetaData>,
current_group: usize,
remaining_rows: usize,
Expand All @@ -36,6 +37,7 @@ impl<R: Read + Seek> RecordReader<R> {
projection: Option<Vec<usize>>,
limit: Option<usize>,
groups_filter: GroupFilter,
pages_filter: Option<PageFilter>,
) -> Result<Self> {
let metadata = read_metadata(&mut reader)?;

Expand Down Expand Up @@ -75,6 +77,7 @@ impl<R: Read + Seek> RecordReader<R> {
schema,
indices: Rc::new(indices),
groups_filter,
pages_filter,
metadata: Rc::new(metadata),
current_group: 0,
buffer: vec![],
Expand Down Expand Up @@ -124,7 +127,15 @@ impl<R: Read + Seek> Iterator for RecordReader<R> {
// column according to the file's indexing
let column = self.indices[column];
let column_meta = &columns_meta[column];
let pages = get_page_iterator(&metadata, row_group, column, &mut self.reader, b1)?;
let pages = get_page_iterator(
&metadata,
row_group,
column,
&mut self.reader,
self.pages_filter.clone(),
b1,
)?;

let mut pages = Decompressor::new(pages, b2);

let array = page_iter_to_array(&mut pages, column_meta, field.data_type().clone())?;
Expand Down