Skip to content

Commit

Permalink
feat(planner): improve push down filter join (databendlabs#14872)
Browse files Browse the repository at this point in the history
* feat: improve push down filter join

* fix: reverse operator when (constant, column_ref)

* chore: add more test

* chore: complete sqllogictest

* fix: outer join push down

* chore: remove z3 and add new method

* chore: move

* chore: refactor try push down filter join

* chore: update sqllogictest

* chore: update sqllogictest

* chore: update sqllogictest

* chore: fix push down single join

* chore: update subquery

* fix: after join reorder don not push down

* chore: update native

* chore: improve can_filter_null

* chore: fix test

* feat: improve infer and / or filter

* chore: update native sqllogictest

* chore: update sqllogictest

* feat: add SingleToInnerOptimizer

* chore: remove after_join_reorder

* chore: fix outer join to inner join

* test: update parquet explain sqllogictest

* test: update native explain sqllogictest

* chore: make lint code

* chore: fix Cascades optimizer

* chore: add z3-prove

* chore: update sqllogictest

* chore: fix infer filter for outer join

* chore: update sqllogictest

* chore: update ee test

* chore: fix tpcds test

* chore: update sqllogictest

* chore: update ee test

* chore: fix distributed query plan

* chore: update sqllogictet

* chore: update test comment

* chore: add comments

* chore: make lint

* chore: update sqllogictest

* chore: suuport more join type

* chore: refine code

* chore: add semi / anti test

* chore: update sqllogictest

* chore: improve infer filter

* chore: update sqllogictest

* chore: add more comments and remove useless code

* chore: make lint

* test: update memo sqllogictest

* chore: update memo test

* chore: update ee test
  • Loading branch information
Dousir9 authored and yufan022 committed Apr 25, 2024
1 parent d5eaebd commit cc47269
Show file tree
Hide file tree
Showing 63 changed files with 2,828 additions and 1,388 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct HashJoinDesc {
pub(crate) build_keys: Vec<Expr>,
pub(crate) probe_keys: Vec<Expr>,
pub(crate) join_type: JoinType,
pub(crate) original_join_type: Option<JoinType>,
pub(crate) single_to_inner: Option<JoinType>,
/// when we have non-equal conditions for hash join,
/// for example `a = b and c = d and e > f`, we will use `and_filters`
/// to wrap `e > f` as a other_predicate to do next step's check.
Expand Down Expand Up @@ -88,7 +88,7 @@ impl HashJoinDesc {
from_correlated_subquery: join.from_correlated_subquery,
probe_keys_rt,
broadcast: join.broadcast,
original_join_type: join.original_join_type.clone(),
single_to_inner: join.single_to_inner.clone(),
enable_bloom_runtime_filter: join.enable_bloom_runtime_filter,
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl HashJoinBuildState {
pub(crate) fn add_build_block(&self, data_block: DataBlock) -> Result<()> {
let block_outer_scan_map = if self.hash_join_state.need_outer_scan()
|| matches!(
self.hash_join_state.hash_join_desc.original_join_type,
self.hash_join_state.hash_join_desc.single_to_inner,
Some(JoinType::RightSingle)
) {
vec![false; data_block.num_rows()]
Expand All @@ -235,7 +235,7 @@ impl HashJoinBuildState {
let build_state = unsafe { &mut *self.hash_join_state.build_state.get() };
if self.hash_join_state.need_outer_scan()
|| matches!(
self.hash_join_state.hash_join_desc.original_join_type,
self.hash_join_state.hash_join_desc.single_to_inner,
Some(JoinType::RightSingle)
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl HashJoinProbeState {
.other_predicate
.is_none();
match self.hash_join_state.hash_join_desc.join_type {
JoinType::Inner => match self.hash_join_state.hash_join_desc.original_join_type {
JoinType::Inner => match self.hash_join_state.hash_join_desc.single_to_inner {
Some(JoinType::LeftSingle) => {
self.inner_join::<_, true, false>(input, keys, hash_table, probe_state)
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl PhysicalPlanReplacer for Fragmenter {
probe_keys_rt: plan.probe_keys_rt.clone(),
enable_bloom_runtime_filter: plan.enable_bloom_runtime_filter,
broadcast: plan.broadcast,
original_join_type: plan.original_join_type.clone(),
single_to_inner: plan.single_to_inner.clone(),
}))
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/executor/physical_plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ pub trait PhysicalPlanReplacer {
probe_keys_rt: plan.probe_keys_rt.clone(),
enable_bloom_runtime_filter: plan.enable_bloom_runtime_filter,
broadcast: plan.broadcast,
original_join_type: plan.original_join_type.clone(),
single_to_inner: plan.single_to_inner.clone(),
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ pub struct HashJoin {
pub enable_bloom_runtime_filter: bool,
// Under cluster, mark if the join is broadcast join.
pub broadcast: bool,
// Original join type. Left/Right single join may be convert to inner join
// Record the original join type and do some special processing during runtime.
pub original_join_type: Option<JoinType>,
// When left/right single join converted to inner join, record the original join type
// and do some special processing during runtime.
pub single_to_inner: Option<JoinType>,
}

impl HashJoin {
Expand Down Expand Up @@ -504,7 +504,7 @@ impl PhysicalPlanBuilder {
need_hold_hash_table: join.need_hold_hash_table,
stat_info: Some(stat_info),
broadcast: is_broadcast,
original_join_type: join.original_join_type.clone(),
single_to_inner: join.single_to_inner.clone(),
enable_bloom_runtime_filter: adjust_bloom_runtime_filter(
self.ctx.clone(),
&self.metadata,
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#![feature(try_blocks)]
#![feature(extend_one)]
#![feature(lazy_cell)]
#![feature(if_let_guard)]

pub mod evaluator;
pub mod executor;
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl Binder {
from_correlated_subquery: false,
need_hold_hash_table: false,
is_lateral,
original_join_type: None,
single_to_inner: None,
};
Ok(SExpr::create_binary(
Arc::new(logical_join.into()),
Expand Down
4 changes: 0 additions & 4 deletions src/query/sql/src/planner/binder/scalar_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ impl<'a> JoinPredicate<'a> {
left_prop: &RelationalProperty,
right_prop: &RelationalProperty,
) -> Self {
if contain_subquery(scalar) {
return Self::Other(scalar);
}

if scalar.used_columns().is_empty() {
return Self::ALL(scalar);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl ApplyRuleTask {
let group = optimizer.memo.group(self.target_group_index)?;
let m_expr = group.m_expr(self.m_expr_index)?;
let mut state = TransformResult::new();
let rule = RuleFactory::create_rule(self.rule_id, optimizer.metadata.clone(), false)?;
let rule = RuleFactory::create_rule(self.rule_id, optimizer.metadata.clone())?;
m_expr.apply_rule(&optimizer.memo, &rule, &mut state)?;
optimizer.insert_from_transform_state(self.target_group_index, state)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl SubqueryRewriter {
from_correlated_subquery: true,
need_hold_hash_table: false,
is_lateral: false,
original_join_type: None,
single_to_inner: None,
};

// Rewrite plan to semi-join.
Expand Down Expand Up @@ -285,7 +285,7 @@ impl SubqueryRewriter {
from_correlated_subquery: true,
need_hold_hash_table: false,
is_lateral: false,
original_join_type: None,
single_to_inner: None,
};
let s_expr = SExpr::create_binary(
Arc::new(join_plan.into()),
Expand Down Expand Up @@ -334,7 +334,7 @@ impl SubqueryRewriter {
from_correlated_subquery: true,
need_hold_hash_table: false,
is_lateral: false,
original_join_type: None,
single_to_inner: None,
};
let s_expr = SExpr::create_binary(
Arc::new(join_plan.into()),
Expand Down Expand Up @@ -398,7 +398,7 @@ impl SubqueryRewriter {
from_correlated_subquery: true,
need_hold_hash_table: false,
is_lateral: false,
original_join_type: None,
single_to_inner: None,
}
.into();
Ok((
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl SubqueryRewriter {
from_correlated_subquery: false,
need_hold_hash_table: false,
is_lateral: false,
original_join_type: None,
single_to_inner: None,
}
.into();

Expand Down Expand Up @@ -462,7 +462,7 @@ impl SubqueryRewriter {
from_correlated_subquery: false,
need_hold_hash_table: false,
is_lateral: false,
original_join_type: None,
single_to_inner: None,
}
.into(),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ impl SubqueryRewriter {
from_correlated_subquery: false,
need_hold_hash_table: false,
is_lateral: false,
original_join_type: None,
single_to_inner: None,
}
.into();
Ok((
Expand Down Expand Up @@ -549,7 +549,7 @@ impl SubqueryRewriter {
from_correlated_subquery: false,
need_hold_hash_table: false,
is_lateral: false,
original_join_type: None,
single_to_inner: None,
}
.into();
let s_expr = SExpr::create_binary(
Expand Down Expand Up @@ -580,7 +580,7 @@ impl SubqueryRewriter {
from_correlated_subquery: false,
need_hold_hash_table: false,
is_lateral: false,
original_join_type: None,
single_to_inner: None,
}
.into();

Expand Down
Loading

0 comments on commit cc47269

Please sign in to comment.