Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11074: [Rust][DataFusion] Implement predicate push-down for parquet tables #9064

Closed
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::logical_plan::Expr;
use crate::physical_plan::parquet::ParquetExec;
use crate::physical_plan::ExecutionPlan;

use super::datasource::TableProviderFilterPushDown;

/// Table-based representation of a `ParquetFile`.
pub struct ParquetTable {
path: String,
Expand All @@ -41,7 +43,7 @@ pub struct ParquetTable {
impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(path: &str, max_concurrency: usize) -> Result<Self> {
let parquet_exec = ParquetExec::try_from_path(path, None, 0, 1)?;
let parquet_exec = ParquetExec::try_from_path(path, None, None, 0, 1)?;
let schema = parquet_exec.schema();
Ok(Self {
path: path.to_string(),
Expand All @@ -62,17 +64,26 @@ impl TableProvider for ParquetTable {
self.schema.clone()
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Inexact)
}

/// Scan the file(s), using the provided projection, and return one BatchIterator per
/// partition.
fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
_filters: &[Expr],
filters: &[Expr],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, I feel we should have a proper filter API defined in data fusion which can be shared among various data sources. On the other hand, the actual filtering logic should be implemented by different data sources / formats, probably via converting the data fusion's filter API to the corresponding ones from the latter.

But this is a very good start and we can probably do them as follow ups (if we don't care much for API changes).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that starting with this PR and then extending the approach to be something more generic is a good approach.

) -> Result<Arc<dyn ExecutionPlan>> {
let predicate = combine_filters(filters);
Ok(Arc::new(ParquetExec::try_from_path(
&self.path,
projection.clone(),
predicate,
batch_size,
self.max_concurrency,
)?))
Expand All @@ -83,6 +94,22 @@ impl TableProvider for ParquetTable {
}
}

/// Combines an array of filter expressions into a single filter expression
/// consisting of the input filter expressions joined with logical AND.
/// Returns None if the filters array is empty.
fn combine_filters(filters: &[Expr]) -> Option<Expr> {
if filters.is_empty() {
return None;
}
let combined_filter = filters
.iter()
.skip(1)
.fold(filters[0].clone(), |acc, filter| {
crate::logical_plan::and(acc, filter.clone())
});
Some(combined_filter)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -328,4 +355,29 @@ mod tests {
.expect("should have received at least one batch")
.map_err(|e| e.into())
}

#[test]
fn combine_zero_filters() {
let result = combine_filters(&[]);
assert_eq!(result, None);
}

#[test]
fn combine_one_filter() {
use crate::logical_plan::{binary_expr, col, lit, Operator};
let filter = binary_expr(col("c1"), Operator::Lt, lit(1));
let result = combine_filters(&[filter.clone()]);
assert_eq!(result, Some(filter));
}

#[test]
fn combine_multiple_filters() {
use crate::logical_plan::{and, binary_expr, col, lit, Operator};
let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1));
let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2));
let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3));
let result =
combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]);
assert_eq!(result, Some(and(and(filter1, filter2), filter3)));
}
}
2 changes: 1 addition & 1 deletion rust/datafusion/src/logical_plan/operators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{fmt, ops};
use super::{binary_expr, Expr};

/// Operators applied to expressions
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Operator {
/// Expressions are equal
Eq,
Expand Down
Loading