diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 85f36f9d3aa..94c089931a2 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -41,7 +41,7 @@ fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> { let schema = schema.filter(|index, _| index == column); - let reader = read::FileReader::new(reader, metadata, schema, None, None, None); + let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None); for maybe_chunk in reader { let columns = maybe_chunk?; diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 490b74638d9..e0e5a220e8a 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -25,11 +25,20 @@ fn main() -> Result<(), Error> { println!("{:#?}", statistics); - // and create an iterator of - let reader = read::FileReader::new(reader, metadata, schema, Some(1024 * 8 * 8), None, None); + // say we found that we only need to read the first two row groups, "0" and "1" + let row_groups = metadata + .row_groups + .into_iter() + .enumerate() + .filter(|(index, _)| *index == 0 || *index == 1) + .map(|(_, row_group)| row_group) + .collect(); + + // we can then read the row groups into chunks + let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None); let start = SystemTime::now(); - for maybe_chunk in reader { + for maybe_chunk in chunks { let chunk = maybe_chunk?; assert!(!chunk.is_empty()); } diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index 6bf34fd5f23..ff8944d38cf 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -1,5 +1,4 @@ use std::io::{Read, Seek}; -use std::sync::Arc; use crate::array::Array; use crate::chunk::Chunk; @@ -7,9 +6,7 @@ use crate::datatypes::Schema; use crate::error::Result; use crate::io::parquet::read::read_columns_many; -use super::{FileMetaData, RowGroupDeserializer, RowGroupMetaData}; - -type GroupFilter = Arc bool + Send + Sync>; +use super::{RowGroupDeserializer, RowGroupMetaData}; /// An iterator of [`Chunk`]s coming from row groups of a parquet file. /// @@ -28,20 +25,12 @@ impl FileReader { /// Returns a new [`FileReader`]. pub fn new( reader: R, - metadata: FileMetaData, + row_groups: Vec, schema: Schema, chunk_size: Option, limit: Option, - groups_filter: Option, ) -> Self { - let row_groups = RowGroupReader::new( - reader, - schema, - groups_filter, - metadata.row_groups, - chunk_size, - limit, - ); + let row_groups = RowGroupReader::new(reader, schema, row_groups, chunk_size, limit); Self { row_groups, @@ -50,11 +39,6 @@ impl FileReader { } } - /// Sets the groups filter - pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) { - self.row_groups.set_groups_filter(groups_filter); - } - fn next_row_group(&mut self) -> Result> { let result = self.row_groups.next().transpose()?; @@ -121,7 +105,6 @@ impl Iterator for FileReader { pub struct RowGroupReader { reader: R, schema: Schema, - groups_filter: Option, row_groups: std::iter::Enumerate>, chunk_size: Option, remaining_rows: usize, @@ -132,7 +115,6 @@ impl RowGroupReader { pub fn new( reader: R, schema: Schema, - groups_filter: Option, row_groups: Vec, chunk_size: Option, limit: Option, @@ -140,18 +122,12 @@ impl RowGroupReader { Self { reader, schema, - groups_filter, row_groups: row_groups.into_iter().enumerate(), chunk_size, remaining_rows: limit.unwrap_or(usize::MAX), } } - /// Sets the groups filter - pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) { - self.groups_filter = Some(groups_filter); - } - #[inline] fn _next(&mut self) -> Result> { if self.schema.fields.is_empty() { @@ -162,14 +138,7 @@ impl RowGroupReader { return Ok(None); } - let row_group = if let Some(groups_filter) = self.groups_filter.as_ref() { - self.row_groups - .by_ref() - .find(|(index, row_group)| (groups_filter)(*index, row_group)) - } else { - self.row_groups.next() - }; - let row_group = if let Some((_, row_group)) = row_group { + let row_group = if let Some((_, row_group)) = self.row_groups.next() { row_group } else { return Ok(None); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 42e00aa4aa0..55baceaa9af 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -39,9 +39,7 @@ pub fn read_column(mut reader: R, column: &str) -> Result) -> Result>>()?; @@ -1541,15 +1538,16 @@ fn filter_chunk() -> Result<()> { let new_schema = p_read::infer_schema(&metadata)?; assert_eq!(new_schema, schema); - let reader = p_read::FileReader::new( - reader, - metadata, - schema, - None, - None, - // select chunk 1 - Some(std::sync::Arc::new(|i, _| i == 0)), - ); + // select chunk 1 + let row_groups = metadata + .row_groups + .into_iter() + .enumerate() + .filter(|(index, _)| *index == 0) + .map(|(_, row_group)| row_group) + .collect(); + + let reader = p_read::FileReader::new(reader, row_groups, schema, None, None); let new_chunks = reader.collect::>>()?; diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 0349eb4b03f..efb3943da94 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -500,7 +500,7 @@ fn all_types() -> Result<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; - let reader = FileReader::new(reader, metadata, schema, None, None, None); + let reader = FileReader::new(reader, metadata.row_groups, schema, None, None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1); @@ -542,7 +542,7 @@ fn all_types_chunked() -> Result<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; // chunk it in 5 (so, (5,3)) - let reader = FileReader::new(reader, metadata, schema, Some(5), None, None); + let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 2); @@ -605,7 +605,7 @@ fn invalid_utf8() -> Result<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; - let reader = FileReader::new(reader, metadata, schema, Some(5), None, None); + let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None); let error = reader.collect::>>().unwrap_err(); assert!(