Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Exposed parquet indexed page filtering to FileReader (#1216)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Aug 15, 2022
1 parent d7f44ad commit 3b29c82
Show file tree
Hide file tree
Showing 17 changed files with 546 additions and 185 deletions.
2 changes: 1 addition & 1 deletion benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> {

let schema = schema.filter(|index, _| index == column);

let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None);
let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None, None);

for maybe_chunk in reader {
let columns = maybe_chunk?;
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn main() -> Result<(), Error> {
.collect();

// we can then read the row groups into chunks
let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None);
let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None, None);

let start = SystemTime::now();
for maybe_chunk in chunks {
Expand Down
11 changes: 9 additions & 2 deletions examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ async fn main() -> Result<()> {
for row_group in &metadata.row_groups {
// A row group is consumed in two steps: the first step is to read the (compressed)
// columns into memory, which is IO-bounded.
let column_chunks =
read::read_columns_many_async(factory, row_group, schema.fields.clone(), None).await?;
let column_chunks = read::read_columns_many_async(
factory,
row_group,
schema.fields.clone(),
None,
None,
None,
)
.await?;

// the second step is to iterate over the columns in chunks.
// this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod stream_async;
pub mod file_async;

pub(crate) use common::first_dict_field;
#[cfg(feature = "io_flight")]
pub(crate) use common::{read_dictionary, read_record_batch};
pub use file::{read_batch, read_file_dictionaries, read_file_metadata, FileMetadata};
pub use reader::FileReader;
Expand Down
46 changes: 36 additions & 10 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::io::{Read, Seek};

use parquet2::indexes::FilteredPage;

use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
Expand Down Expand Up @@ -29,8 +31,10 @@ impl<R: Read + Seek> FileReader<R> {
schema: Schema,
chunk_size: Option<usize>,
limit: Option<usize>,
page_indexes: Option<Vec<Vec<Vec<Vec<FilteredPage>>>>>,
) -> Self {
let row_groups = RowGroupReader::new(reader, schema, row_groups, chunk_size, limit);
let row_groups =
RowGroupReader::new(reader, schema, row_groups, chunk_size, limit, page_indexes);

Self {
row_groups,
Expand Down Expand Up @@ -105,9 +109,10 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
pub struct RowGroupReader<R: Read + Seek> {
reader: R,
schema: Schema,
row_groups: std::iter::Enumerate<std::vec::IntoIter<RowGroupMetaData>>,
row_groups: std::vec::IntoIter<RowGroupMetaData>,
chunk_size: Option<usize>,
remaining_rows: usize,
page_indexes: Option<std::vec::IntoIter<Vec<Vec<Vec<FilteredPage>>>>>,
}

impl<R: Read + Seek> RowGroupReader<R> {
Expand All @@ -118,13 +123,18 @@ impl<R: Read + Seek> RowGroupReader<R> {
row_groups: Vec<RowGroupMetaData>,
chunk_size: Option<usize>,
limit: Option<usize>,
page_indexes: Option<Vec<Vec<Vec<Vec<FilteredPage>>>>>,
) -> Self {
if let Some(pages) = &page_indexes {
assert_eq!(pages.len(), row_groups.len())
}
Self {
reader,
schema,
row_groups: row_groups.into_iter().enumerate(),
row_groups: row_groups.into_iter(),
chunk_size,
remaining_rows: limit.unwrap_or(usize::MAX),
page_indexes: page_indexes.map(|pages| pages.into_iter()),
}
}

Expand All @@ -138,26 +148,42 @@ impl<R: Read + Seek> RowGroupReader<R> {
return Ok(None);
}

let row_group = if let Some((_, row_group)) = self.row_groups.next() {
let row_group = if let Some(row_group) = self.row_groups.next() {
row_group
} else {
return Ok(None);
};

let pages = self.page_indexes.as_mut().and_then(|iter| iter.next());

// the number of rows depends on whether indexes are selected or not.
let num_rows = pages
.as_ref()
.map(|x| {
// first field, first column within that field
x[0][0]
.iter()
.map(|page| {
page.selected_rows
.iter()
.map(|interval| interval.length)
.sum::<usize>()
})
.sum()
})
.unwrap_or_else(|| row_group.num_rows());

let column_chunks = read_columns_many(
&mut self.reader,
&row_group,
self.schema.fields.clone(),
self.chunk_size,
Some(self.remaining_rows),
pages,
)?;

let result = RowGroupDeserializer::new(
column_chunks,
row_group.num_rows(),
Some(self.remaining_rows),
);
self.remaining_rows = self.remaining_rows.saturating_sub(row_group.num_rows());
let result = RowGroupDeserializer::new(column_chunks, num_rows, Some(self.remaining_rows));
self.remaining_rows = self.remaining_rows.saturating_sub(num_rows);
Ok(Some(result))
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/indexes/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use crate::{
trusted_len::TrustedLen,
};

use super::ColumnIndex;
use super::ColumnPageStatistics;

pub fn deserialize(
indexes: &[PageIndex<Vec<u8>>],
data_type: &DataType,
) -> Result<ColumnIndex, Error> {
Ok(ColumnIndex {
) -> Result<ColumnPageStatistics, Error> {
Ok(ColumnPageStatistics {
min: deserialize_binary_iter(indexes.iter().map(|index| index.min.as_ref()), data_type)?,
max: deserialize_binary_iter(indexes.iter().map(|index| index.max.as_ref()), data_type)?,
null_count: PrimitiveArray::from_trusted_len_iter(
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/indexes/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use parquet2::indexes::PageIndex;

use crate::array::{BooleanArray, PrimitiveArray};

use super::ColumnIndex;
use super::ColumnPageStatistics;

pub fn deserialize(indexes: &[PageIndex<bool>]) -> ColumnIndex {
ColumnIndex {
pub fn deserialize(indexes: &[PageIndex<bool>]) -> ColumnPageStatistics {
ColumnPageStatistics {
min: Box::new(BooleanArray::from_trusted_len_iter(
indexes.iter().map(|index| index.min),
)),
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/indexes/fixed_len_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use crate::{
trusted_len::TrustedLen,
};

use super::ColumnIndex;
use super::ColumnPageStatistics;

pub fn deserialize(indexes: &[PageIndex<Vec<u8>>], data_type: DataType) -> ColumnIndex {
ColumnIndex {
pub fn deserialize(indexes: &[PageIndex<Vec<u8>>], data_type: DataType) -> ColumnPageStatistics {
ColumnPageStatistics {
min: deserialize_binary_iter(
indexes.iter().map(|index| index.min.as_ref()),
data_type.clone(),
Expand Down
Loading

0 comments on commit 3b29c82

Please sign in to comment.