Skip to content

Commit

Permalink
Use IsNull for null count predicate pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 19, 2022
1 parent eaedebb commit bc6b9b5
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 50 deletions.
72 changes: 38 additions & 34 deletions datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use crate::{
logical_plan::{Column, DFSchema, Expr, Operator},
optimizer::utils,
physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
scalar::ScalarValue,
};

/// Interface to pass statistics information to [`PruningPredicates`]
Expand Down Expand Up @@ -446,13 +445,6 @@ impl<'a> PruningExpressionBuilder<'a> {
&self.scalar_expr
}

fn scalar_expr_value(&self) -> Option<&ScalarValue> {
match &self.scalar_expr {
Expr::Literal(s) => Some(s),
_ => None,
}
}

fn min_column_expr(&mut self) -> Result<Expr> {
self.required_columns
.min_column_expr(&self.column, &self.column_expr, self.field)
Expand All @@ -462,15 +454,6 @@ impl<'a> PruningExpressionBuilder<'a> {
self.required_columns
.max_column_expr(&self.column, &self.column_expr, self.field)
}

fn null_count_column_expr(&mut self) -> Result<Expr> {
let null_count_field = &Field::new(self.field.name(), DataType::UInt64, false);
self.required_columns.null_count_column_expr(
&self.column,
&self.column_expr,
null_count_field,
)
}
}

/// This function is designed to rewrite the column_expr to
Expand Down Expand Up @@ -623,6 +606,32 @@ fn build_single_column_expr(
}
}

/// Given an expression reference to `expr`, if `expr` is a column expression,
/// returns a pruning expression in terms of IsNull that will evaluate to true
/// if the column may contain null, and false if definitely does not
/// contain null.
fn build_is_null_column_expr(
expr: &Expr,
schema: &Schema,
required_columns: &mut RequiredStatColumns,
) -> Option<Expr> {
match expr {
Expr::Column(ref col) => {
let field = schema.field_with_name(&col.name).ok()?;

let null_count_field = &Field::new(field.name(), DataType::UInt64, false);
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNull(column) => null_count > 0
null_count_column_expr.gt(lit::<u64>(0))
})
.ok()
}
_ => None,
}
}

/// Translate logical filter expression into pruning predicate
/// expression that will evaluate to FALSE if it can be determined no
/// rows between the min/max values could pass the predicates.
Expand All @@ -643,6 +652,11 @@ fn build_predicate_expression(
// predicate expression can only be a binary expression
let (left, op, right) = match expr {
Expr::BinaryExpr { left, op, right } => (left, *op, right),
Expr::IsNull(expr) => {
let expr = build_is_null_column_expr(expr, schema, required_columns)
.unwrap_or(unhandled);
return Ok(expr);
}
Expr::Column(col) => {
let expr = build_single_column_expr(col, schema, required_columns, false)
.unwrap_or(unhandled);
Expand Down Expand Up @@ -698,23 +712,13 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result<
.or(expr_builder.scalar_expr().clone().not_eq(max_column_expr))
}
Operator::Eq => {
if expr_builder
.scalar_expr_value()
.map(|s| s.is_null())
.unwrap_or(false)
{
// column = null => null_count > 0
let null_count_column_expr = expr_builder.null_count_column_expr()?;
null_count_column_expr.gt(lit::<u64>(0))
} else {
// column = literal => (min, max) = literal => min <= literal && literal <= max
// (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
let min_column_expr = expr_builder.min_column_expr()?;
let max_column_expr = expr_builder.max_column_expr()?;
min_column_expr
.lt_eq(expr_builder.scalar_expr().clone())
.and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr))
}
// column = literal => (min, max) = literal => min <= literal && literal <= max
// (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
let min_column_expr = expr_builder.min_column_expr()?;
let max_column_expr = expr_builder.max_column_expr()?;
min_column_expr
.lt_eq(expr_builder.scalar_expr().clone())
.and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr))
}
Operator::Gt => {
// column > literal => (min, max) > literal => max > literal
Expand Down
65 changes: 49 additions & 16 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,21 +742,7 @@ mod tests {
Ok(())
}

#[test]
fn row_group_pruning_predicate_null_expr() -> Result<()> {
use crate::logical_plan::{col, lit};
// test row group predicate with an unknown (Null) expr
//
// int > 1 and bool = NULL => c1_max > 1 and null
let expr = col("c1")
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;

fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
let schema_descr = get_test_schema_descr(vec![
("c1", PhysicalType::INT32),
("c2", PhysicalType::BOOLEAN),
Expand All @@ -775,7 +761,21 @@ mod tests {
ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
],
);
let row_group_metadata = vec![rgm1, rgm2];
vec![rgm1, rgm2]
}

#[test]
fn row_group_pruning_predicate_null_expr() -> Result<()> {
use crate::logical_plan::{col, lit};
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();

let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
Expand All @@ -792,6 +792,39 @@ mod tests {
Ok(())
}

#[test]
fn row_group_pruning_predicate_eq_null_expr() -> Result<()> {
use crate::logical_plan::{col, lit};
// test row group predicate with an unknown (Null) expr
//
// int > 1 and bool = NULL => c1_max > 1 and null
let expr = col("c1")
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();

let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
.map(|(i, g)| row_group_predicate(g, i))
.collect::<Vec<_>>();
// no row group is filtered out because the predicate expression can't be evaluated
// when a null array is generated for a statistics column,
assert_eq!(row_group_filter, vec![true, true]);

Ok(())
}

fn get_row_group_meta_data(
schema_descr: &SchemaDescPtr,
column_statistics: Vec<ParquetStatistics>,
Expand Down

0 comments on commit bc6b9b5

Please sign in to comment.