Skip to content

Commit

Permalink
feat(pruning): add predicate rewrite for `CASE WHEN x_null_count = x_…
Browse files Browse the repository at this point in the history
…row_count THEN false ELSE ... END`
  • Loading branch information
appletreeisyellow committed Feb 13, 2024
1 parent 83a8f5c commit e9a7592
Showing 1 changed file with 148 additions and 3 deletions.
151 changes: 148 additions & 3 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ pub trait PruningStatistics {
/// these statistics.
///
/// This value corresponds to the size of the [`ArrayRef`] returned by
/// [`Self::min_values`], [`Self::max_values`], and [`Self::null_counts`].
/// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
/// and [`Self::row_counts`].
fn num_containers(&self) -> usize;

/// Return the number of null values for the named column as an
Expand Down Expand Up @@ -325,6 +326,19 @@ pub trait PruningStatistics {
/// `x = 5 AND y = 10` | `x_min <= 5 AND 5 <= x_max AND y_min <= 10 AND 10 <= y_max`
/// `x IS NULL` | `x_null_count > 0`
///
/// In addition, for a given column `x`, the `x_null_count` and `x_row_count` will
/// be wrapped around the above rewritten predicate to form the final rewritten predicate.
/// This step is necessary to handle the case where the column `x` is kown to be all nulls,
/// This is different from knowing nothing about the column `x`, which confusionly is
/// enconded by returning `NULL` for the min/max values from [`PruningStatistics::min_values`].
///
/// Original Predicate | Rewritten Predicate
/// ------------------ | --------------------
/// `x = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END`
/// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END`
/// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max AND y_min <= 10 AND 10 <= y_max END`
/// `x IS NULL` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_null_count > 0 END`
///
/// ## Predicate Evaluation
/// The PruningPredicate works in two passes
///
Expand Down Expand Up @@ -753,6 +767,22 @@ impl RequiredColumns {
"null_count",
)
}

/// rewrite col --> col_row_count
fn row_count_column_expr(
&mut self,
column: &phys_expr::Column,
column_expr: &Arc<dyn PhysicalExpr>,
field: &Field,
) -> Result<Arc<dyn PhysicalExpr>> {
self.stat_column_expr(
column,
column_expr,
field,
StatisticsType::RowCount,
"row_count",
)
}
}

impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
Expand Down Expand Up @@ -803,6 +833,7 @@ fn build_statistics_record_batch<S: PruningStatistics>(
StatisticsType::Min => statistics.min_values(&column),
StatisticsType::Max => statistics.max_values(&column),
StatisticsType::NullCount => statistics.null_counts(&column),
StatisticsType::RowCount => statistics.row_counts(&column),
};
let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));

Expand Down Expand Up @@ -912,6 +943,22 @@ impl<'a> PruningExpressionBuilder<'a> {
self.required_columns
.max_column_expr(&self.column, &self.column_expr, self.field)
}

fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
self.required_columns.null_count_column_expr(
&self.column,
&self.column_expr,
self.field,
)
}

fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
self.required_columns.row_count_column_expr(
&self.column,
&self.column_expr,
self.field,
)
}
}

/// This function is designed to rewrite the column_expr to
Expand Down Expand Up @@ -1329,14 +1376,51 @@ fn build_statistics_expr(
);
}
};
let statistics_expr = wrap_case_expr(statistics_expr, expr_builder)?;
Ok(statistics_expr)
}

/// Wrap the statistics expression in a case expression.
/// This is necessary to handle the case where the column is known
/// to be all nulls.
///
/// For example:
///
/// `x_min <= 10 AND x_max >= 10`
///
/// will become
///
/// ```
/// CASE
/// WHEN y_null_count = y_row_count THEN false
/// ELSE x_min <= 10 AND x_max >= 10
/// END
/// ```
fn wrap_case_expr(
statistics_expr: Arc<dyn PhysicalExpr>,
expr_builder: &mut PruningExpressionBuilder,
) -> Result<Arc<dyn PhysicalExpr>> {
let nul_count_row_count_comparesion = Arc::new(phys_expr::BinaryExpr::new(
expr_builder.null_count_column_expr()?,
Operator::Eq,
expr_builder.row_count_column_expr()?,
));
Ok(Arc::new(phys_expr::CaseExpr::try_new(
Some(nul_count_row_count_comparesion),
vec![(
Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true)))),
Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false)))),
)],
Some(statistics_expr),
)?))
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum StatisticsType {
Min,
Max,
NullCount,
RowCount,
}

#[cfg(test)]
Expand Down Expand Up @@ -2881,15 +2965,76 @@ mod tests {
let expected_ret = &[false, true, true, true, false];

prune_with_expr(
// i IS NULL, with actual null statistcs
// i IS NULL, with actual null statistics
col("i").is_null(),
&schema,
&statistics,
expected_ret,
);
}

// TODO chunchun: add test for a column is all null, use row_counts
#[test]
fn prune_int32_column_is_known_all_null() {
let (schema, statistics) = int32_setup();

// Expression "i < 0"
// i [-5, 5] ==> some rows could pass (must keep)
// i [1, 11] ==> no rows can pass (not keep)
// i [-11, -1] ==> all rows must pass (must keep)
// i [NULL, NULL] ==> unknown (must keep)
// i [1, NULL] ==> no rows can pass (not keep)
let expected_ret = &[true, false, true, true, false];

prune_with_expr(
// i < 0
col("i").lt(lit(0)),
&schema,
&statistics,
expected_ret,
);

// provide row counts for each column
let statistics = statistics.with_row_counts(
"i",
vec![
Some(10),
Some(9),
None, // unknown row counts
Some(4), // 4 rows of data
Some(10),
],
);

// provide null counts for each column
let statistics = statistics.with_null_counts(
"i",
vec![
Some(0), // no nulls
Some(1), // 1 null
None, // unknown nulls
Some(4), // 4 nulls, which is the same as the row counts, i.e. this column is all null (don't keep)
Some(0), // 0 nulls (max=null too which means no known max)
],
);

// Expression "i < 0" with actual null and row counts statistics
// col | min, max | row counts | null counts |
// ----+--------------+------------+-------------+
// i | [-5, 5] | 10 | 0 | ==> Some rows could pass (must keep)
// i | [1, 11] | 9 | 1 | ==> No rows can pass (not keep)
// i | [-11,-1] | Unknown | Unknown | ==> All rows must pass (must keep)
// i | [NULL, NULL] | 4 | 4 | ==> The column is all null (not keep)
// i | [1, NULL] | 10 | 0 | ==> No rows can pass (not keep)
let expected_ret = &[true, false, true, false, false];

prune_with_expr(
// i < 0, with actual null and row counts statistics
col("i").lt(lit(0)),
&schema,
&statistics,
expected_ret,
);
}

#[test]
fn prune_cast_column_scalar() {
Expand Down

0 comments on commit e9a7592

Please sign in to comment.