Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(planner): add disable merge into join reorder settings #15322

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("disable_merge_into_join_reorder", DefaultSettingValue {
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
value: UserSettingValue::UInt64(0),
desc: "Disable merge into join reorder optimization.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("inlist_to_join_threshold", DefaultSettingValue {
value: UserSettingValue::UInt64(1024),
desc: "Set the threshold for converting IN list to JOIN.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ impl Settings {
Ok(self.try_get_u64("enforce_broadcast_join")? != 0)
}

pub fn get_disable_merge_into_join_reorder(&self) -> Result<bool> {
Ok(self.try_get_u64("disable_merge_into_join_reorder")? != 0)
}

pub fn get_sql_dialect(&self) -> Result<Dialect> {
match self.try_get_string("sql_dialect")?.to_lowercase().as_str() {
"hive" => Ok(Dialect::Hive),
Expand Down
29 changes: 20 additions & 9 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct OptimizerContext {
enable_distributed_optimization: bool,
enable_join_reorder: bool,
enable_dphyp: bool,
enable_merge_into_join_reorder: bool,
}

impl OptimizerContext {
Expand All @@ -74,6 +75,7 @@ impl OptimizerContext {
enable_distributed_optimization: false,
enable_join_reorder: true,
enable_dphyp: true,
enable_merge_into_join_reorder: true,
}
}

Expand All @@ -91,6 +93,11 @@ impl OptimizerContext {
self.enable_dphyp = enable;
self
}

pub fn with_enable_merge_into_join_reorder(mut self, enable: bool) -> Self {
self.enable_merge_into_join_reorder = enable;
self
}
}

/// A recursive optimizer that will apply the given rules recursively.
Expand Down Expand Up @@ -391,15 +398,19 @@ async fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box<MergeInto>) ->
// 3. for full merge into, we use right outer join
// for now, let's import the statistic info to determine left join or right join
// we just do optimization for the top join (target and source),won't do recursive optimization.
let rule = RuleFactory::create_rule(RuleID::CommuteJoin, plan.meta_data.clone())?;
let mut state = TransformResult::new();
// we will reorder the join order according to the cardinality of target and source.
rule.apply(&join_sexpr, &mut state)?;
assert!(state.results().len() <= 1);
// we need to check whether we do swap left and right.
let change_join_order = if state.results().len() == 1 {
join_sexpr = Box::new(state.results()[0].clone());
true
let change_join_order = if opt_ctx.enable_merge_into_join_reorder {
let rule = RuleFactory::create_rule(RuleID::CommuteJoin, plan.meta_data.clone())?;
let mut state = TransformResult::new();
// we will reorder the join order according to the cardinality of target and source.
rule.apply(&join_sexpr, &mut state)?;
assert!(state.results().len() <= 1);
// we need to check whether we do swap left and right.
if state.results().len() == 1 {
join_sexpr = Box::new(state.results()[0].clone());
true
} else {
false
}
} else {
false
};
Expand Down
9 changes: 5 additions & 4 deletions src/query/sql/src/planner/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ impl Planner {
// Step 4: Optimize the SExpr with optimizers, and generate optimized physical SExpr
let opt_ctx = OptimizerContext::new(self.ctx.clone(), metadata.clone())
.with_enable_distributed_optimization(!self.ctx.get_cluster().is_empty())
.with_enable_join_reorder(unsafe {
!self.ctx.get_settings().get_disable_join_reorder()?
})
.with_enable_dphyp(self.ctx.get_settings().get_enable_dphyp()?);
.with_enable_join_reorder(unsafe { !settings.get_disable_join_reorder()? })
.with_enable_dphyp(settings.get_enable_dphyp()?)
.with_enable_merge_into_join_reorder(
!settings.get_disable_merge_into_join_reorder()?,
);

let optimized_plan = optimize(opt_ctx, plan).await?;
Ok((optimized_plan, PlanExtras {
Expand Down
46 changes: 46 additions & 0 deletions tests/sqllogictests/suites/mode/standalone/explain/merge_into.test
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,52 @@ HashJoin
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 6.00

## test disable_merge_into_join_reorder settings
statement ok
set disable_merge_into_join_reorder = 1;

query T
explain MERGE INTO salaries2 USING (SELECT * FROM employees2) as employees2 ON salaries2.employee_id = employees2.employee_id WHEN MATCHED AND employees2.department = 'HR' THEN UPDATE SET salaries2.salary = salaries2.salary + 1000.00 WHEN MATCHED THEN UPDATE SET salaries2.salary = salaries2.salary + 500.00 WHEN NOT MATCHED THEN INSERT (employee_id, salary) VALUES (employees2.employee_id, 55000.00);
----
MergeInto:
target_table: default.default.salaries2
├── distributed: false
├── target_build_optimization: false
├── can_try_update_column_only: false
├── matched update: [condition: eq(employees2.department (#2), 'HR'),update set salary = plus(salaries2.salary (#4), 1000.00)]
├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)]
└── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))]
HashJoin
├── output columns: [salaries2.employee_id (#3), salaries2.salary (#4), salaries2._row_id (#5), employees2.employee_id (#0), employees2.employee_name (#1), employees2.department (#2)]
├── join type: RIGHT OUTER
├── build keys: [employees2.employee_id (#0)]
├── probe keys: [salaries2.employee_id (#3)]
├── filters: []
├── estimated rows: 4.00
├── TableScan(Build)
│ ├── table: default.default.employees2
│ ├── output columns: [employee_id (#0), employee_name (#1), department (#2)]
│ ├── read rows: 4
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 4.00
└── TableScan(Probe)
├── table: default.default.salaries2
├── output columns: [employee_id (#3), salary (#4), _row_id (#5)]
├── read rows: 6
├── read size: < 1 KiB
├── partitions total: 2
├── partitions scanned: 2
├── pruning stats: [segments: <range pruning: 2 to 2>, blocks: <range pruning: 2 to 2>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 6.00

statement ok
set disable_merge_into_join_reorder = 0;

## test update column only optimization
statement ok
drop table if exists column_only_optimization_target;
Expand Down
Loading