From 49e38f5c83eff08ca5674c07c16fd67fc6440b96 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 17 Jan 2022 01:59:29 -0800 Subject: [PATCH 1/5] Fix null comparison for Parquet pruning predicate --- datafusion/src/physical_optimizer/pruning.rs | 74 +++++++++++++++++-- .../src/physical_plan/file_format/parquet.rs | 35 +++++++-- 2 files changed, 96 insertions(+), 13 deletions(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 24334d7983d5..bedab232917c 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -37,12 +37,14 @@ use arrow::{ record_batch::RecordBatch, }; +use crate::prelude::lit; use crate::{ error::{DataFusionError, Result}, execution::context::ExecutionContextState, logical_plan::{Column, DFSchema, Expr, Operator}, optimizer::utils, physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr}, + scalar::ScalarValue, }; /// Interface to pass statistics information to [`PruningPredicates`] @@ -75,6 +77,10 @@ pub trait PruningStatistics { /// return the number of containers (e.g. row groups) being /// pruned with these statistics fn num_containers(&self) -> usize; + + /// return the number of null values for the named column. + /// Note: the returned array must contain `num_containers()` rows. + fn null_counts(&self, column: &Column) -> Option; } /// Evaluates filter expressions on statistics in order to @@ -200,7 +206,7 @@ impl PruningPredicate { struct RequiredStatColumns { /// The statistics required to evaluate this predicate: /// * The unqualified column in the input schema - /// * Statistics type (e.g. Min or Max) + /// * Statistics type (e.g. Min or Max or Null_Count) /// * The field the statistics value should be placed in for /// pruning predicate evaluation columns: Vec<(Column, StatisticsType, Field)>, @@ -281,6 +287,22 @@ impl RequiredStatColumns { ) -> Result { self.stat_column_expr(column, column_expr, field, StatisticsType::Max, "max") } + + /// rewrite col --> col_null_count + fn null_count_column_expr( + &mut self, + column: &Column, + column_expr: &Expr, + field: &Field, + ) -> Result { + self.stat_column_expr( + column, + column_expr, + field, + StatisticsType::NullCount, + "null_count", + ) + } } impl From> for RequiredStatColumns { @@ -329,6 +351,7 @@ fn build_statistics_record_batch( let array = match statistics_type { StatisticsType::Min => statistics.min_values(column), StatisticsType::Max => statistics.max_values(column), + StatisticsType::NullCount => statistics.null_counts(column), }; let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers)); @@ -421,6 +444,13 @@ impl<'a> PruningExpressionBuilder<'a> { &self.scalar_expr } + fn scalar_expr_value(&self) -> Result<&ScalarValue> { + match &self.scalar_expr { + Expr::Literal(s) => Ok(s), + _ => Err(DataFusionError::Plan("Not literal".to_string())), + } + } + fn min_column_expr(&mut self) -> Result { self.required_columns .min_column_expr(&self.column, &self.column_expr, self.field) @@ -430,6 +460,15 @@ impl<'a> PruningExpressionBuilder<'a> { self.required_columns .max_column_expr(&self.column, &self.column_expr, self.field) } + + fn null_count_column_expr(&mut self) -> Result { + let null_count_field = &Field::new(self.field.name(), DataType::Int64, false); + self.required_columns.null_count_column_expr( + &self.column, + &self.column_expr, + null_count_field, + ) + } } /// This function is designed to rewrite the column_expr to @@ -657,13 +696,23 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result< .or(expr_builder.scalar_expr().clone().not_eq(max_column_expr)) } Operator::Eq => { - // column = literal => (min, max) = literal => min <= literal && literal <= max - // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) - let min_column_expr = expr_builder.min_column_expr()?; - let max_column_expr = expr_builder.max_column_expr()?; - min_column_expr - .lt_eq(expr_builder.scalar_expr().clone()) - .and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr)) + if expr_builder + .scalar_expr_value() + .map(|s| s.is_null()) + .unwrap_or(false) + { + // column = null => null_count > 0 + let null_count_column_expr = expr_builder.null_count_column_expr()?; + null_count_column_expr.gt(lit::(0)) + } else { + // column = literal => (min, max) = literal => min <= literal && literal <= max + // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) + let min_column_expr = expr_builder.min_column_expr()?; + let max_column_expr = expr_builder.max_column_expr()?; + min_column_expr + .lt_eq(expr_builder.scalar_expr().clone()) + .and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr)) + } } Operator::Gt => { // column > literal => (min, max) > literal => max > literal @@ -702,6 +751,7 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result< enum StatisticsType { Min, Max, + NullCount, } #[cfg(test)] @@ -812,6 +862,10 @@ mod tests { .map(|container_stats| container_stats.len()) .unwrap_or(0) } + + fn null_counts(&self, column: &Column) -> Option { + None + } } /// Returns the specified min/max container values @@ -833,6 +887,10 @@ mod tests { fn num_containers(&self) -> usize { self.num_containers } + + fn null_counts(&self, column: &Column) -> Option { + None + } } #[test] diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 73f0b8ddb639..0b2b05244865 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -342,6 +342,29 @@ macro_rules! get_min_max_values { }} } +// Extract the null count value on the ParquetStatistics +macro_rules! get_null_count_values { + ($self:expr, $column:expr) => {{ + let column_index = + if let Some((v, _)) = $self.parquet_schema.column_with_name(&$column.name) { + v + } else { + // Named column was not present + return None; + }; + + let scalar_values: Vec = $self + .row_group_metadata + .iter() + .flat_map(|meta| meta.column(column_index).statistics()) + .map(|stats| ScalarValue::Int64(Some(stats.null_count().try_into().unwrap()))) + .collect(); + + // ignore errors converting to arrays (e.g. different types) + ScalarValue::iter_to_array(scalar_values).ok() + }}; +} + impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn min_values(&self, column: &Column) -> Option { get_min_max_values!(self, column, min, min_bytes) @@ -354,6 +377,10 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn num_containers(&self) -> usize { self.row_group_metadata.len() } + + fn null_counts(&self, column: &Column) -> Option { + get_null_count_values!(self, column) + } } fn build_row_group_predicate( @@ -743,7 +770,7 @@ mod tests { &schema_descr, vec![ ParquetStatistics::int32(Some(11), Some(20), None, 0, false), - ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), + ParquetStatistics::boolean(Some(false), Some(true), None, 1, false), ], ); let row_group_metadata = vec![rgm1, rgm2]; @@ -757,10 +784,8 @@ mod tests { .enumerate() .map(|(i, g)| row_group_predicate(g, i)) .collect::>(); - // no row group is filtered out because the predicate expression can't be evaluated - // when a null array is generated for a statistics column, - // because the null values propagate to the end result, making the predicate result undefined - assert_eq!(row_group_filter, vec![true, true]); + // First row group was filtered out because it contains no null value on "c2". + assert_eq!(row_group_filter, vec![false, true]); Ok(()) } From 416806f5c67f76378cce1412363d2a25598d7fe2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 17 Jan 2022 10:26:30 -0800 Subject: [PATCH 2/5] Fix clippy --- datafusion/src/physical_optimizer/pruning.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index bedab232917c..6d6d9b658bdf 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -863,7 +863,7 @@ mod tests { .unwrap_or(0) } - fn null_counts(&self, column: &Column) -> Option { + fn null_counts(&self, _column: &Column) -> Option { None } } @@ -888,7 +888,7 @@ mod tests { self.num_containers } - fn null_counts(&self, column: &Column) -> Option { + fn null_counts(&self, _column: &Column) -> Option { None } } From c9718cf121eac63dfafa7979d745f7d9b44f65d0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 18 Jan 2022 23:19:35 -0800 Subject: [PATCH 3/5] Use u64 --- datafusion/src/physical_optimizer/pruning.rs | 4 ++-- datafusion/src/physical_plan/file_format/parquet.rs | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 6d6d9b658bdf..77fae425129e 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -462,7 +462,7 @@ impl<'a> PruningExpressionBuilder<'a> { } fn null_count_column_expr(&mut self) -> Result { - let null_count_field = &Field::new(self.field.name(), DataType::Int64, false); + let null_count_field = &Field::new(self.field.name(), DataType::UInt64, false); self.required_columns.null_count_column_expr( &self.column, &self.column_expr, @@ -703,7 +703,7 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result< { // column = null => null_count > 0 let null_count_column_expr = expr_builder.null_count_column_expr()?; - null_count_column_expr.gt(lit::(0)) + null_count_column_expr.gt(lit::(0)) } else { // column = literal => (min, max) = literal => min <= literal && literal <= max // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 0b2b05244865..818b2fb85278 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -357,7 +357,9 @@ macro_rules! get_null_count_values { .row_group_metadata .iter() .flat_map(|meta| meta.column(column_index).statistics()) - .map(|stats| ScalarValue::Int64(Some(stats.null_count().try_into().unwrap()))) + .map(|stats| { + ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap())) + }) .collect(); // ignore errors converting to arrays (e.g. different types) From eaedebbd94a6ef29b616239c6e66fbf4137d0cd3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 18 Jan 2022 23:23:45 -0800 Subject: [PATCH 4/5] Address comments --- datafusion/src/physical_optimizer/pruning.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 77fae425129e..703571fc7b41 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -78,7 +78,9 @@ pub trait PruningStatistics { /// pruned with these statistics fn num_containers(&self) -> usize; - /// return the number of null values for the named column. + /// return the number of null values for the named column as an + /// `Option`. + /// /// Note: the returned array must contain `num_containers()` rows. fn null_counts(&self, column: &Column) -> Option; } @@ -444,10 +446,10 @@ impl<'a> PruningExpressionBuilder<'a> { &self.scalar_expr } - fn scalar_expr_value(&self) -> Result<&ScalarValue> { + fn scalar_expr_value(&self) -> Option<&ScalarValue> { match &self.scalar_expr { - Expr::Literal(s) => Ok(s), - _ => Err(DataFusionError::Plan("Not literal".to_string())), + Expr::Literal(s) => Some(s), + _ => None, } } From bc6b9b5ddcdfdf9841856e3709dde7ac00e0b4cf Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 19 Jan 2022 09:38:13 -0800 Subject: [PATCH 5/5] Use IsNull for null count predicate pruning --- datafusion/src/physical_optimizer/pruning.rs | 72 ++++++++++--------- .../src/physical_plan/file_format/parquet.rs | 65 ++++++++++++----- 2 files changed, 87 insertions(+), 50 deletions(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 703571fc7b41..72279f49f57d 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -44,7 +44,6 @@ use crate::{ logical_plan::{Column, DFSchema, Expr, Operator}, optimizer::utils, physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr}, - scalar::ScalarValue, }; /// Interface to pass statistics information to [`PruningPredicates`] @@ -446,13 +445,6 @@ impl<'a> PruningExpressionBuilder<'a> { &self.scalar_expr } - fn scalar_expr_value(&self) -> Option<&ScalarValue> { - match &self.scalar_expr { - Expr::Literal(s) => Some(s), - _ => None, - } - } - fn min_column_expr(&mut self) -> Result { self.required_columns .min_column_expr(&self.column, &self.column_expr, self.field) @@ -462,15 +454,6 @@ impl<'a> PruningExpressionBuilder<'a> { self.required_columns .max_column_expr(&self.column, &self.column_expr, self.field) } - - fn null_count_column_expr(&mut self) -> Result { - let null_count_field = &Field::new(self.field.name(), DataType::UInt64, false); - self.required_columns.null_count_column_expr( - &self.column, - &self.column_expr, - null_count_field, - ) - } } /// This function is designed to rewrite the column_expr to @@ -623,6 +606,32 @@ fn build_single_column_expr( } } +/// Given an expression reference to `expr`, if `expr` is a column expression, +/// returns a pruning expression in terms of IsNull that will evaluate to true +/// if the column may contain null, and false if definitely does not +/// contain null. +fn build_is_null_column_expr( + expr: &Expr, + schema: &Schema, + required_columns: &mut RequiredStatColumns, +) -> Option { + match expr { + Expr::Column(ref col) => { + let field = schema.field_with_name(&col.name).ok()?; + + let null_count_field = &Field::new(field.name(), DataType::UInt64, false); + required_columns + .null_count_column_expr(col, expr, null_count_field) + .map(|null_count_column_expr| { + // IsNull(column) => null_count > 0 + null_count_column_expr.gt(lit::(0)) + }) + .ok() + } + _ => None, + } +} + /// Translate logical filter expression into pruning predicate /// expression that will evaluate to FALSE if it can be determined no /// rows between the min/max values could pass the predicates. @@ -643,6 +652,11 @@ fn build_predicate_expression( // predicate expression can only be a binary expression let (left, op, right) = match expr { Expr::BinaryExpr { left, op, right } => (left, *op, right), + Expr::IsNull(expr) => { + let expr = build_is_null_column_expr(expr, schema, required_columns) + .unwrap_or(unhandled); + return Ok(expr); + } Expr::Column(col) => { let expr = build_single_column_expr(col, schema, required_columns, false) .unwrap_or(unhandled); @@ -698,23 +712,13 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result< .or(expr_builder.scalar_expr().clone().not_eq(max_column_expr)) } Operator::Eq => { - if expr_builder - .scalar_expr_value() - .map(|s| s.is_null()) - .unwrap_or(false) - { - // column = null => null_count > 0 - let null_count_column_expr = expr_builder.null_count_column_expr()?; - null_count_column_expr.gt(lit::(0)) - } else { - // column = literal => (min, max) = literal => min <= literal && literal <= max - // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) - let min_column_expr = expr_builder.min_column_expr()?; - let max_column_expr = expr_builder.max_column_expr()?; - min_column_expr - .lt_eq(expr_builder.scalar_expr().clone()) - .and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr)) - } + // column = literal => (min, max) = literal => min <= literal && literal <= max + // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) + let min_column_expr = expr_builder.min_column_expr()?; + let max_column_expr = expr_builder.max_column_expr()?; + min_column_expr + .lt_eq(expr_builder.scalar_expr().clone()) + .and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr)) } Operator::Gt => { // column > literal => (min, max) > literal => max > literal diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 818b2fb85278..780ad68d2582 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -742,21 +742,7 @@ mod tests { Ok(()) } - #[test] - fn row_group_pruning_predicate_null_expr() -> Result<()> { - use crate::logical_plan::{col, lit}; - // test row group predicate with an unknown (Null) expr - // - // int > 1 and bool = NULL => c1_max > 1 and null - let expr = col("c1") - .gt(lit(15)) - .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Boolean, false), - ])); - let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; - + fn gen_row_group_meta_data_for_pruning_predicate() -> Vec { let schema_descr = get_test_schema_descr(vec![ ("c1", PhysicalType::INT32), ("c2", PhysicalType::BOOLEAN), @@ -775,7 +761,21 @@ mod tests { ParquetStatistics::boolean(Some(false), Some(true), None, 1, false), ], ); - let row_group_metadata = vec![rgm1, rgm2]; + vec![rgm1, rgm2] + } + + #[test] + fn row_group_pruning_predicate_null_expr() -> Result<()> { + use crate::logical_plan::{col, lit}; + // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0 + let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Boolean, false), + ])); + let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; + let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); + let row_group_predicate = build_row_group_predicate( &pruning_predicate, parquet_file_metrics(), @@ -792,6 +792,39 @@ mod tests { Ok(()) } + #[test] + fn row_group_pruning_predicate_eq_null_expr() -> Result<()> { + use crate::logical_plan::{col, lit}; + // test row group predicate with an unknown (Null) expr + // + // int > 1 and bool = NULL => c1_max > 1 and null + let expr = col("c1") + .gt(lit(15)) + .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Boolean, false), + ])); + let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; + let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); + + let row_group_predicate = build_row_group_predicate( + &pruning_predicate, + parquet_file_metrics(), + &row_group_metadata, + ); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + // no row group is filtered out because the predicate expression can't be evaluated + // when a null array is generated for a statistics column, + assert_eq!(row_group_filter, vec![true, true]); + + Ok(()) + } + fn get_row_group_meta_data( schema_descr: &SchemaDescPtr, column_statistics: Vec,