Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): update interpreter and storage support #9261

Merged
merged 32 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d1422cd
create file and dir
zhyass Dec 15, 2022
23ae3e0
update
zhyass Dec 21, 2022
7710609
rename deletepart to mutationpart
zhyass Dec 22, 2022
d5eed3a
update source
zhyass Dec 29, 2022
d828040
make lint
zhyass Dec 29, 2022
5de1ef5
rename deletion_transform to mutation_transform
zhyass Dec 29, 2022
518fa74
add interpreter_update
zhyass Dec 29, 2022
aba9f90
enable update
zhyass Dec 30, 2022
5e17e8c
fix test case
zhyass Dec 30, 2022
b1a4d21
Add unit test
zhyass Jan 3, 2023
d3e76c4
Merge remote-tracking branch 'upstream/main' into feature_update
zhyass Jan 3, 2023
bdbc7db
fix conflict
zhyass Jan 3, 2023
4d25c4f
Add sqllogic test
zhyass Jan 3, 2023
e99bea0
add scan progress
zhyass Jan 3, 2023
3a707a5
Add serialize data transform
zhyass Jan 6, 2023
e01cb7a
Merge remote-tracking branch 'upstream/main' into feature_update
zhyass Jan 6, 2023
2e8e87e
resolve conflict
zhyass Jan 6, 2023
9fe6f0e
add mutation source
zhyass Jan 10, 2023
b49199d
remove deletion source
zhyass Jan 10, 2023
7249fe9
Merge remote-tracking branch 'upstream/main' into feature_update
zhyass Jan 10, 2023
93322b6
resolve conflict
zhyass Jan 10, 2023
cbe97ac
remove update source
zhyass Jan 10, 2023
09f50c8
remove unused codes
zhyass Jan 10, 2023
0dcc149
update
zhyass Jan 12, 2023
cc8f39a
Merge remote-tracking branch 'upstream/main' into feature_update
zhyass Jan 12, 2023
17b9c0b
fix bug
zhyass Jan 12, 2023
5696a6c
format codes
zhyass Jan 13, 2023
1b53a03
add mutation block pruning
zhyass Jan 13, 2023
b4ecec3
fix tests
zhyass Jan 13, 2023
fa27986
Add sql logic tests
zhyass Jan 15, 2023
789af98
Merge remote-tracking branch 'upstream/main' into feature_update
zhyass Jan 15, 2023
030fc95
Merge branch 'main' into feature_update
mergify[bot] Jan 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,23 @@ pub trait Table: Sync + Send {
)))
}

async fn update(
&self,
ctx: Arc<dyn TableContext>,
filter: Option<RemoteExpr<String>>,
col_indices: Vec<usize>,
update_list: Vec<(usize, RemoteExpr<String>)>,
pipeline: &mut Pipeline,
) -> Result<()> {
let (_, _, _, _, _) = (ctx, filter, col_indices, update_list, pipeline);

Err(ErrorCode::Unimplemented(format!(
"table {}, of engine type {}, does not support UPDATE",
self.name(),
self.get_table_info().engine(),
)))
}

fn get_block_compact_thresholds(&self) -> BlockCompactThresholds {
BlockCompactThresholds {
max_rows_per_block: 1000 * 1000,
Expand Down
7 changes: 4 additions & 3 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use std::sync::Arc;
use common_exception::Result;
use common_expression::DataSchemaRef;
use common_pipeline_core::Pipeline;
use common_sql::plans::DeletePlan;

use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::plans::DeletePlan;
use crate::sql::plans::ScalarExpr;

/// interprets DeletePlan
Expand All @@ -34,7 +34,7 @@ pub struct DeleteInterpreter {
}

impl DeleteInterpreter {
/// Create the DelectInterpreter from DelectPlan
/// Create the DeleteInterpreter from DeletePlan
pub fn try_create(ctx: Arc<QueryContext>, plan: DeletePlan) -> Result<Self> {
Ok(DeleteInterpreter { ctx, plan })
}
Expand All @@ -47,7 +47,7 @@ impl Interpreter for DeleteInterpreter {
"DeleteInterpreter"
}

/// Get the schema of SelectPlan
/// Get the schema of DeletePlan
fn schema(&self) -> DataSchemaRef {
self.plan.schema()
}
Expand All @@ -59,6 +59,7 @@ impl Interpreter for DeleteInterpreter {
let db_name = self.plan.database_name.as_str();
let tbl_name = self.plan.table_name.as_str();
let tbl = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;

let (filter, col_indices) = if let Some(scalar) = &self.plan.selection {
let filter = scalar.as_expr()?.as_remote_expr();
let col_indices = scalar.used_columns().into_iter().collect();
Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::Arc;

use common_ast::ast::ExplainKind;
use common_exception::ErrorCode;
use common_exception::Result;
use tracing::error;

Expand All @@ -35,6 +34,7 @@ use crate::interpreters::CreateShareInterpreter;
use crate::interpreters::DropShareInterpreter;
use crate::interpreters::DropUserInterpreter;
use crate::interpreters::SetRoleInterpreter;
use crate::interpreters::UpdateInterpreter;
use crate::sessions::QueryContext;
use crate::sql::plans::Plan;

Expand Down Expand Up @@ -200,9 +200,10 @@ impl InterpreterFactory {
*delete.clone(),
)?)),

Plan::Update(_update) => Err(ErrorCode::Unimplemented(
"Unimplement for update".to_string(),
)),
Plan::Update(update) => Ok(Arc::new(UpdateInterpreter::try_create(
ctx,
*update.clone(),
)?)),

// Roles
Plan::CreateRole(create_role) => Ok(Arc::new(CreateRoleInterpreter::try_create(
Expand Down
161 changes: 161 additions & 0 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_pipeline_core::Pipeline;
use common_sql::plans::BoundColumnRef;
use common_sql::plans::CastExpr;
use common_sql::plans::FunctionCall;
use common_sql::BindContext;
use common_sql::ColumnBinding;
use common_sql::Scalar;
use common_sql::Visibility;

use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::plans::ScalarExpr;
use crate::sql::plans::UpdatePlan;

/// interprets UpdatePlan
pub struct UpdateInterpreter {
ctx: Arc<QueryContext>,
plan: UpdatePlan,
}

impl UpdateInterpreter {
/// Create the UpdateInterpreter from UpdatePlan
pub fn try_create(ctx: Arc<QueryContext>, plan: UpdatePlan) -> Result<Self> {
Ok(UpdateInterpreter { ctx, plan })
}
}

#[async_trait::async_trait]
impl Interpreter for UpdateInterpreter {
/// Get the name of current interpreter
fn name(&self) -> &str {
"UpdateInterpreter"
}

/// Get the schema of UpdatePlan
fn schema(&self) -> DataSchemaRef {
self.plan.schema()
}

#[tracing::instrument(level = "debug", name = "update_interpreter_execute", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
async fn execute2(&self) -> Result<PipelineBuildResult> {
// TODO check privilege
let catalog_name = self.plan.catalog.as_str();
let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();
let tbl = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;

let (filter, col_indices) = if let Some(scalar) = &self.plan.selection {
let filter = scalar.as_expr()?.as_remote_expr();
let col_indices = scalar.used_columns().into_iter().collect();
(Some(filter), col_indices)
} else {
(None, vec![])
};

let predicate = Scalar::BoundColumnRef(BoundColumnRef {
column: ColumnBinding {
database_name: None,
table_name: None,
column_name: "_predicate".to_string(),
index: tbl.schema().num_fields(),
data_type: Box::new(DataType::Boolean),
visibility: Visibility::Visible,
},
});

let schema: DataSchema = tbl.schema().into();
let update_list = self.plan.update_list.iter().try_fold(
Vec::with_capacity(self.plan.update_list.len()),
|mut acc, (id, scalar)| {
let filed = schema.field(*id);
let left = Scalar::CastExpr(CastExpr {
argument: Box::new(scalar.clone()),
from_type: Box::new(scalar.data_type()),
target_type: Box::new(filed.data_type().clone()),
});
let scalar = if col_indices.is_empty() {
// The condition is always true.
// Replace column to the result of the following expression:
// CAST(expression, type)
left
} else {
// Replace column to the result of the following expression:
// if(condition, CAST(expression, type), column)
let mut right = None;
for column_binding in self.plan.bind_context.columns.iter() {
if BindContext::match_column_binding(
Some(db_name),
Some(tbl_name),
filed.name(),
column_binding,
) {
right = Some(Scalar::BoundColumnRef(BoundColumnRef {
column: column_binding.clone(),
}));
break;
}
}
let right = right.ok_or_else(|| ErrorCode::Internal("It's a bug"))?;
let return_type = right.data_type();
Scalar::FunctionCall(FunctionCall {
params: vec![],
arguments: vec![predicate.clone(), left, right],
func_name: "if".to_string(),
return_type: Box::new(return_type),
})
};
acc.push((*id, scalar.as_expr()?.as_remote_expr()));
Ok::<_, ErrorCode>(acc)
},
)?;

let mut pipeline = Pipeline::create();
tbl.update(
self.ctx.clone(),
filter,
col_indices,
update_list,
&mut pipeline,
)
.await?;
if !pipeline.pipes.is_empty() {
let settings = self.ctx.get_settings();
pipeline.set_max_threads(settings.get_max_threads()? as usize);
let query_id = self.ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;

self.ctx.set_executor(Arc::downgrade(&executor.get_inner()));
executor.execute()?;
drop(executor);
}

Ok(PipelineBuildResult::create())
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ mod interpreter_table_show_create;
mod interpreter_table_truncate;
mod interpreter_table_undrop;
mod interpreter_unsetting;
mod interpreter_update;
mod interpreter_use_database;
mod interpreter_user_alter;
mod interpreter_user_create;
Expand Down Expand Up @@ -135,6 +136,7 @@ pub use interpreter_table_show_create::ShowCreateTableInterpreter;
pub use interpreter_table_truncate::TruncateTableInterpreter;
pub use interpreter_table_undrop::UndropTableInterpreter;
pub use interpreter_unsetting::UnSettingInterpreter;
pub use interpreter_update::UpdateInterpreter;
pub use interpreter_use_database::UseDatabaseInterpreter;
pub use interpreter_user_alter::AlterUserInterpreter;
pub use interpreter_user_create::CreateUserInterpreter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// limitations under the License.

mod block_compact_mutator;
mod deletion_mutator;
mod deletion;
mod recluster_mutator;
mod segments_compact_mutator;

pub use deletion_mutator::do_deletion;
pub use deletion::do_deletion;
10 changes: 0 additions & 10 deletions src/query/sql/src/planner/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ impl<'a> Binder {
&[],
);

let table = self
.ctx
.get_table(&catalog_name, &database_name, &table_name)
.await?;

let tbl_info = table.get_table_info();
let table_id = tbl_info.ident;

let selection = if let Some(expr) = filter {
let (scalar, _) = scalar_binder.bind(expr).await?;
Some(scalar)
Expand All @@ -84,8 +76,6 @@ impl<'a> Binder {
catalog_name,
database_name,
table_name,
table_id,
metadata: self.metadata.clone(),
selection,
};
Ok(Plan::Delete(Box::new(plan)))
Expand Down
15 changes: 13 additions & 2 deletions src/query/sql/src/planner/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::binder::Binder;
use crate::binder::ScalarBinder;
use crate::normalize_identifier;
use crate::plans::Plan;
use crate::plans::Scalar;
use crate::plans::UpdatePlan;
use crate::BindContext;

Expand Down Expand Up @@ -67,7 +68,6 @@ impl<'a> Binder {
.ctx
.get_table(&catalog_name, &database_name, &table_name)
.await?;
let table_id = table.get_id();

let mut scalar_binder = ScalarBinder::new(
&context,
Expand All @@ -88,12 +88,23 @@ impl<'a> Binder {
)));
}

// TODO(zhyass): selection and update_list support subquery.
let (scalar, _) = scalar_binder.bind(&update_expr.expr).await?;
if matches!(scalar, Scalar::SubqueryExpr(_)) {
return Err(ErrorCode::Internal(
"Update does not support subquery temporarily",
));
}
update_columns.insert(index, scalar);
}

let push_downs = if let Some(expr) = selection {
let (scalar, _) = scalar_binder.bind(expr).await?;
if matches!(scalar, Scalar::SubqueryExpr(_)) {
return Err(ErrorCode::Internal(
"Update does not support subquery temporarily",
));
}
Some(scalar)
} else {
None
Expand All @@ -103,9 +114,9 @@ impl<'a> Binder {
catalog: catalog_name,
database: database_name,
table: table_name,
table_id,
update_list: update_columns,
selection: push_downs,
bind_context: Box::new(context.clone()),
};
Ok(Plan::Update(Box::new(plan)))
}
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub use expression_parser::parse_exprs;
pub use expression_parser::parse_to_remote_string_exprs;
pub use metadata::*;
pub use planner::Planner;
pub use plans::Scalar;
pub use plans::ScalarExpr;
pub use semantic::normalize_identifier;
pub use semantic::validate_function_arg;
Expand Down
4 changes: 0 additions & 4 deletions src/query/sql/src/planner/plans/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ use std::sync::Arc;

use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_meta_app::schema::TableIdent;

use crate::plans::Scalar;
use crate::MetadataRef;

#[derive(Clone, Debug)]
pub struct DeletePlan {
pub catalog_name: String,
pub database_name: String,
pub table_name: String,
pub table_id: TableIdent,
pub metadata: MetadataRef,
pub selection: Option<Scalar>,
}

Expand Down
Loading