Skip to content

Commit

Permalink
Minor: Improve documentation for Filter Pushdown (#8023)
Browse files Browse the repository at this point in the history
* Minor: Improve documentation for Fulter Pushdown

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: jakevin <[email protected]>

* Apply suggestions from code review

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: Alex Huang <[email protected]>

---------

Co-authored-by: jakevin <[email protected]>
Co-authored-by: Alex Huang <[email protected]>
  • Loading branch information
3 people authored Nov 2, 2023
1 parent d2671cd commit b089137
Showing 1 changed file with 82 additions and 19 deletions.
101 changes: 82 additions & 19 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// specific language governing permissions and limitations
// under the License.

//! Push Down Filter optimizer rule ensures that filters are applied as early as possible in the plan
//! [`PushDownFilter`] Moves filters so they are applied as early as possible in
//! the plan.
use crate::optimizer::ApplyOrder;
use crate::utils::{conjunction, split_conjunction, split_conjunction_owned};
Expand All @@ -33,31 +34,93 @@ use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

/// Push Down Filter optimizer rule pushes filter clauses down the plan
/// Optimizer rule for pushing (moving) filter expressions down in a plan so
/// they are applied as early as possible.
///
/// # Introduction
/// A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)).
/// An example of a filter-commutative operation is a projection; a counter-example is `limit`.
///
/// The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
/// can commute with a filter that depends on A only, but does not commute with a filter that depends
/// on SUM(B).
/// The goal of this rule is to improve query performance by eliminating
/// redundant work.
///
/// For example, given a plan that sorts all values where `a > 10`:
///
/// ```text
/// Filter (a > 10)
/// Sort (a, b)
/// ```
///
/// A better plan is to filter the data *before* the Sort, which sorts fewer
/// rows and therefore does less work overall:
///
/// ```text
/// Sort (a, b)
/// Filter (a > 10) <-- Filter is moved before the sort
/// ```
///
/// However it is not always possible to push filters down. For example, given a
/// plan that finds the top 3 values and then keeps only those that are greater
/// than 10, if the filter is pushed below the limit it would produce a
/// different result.
///
/// ```text
/// Filter (a > 10) <-- can not move this Filter before the limit
/// Limit (fetch=3)
/// Sort (a, b)
/// ```
///
///
/// More formally, a filter-commutative operation is an operation `op` that
/// satisfies `filter(op(data)) = op(filter(data))`.
///
/// The filter-commutative property is plan and column-specific. A filter on `a`
/// can be pushed through a `Aggregate(group_by = [a], agg=[SUM(b))`. However, a
/// filter on `SUM(b)` can not be pushed through the same aggregate.
///
/// # Handling Conjunctions
///
/// It is possible to only push down **part** of a filter expression if is
/// connected with `AND`s (more formally if it is a "conjunction").
///
/// For example, given the following plan:
///
/// ```text
/// Filter(a > 10 AND SUM(b) < 5)
/// Aggregate(group_by = [a], agg = [SUM(b))
/// ```
///
/// The `a > 10` is commutative with the `Aggregate` but `SUM(b) < 5` is not.
/// Therefore it is possible to only push part of the expression, resulting in:
///
/// ```text
/// Filter(SUM(b) < 5)
/// Aggregate(group_by = [a], agg = [SUM(b))
/// Filter(a > 10)
/// ```
///
/// # Handling Column Aliases
///
/// This optimizer commutes filters with filter-commutative operations to push the filters
/// the closest possible to the scans, re-writing the filter expressions by every
/// projection that changes the filter's expression.
/// This optimizer must sometimes handle re-writing filter expressions when they
/// pushed, for example if there is a projection that aliases `a+1` to `"b"`:
///
/// Filter: b Gt Int64(10)
/// Projection: a AS b
/// ```text
/// Filter (b > 10)
/// Projection: [a+1 AS "b"] <-- changes the name of `a+1` to `b`
/// ```
///
/// is optimized to
/// To apply the filter prior to the `Projection`, all references to `b` must be
/// rewritten to `a+1`:
///
/// Projection: a AS b
/// Filter: a Gt Int64(10) <--- changed from b to a
/// ```text
/// Projection: a AS "b"
/// Filter: (a + 1 > 10) <--- changed from b to a + 1
/// ```
/// # Implementation Notes
///
/// This performs a single pass through the plan. When it passes through a filter, it stores that filter,
/// and when it reaches a node that does not commute with it, it adds the filter to that place.
/// When it passes through a projection, it re-writes the filter's expression taking into account that projection.
/// When multiple filters would have been written, it `AND` their expressions into a single expression.
/// This implementation performs a single pass through the plan, "pushing" down
/// filters. When it passes through a filter, it stores that filter, and when it
/// reaches a plan node that does not commute with that filter, it adds the
/// filter to that place. When it passes through a projection, it re-writes the
/// filter's expression taking into account that projection.
#[derive(Default)]
pub struct PushDownFilter {}

Expand Down

0 comments on commit b089137

Please sign in to comment.