Skip to content

Commit

Permalink
Add Parquet RowFilter API (#2335)
Browse files Browse the repository at this point in the history
* Add RowFilter API

* Review feedback

* Fix doc

* Fix handling of NULL boolean array

* Add tests, fix bugs

* Fix clippy

* Review feedback

* Fix doc
  • Loading branch information
tustvold authored Aug 11, 2022
1 parent 4481993 commit 21ba02e
Show file tree
Hide file tree
Showing 7 changed files with 910 additions and 196 deletions.
19 changes: 6 additions & 13 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,18 @@ use crate::data_type::{
Int64Type, Int96Type,
};
use crate::errors::Result;
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};

/// Create array reader from parquet schema, projection mask, and parquet file reader.
pub fn build_array_reader(
parquet_schema: SchemaDescPtr,
arrow_schema: SchemaRef,
mask: ProjectionMask,
row_groups: Box<dyn RowGroupCollection>,
row_groups: &dyn RowGroupCollection,
) -> Result<Box<dyn ArrayReader>> {
let field =
convert_schema(parquet_schema.as_ref(), mask, Some(arrow_schema.as_ref()))?;
let field = convert_schema(&row_groups.schema(), mask, Some(arrow_schema.as_ref()))?;

match &field {
Some(field) => build_reader(field, row_groups.as_ref()),
Some(field) => build_reader(field, row_groups),
None => Ok(make_empty_array_reader(row_groups.num_rows())),
}
}
Expand Down Expand Up @@ -333,13 +331,8 @@ mod tests {
)
.unwrap();

let array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
Arc::new(arrow_schema),
mask,
Box::new(file_reader),
)
.unwrap();
let array_reader =
build_array_reader(Arc::new(arrow_schema), mask, &file_reader).unwrap();

// Create arrow types
let arrow_type = DataType::Struct(vec![Field::new(
Expand Down
10 changes: 3 additions & 7 deletions parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,13 +582,9 @@ mod tests {
let schema = file_metadata.schema_descr_ptr();
let mask = ProjectionMask::leaves(&schema, vec![0]);

let mut array_reader = build_array_reader(
schema,
Arc::new(arrow_schema),
mask,
Box::new(file_reader),
)
.unwrap();
let mut array_reader =
build_array_reader(Arc::new(arrow_schema), mask, &file_reader)
.unwrap();

let batch = array_reader.next_batch(100).unwrap();
assert_eq!(batch.data_type(), array_reader.get_data_type());
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub trait ArrayReader: Send {
/// A collection of row groups
pub trait RowGroupCollection {
/// Get schema of parquet file.
fn schema(&self) -> Result<SchemaDescPtr>;
fn schema(&self) -> SchemaDescPtr;

/// Get the numer of rows in this collection
fn num_rows(&self) -> usize;
Expand All @@ -110,8 +110,8 @@ pub trait RowGroupCollection {
}

impl RowGroupCollection for Arc<dyn FileReader> {
fn schema(&self) -> Result<SchemaDescPtr> {
Ok(self.metadata().file_metadata().schema_descr_ptr())
fn schema(&self) -> SchemaDescPtr {
self.metadata().file_metadata().schema_descr_ptr()
}

fn num_rows(&self) -> usize {
Expand Down
109 changes: 109 additions & 0 deletions parquet/src/arrow/arrow_reader/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::arrow::ProjectionMask;
use arrow::array::BooleanArray;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;

/// A predicate operating on [`RecordBatch`]
pub trait ArrowPredicate: Send + 'static {
/// Returns the [`ProjectionMask`] that describes the columns required
/// to evaluate this predicate. All projected columns will be provided in the `batch`
/// passed to [`evaluate`](Self::evaluate)
fn projection(&self) -> &ProjectionMask;

/// Evaluate this predicate for the given [`RecordBatch`] containing the columns
/// identified by [`Self::projection`]
///
/// Rows that are `true` in the returned [`BooleanArray`] will be returned by the
/// parquet reader, whereas rows that are `false` or `Null` will not be
fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray>;
}

/// An [`ArrowPredicate`] created from an [`FnMut`]
pub struct ArrowPredicateFn<F> {
f: F,
projection: ProjectionMask,
}

impl<F> ArrowPredicateFn<F>
where
F: FnMut(RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
{
/// Create a new [`ArrowPredicateFn`]. `f` will be passed batches
/// that contains the columns specified in `projection`
/// and returns a [`BooleanArray`] that describes which rows should
/// be passed along
pub fn new(projection: ProjectionMask, f: F) -> Self {
Self { f, projection }
}
}

impl<F> ArrowPredicate for ArrowPredicateFn<F>
where
F: FnMut(RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
{
fn projection(&self) -> &ProjectionMask {
&self.projection
}

fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
(self.f)(batch)
}
}

/// A [`RowFilter`] allows pushing down a filter predicate to skip IO and decode
///
/// This consists of a list of [`ArrowPredicate`] where only the rows that satisfy all
/// of the predicates will be returned. Any [`RowSelection`] will be applied prior
/// to the first predicate, and each predicate in turn will then be used to compute
/// a more refined [`RowSelection`] to use when evaluating the subsequent predicates.
///
/// Once all predicates have been evaluated, the final [`RowSelection`] is applied
/// to the top-level [`ProjectionMask`] to produce the final output [`RecordBatch`].
///
/// This design has a couple of implications:
///
/// * [`RowFilter`] can be used to skip entire pages, and thus IO, in addition to CPU decode overheads
/// * Columns may be decoded multiple times if they appear in multiple [`ProjectionMask`]
/// * IO will be deferred until needed by a [`ProjectionMask`]
///
/// As such there is a trade-off between a single large predicate, or multiple predicates,
/// that will depend on the shape of the data. Whilst multiple smaller predicates may
/// minimise the amount of data scanned/decoded, it may not be faster overall.
///
/// For example, if a predicate that needs a single column of data filters out all but
/// 1% of the rows, applying it as one of the early `ArrowPredicateFn` will likely significantly
/// improve performance.
///
/// As a counter example, if a predicate needs several columns of data to evaluate but
/// leaves 99% of the rows, it may be better to not filter the data from parquet and
/// apply the filter after the RecordBatch has been fully decoded.
///
/// [`RowSelection`]: [super::selection::RowSelection]
pub struct RowFilter {
/// A list of [`ArrowPredicate`]
pub(crate) predicates: Vec<Box<dyn ArrowPredicate>>,
}

impl RowFilter {
/// Create a new [`RowFilter`] from an array of [`ArrowPredicate`]
pub fn new(predicates: Vec<Box<dyn ArrowPredicate>>) -> Self {
Self { predicates }
}
}
Loading

0 comments on commit 21ba02e

Please sign in to comment.