From e8686b7ac941d5196c86a0c28aca8822170f7fbd Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 8 Aug 2022 04:20:14 +0000 Subject: [PATCH 1/4] Allow sidecar --- examples/parquet_read.rs | 6 ++++-- src/io/parquet/read/file.rs | 11 ++++------- tests/it/io/parquet/mod.rs | 26 +++++++++++++++++--------- tests/it/io/parquet/read.rs | 19 ++++++++++++------- 4 files changed, 37 insertions(+), 25 deletions(-) diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 92fa83c521e..241e845854d 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -10,8 +10,10 @@ fn main() -> Result<()> { let file_path = &args[1]; - let reader = File::open(file_path)?; - let reader = read::FileReader::try_new(reader, None, Some(1024 * 8 * 8), None, None)?; + let mut reader = File::open(file_path)?; + + let metadata = read::read_metadata(&mut reader)?; + let reader = read::FileReader::try_new(reader, metadata, None, Some(1024 * 8 * 8), None, None)?; println!("{:#?}", reader.schema()); diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index 638089668af..d4926cfc72f 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -10,7 +10,7 @@ use crate::{ error::{Error, Result}, }; -use super::{infer_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData}; +use super::{infer_schema, FileMetaData, RowGroupDeserializer, RowGroupMetaData}; type GroupFilter = Arc bool + Send + Sync>; @@ -29,23 +29,20 @@ pub struct FileReader { } impl FileReader { - /// Creates a new [`FileReader`] by reading the metadata from `reader` and constructing - /// Arrow's schema from it. + /// Returns a new [`FileReader`]. /// /// # Error /// This function errors iff: - /// * reading the metadata from the reader fails /// * it is not possible to derive an arrow schema from the parquet file /// * the projection contains columns that do not exist pub fn try_new( - mut reader: R, + reader: R, + metadata: FileMetaData, projection: Option<&[usize]>, chunk_size: Option, limit: Option, groups_filter: Option, ) -> Result { - let metadata = read_metadata(&mut reader)?; - let schema = infer_schema(&metadata)?; let schema_metadata = schema.metadata; diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 8b8a3c2038d..7bff1a08e0f 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -7,8 +7,8 @@ use arrow2::{ chunk::Chunk, datatypes::*, error::Result, + io::parquet::read as p_read, io::parquet::read::statistics::*, - io::parquet::read::*, io::parquet::write::*, types::{days_ms, NativeType}, }; @@ -23,11 +23,11 @@ mod write_async; type ArrayStats = (Box, Statistics); pub fn read_column(mut reader: R, column: &str) -> Result { - let metadata = read_metadata(&mut reader)?; - let schema = infer_schema(&metadata)?; + let metadata = p_read::read_metadata(&mut reader)?; + let schema = p_read::infer_schema(&metadata)?; // verify that we can read indexes - let _indexes = read_columns_indexes( + let _indexes = p_read::read_columns_indexes( &mut reader, metadata.row_groups[0].columns(), &schema.fields, @@ -40,12 +40,13 @@ pub fn read_column(mut reader: R, column: &str) -> Result>]) -> Resul type IntegrationRead = (Schema, Vec>>); fn integration_read(data: &[u8], limit: Option) -> Result { - let reader = FileReader::try_new(Cursor::new(data), None, None, limit, None)?; + let mut reader = Cursor::new(data); + let metadata = p_read::read_metadata(&mut reader)?; + let reader = p_read::FileReader::try_new(Cursor::new(data), metadata, None, None, limit, None)?; let schema = reader.schema().clone(); for field in &schema.fields { @@ -1527,8 +1530,13 @@ fn filter_chunk() -> Result<()> { let r = integration_write(&schema, &[chunk1.clone(), chunk2.clone()])?; - let reader = FileReader::try_new( - Cursor::new(r), + let mut reader = Cursor::new(r); + + let metadata = p_read::read_metadata(&mut reader)?; + + let reader = p_read::FileReader::try_new( + reader, + metadata, None, None, None, diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 45c5b714d6a..2872889290d 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -496,9 +496,10 @@ fn v1_map_nullable() -> Result<()> { #[test] fn all_types() -> Result<()> { let path = "testing/parquet-testing/data/alltypes_plain.parquet"; - let reader = std::fs::File::open(path)?; + let mut reader = std::fs::File::open(path)?; - let reader = FileReader::try_new(reader, None, None, None, None)?; + let metadata = read_metadata(&mut reader)?; + let reader = FileReader::try_new(reader, metadata, None, None, None, None)?; let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1); @@ -535,10 +536,11 @@ fn all_types() -> Result<()> { fn all_types_chunked() -> Result<()> { // this has one batch with 8 elements let path = "testing/parquet-testing/data/alltypes_plain.parquet"; - let reader = std::fs::File::open(path)?; + let mut reader = std::fs::File::open(path)?; + let metadata = read_metadata(&mut reader)?; // chunk it in 5 (so, (5,3)) - let reader = FileReader::try_new(reader, None, Some(5), None, None)?; + let reader = FileReader::try_new(reader, metadata, None, Some(5), None, None)?; let batches = reader.collect::>>()?; assert_eq!(batches.len(), 2); @@ -584,7 +586,7 @@ fn all_types_chunked() -> Result<()> { #[cfg(feature = "io_parquet_compression")] #[test] -fn invalid_utf8() { +fn invalid_utf8() -> Result<()> { let invalid_data = &[ 0x50, 0x41, 0x52, 0x31, 0x15, 0x00, 0x15, 0x24, 0x15, 0x28, 0x2c, 0x15, 0x02, 0x15, 0x00, 0x15, 0x06, 0x15, 0x08, 0x00, 0x00, 0x12, 0x44, 0x02, 0x00, 0x00, 0x00, 0x03, 0xff, 0x08, @@ -597,8 +599,10 @@ fn invalid_utf8() { 0x42, 0x00, 0x51, 0x00, 0x00, 0x00, 0x50, 0x41, 0x52, 0x31, ]; - let reader = Cursor::new(invalid_data); - let reader = FileReader::try_new(reader, None, Some(5), None, None).unwrap(); + let mut reader = Cursor::new(invalid_data); + + let metadata = read_metadata(&mut reader)?; + let reader = FileReader::try_new(reader, metadata, None, Some(5), None, None)?; let error = reader.collect::>>().unwrap_err(); assert!( @@ -606,4 +610,5 @@ fn invalid_utf8() { "unexpected error: {}", error ); + Ok(()) } From 728c9555439e5ac3651dfcc4b39582a1424a342d Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 8 Aug 2022 05:09:48 +0000 Subject: [PATCH 2/4] Allow a different arrow schema into FileReader --- benches/read_parquet.rs | 34 +++++++++------- examples/parquet_read.rs | 23 ++++++----- src/datatypes/schema.rs | 22 +++++++++++ src/io/parquet/read/file.rs | 78 ++++++------------------------------- tests/it/io/parquet/mod.rs | 43 ++++++++++---------- tests/it/io/parquet/read.rs | 9 +++-- 6 files changed, 94 insertions(+), 115 deletions(-) diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 3d1dc858588..85f36f9d3aa 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -32,10 +32,16 @@ fn to_buffer( buffer } -fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> { - let file = Cursor::new(buffer); +fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> { + let mut reader = Cursor::new(buffer); - let reader = read::FileReader::try_new(file, Some(&[column]), None, None, None)?; + let metadata = read::read_metadata(&mut reader)?; + + let schema = read::infer_schema(&metadata)?; + + let schema = schema.filter(|index, _| index == column); + + let reader = read::FileReader::new(reader, metadata, schema, None, None, None); for maybe_chunk in reader { let columns = maybe_chunk?; @@ -49,43 +55,43 @@ fn add_benchmark(c: &mut Criterion) { let size = 2usize.pow(i); let buffer = to_buffer(size, true, false, false, false); let a = format!("read i64 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap())); let a = format!("read utf8 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); let a = format!("read utf8 large 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 6).unwrap())); let a = format!("read utf8 emoji 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 12).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 12).unwrap())); let a = format!("read bool 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 3).unwrap())); let buffer = to_buffer(size, true, true, false, false); let a = format!("read utf8 dict 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, true, false, false, true); let a = format!("read i64 snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap())); let buffer = to_buffer(size, true, false, true, false); let a = format!("read utf8 multi 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, true, false, true, true); let a = format!("read utf8 multi snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, true, false, true, true); let a = format!("read i64 multi snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap())); let buffer = to_buffer(size, false, false, false, false); let a = format!("read required utf8 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); }); } diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 241e845854d..490b74638d9 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -1,30 +1,33 @@ use std::fs::File; use std::time::SystemTime; -use arrow2::error::Result; +use arrow2::error::Error; use arrow2::io::parquet::read; -fn main() -> Result<()> { +fn main() -> Result<(), Error> { + // say we have a file use std::env; let args: Vec = env::args().collect(); - let file_path = &args[1]; - let mut reader = File::open(file_path)?; + // we can read its metadata: let metadata = read::read_metadata(&mut reader)?; - let reader = read::FileReader::try_new(reader, metadata, None, Some(1024 * 8 * 8), None, None)?; - println!("{:#?}", reader.schema()); + // and infer a [`Schema`] from the `metadata`. + let schema = read::infer_schema(&metadata)?; - // say we want to evaluate if the we can skip some row groups based on a field's value - let field = &reader.schema().fields[0]; + // we can filter the columns we need (here we select all) + let schema = schema.filter(|_index, _field| true); - // we can deserialize the parquet statistics from this field - let statistics = read::statistics::deserialize(field, &reader.metadata().row_groups)?; + // we can read the statistics of all parquet's row groups (here for the first field) + let statistics = read::statistics::deserialize(&schema.fields[0], &metadata.row_groups)?; println!("{:#?}", statistics); + // and create an iterator of + let reader = read::FileReader::new(reader, metadata, schema, Some(1024 * 8 * 8), None, None); + let start = SystemTime::now(); for maybe_chunk in reader { let chunk = maybe_chunk?; diff --git a/src/datatypes/schema.rs b/src/datatypes/schema.rs index baa04476360..671c9438622 100644 --- a/src/datatypes/schema.rs +++ b/src/datatypes/schema.rs @@ -26,6 +26,28 @@ impl Schema { metadata, } } + + /// Returns a new [`Schema`] with a subset of all fields whose `predicate` + /// evaluates to true. + pub fn filter bool>(self, predicate: F) -> Self { + let fields = self + .fields + .into_iter() + .enumerate() + .filter_map(|(index, f)| { + if (predicate)(index, &f) { + Some(f) + } else { + None + } + }) + .collect(); + + Schema { + fields, + metadata: self.metadata, + } + } } impl From> for Schema { diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index d4926cfc72f..6bf34fd5f23 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -4,13 +4,10 @@ use std::sync::Arc; use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::Schema; +use crate::error::Result; use crate::io::parquet::read::read_columns_many; -use crate::{ - datatypes::Field, - error::{Error, Result}, -}; -use super::{infer_schema, FileMetaData, RowGroupDeserializer, RowGroupMetaData}; +use super::{FileMetaData, RowGroupDeserializer, RowGroupMetaData}; type GroupFilter = Arc bool + Send + Sync>; @@ -20,88 +17,37 @@ type GroupFilter = Arc bool + Send + Sync>; /// mapped to an [`Iterator`] and each iterator is iterated upon until either the limit /// or the last iterator ends. /// # Implementation -/// This iterator mixes IO-bounded and CPU-bounded operations. +/// This iterator is single threaded on both IO-bounded and CPU-bounded tasks, and mixes them. pub struct FileReader { row_groups: RowGroupReader, - metadata: FileMetaData, remaining_rows: usize, current_row_group: Option, } impl FileReader { /// Returns a new [`FileReader`]. - /// - /// # Error - /// This function errors iff: - /// * it is not possible to derive an arrow schema from the parquet file - /// * the projection contains columns that do not exist - pub fn try_new( + pub fn new( reader: R, metadata: FileMetaData, - projection: Option<&[usize]>, + schema: Schema, chunk_size: Option, limit: Option, groups_filter: Option, - ) -> Result { - let schema = infer_schema(&metadata)?; - - let schema_metadata = schema.metadata; - let fields: Vec = if let Some(projection) = &projection { - schema - .fields - .into_iter() - .enumerate() - .filter_map(|(index, f)| { - if projection.iter().any(|&i| i == index) { - Some(f) - } else { - None - } - }) - .collect() - } else { - schema.fields.into_iter().collect() - }; - - if let Some(projection) = &projection { - if fields.len() != projection.len() { - return Err(Error::InvalidArgumentError( - "While reading parquet, some columns in the projection do not exist in the file" - .to_string(), - )); - } - } - - let schema = Schema { - fields, - metadata: schema_metadata, - }; - + ) -> Self { let row_groups = RowGroupReader::new( reader, schema, groups_filter, - metadata.row_groups.clone(), + metadata.row_groups, chunk_size, limit, ); - Ok(Self { + Self { row_groups, - metadata, remaining_rows: limit.unwrap_or(usize::MAX), current_row_group: None, - }) - } - - /// Returns the derived arrow [`Schema`] of the file - pub fn schema(&self) -> &Schema { - &self.row_groups.schema - } - - /// Returns parquet's [`FileMetaData`]. - pub fn metadata(&self) -> &FileMetaData { - &self.metadata + } } /// Sets the groups filter @@ -239,12 +185,10 @@ impl RowGroupReader { let result = RowGroupDeserializer::new( column_chunks, - row_group.num_rows() as usize, + row_group.num_rows(), Some(self.remaining_rows), ); - self.remaining_rows = self - .remaining_rows - .saturating_sub(row_group.num_rows() as usize); + self.remaining_rows = self.remaining_rows.saturating_sub(row_group.num_rows()); Ok(Some(result)) } } diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 7bff1a08e0f..42e00aa4aa0 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -33,19 +33,15 @@ pub fn read_column(mut reader: R, column: &str) -> Result>>); fn integration_read(data: &[u8], limit: Option) -> Result { let mut reader = Cursor::new(data); let metadata = p_read::read_metadata(&mut reader)?; - let reader = p_read::FileReader::try_new(Cursor::new(data), metadata, None, None, limit, None)?; - let schema = reader.schema().clone(); + let schema = p_read::infer_schema(&metadata)?; for field in &schema.fields { - let mut _statistics = deserialize(field, &reader.metadata().row_groups)?; + let mut _statistics = deserialize(field, &metadata.row_groups)?; } + let reader = p_read::FileReader::new( + Cursor::new(data), + metadata, + schema.clone(), + None, + limit, + None, + ); + let batches = reader.collect::>>()?; Ok((schema, batches)) @@ -1534,24 +1538,21 @@ fn filter_chunk() -> Result<()> { let metadata = p_read::read_metadata(&mut reader)?; - let reader = p_read::FileReader::try_new( + let new_schema = p_read::infer_schema(&metadata)?; + assert_eq!(new_schema, schema); + + let reader = p_read::FileReader::new( reader, metadata, - None, + schema, None, None, // select chunk 1 Some(std::sync::Arc::new(|i, _| i == 0)), - )?; - let new_schema = reader.schema().clone(); - - for field in &schema.fields { - let mut _statistics = deserialize(field, &reader.metadata().row_groups)?; - } + ); let new_chunks = reader.collect::>>()?; - assert_eq!(new_schema, schema); assert_eq!(new_chunks, vec![chunk1]); Ok(()) } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 2872889290d..0349eb4b03f 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -499,7 +499,8 @@ fn all_types() -> Result<()> { let mut reader = std::fs::File::open(path)?; let metadata = read_metadata(&mut reader)?; - let reader = FileReader::try_new(reader, metadata, None, None, None, None)?; + let schema = infer_schema(&metadata)?; + let reader = FileReader::new(reader, metadata, schema, None, None, None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1); @@ -539,8 +540,9 @@ fn all_types_chunked() -> Result<()> { let mut reader = std::fs::File::open(path)?; let metadata = read_metadata(&mut reader)?; + let schema = infer_schema(&metadata)?; // chunk it in 5 (so, (5,3)) - let reader = FileReader::try_new(reader, metadata, None, Some(5), None, None)?; + let reader = FileReader::new(reader, metadata, schema, Some(5), None, None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 2); @@ -602,7 +604,8 @@ fn invalid_utf8() -> Result<()> { let mut reader = Cursor::new(invalid_data); let metadata = read_metadata(&mut reader)?; - let reader = FileReader::try_new(reader, metadata, None, Some(5), None, None)?; + let schema = infer_schema(&metadata)?; + let reader = FileReader::new(reader, metadata, schema, Some(5), None, None); let error = reader.collect::>>().unwrap_err(); assert!( From 4e45ccd1d30f7b6fa4daa5753fc218906529af04 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 8 Aug 2022 05:19:41 +0000 Subject: [PATCH 3/4] Simpler GroupFilter API --- benches/read_parquet.rs | 2 +- examples/parquet_read.rs | 15 +++++++++++--- src/io/parquet/read/file.rs | 39 ++++--------------------------------- tests/it/io/parquet/mod.rs | 26 ++++++++++++------------- tests/it/io/parquet/read.rs | 6 +++--- 5 files changed, 32 insertions(+), 56 deletions(-) diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 85f36f9d3aa..94c089931a2 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -41,7 +41,7 @@ fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> { let schema = schema.filter(|index, _| index == column); - let reader = read::FileReader::new(reader, metadata, schema, None, None, None); + let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None); for maybe_chunk in reader { let columns = maybe_chunk?; diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 490b74638d9..e0e5a220e8a 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -25,11 +25,20 @@ fn main() -> Result<(), Error> { println!("{:#?}", statistics); - // and create an iterator of - let reader = read::FileReader::new(reader, metadata, schema, Some(1024 * 8 * 8), None, None); + // say we found that we only need to read the first two row groups, "0" and "1" + let row_groups = metadata + .row_groups + .into_iter() + .enumerate() + .filter(|(index, _)| *index == 0 || *index == 1) + .map(|(_, row_group)| row_group) + .collect(); + + // we can then read the row groups into chunks + let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None); let start = SystemTime::now(); - for maybe_chunk in reader { + for maybe_chunk in chunks { let chunk = maybe_chunk?; assert!(!chunk.is_empty()); } diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index 6bf34fd5f23..ff8944d38cf 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -1,5 +1,4 @@ use std::io::{Read, Seek}; -use std::sync::Arc; use crate::array::Array; use crate::chunk::Chunk; @@ -7,9 +6,7 @@ use crate::datatypes::Schema; use crate::error::Result; use crate::io::parquet::read::read_columns_many; -use super::{FileMetaData, RowGroupDeserializer, RowGroupMetaData}; - -type GroupFilter = Arc bool + Send + Sync>; +use super::{RowGroupDeserializer, RowGroupMetaData}; /// An iterator of [`Chunk`]s coming from row groups of a parquet file. /// @@ -28,20 +25,12 @@ impl FileReader { /// Returns a new [`FileReader`]. pub fn new( reader: R, - metadata: FileMetaData, + row_groups: Vec, schema: Schema, chunk_size: Option, limit: Option, - groups_filter: Option, ) -> Self { - let row_groups = RowGroupReader::new( - reader, - schema, - groups_filter, - metadata.row_groups, - chunk_size, - limit, - ); + let row_groups = RowGroupReader::new(reader, schema, row_groups, chunk_size, limit); Self { row_groups, @@ -50,11 +39,6 @@ impl FileReader { } } - /// Sets the groups filter - pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) { - self.row_groups.set_groups_filter(groups_filter); - } - fn next_row_group(&mut self) -> Result> { let result = self.row_groups.next().transpose()?; @@ -121,7 +105,6 @@ impl Iterator for FileReader { pub struct RowGroupReader { reader: R, schema: Schema, - groups_filter: Option, row_groups: std::iter::Enumerate>, chunk_size: Option, remaining_rows: usize, @@ -132,7 +115,6 @@ impl RowGroupReader { pub fn new( reader: R, schema: Schema, - groups_filter: Option, row_groups: Vec, chunk_size: Option, limit: Option, @@ -140,18 +122,12 @@ impl RowGroupReader { Self { reader, schema, - groups_filter, row_groups: row_groups.into_iter().enumerate(), chunk_size, remaining_rows: limit.unwrap_or(usize::MAX), } } - /// Sets the groups filter - pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) { - self.groups_filter = Some(groups_filter); - } - #[inline] fn _next(&mut self) -> Result> { if self.schema.fields.is_empty() { @@ -162,14 +138,7 @@ impl RowGroupReader { return Ok(None); } - let row_group = if let Some(groups_filter) = self.groups_filter.as_ref() { - self.row_groups - .by_ref() - .find(|(index, row_group)| (groups_filter)(*index, row_group)) - } else { - self.row_groups.next() - }; - let row_group = if let Some((_, row_group)) = row_group { + let row_group = if let Some((_, row_group)) = self.row_groups.next() { row_group } else { return Ok(None); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 42e00aa4aa0..55baceaa9af 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -39,9 +39,7 @@ pub fn read_column(mut reader: R, column: &str) -> Result) -> Result>>()?; @@ -1541,15 +1538,16 @@ fn filter_chunk() -> Result<()> { let new_schema = p_read::infer_schema(&metadata)?; assert_eq!(new_schema, schema); - let reader = p_read::FileReader::new( - reader, - metadata, - schema, - None, - None, - // select chunk 1 - Some(std::sync::Arc::new(|i, _| i == 0)), - ); + // select chunk 1 + let row_groups = metadata + .row_groups + .into_iter() + .enumerate() + .filter(|(index, _)| *index == 0) + .map(|(_, row_group)| row_group) + .collect(); + + let reader = p_read::FileReader::new(reader, row_groups, schema, None, None); let new_chunks = reader.collect::>>()?; diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 0349eb4b03f..efb3943da94 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -500,7 +500,7 @@ fn all_types() -> Result<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; - let reader = FileReader::new(reader, metadata, schema, None, None, None); + let reader = FileReader::new(reader, metadata.row_groups, schema, None, None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1); @@ -542,7 +542,7 @@ fn all_types_chunked() -> Result<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; // chunk it in 5 (so, (5,3)) - let reader = FileReader::new(reader, metadata, schema, Some(5), None, None); + let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 2); @@ -605,7 +605,7 @@ fn invalid_utf8() -> Result<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; - let reader = FileReader::new(reader, metadata, schema, Some(5), None, None); + let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None); let error = reader.collect::>>().unwrap_err(); assert!( From 34c3539e4615441f7e7217bd0fa78b460e6ddf7e Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 8 Aug 2022 06:07:47 +0000 Subject: [PATCH 4/4] lint --- examples/parquet_read_async.rs | 2 +- examples/s3/src/main.rs | 2 +- src/io/parquet/read/row_group.rs | 4 +--- tests/it/io/parquet/read_indexes.rs | 2 +- tests/it/io/parquet/write_async.rs | 2 +- 5 files changed, 5 insertions(+), 7 deletions(-) diff --git a/examples/parquet_read_async.rs b/examples/parquet_read_async.rs index 2428b368869..3f1abc4faf6 100644 --- a/examples/parquet_read_async.rs +++ b/examples/parquet_read_async.rs @@ -43,7 +43,7 @@ async fn main() -> Result<()> { // the runtime. // Furthermore, this operation is trivially paralellizable e.g. via rayon, as each iterator // can be advanced in parallel (parallel decompression and deserialization). - let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows() as usize, None); + let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows(), None); for maybe_chunk in chunks { let chunk = maybe_chunk?; println!("{}", chunk.len()); diff --git a/examples/s3/src/main.rs b/examples/s3/src/main.rs index 5aa24431dd9..d3f668d4421 100644 --- a/examples/s3/src/main.rs +++ b/examples/s3/src/main.rs @@ -71,7 +71,7 @@ async fn main() -> Result<()> { // this is CPU-bounded and should be sent to a separate thread-pool. // We do it here for simplicity - let chunks = read::RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None); + let chunks = read::RowGroupDeserializer::new(column_chunks, group.num_rows(), None); let chunks = chunks.collect::>>()?; // this is a single chunk because chunk_size is `None` diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 4dde21e467d..59fba886a65 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -260,8 +260,6 @@ pub async fn read_columns_many_async< field_columns .into_iter() .zip(fields.into_iter()) - .map(|(columns, field)| { - to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size) - }) + .map(|(columns, field)| to_deserializer(columns, field, row_group.num_rows(), chunk_size)) .collect() } diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index 68f09bae804..328a826eaf0 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -125,7 +125,7 @@ fn read_with_indexes( vec![&c1.descriptor().descriptor.primitive_type], schema.fields[1].clone(), None, - row_group.num_rows() as usize, + row_group.num_rows(), )?; let arrays = arrays.collect::>>()?; diff --git a/tests/it/io/parquet/write_async.rs b/tests/it/io/parquet/write_async.rs index 5644caf67f5..000c39ca64c 100644 --- a/tests/it/io/parquet/write_async.rs +++ b/tests/it/io/parquet/write_async.rs @@ -63,7 +63,7 @@ async fn test_parquet_async_roundtrip() { let column_chunks = read_columns_many_async(factory, group, schema.fields.clone(), None) .await .unwrap(); - let chunks = RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None); + let chunks = RowGroupDeserializer::new(column_chunks, group.num_rows(), None); let mut chunks = chunks.collect::>>().unwrap(); out.append(&mut chunks); }