diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 7c567c72053..f13f3f0da32 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -21,7 +21,7 @@ fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result< let file = Cursor::new(buffer); let reader = - read::RecordReader::try_new(file, Some(vec![column]), None, Arc::new(|_, _| true))?; + read::RecordReader::try_new(file, Some(vec![column]), None, Arc::new(|_, _| true), None)?; for maybe_batch in reader { let batch = maybe_batch?; diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index e3b82b4714d..a5fb9658791 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -19,7 +19,8 @@ fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result Result>> { row_group, column, &mut file, + None, vec![], ) .unwrap() diff --git a/examples/parquet_read_record.rs b/examples/parquet_read_record.rs index 6d74629a641..d55065125b1 100644 --- a/examples/parquet_read_record.rs +++ b/examples/parquet_read_record.rs @@ -11,7 +11,7 @@ fn main() -> Result<()> { let file_path = &args[1]; let reader = File::open(file_path)?; - let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true))?; + let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?; for maybe_batch in reader { let batch = maybe_batch?; diff --git a/src/io/parquet/mod.rs b/src/io/parquet/mod.rs index 0a4e80f0b5a..0ac846cabb5 100644 --- a/src/io/parquet/mod.rs +++ b/src/io/parquet/mod.rs @@ -36,8 +36,13 @@ mod tests { ) -> Result { let metadata = read::read_metadata(&mut reader)?; - let mut reader = - read::RecordReader::try_new(reader, Some(vec![column]), None, Arc::new(|_, _| true))?; + let mut reader = read::RecordReader::try_new( + reader, + Some(vec![column]), + None, + Arc::new(|_, _| true), + None, + )?; let statistics = metadata.row_groups[row_group] .column(column) @@ -458,7 +463,7 @@ mod tests_integration { fn integration_read(data: &[u8]) -> Result<(Arc, Vec)> { let reader = Cursor::new(data); - let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true))?; + let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?; let schema = reader.schema().clone(); let batches = reader.collect::>>()?; diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 8cddeb220bf..6c53a92211b 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -25,7 +25,7 @@ pub use parquet2::{ page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ decompress, get_page_iterator as _get_page_iterator, read_metadata as _read_metadata, - streaming_iterator, Decompressor, PageIterator, StreamingIterator, + streaming_iterator, Decompressor, PageFilter, PageIterator, StreamingIterator, }, schema::types::{ LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, @@ -40,10 +40,16 @@ pub fn get_page_iterator<'b, RR: Read + Seek>( row_group: usize, column: usize, reader: &'b mut RR, + pages_filter: Option, buffer: Vec, ) -> Result> { Ok(_get_page_iterator( - metadata, row_group, column, reader, None, buffer, + metadata, + row_group, + column, + reader, + pages_filter, + buffer, )?) } @@ -394,7 +400,7 @@ mod tests_integration { let path = "testing/parquet-testing/data/alltypes_plain.parquet"; let reader = std::fs::File::open(path)?; - let reader = RecordReader::try_new(reader, None, None, Arc::new(|_, _| true))?; + let reader = RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?; let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1); diff --git a/src/io/parquet/read/record_batch.rs b/src/io/parquet/read/record_batch.rs index 4d3f75fdfa7..b88f3703baa 100644 --- a/src/io/parquet/read/record_batch.rs +++ b/src/io/parquet/read/record_batch.rs @@ -12,7 +12,7 @@ use crate::{ use super::{ get_page_iterator, get_schema, page_iter_to_array, read_metadata, Decompressor, FileMetaData, - RowGroupMetaData, + PageFilter, RowGroupMetaData, }; type GroupFilter = Arc bool>; @@ -25,6 +25,7 @@ pub struct RecordReader { buffer: Vec, decompress_buffer: Vec, groups_filter: GroupFilter, + pages_filter: Option, metadata: Rc, current_group: usize, remaining_rows: usize, @@ -36,6 +37,7 @@ impl RecordReader { projection: Option>, limit: Option, groups_filter: GroupFilter, + pages_filter: Option, ) -> Result { let metadata = read_metadata(&mut reader)?; @@ -75,6 +77,7 @@ impl RecordReader { schema, indices: Rc::new(indices), groups_filter, + pages_filter, metadata: Rc::new(metadata), current_group: 0, buffer: vec![], @@ -124,7 +127,15 @@ impl Iterator for RecordReader { // column according to the file's indexing let column = self.indices[column]; let column_meta = &columns_meta[column]; - let pages = get_page_iterator(&metadata, row_group, column, &mut self.reader, b1)?; + let pages = get_page_iterator( + &metadata, + row_group, + column, + &mut self.reader, + self.pages_filter.clone(), + b1, + )?; + let mut pages = Decompressor::new(pages, b2); let array = page_iter_to_array(&mut pages, column_meta, field.data_type().clone())?;