Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix null comparison for Parquet pruning predicate #1595

Merged
merged 5 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use arrow::{
record_batch::RecordBatch,
};

use crate::prelude::lit;
use crate::{
error::{DataFusionError, Result},
execution::context::ExecutionContextState,
Expand Down Expand Up @@ -75,6 +76,12 @@ pub trait PruningStatistics {
/// return the number of containers (e.g. row groups) being
/// pruned with these statistics
fn num_containers(&self) -> usize;

/// return the number of null values for the named column as an
/// `Option<UInt64Array>`.
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
}

/// Evaluates filter expressions on statistics in order to
Expand Down Expand Up @@ -200,7 +207,7 @@ impl PruningPredicate {
struct RequiredStatColumns {
/// The statistics required to evaluate this predicate:
/// * The unqualified column in the input schema
/// * Statistics type (e.g. Min or Max)
/// * Statistics type (e.g. Min or Max or Null_Count)
/// * The field the statistics value should be placed in for
/// pruning predicate evaluation
columns: Vec<(Column, StatisticsType, Field)>,
Expand Down Expand Up @@ -281,6 +288,22 @@ impl RequiredStatColumns {
) -> Result<Expr> {
self.stat_column_expr(column, column_expr, field, StatisticsType::Max, "max")
}

/// rewrite col --> col_null_count
fn null_count_column_expr(
&mut self,
column: &Column,
column_expr: &Expr,
field: &Field,
) -> Result<Expr> {
self.stat_column_expr(
column,
column_expr,
field,
StatisticsType::NullCount,
"null_count",
)
}
}

impl From<Vec<(Column, StatisticsType, Field)>> for RequiredStatColumns {
Expand Down Expand Up @@ -329,6 +352,7 @@ fn build_statistics_record_batch<S: PruningStatistics>(
let array = match statistics_type {
StatisticsType::Min => statistics.min_values(column),
StatisticsType::Max => statistics.max_values(column),
StatisticsType::NullCount => statistics.null_counts(column),
};
let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));

Expand Down Expand Up @@ -582,6 +606,32 @@ fn build_single_column_expr(
}
}

/// Given an expression reference to `expr`, if `expr` is a column expression,
/// returns a pruning expression in terms of IsNull that will evaluate to true
/// if the column may contain null, and false if definitely does not
/// contain null.
fn build_is_null_column_expr(
expr: &Expr,
schema: &Schema,
required_columns: &mut RequiredStatColumns,
) -> Option<Expr> {
match expr {
Expr::Column(ref col) => {
let field = schema.field_with_name(&col.name).ok()?;

let null_count_field = &Field::new(field.name(), DataType::UInt64, false);
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNull(column) => null_count > 0
null_count_column_expr.gt(lit::<u64>(0))
})
.ok()
}
_ => None,
}
}

/// Translate logical filter expression into pruning predicate
/// expression that will evaluate to FALSE if it can be determined no
/// rows between the min/max values could pass the predicates.
Expand All @@ -602,6 +652,11 @@ fn build_predicate_expression(
// predicate expression can only be a binary expression
let (left, op, right) = match expr {
Expr::BinaryExpr { left, op, right } => (left, *op, right),
Expr::IsNull(expr) => {
let expr = build_is_null_column_expr(expr, schema, required_columns)
.unwrap_or(unhandled);
return Ok(expr);
}
Expr::Column(col) => {
let expr = build_single_column_expr(col, schema, required_columns, false)
.unwrap_or(unhandled);
Expand Down Expand Up @@ -702,6 +757,7 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result<
enum StatisticsType {
Min,
Max,
NullCount,
}

#[cfg(test)]
Expand Down Expand Up @@ -812,6 +868,10 @@ mod tests {
.map(|container_stats| container_stats.len())
.unwrap_or(0)
}

fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}
}

/// Returns the specified min/max container values
Expand All @@ -833,6 +893,10 @@ mod tests {
fn num_containers(&self) -> usize {
self.num_containers
}

fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}
}

#[test]
Expand Down
96 changes: 78 additions & 18 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,31 @@ macro_rules! get_min_max_values {
}}
}

// Extract the null count value on the ParquetStatistics
macro_rules! get_null_count_values {
($self:expr, $column:expr) => {{
let column_index =
if let Some((v, _)) = $self.parquet_schema.column_with_name(&$column.name) {
v
} else {
// Named column was not present
return None;
};

let scalar_values: Vec<ScalarValue> = $self
.row_group_metadata
.iter()
.flat_map(|meta| meta.column(column_index).statistics())
.map(|stats| {
ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap()))
})
.collect();

// ignore errors converting to arrays (e.g. different types)
ScalarValue::iter_to_array(scalar_values).ok()
}};
}

impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, min, min_bytes)
Expand All @@ -354,6 +379,10 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn num_containers(&self) -> usize {
self.row_group_metadata.len()
}

fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
get_null_count_values!(self, column)
}
}

fn build_row_group_predicate(
Expand Down Expand Up @@ -713,21 +742,7 @@ mod tests {
Ok(())
}

#[test]
fn row_group_pruning_predicate_null_expr() -> Result<()> {
use crate::logical_plan::{col, lit};
// test row group predicate with an unknown (Null) expr
//
// int > 1 and bool = NULL => c1_max > 1 and null
let expr = col("c1")
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;

fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
let schema_descr = get_test_schema_descr(vec![
("c1", PhysicalType::INT32),
("c2", PhysicalType::BOOLEAN),
Expand All @@ -743,10 +758,56 @@ mod tests {
&schema_descr,
vec![
ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
ParquetStatistics::boolean(Some(false), Some(true), None, 0, false),
ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
],
);
let row_group_metadata = vec![rgm1, rgm2];
vec![rgm1, rgm2]
}

#[test]
fn row_group_pruning_predicate_null_expr() -> Result<()> {
use crate::logical_plan::{col, lit};
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();

let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
.map(|(i, g)| row_group_predicate(g, i))
.collect::<Vec<_>>();
// First row group was filtered out because it contains no null value on "c2".
assert_eq!(row_group_filter, vec![false, true]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think this could be vec![false, false] as the predicate can never be true (int > 1 AND bool = NULL is always NULL)

Copy link
Member Author

@viirya viirya Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the expression semantics in datafusion. In Spark, the predicate should be IsNull that checks the null value. Here I follow the original expression bool = NULL.

I see there is also IsNull predicate expression, but I don't see IsNull is handled in predicate pushdown. I don't know if this is intentional (i.e. using = to do null predicate pushdown) or a bug.

I can fix it if you agree that IsNull is correct way to handle null predicate here.

Copy link
Member Author

@viirya viirya Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is related to the "Confusion 1 and 2". I guess this is also why you feel confused about treating = specially.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In sql IsNull is the correct way to test a column for null as well 👍

It would make a lot of sense to me to rewrite x IS NULL --> 0 > x_null_count

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I'm surprised when I looked at the bool = NULL and confused too. I guess this is how datafusion works but seems not :). Let me fix it together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like me to fix it here or in a following PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated to use IsNull for predicate pruning.


Ok(())
}

#[test]
fn row_group_pruning_predicate_eq_null_expr() -> Result<()> {
use crate::logical_plan::{col, lit};
// test row group predicate with an unknown (Null) expr
//
// int > 1 and bool = NULL => c1_max > 1 and null
let expr = col("c1")
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();

let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
Expand All @@ -759,7 +820,6 @@ mod tests {
.collect::<Vec<_>>();
// no row group is filtered out because the predicate expression can't be evaluated
// when a null array is generated for a statistics column,
// because the null values propagate to the end result, making the predicate result undefined
assert_eq!(row_group_filter, vec![true, true]);

Ok(())
Expand Down