Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simpler GroupFilter API
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 8, 2022
1 parent 728c955 commit 4e45ccd
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 56 deletions.
2 changes: 1 addition & 1 deletion benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> {

let schema = schema.filter(|index, _| index == column);

let reader = read::FileReader::new(reader, metadata, schema, None, None, None);
let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None);

for maybe_chunk in reader {
let columns = maybe_chunk?;
Expand Down
15 changes: 12 additions & 3 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@ fn main() -> Result<(), Error> {

println!("{:#?}", statistics);

// and create an iterator of
let reader = read::FileReader::new(reader, metadata, schema, Some(1024 * 8 * 8), None, None);
// say we found that we only need to read the first two row groups, "0" and "1"
let row_groups = metadata
.row_groups
.into_iter()
.enumerate()
.filter(|(index, _)| *index == 0 || *index == 1)
.map(|(_, row_group)| row_group)
.collect();

// we can then read the row groups into chunks
let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None);

let start = SystemTime::now();
for maybe_chunk in reader {
for maybe_chunk in chunks {
let chunk = maybe_chunk?;
assert!(!chunk.is_empty());
}
Expand Down
39 changes: 4 additions & 35 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use std::io::{Read, Seek};
use std::sync::Arc;

use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::error::Result;
use crate::io::parquet::read::read_columns_many;

use super::{FileMetaData, RowGroupDeserializer, RowGroupMetaData};

type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool + Send + Sync>;
use super::{RowGroupDeserializer, RowGroupMetaData};

/// An iterator of [`Chunk`]s coming from row groups of a parquet file.
///
Expand All @@ -28,20 +25,12 @@ impl<R: Read + Seek> FileReader<R> {
/// Returns a new [`FileReader`].
pub fn new(
reader: R,
metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
schema: Schema,
chunk_size: Option<usize>,
limit: Option<usize>,
groups_filter: Option<GroupFilter>,
) -> Self {
let row_groups = RowGroupReader::new(
reader,
schema,
groups_filter,
metadata.row_groups,
chunk_size,
limit,
);
let row_groups = RowGroupReader::new(reader, schema, row_groups, chunk_size, limit);

Self {
row_groups,
Expand All @@ -50,11 +39,6 @@ impl<R: Read + Seek> FileReader<R> {
}
}

/// Sets the groups filter
pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) {
self.row_groups.set_groups_filter(groups_filter);
}

fn next_row_group(&mut self) -> Result<Option<RowGroupDeserializer>> {
let result = self.row_groups.next().transpose()?;

Expand Down Expand Up @@ -121,7 +105,6 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
pub struct RowGroupReader<R: Read + Seek> {
reader: R,
schema: Schema,
groups_filter: Option<GroupFilter>,
row_groups: std::iter::Enumerate<std::vec::IntoIter<RowGroupMetaData>>,
chunk_size: Option<usize>,
remaining_rows: usize,
Expand All @@ -132,26 +115,19 @@ impl<R: Read + Seek> RowGroupReader<R> {
pub fn new(
reader: R,
schema: Schema,
groups_filter: Option<GroupFilter>,
row_groups: Vec<RowGroupMetaData>,
chunk_size: Option<usize>,
limit: Option<usize>,
) -> Self {
Self {
reader,
schema,
groups_filter,
row_groups: row_groups.into_iter().enumerate(),
chunk_size,
remaining_rows: limit.unwrap_or(usize::MAX),
}
}

/// Sets the groups filter
pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) {
self.groups_filter = Some(groups_filter);
}

#[inline]
fn _next(&mut self) -> Result<Option<RowGroupDeserializer>> {
if self.schema.fields.is_empty() {
Expand All @@ -162,14 +138,7 @@ impl<R: Read + Seek> RowGroupReader<R> {
return Ok(None);
}

let row_group = if let Some(groups_filter) = self.groups_filter.as_ref() {
self.row_groups
.by_ref()
.find(|(index, row_group)| (groups_filter)(*index, row_group))
} else {
self.row_groups.next()
};
let row_group = if let Some((_, row_group)) = row_group {
let row_group = if let Some((_, row_group)) = self.row_groups.next() {
row_group
} else {
return Ok(None);
Expand Down
26 changes: 12 additions & 14 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ pub fn read_column<R: Read + Seek>(mut reader: R, column: &str) -> Result<ArrayS

let statistics = deserialize(field, &metadata.row_groups)?;

let metadata = p_read::read_metadata(&mut reader)?;
let schema = p_read::infer_schema(&metadata)?;
let mut reader = p_read::FileReader::new(reader, metadata, schema, None, None, None);
let mut reader = p_read::FileReader::new(reader, metadata.row_groups, schema, None, None);

Ok((
reader.next().unwrap()?.into_arrays().pop().unwrap(),
Expand Down Expand Up @@ -1157,11 +1155,10 @@ fn integration_read(data: &[u8], limit: Option<usize>) -> Result<IntegrationRead

let reader = p_read::FileReader::new(
Cursor::new(data),
metadata,
metadata.row_groups,
schema.clone(),
None,
limit,
None,
);

let batches = reader.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -1541,15 +1538,16 @@ fn filter_chunk() -> Result<()> {
let new_schema = p_read::infer_schema(&metadata)?;
assert_eq!(new_schema, schema);

let reader = p_read::FileReader::new(
reader,
metadata,
schema,
None,
None,
// select chunk 1
Some(std::sync::Arc::new(|i, _| i == 0)),
);
// select chunk 1
let row_groups = metadata
.row_groups
.into_iter()
.enumerate()
.filter(|(index, _)| *index == 0)
.map(|(_, row_group)| row_group)
.collect();

let reader = p_read::FileReader::new(reader, row_groups, schema, None, None);

let new_chunks = reader.collect::<Result<Vec<_>>>()?;

Expand Down
6 changes: 3 additions & 3 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ fn all_types() -> Result<()> {

let metadata = read_metadata(&mut reader)?;
let schema = infer_schema(&metadata)?;
let reader = FileReader::new(reader, metadata, schema, None, None, None);
let reader = FileReader::new(reader, metadata.row_groups, schema, None, None);

let batches = reader.collect::<Result<Vec<_>>>()?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -542,7 +542,7 @@ fn all_types_chunked() -> Result<()> {
let metadata = read_metadata(&mut reader)?;
let schema = infer_schema(&metadata)?;
// chunk it in 5 (so, (5,3))
let reader = FileReader::new(reader, metadata, schema, Some(5), None, None);
let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None);

let batches = reader.collect::<Result<Vec<_>>>()?;
assert_eq!(batches.len(), 2);
Expand Down Expand Up @@ -605,7 +605,7 @@ fn invalid_utf8() -> Result<()> {

let metadata = read_metadata(&mut reader)?;
let schema = infer_schema(&metadata)?;
let reader = FileReader::new(reader, metadata, schema, Some(5), None, None);
let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None);

let error = reader.collect::<Result<Vec<_>>>().unwrap_err();
assert!(
Expand Down

0 comments on commit 4e45ccd

Please sign in to comment.