Skip to content

Commit

Permalink
feat: add eliminate union optimizer rule (#16478)
Browse files Browse the repository at this point in the history
* feat: add optimizer rule to eliminate union

* fix test

* fix
  • Loading branch information
xudong963 authored Sep 20, 2024
1 parent 1fac99d commit fdd7d3b
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/optimizer/rule/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use databend_common_exception::Result;

use super::rewrite::RuleCommuteJoin;
use super::rewrite::RuleEliminateEvalScalar;
use super::rewrite::RuleEliminateUnion;
use super::rewrite::RuleFoldCountAggregate;
use super::rewrite::RuleNormalizeScalarFilter;
use super::rewrite::RulePushDownFilterAggregate;
Expand Down Expand Up @@ -58,6 +59,7 @@ pub const MAX_PUSH_DOWN_LIMIT: usize = 10000;
impl RuleFactory {
pub fn create_rule(id: RuleID, metadata: MetadataRef) -> Result<RulePtr> {
match id {
RuleID::EliminateUnion => Ok(Box::new(RuleEliminateUnion::new(metadata))),
RuleID::EliminateEvalScalar => Ok(Box::new(RuleEliminateEvalScalar::new(metadata))),
RuleID::PushDownFilterUnion => Ok(Box::new(RulePushDownFilterUnion::new())),
RuleID::PushDownFilterEvalScalar => Ok(Box::new(RulePushDownFilterEvalScalar::new())),
Expand Down
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/optimizer/rule/rewrite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod rule_commute_join;
mod rule_eliminate_eval_scalar;
mod rule_eliminate_filter;
mod rule_eliminate_sort;
mod rule_eliminate_union;
mod rule_fold_count_aggregate;
mod rule_merge_eval_scalar;
mod rule_merge_filter;
Expand Down Expand Up @@ -49,6 +50,7 @@ pub use rule_commute_join::RuleCommuteJoin;
pub use rule_eliminate_eval_scalar::RuleEliminateEvalScalar;
pub use rule_eliminate_filter::RuleEliminateFilter;
pub use rule_eliminate_sort::RuleEliminateSort;
pub use rule_eliminate_union::RuleEliminateUnion;
pub use rule_fold_count_aggregate::RuleFoldCountAggregate;
pub use rule_merge_eval_scalar::RuleMergeEvalScalar;
pub use rule_merge_filter::RuleMergeFilter;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRefExt;

use crate::optimizer::extract::Matcher;
use crate::optimizer::rule::Rule;
use crate::optimizer::rule::RuleID;
use crate::optimizer::rule::TransformResult;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
use crate::plans::ConstantTableScan;
use crate::plans::Operator;
use crate::plans::RelOp;
use crate::plans::RelOperator;
use crate::plans::UnionAll;
use crate::MetadataRef;

pub struct RuleEliminateUnion {
id: RuleID,
matchers: Vec<Matcher>,
metadata: MetadataRef,
}

impl RuleEliminateUnion {
pub fn new(metadata: MetadataRef) -> Self {
Self {
id: RuleID::EliminateUnion,
matchers: vec![Matcher::MatchOp {
op_type: RelOp::UnionAll,
children: vec![Matcher::Leaf, Matcher::Leaf],
}],
metadata,
}
}

fn is_empty_scan(s_expr: &SExpr) -> Result<bool> {
let child_num = s_expr.children.len();
if child_num > 1 {
return Ok(false);
}
if child_num == 0 {
Ok(matches!(
s_expr.plan(),
RelOperator::ConstantTableScan(ConstantTableScan { num_rows: 0, .. })
))
} else {
Self::is_empty_scan(s_expr.child(0)?)
}
}
}

impl Rule for RuleEliminateUnion {
fn id(&self) -> RuleID {
self.id
}

fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
let union: UnionAll = s_expr.plan().clone().try_into()?;
let left_child = s_expr.child(0)?;
let right_child = s_expr.child(1)?;

if Self::is_empty_scan(left_child)? && Self::is_empty_scan(right_child)? {
// If both children are empty, replace with EmptyResultScan
let union_output_columns = union
.derive_relational_prop(&RelExpr::with_s_expr(s_expr))?
.output_columns
.clone();
let metadata = self.metadata.read();
let mut fields = Vec::with_capacity(union_output_columns.len());
for col in union_output_columns.iter() {
fields.push(DataField::new(
&col.to_string(),
metadata.column(*col).data_type(),
));
}

let empty_scan = ConstantTableScan::new_empty_scan(
DataSchemaRefExt::create(fields),
union_output_columns,
);
let result = SExpr::create_leaf(Arc::new(RelOperator::ConstantTableScan(empty_scan)));
state.add_result(result);
} else if Self::is_empty_scan(left_child)? {
// If left child is empty, use right child
state.add_result(right_child.clone());
} else if Self::is_empty_scan(right_child)? {
// If right child is empty, use left child
state.add_result(left_child.clone());
}

Ok(())
}

fn matchers(&self) -> &[Matcher] {
&self.matchers
}
}
3 changes: 3 additions & 0 deletions src/query/sql/src/planner/optimizer/rule/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::optimizer::SExpr;
pub static DEFAULT_REWRITE_RULES: LazyLock<Vec<RuleID>> = LazyLock::new(|| {
vec![
RuleID::EliminateSort,
RuleID::EliminateUnion,
RuleID::MergeEvalScalar,
// Filter
RuleID::EliminateFilter,
Expand Down Expand Up @@ -78,6 +79,7 @@ pub trait Rule {
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, FromPrimitive, ToPrimitive)]
pub enum RuleID {
// Rewrite rules
EliminateUnion,
NormalizeScalarFilter,
PushDownFilterAggregate,
PushDownFilterEvalScalar,
Expand Down Expand Up @@ -152,6 +154,7 @@ impl Display for RuleID {
RuleID::EagerAggregation => write!(f, "EagerAggregation"),
RuleID::TryApplyAggIndex => write!(f, "TryApplyAggIndex"),
RuleID::SemiToInnerJoin => write!(f, "SemiToInnerJoin"),
RuleID::EliminateUnion => write!(f, "EliminateUnion"),
}
}
}
53 changes: 53 additions & 0 deletions tests/sqllogictests/suites/mode/standalone/explain/union.test
Original file line number Diff line number Diff line change
Expand Up @@ -224,5 +224,58 @@ drop table t1
statement ok
drop table t2

statement ok
create table t1 as select number as a from numbers(10000);

statement ok
create table t2 as select number as b from numbers(10000);

query T
explain select * from t1 where t1.a < 0 union all select a from t2 join t1 on t1.a = t2.b where a <0;
----
EmptyResultScan


query T
----
explain select * from t1 where t1.a < 0 union all select * from t2 ;
----
TableScan
├── table: default.default.t2
├── output columns: []
├── read rows: 10000
├── read size: 0
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 10000.00


query T
explain select * from t1 union all select * from t2 where t2.b < 0;
----
TableScan
├── table: default.default.t1
├── output columns: [a (#0)]
├── read rows: 10000
├── read size: 10.59 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 10000.00

query T
explain select * from t1 where t1.a < 0 union all select * from t2 where t2.b < 0;
----
EmptyResultScan

statement ok
drop table t1;

statement ok
drop table t2;

statement ok
drop view v

0 comments on commit fdd7d3b

Please sign in to comment.