diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index f13f3f0da32..f455c66434b 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -1,5 +1,4 @@ use std::io::Read; -use std::sync::Arc; use std::{fs, io::Cursor, path::PathBuf}; use criterion::{criterion_group, criterion_main, Criterion}; @@ -20,8 +19,7 @@ fn to_buffer(size: usize) -> Vec { fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<()> { let file = Cursor::new(buffer); - let reader = - read::RecordReader::try_new(file, Some(vec![column]), None, Arc::new(|_, _| true), None)?; + let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?; for maybe_batch in reader { let batch = maybe_batch?; diff --git a/examples/parquet_read_record.rs b/examples/parquet_read_record.rs index d55065125b1..9c5467ad791 100644 --- a/examples/parquet_read_record.rs +++ b/examples/parquet_read_record.rs @@ -1,5 +1,4 @@ use std::fs::File; -use std::sync::Arc; use arrow2::error::Result; use arrow2::io::parquet::read; @@ -11,7 +10,7 @@ fn main() -> Result<()> { let file_path = &args[1]; let reader = File::open(file_path)?; - let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?; + let reader = read::RecordReader::try_new(reader, None, None, None, None)?; for maybe_batch in reader { let batch = maybe_batch?; diff --git a/src/io/parquet/read/record_batch.rs b/src/io/parquet/read/record_batch.rs index 4335dd62058..8eb36e168e5 100644 --- a/src/io/parquet/read/record_batch.rs +++ b/src/io/parquet/read/record_batch.rs @@ -24,7 +24,7 @@ pub struct RecordReader { indices: Rc>, buffer: Vec, decompress_buffer: Vec, - groups_filter: GroupFilter, + groups_filter: Option, pages_filter: Option, metadata: Rc, current_group: usize, @@ -36,7 +36,7 @@ impl RecordReader { mut reader: R, projection: Option>, limit: Option, - groups_filter: GroupFilter, + groups_filter: Option, pages_filter: Option, ) -> Result { let metadata = read_metadata(&mut reader)?; @@ -89,6 +89,10 @@ impl RecordReader { pub fn schema(&self) -> &Arc { &self.schema } + + pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) { + self.groups_filter = Some(groups_filter); + } } impl Iterator for RecordReader { @@ -109,9 +113,11 @@ impl Iterator for RecordReader { let row_group = self.current_group; let metadata = self.metadata.clone(); let group = &metadata.row_groups[row_group]; - if !(self.groups_filter)(row_group, group) { - self.current_group += 1; - return self.next(); + if let Some(groups_filter) = self.groups_filter.as_ref() { + if !(groups_filter)(row_group, group) { + self.current_group += 1; + return self.next(); + } } let columns_meta = group.columns(); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 31964d6d4ac..742fcfdcbf9 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -21,13 +21,7 @@ pub fn read_column( ) -> Result { let metadata = read_metadata(&mut reader)?; - let mut reader = RecordReader::try_new( - reader, - Some(vec![column]), - None, - Arc::new(|_, _| true), - None, - )?; + let mut reader = RecordReader::try_new(reader, Some(vec![column]), None, None, None)?; let statistics = metadata.row_groups[row_group] .column(column) @@ -432,7 +426,7 @@ fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result fn integration_read(data: &[u8]) -> Result<(Arc, Vec)> { let reader = Cursor::new(data); - let reader = RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?; + let reader = RecordReader::try_new(reader, None, None, None, None)?; let schema = reader.schema().clone(); let batches = reader.collect::>>()?; diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 31663927883..61d855edf79 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -1,5 +1,4 @@ use std::fs::File; -use std::sync::Arc; use arrow2::array::*; use arrow2::error::Result; @@ -223,7 +222,7 @@ fn all_types() -> Result<()> { let path = "testing/parquet-testing/data/alltypes_plain.parquet"; let reader = std::fs::File::open(path)?; - let reader = RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?; + let reader = RecordReader::try_new(reader, None, None, None, None)?; let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1);