Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): add plan cache #16333

Merged
merged 21 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ ordq = "0.2.0"
parking_lot = "0.12.1"
parquet = { version = "52", features = ["async"] }
paste = "1.0.15"
sha2 = "0.10.8"
# TODO: let's use native tls instead.
iceberg = { version = "0.3.0" }
iceberg-catalog-hms = { version = "0.3.0" }
Expand Down
10 changes: 10 additions & 0 deletions src/common/cache/src/mem_sized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,13 @@ where T: MemSized
}
}
}

impl<T, U> MemSized for (T, U)
where
T: MemSized,
U: MemSized,
{
fn mem_bytes(&self) -> usize {
self.0.mem_bytes() + self.1.mem_bytes()
}
}
2 changes: 1 addition & 1 deletion src/meta/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha1 = "0.10.5"
sha2 = "0.10.6"
sha2 = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ parquet = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = "0.10.6"
sha2 = { workspace = true }
thrift = "0.17.0"
typetag = { workspace = true }
xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] }
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ roaring = "0.10.1"
serde = { workspace = true }
serde_json = { workspace = true }
sha1 = "0.10.5"
sha2 = "0.10.6"
sha2 = { workspace = true }
simdutf8 = "0.1.4"
simple_hll = { version = "0.0.1", features = ["serde_borsh"] }
siphasher = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_stacker = { workspace = true }
serde_urlencoded = "0.7.1"
sha2 = "0.10.8"
sha2 = { workspace = true }
socket2 = "0.5.3"
strength_reduce = "0.2.4"
sysinfo = "0.30"
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: None,
}),
("enable_planner_cache", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Enables caching logic plan from same query.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_query_result_cache", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables caching query results to improve performance for identical queries.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ impl Settings {
Ok(self.try_get_u64("hide_options_in_show_create_table")? != 0)
}

pub fn get_enable_planner_cache(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_planner_cache")? != 0)
}

pub fn get_enable_query_result_cache(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_query_result_cache")? != 0)
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dashmap = { workspace = true }
databend-common-ast = { workspace = true }
databend-common-async-functions = { workspace = true }
databend-common-base = { workspace = true }
databend-common-cache = { workspace = true }
databend-common-catalog = { workspace = true }
databend-common-compress = { workspace = true }
databend-common-config = { workspace = true }
Expand Down Expand Up @@ -70,6 +71,7 @@ recursive = "0.1.1"
regex = { workspace = true }
roaring = "0.10.1"
serde = { workspace = true }
sha2 = { workspace = true }
simsearch = "0.2"
time = "0.3.14"
url = "2.3.1"
Expand Down
13 changes: 9 additions & 4 deletions src/query/sql/src/planner/binder/binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;
use std::time::Instant;

use chrono_tz::Tz;
use databend_common_ast::ast::format_statement;
use databend_common_ast::ast::Hint;
use databend_common_ast::ast::Identifier;
use databend_common_ast::ast::Statement;
Expand Down Expand Up @@ -100,6 +99,8 @@ pub struct Binder {
/// For the recursive cte, the cte table name occurs in the recursive cte definition and main query
/// if meet recursive cte table name in cte definition, set `bind_recursive_cte` true and treat it as `CteScan`.
pub bind_recursive_cte: bool,

pub enable_result_cache: bool,
}

impl<'a> Binder {
Expand All @@ -110,6 +111,10 @@ impl<'a> Binder {
metadata: MetadataRef,
) -> Self {
let dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default();
let enable_result_cache = ctx
.get_settings()
.get_enable_query_result_cache()
.unwrap_or_default();
Binder {
ctx,
dialect,
Expand All @@ -121,6 +126,7 @@ impl<'a> Binder {
ctes_map: Box::default(),
expression_scan_context: ExpressionScanContext::new(),
bind_recursive_cte: false,
enable_result_cache,
}
}

Expand Down Expand Up @@ -165,9 +171,8 @@ impl<'a> Binder {

// Remove unused cache columns and join conditions and construct ExpressionScan's child.
(s_expr, _) = self.construct_expression_scan(&s_expr, self.metadata.clone())?;

let formatted_ast = if self.ctx.get_settings().get_enable_query_result_cache()? {
Some(format_statement(stmt.clone())?)
let formatted_ast = if self.enable_result_cache {
Some(stmt.to_string())
} else {
None
};
Expand Down
4 changes: 3 additions & 1 deletion src/query/sql/src/planner/binder/copy_into_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ impl<'a> Binder {
&table.database,
&table.table,
);
let subquery = format!("SELECT * FROM {catalog_name}.{database_name}.{table_name}");
let subquery = format!(
"SELECT * FROM \"{catalog_name}\".\"{database_name}\".\"{table_name}\""
);
let tokens = tokenize_sql(&subquery)?;
let sub_stmt_msg = parse_sql(&tokens, self.dialect)?;
let sub_stmt = sub_stmt_msg.0;
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod binder;
pub mod dataframe;
mod expression_parser;
pub mod optimizer;
mod planner_cache;
pub mod plans;
mod stream_column;
mod udf_validator;
Expand Down
47 changes: 40 additions & 7 deletions src/query/sql/src/planner/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_ast::parser::token::Token;
use databend_common_ast::parser::token::TokenKind;
use databend_common_ast::parser::token::Tokenizer;
use databend_common_ast::parser::Dialect;
use databend_common_cache::MemSized;
use databend_common_catalog::catalog::CatalogManager;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::table_context::TableContext;
Expand All @@ -45,24 +46,29 @@ use crate::plans::Plan;
use crate::Binder;
use crate::CountSetOps;
use crate::Metadata;
use crate::MetadataRef;
use crate::NameResolutionContext;
use crate::VariableNormalizer;

const PROBE_INSERT_INITIAL_TOKENS: usize = 128;
const PROBE_INSERT_MAX_TOKENS: usize = 128 * 8;

pub struct Planner {
ctx: Arc<dyn TableContext>,
pub(crate) ctx: Arc<dyn TableContext>,
}

#[derive(Debug, Clone)]
pub struct PlanExtras {
pub metadata: MetadataRef,
pub format: Option<String>,
pub statement: Statement,
}

impl MemSized for PlanExtras {
fn mem_bytes(&self) -> usize {
// fake
1024
}
}

impl Planner {
pub fn new(ctx: Arc<dyn TableContext>) -> Self {
Planner { ctx }
Expand Down Expand Up @@ -152,8 +158,29 @@ impl Planner {
self.replace_stmt(&mut stmt)?;

// Step 3: Bind AST with catalog, and generate a pure logical SExpr
let metadata = Arc::new(RwLock::new(Metadata::default()));
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let mut enable_planner_cache =
self.ctx.get_settings().get_enable_planner_cache()?;
let planner_cache_key = if enable_planner_cache {
Some(Self::planner_cache_key(&stmt.to_string()))
} else {
None
};

if enable_planner_cache {
let (c, plan) = self.get_cache(
name_resolution_ctx.clone(),
planner_cache_key.as_ref().unwrap(),
&stmt,
);
if let Some(plan) = plan {
info!("logical plan from cache, time used: {:?}", start.elapsed());
return Ok(plan);
}
enable_planner_cache = c;
}

let metadata = Arc::new(RwLock::new(Metadata::default()));
let binder = Binder::new(
self.ctx.clone(),
CatalogManager::instance(),
Expand All @@ -176,11 +203,17 @@ impl Planner {
.with_enable_dphyp(settings.get_enable_dphyp()?);

let optimized_plan = optimize(opt_ctx, plan).await?;
Ok((optimized_plan, PlanExtras {
metadata,
let result = (optimized_plan, PlanExtras {
format,
statement: stmt,
}))
});

if enable_planner_cache {
self.set_cache(planner_cache_key.clone().unwrap(), result.clone());
Ok(result)
} else {
Ok(result)
}
}
.await;

Expand Down
Loading
Loading