-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix null comparison for Parquet pruning predicate #1595
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -37,12 +37,14 @@ use arrow::{ | |||||
record_batch::RecordBatch, | ||||||
}; | ||||||
|
||||||
use crate::prelude::lit; | ||||||
use crate::{ | ||||||
error::{DataFusionError, Result}, | ||||||
execution::context::ExecutionContextState, | ||||||
logical_plan::{Column, DFSchema, Expr, Operator}, | ||||||
optimizer::utils, | ||||||
physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr}, | ||||||
scalar::ScalarValue, | ||||||
}; | ||||||
|
||||||
/// Interface to pass statistics information to [`PruningPredicates`] | ||||||
|
@@ -75,6 +77,10 @@ pub trait PruningStatistics { | |||||
/// return the number of containers (e.g. row groups) being | ||||||
/// pruned with these statistics | ||||||
fn num_containers(&self) -> usize; | ||||||
|
||||||
/// return the number of null values for the named column. | ||||||
/// Note: the returned array must contain `num_containers()` rows. | ||||||
fn null_counts(&self, column: &Column) -> Option<ArrayRef>; | ||||||
} | ||||||
|
||||||
/// Evaluates filter expressions on statistics in order to | ||||||
|
@@ -200,7 +206,7 @@ impl PruningPredicate { | |||||
struct RequiredStatColumns { | ||||||
/// The statistics required to evaluate this predicate: | ||||||
/// * The unqualified column in the input schema | ||||||
/// * Statistics type (e.g. Min or Max) | ||||||
/// * Statistics type (e.g. Min or Max or Null_Count) | ||||||
/// * The field the statistics value should be placed in for | ||||||
/// pruning predicate evaluation | ||||||
columns: Vec<(Column, StatisticsType, Field)>, | ||||||
|
@@ -281,6 +287,22 @@ impl RequiredStatColumns { | |||||
) -> Result<Expr> { | ||||||
self.stat_column_expr(column, column_expr, field, StatisticsType::Max, "max") | ||||||
} | ||||||
|
||||||
/// rewrite col --> col_null_count | ||||||
fn null_count_column_expr( | ||||||
&mut self, | ||||||
column: &Column, | ||||||
column_expr: &Expr, | ||||||
field: &Field, | ||||||
) -> Result<Expr> { | ||||||
self.stat_column_expr( | ||||||
column, | ||||||
column_expr, | ||||||
field, | ||||||
StatisticsType::NullCount, | ||||||
"null_count", | ||||||
) | ||||||
} | ||||||
} | ||||||
|
||||||
impl From<Vec<(Column, StatisticsType, Field)>> for RequiredStatColumns { | ||||||
|
@@ -329,6 +351,7 @@ fn build_statistics_record_batch<S: PruningStatistics>( | |||||
let array = match statistics_type { | ||||||
StatisticsType::Min => statistics.min_values(column), | ||||||
StatisticsType::Max => statistics.max_values(column), | ||||||
StatisticsType::NullCount => statistics.null_counts(column), | ||||||
}; | ||||||
let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers)); | ||||||
|
||||||
|
@@ -421,6 +444,13 @@ impl<'a> PruningExpressionBuilder<'a> { | |||||
&self.scalar_expr | ||||||
} | ||||||
|
||||||
fn scalar_expr_value(&self) -> Result<&ScalarValue> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Would save a string creation on error (not that it really matters) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||||||
match &self.scalar_expr { | ||||||
Expr::Literal(s) => Ok(s), | ||||||
_ => Err(DataFusionError::Plan("Not literal".to_string())), | ||||||
} | ||||||
} | ||||||
|
||||||
fn min_column_expr(&mut self) -> Result<Expr> { | ||||||
self.required_columns | ||||||
.min_column_expr(&self.column, &self.column_expr, self.field) | ||||||
|
@@ -430,6 +460,15 @@ impl<'a> PruningExpressionBuilder<'a> { | |||||
self.required_columns | ||||||
.max_column_expr(&self.column, &self.column_expr, self.field) | ||||||
} | ||||||
|
||||||
fn null_count_column_expr(&mut self) -> Result<Expr> { | ||||||
let null_count_field = &Field::new(self.field.name(), DataType::Int64, false); | ||||||
self.required_columns.null_count_column_expr( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||||||
&self.column, | ||||||
&self.column_expr, | ||||||
null_count_field, | ||||||
) | ||||||
} | ||||||
} | ||||||
|
||||||
/// This function is designed to rewrite the column_expr to | ||||||
|
@@ -657,13 +696,23 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result< | |||||
.or(expr_builder.scalar_expr().clone().not_eq(max_column_expr)) | ||||||
} | ||||||
Operator::Eq => { | ||||||
// column = literal => (min, max) = literal => min <= literal && literal <= max | ||||||
// (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) | ||||||
let min_column_expr = expr_builder.min_column_expr()?; | ||||||
let max_column_expr = expr_builder.max_column_expr()?; | ||||||
min_column_expr | ||||||
.lt_eq(expr_builder.scalar_expr().clone()) | ||||||
.and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr)) | ||||||
if expr_builder | ||||||
.scalar_expr_value() | ||||||
.map(|s| s.is_null()) | ||||||
.unwrap_or(false) | ||||||
{ | ||||||
// column = null => null_count > 0 | ||||||
let null_count_column_expr = expr_builder.null_count_column_expr()?; | ||||||
null_count_column_expr.gt(lit::<i64>(0)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am curious why we use a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, you're right. This should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to |
||||||
} else { | ||||||
// column = literal => (min, max) = literal => min <= literal && literal <= max | ||||||
// (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) | ||||||
let min_column_expr = expr_builder.min_column_expr()?; | ||||||
let max_column_expr = expr_builder.max_column_expr()?; | ||||||
min_column_expr | ||||||
.lt_eq(expr_builder.scalar_expr().clone()) | ||||||
.and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr)) | ||||||
} | ||||||
} | ||||||
Operator::Gt => { | ||||||
// column > literal => (min, max) > literal => max > literal | ||||||
|
@@ -702,6 +751,7 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result< | |||||
enum StatisticsType { | ||||||
Min, | ||||||
Max, | ||||||
NullCount, | ||||||
} | ||||||
|
||||||
#[cfg(test)] | ||||||
|
@@ -812,6 +862,10 @@ mod tests { | |||||
.map(|container_stats| container_stats.len()) | ||||||
.unwrap_or(0) | ||||||
} | ||||||
|
||||||
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> { | ||||||
None | ||||||
} | ||||||
} | ||||||
|
||||||
/// Returns the specified min/max container values | ||||||
|
@@ -833,6 +887,10 @@ mod tests { | |||||
fn num_containers(&self) -> usize { | ||||||
self.num_containers | ||||||
} | ||||||
|
||||||
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> { | ||||||
None | ||||||
} | ||||||
} | ||||||
|
||||||
#[test] | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -342,6 +342,29 @@ macro_rules! get_min_max_values { | |
}} | ||
} | ||
|
||
// Extract the null count value on the ParquetStatistics | ||
macro_rules! get_null_count_values { | ||
($self:expr, $column:expr) => {{ | ||
let column_index = | ||
if let Some((v, _)) = $self.parquet_schema.column_with_name(&$column.name) { | ||
v | ||
} else { | ||
// Named column was not present | ||
return None; | ||
}; | ||
|
||
let scalar_values: Vec<ScalarValue> = $self | ||
.row_group_metadata | ||
.iter() | ||
.flat_map(|meta| meta.column(column_index).statistics()) | ||
.map(|stats| ScalarValue::Int64(Some(stats.null_count().try_into().unwrap()))) | ||
.collect(); | ||
|
||
// ignore errors converting to arrays (e.g. different types) | ||
ScalarValue::iter_to_array(scalar_values).ok() | ||
}}; | ||
} | ||
|
||
impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { | ||
fn min_values(&self, column: &Column) -> Option<ArrayRef> { | ||
get_min_max_values!(self, column, min, min_bytes) | ||
|
@@ -354,6 +377,10 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { | |
fn num_containers(&self) -> usize { | ||
self.row_group_metadata.len() | ||
} | ||
|
||
fn null_counts(&self, column: &Column) -> Option<ArrayRef> { | ||
get_null_count_values!(self, column) | ||
} | ||
} | ||
|
||
fn build_row_group_predicate( | ||
|
@@ -743,7 +770,7 @@ mod tests { | |
&schema_descr, | ||
vec![ | ||
ParquetStatistics::int32(Some(11), Some(20), None, 0, false), | ||
ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), | ||
ParquetStatistics::boolean(Some(false), Some(true), None, 1, false), | ||
], | ||
); | ||
let row_group_metadata = vec![rgm1, rgm2]; | ||
|
@@ -757,10 +784,8 @@ mod tests { | |
.enumerate() | ||
.map(|(i, g)| row_group_predicate(g, i)) | ||
.collect::<Vec<_>>(); | ||
// no row group is filtered out because the predicate expression can't be evaluated | ||
// when a null array is generated for a statistics column, | ||
// because the null values propagate to the end result, making the predicate result undefined | ||
assert_eq!(row_group_filter, vec![true, true]); | ||
// First row group was filtered out because it contains no null value on "c2". | ||
assert_eq!(row_group_filter, vec![false, true]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually think this could be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure about the expression semantics in datafusion. In Spark, the predicate should be I see there is also I can fix it if you agree that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is related to the "Confusion 1 and 2". I guess this is also why you feel confused about treating There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In sql It would make a lot of sense to me to rewrite There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, I'm surprised when I looked at the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you like me to fix it here or in a following PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've updated to use IsNull for predicate pruning. |
||
|
||
Ok(()) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to look this up to figure out what type this was required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.