Skip to content

Commit

Permalink
Merge pull request #9261 from zhyass/feature_update
Browse files Browse the repository at this point in the history
feat(storage): update  interpreter and storage support
  • Loading branch information
BohuTANG authored Jan 16, 2023
2 parents 1bd62f3 + 030fc95 commit f014d56
Show file tree
Hide file tree
Showing 32 changed files with 1,402 additions and 676 deletions.
17 changes: 17 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,23 @@ pub trait Table: Sync + Send {
)))
}

async fn update(
&self,
ctx: Arc<dyn TableContext>,
filter: Option<RemoteExpr<String>>,
col_indices: Vec<usize>,
update_list: Vec<(usize, RemoteExpr<String>)>,
pipeline: &mut Pipeline,
) -> Result<()> {
let (_, _, _, _, _) = (ctx, filter, col_indices, update_list, pipeline);

Err(ErrorCode::Unimplemented(format!(
"table {}, of engine type {}, does not support UPDATE",
self.name(),
self.get_table_info().engine(),
)))
}

fn get_block_compact_thresholds(&self) -> BlockCompactThresholds {
BlockCompactThresholds {
max_rows_per_block: 1000 * 1000,
Expand Down
7 changes: 4 additions & 3 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use std::sync::Arc;
use common_exception::Result;
use common_expression::DataSchemaRef;
use common_pipeline_core::Pipeline;
use common_sql::plans::DeletePlan;

use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::plans::DeletePlan;
use crate::sql::plans::ScalarExpr;

/// interprets DeletePlan
Expand All @@ -34,7 +34,7 @@ pub struct DeleteInterpreter {
}

impl DeleteInterpreter {
/// Create the DelectInterpreter from DelectPlan
/// Create the DeleteInterpreter from DeletePlan
pub fn try_create(ctx: Arc<QueryContext>, plan: DeletePlan) -> Result<Self> {
Ok(DeleteInterpreter { ctx, plan })
}
Expand All @@ -47,7 +47,7 @@ impl Interpreter for DeleteInterpreter {
"DeleteInterpreter"
}

/// Get the schema of SelectPlan
/// Get the schema of DeletePlan
fn schema(&self) -> DataSchemaRef {
self.plan.schema()
}
Expand All @@ -59,6 +59,7 @@ impl Interpreter for DeleteInterpreter {
let db_name = self.plan.database_name.as_str();
let tbl_name = self.plan.table_name.as_str();
let tbl = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;

let (filter, col_indices) = if let Some(scalar) = &self.plan.selection {
let filter = scalar.as_expr()?.as_remote_expr();
let col_indices = scalar.used_columns().into_iter().collect();
Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::Arc;

use common_ast::ast::ExplainKind;
use common_exception::ErrorCode;
use common_exception::Result;
use tracing::error;

Expand All @@ -35,6 +34,7 @@ use crate::interpreters::CreateShareInterpreter;
use crate::interpreters::DropShareInterpreter;
use crate::interpreters::DropUserInterpreter;
use crate::interpreters::SetRoleInterpreter;
use crate::interpreters::UpdateInterpreter;
use crate::sessions::QueryContext;
use crate::sql::plans::Plan;

Expand Down Expand Up @@ -200,9 +200,10 @@ impl InterpreterFactory {
*delete.clone(),
)?)),

Plan::Update(_update) => Err(ErrorCode::Unimplemented(
"Unimplement for update".to_string(),
)),
Plan::Update(update) => Ok(Arc::new(UpdateInterpreter::try_create(
ctx,
*update.clone(),
)?)),

// Roles
Plan::CreateRole(create_role) => Ok(Arc::new(CreateRoleInterpreter::try_create(
Expand Down
161 changes: 161 additions & 0 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_pipeline_core::Pipeline;
use common_sql::plans::BoundColumnRef;
use common_sql::plans::CastExpr;
use common_sql::plans::FunctionCall;
use common_sql::BindContext;
use common_sql::ColumnBinding;
use common_sql::Scalar;
use common_sql::Visibility;

use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::plans::ScalarExpr;
use crate::sql::plans::UpdatePlan;

/// interprets UpdatePlan
pub struct UpdateInterpreter {
ctx: Arc<QueryContext>,
plan: UpdatePlan,
}

impl UpdateInterpreter {
/// Create the UpdateInterpreter from UpdatePlan
pub fn try_create(ctx: Arc<QueryContext>, plan: UpdatePlan) -> Result<Self> {
Ok(UpdateInterpreter { ctx, plan })
}
}

#[async_trait::async_trait]
impl Interpreter for UpdateInterpreter {
/// Get the name of current interpreter
fn name(&self) -> &str {
"UpdateInterpreter"
}

/// Get the schema of UpdatePlan
fn schema(&self) -> DataSchemaRef {
self.plan.schema()
}

#[tracing::instrument(level = "debug", name = "update_interpreter_execute", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
async fn execute2(&self) -> Result<PipelineBuildResult> {
// TODO check privilege
let catalog_name = self.plan.catalog.as_str();
let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();
let tbl = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;

let (filter, col_indices) = if let Some(scalar) = &self.plan.selection {
let filter = scalar.as_expr()?.as_remote_expr();
let col_indices = scalar.used_columns().into_iter().collect();
(Some(filter), col_indices)
} else {
(None, vec![])
};

let predicate = Scalar::BoundColumnRef(BoundColumnRef {
column: ColumnBinding {
database_name: None,
table_name: None,
column_name: "_predicate".to_string(),
index: tbl.schema().num_fields(),
data_type: Box::new(DataType::Boolean),
visibility: Visibility::Visible,
},
});

let schema: DataSchema = tbl.schema().into();
let update_list = self.plan.update_list.iter().try_fold(
Vec::with_capacity(self.plan.update_list.len()),
|mut acc, (id, scalar)| {
let filed = schema.field(*id);
let left = Scalar::CastExpr(CastExpr {
argument: Box::new(scalar.clone()),
from_type: Box::new(scalar.data_type()),
target_type: Box::new(filed.data_type().clone()),
});
let scalar = if col_indices.is_empty() {
// The condition is always true.
// Replace column to the result of the following expression:
// CAST(expression, type)
left
} else {
// Replace column to the result of the following expression:
// if(condition, CAST(expression, type), column)
let mut right = None;
for column_binding in self.plan.bind_context.columns.iter() {
if BindContext::match_column_binding(
Some(db_name),
Some(tbl_name),
filed.name(),
column_binding,
) {
right = Some(Scalar::BoundColumnRef(BoundColumnRef {
column: column_binding.clone(),
}));
break;
}
}
let right = right.ok_or_else(|| ErrorCode::Internal("It's a bug"))?;
let return_type = right.data_type();
Scalar::FunctionCall(FunctionCall {
params: vec![],
arguments: vec![predicate.clone(), left, right],
func_name: "if".to_string(),
return_type: Box::new(return_type),
})
};
acc.push((*id, scalar.as_expr()?.as_remote_expr()));
Ok::<_, ErrorCode>(acc)
},
)?;

let mut pipeline = Pipeline::create();
tbl.update(
self.ctx.clone(),
filter,
col_indices,
update_list,
&mut pipeline,
)
.await?;
if !pipeline.pipes.is_empty() {
let settings = self.ctx.get_settings();
pipeline.set_max_threads(settings.get_max_threads()? as usize);
let query_id = self.ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;

self.ctx.set_executor(Arc::downgrade(&executor.get_inner()));
executor.execute()?;
drop(executor);
}

Ok(PipelineBuildResult::create())
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ mod interpreter_table_show_create;
mod interpreter_table_truncate;
mod interpreter_table_undrop;
mod interpreter_unsetting;
mod interpreter_update;
mod interpreter_use_database;
mod interpreter_user_alter;
mod interpreter_user_create;
Expand Down Expand Up @@ -135,6 +136,7 @@ pub use interpreter_table_show_create::ShowCreateTableInterpreter;
pub use interpreter_table_truncate::TruncateTableInterpreter;
pub use interpreter_table_undrop::UndropTableInterpreter;
pub use interpreter_unsetting::UnSettingInterpreter;
pub use interpreter_update::UpdateInterpreter;
pub use interpreter_use_database::UseDatabaseInterpreter;
pub use interpreter_user_alter::AlterUserInterpreter;
pub use interpreter_user_create::CreateUserInterpreter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// limitations under the License.

mod block_compact_mutator;
mod deletion_mutator;
mod deletion;
mod recluster_mutator;
mod segments_compact_mutator;

pub use deletion_mutator::do_deletion;
pub use deletion::do_deletion;
10 changes: 0 additions & 10 deletions src/query/sql/src/planner/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ impl<'a> Binder {
&[],
);

let table = self
.ctx
.get_table(&catalog_name, &database_name, &table_name)
.await?;

let tbl_info = table.get_table_info();
let table_id = tbl_info.ident;

let selection = if let Some(expr) = filter {
let (scalar, _) = scalar_binder.bind(expr).await?;
Some(scalar)
Expand All @@ -84,8 +76,6 @@ impl<'a> Binder {
catalog_name,
database_name,
table_name,
table_id,
metadata: self.metadata.clone(),
selection,
};
Ok(Plan::Delete(Box::new(plan)))
Expand Down
15 changes: 13 additions & 2 deletions src/query/sql/src/planner/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::binder::Binder;
use crate::binder::ScalarBinder;
use crate::normalize_identifier;
use crate::plans::Plan;
use crate::plans::Scalar;
use crate::plans::UpdatePlan;
use crate::BindContext;

Expand Down Expand Up @@ -67,7 +68,6 @@ impl<'a> Binder {
.ctx
.get_table(&catalog_name, &database_name, &table_name)
.await?;
let table_id = table.get_id();

let mut scalar_binder = ScalarBinder::new(
&context,
Expand All @@ -88,12 +88,23 @@ impl<'a> Binder {
)));
}

// TODO(zhyass): selection and update_list support subquery.
let (scalar, _) = scalar_binder.bind(&update_expr.expr).await?;
if matches!(scalar, Scalar::SubqueryExpr(_)) {
return Err(ErrorCode::Internal(
"Update does not support subquery temporarily",
));
}
update_columns.insert(index, scalar);
}

let push_downs = if let Some(expr) = selection {
let (scalar, _) = scalar_binder.bind(expr).await?;
if matches!(scalar, Scalar::SubqueryExpr(_)) {
return Err(ErrorCode::Internal(
"Update does not support subquery temporarily",
));
}
Some(scalar)
} else {
None
Expand All @@ -103,9 +114,9 @@ impl<'a> Binder {
catalog: catalog_name,
database: database_name,
table: table_name,
table_id,
update_list: update_columns,
selection: push_downs,
bind_context: Box::new(context.clone()),
};
Ok(Plan::Update(Box::new(plan)))
}
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub use expression_parser::parse_exprs;
pub use expression_parser::parse_to_remote_string_exprs;
pub use metadata::*;
pub use planner::Planner;
pub use plans::Scalar;
pub use plans::ScalarExpr;
pub use semantic::normalize_identifier;
pub use semantic::validate_function_arg;
Expand Down
4 changes: 0 additions & 4 deletions src/query/sql/src/planner/plans/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ use std::sync::Arc;

use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_meta_app::schema::TableIdent;

use crate::plans::Scalar;
use crate::MetadataRef;

#[derive(Clone, Debug)]
pub struct DeletePlan {
pub catalog_name: String,
pub database_name: String,
pub table_name: String,
pub table_id: TableIdent,
pub metadata: MetadataRef,
pub selection: Option<Scalar>,
}

Expand Down
Loading

1 comment on commit f014d56

@vercel
Copy link

@vercel vercel bot commented on f014d56 Jan 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.rs
databend-databend.vercel.app
databend-git-main-databend.vercel.app
databend.vercel.app

Please sign in to comment.