Skip to content

Commit

Permalink
fix review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Mar 20, 2024
1 parent 9bbc56c commit b31d962
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 66 deletions.
6 changes: 3 additions & 3 deletions scripts/benchmark/query/load/tpch10.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ select version();
SQL

for t in customer lineitem nation orders partsupp part region supplier; do
echo "DROP TABLE IF EXISTS $t;" | bendsql
echo "DROP TABLE IF EXISTS $t;" | bendsql
done

cat <<SQL | bendsql
Expand Down Expand Up @@ -113,8 +113,8 @@ cat <<SQL | bendsql
SQL

for t in customer lineitem nation orders partsupp part region supplier; do
echo "loading into $t ..."
cat <<SQL | bendsql
echo "loading into $t ..."
cat <<SQL | bendsql
COPY INTO $t FROM 's3://repo.databend.rs/datasets/tpch10/${t}/' connection=(connection_name='repo') pattern ='${t}.*'
file_format=(type='CSV' field_delimiter='|' record_delimiter='\\n' skip_header=0);
ANALYZE TABLE "${t}";
Expand Down
10 changes: 5 additions & 5 deletions scripts/benchmark/query/load/tpch100.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ select version();
SQL

for t in customer lineitem nation orders partsupp part region supplier; do
echo "DROP TABLE IF EXISTS $t;" | bendsql
echo "DROP TABLE IF EXISTS $t;" | bendsql
done

cat <<SQL | bendsql
Expand Down Expand Up @@ -113,8 +113,8 @@ cat <<SQL | bendsql
SQL

for t in nation region; do
echo "loading into $t ..."
cat <<SQL | bendsql
echo "loading into $t ..."
cat <<SQL | bendsql
COPY INTO $t FROM 's3://repo.databend.rs/tpch100/${t}.tbl'
credentials=(access_key_id ='$REPO_ACCESS_KEY_ID' secret_access_key ='$REPO_SECRET_ACCESS_KEY')
file_format=(type='CSV' field_delimiter='|' record_delimiter='\\n' skip_header=1);
Expand All @@ -124,8 +124,8 @@ SQL
done

for t in customer lineitem orders partsupp part supplier; do
echo "loading into $t ..."
cat <<SQL | bendsql
echo "loading into $t ..."
cat <<SQL | bendsql
COPY INTO $t FROM 's3://repo.databend.rs/tpch100/${t}/' connection=(connection_name='repo') pattern ='${t}.tbl.*'
file_format=(type='CSV' field_delimiter='|' record_delimiter='\\n' skip_header=1);
ANALYZE TABLE "${t}";
Expand Down
53 changes: 26 additions & 27 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use databend_common_sql::MetadataRef;
use databend_common_sql::ScalarExpr;
use databend_common_sql::Visibility;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::operations::TruncateMode;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_table_meta::meta::TableSnapshot;
use futures_util::TryStreamExt;
Expand Down Expand Up @@ -189,7 +190,7 @@ impl Interpreter for DeleteInterpreter {
(None, vec![])
};

let fuse_table = tbl.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
let fuse_table = FuseTable::try_from_table(tbl.as_ref()).map_err(|_| {
ErrorCode::Unimplemented(format!(
"table {}, engine type {}, does not support DELETE FROM",
tbl.name(),
Expand All @@ -199,11 +200,8 @@ impl Interpreter for DeleteInterpreter {

let mut build_res = PipelineBuildResult::create();

let snapshot_opt = fuse_table.read_table_snapshot().await?;
// check if table is empty
let snapshot = if let Some(val) = snapshot_opt {
val
} else {
let Some(snapshot) = fuse_table.read_table_snapshot().await? else {
// no snapshot, no deletion
return Ok(build_res);
};
Expand All @@ -214,20 +212,21 @@ impl Interpreter for DeleteInterpreter {

build_res.main_pipeline.add_lock_guard(lock_guard);
// check if unconditional deletion
let deletion_filters = match filters {
None => {
let progress_values = ProgressValues {
rows: snapshot.summary.row_count as usize,
bytes: snapshot.summary.uncompressed_byte_size as usize,
};
self.ctx.get_write_progress().incr(&progress_values);
// deleting the whole table... just a truncate
fuse_table
.do_truncate(self.ctx.clone(), &mut build_res.main_pipeline, false)
.await?;
return Ok(build_res);
}
Some(filters) => filters,
let Some(filters) = filters else {
let progress_values = ProgressValues {
rows: snapshot.summary.row_count as usize,
bytes: snapshot.summary.uncompressed_byte_size as usize,
};
self.ctx.get_write_progress().incr(&progress_values);
// deleting the whole table... just a truncate
fuse_table
.do_truncate(
self.ctx.clone(),
&mut build_res.main_pipeline,
TruncateMode::Delete,
)
.await?;
return Ok(build_res);
};

let query_row_id_col = !self.plan.subquery_desc.is_empty();
Expand All @@ -240,11 +239,7 @@ impl Interpreter for DeleteInterpreter {
// if the `filter_expr` is of "constant" nullary :
// for the whole block, whether all of the rows should be kept or dropped,
// we can just return from here, without accessing the block data
if fuse_table.try_eval_const(
self.ctx.clone(),
&fuse_table.schema(),
&deletion_filters.filter,
)? {
if fuse_table.try_eval_const(self.ctx.clone(), &fuse_table.schema(), &filters.filter)? {
let progress_values = ProgressValues {
rows: snapshot.summary.row_count as usize,
bytes: snapshot.summary.uncompressed_byte_size as usize,
Expand All @@ -253,7 +248,11 @@ impl Interpreter for DeleteInterpreter {

// deleting the whole table... just a truncate
fuse_table
.do_truncate(self.ctx.clone(), &mut build_res.main_pipeline, false)
.do_truncate(
self.ctx.clone(),
&mut build_res.main_pipeline,
TruncateMode::Delete,
)
.await?;
return Ok(build_res);
}
Expand All @@ -266,14 +265,14 @@ impl Interpreter for DeleteInterpreter {
self.ctx.clone(),
snapshot.clone(),
col_indices.clone(),
Some(deletion_filters.clone()),
Some(filters.clone()),
is_lazy,
true,
)
.await?;

let physical_plan = Self::build_physical_plan(
deletion_filters,
filters,
partitions,
fuse_table.get_table_info().clone(),
col_indices,
Expand Down
8 changes: 6 additions & 2 deletions src/query/service/src/interpreters/interpreter_table_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_management::RoleApi;
use databend_common_meta_app::principal::OwnershipObject;
use databend_common_meta_app::schema::DropTableByIdReq;
use databend_common_sql::plans::DropTablePlan;
use databend_common_storages_fuse::operations::TruncateMode;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_share::save_share_spec;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
Expand Down Expand Up @@ -136,9 +137,12 @@ impl Interpreter for DropTableInterpreter {
// if target table if of type FuseTable, purge its historical data
// otherwise, plain truncate
if let Ok(fuse_table) = maybe_fuse_table {
let purge = true;
fuse_table
.do_truncate(self.ctx.clone(), &mut build_res.main_pipeline, purge)
.do_truncate(
self.ctx.clone(),
&mut build_res.main_pipeline,
TruncateMode::Purge,
)
.await?
} else {
latest
Expand Down
8 changes: 2 additions & 6 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ use crate::fuse_type::FuseTableType;
use crate::io::MetaReaders;
use crate::io::TableMetaLocationGenerator;
use crate::io::WriteSettings;
use crate::operations::TruncateMode;
use crate::table_functions::unwrap_tuple;
use crate::FuseStorageFormat;
use crate::NavigationPoint;
Expand Down Expand Up @@ -719,12 +720,7 @@ impl Table for FuseTable {
#[minitrace::trace]
#[async_backtrace::framed]
async fn truncate(&self, ctx: Arc<dyn TableContext>, pipeline: &mut Pipeline) -> Result<()> {
let loc = self.snapshot_loc().await?;
if loc.is_none() {
return Ok(());
}

self.do_truncate(ctx, pipeline, false).await
self.do_truncate(ctx, pipeline, TruncateMode::Normal).await
}

#[minitrace::trace]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -86,7 +87,9 @@ impl AppendGenerator {

#[async_trait::async_trait]
impl SnapshotGenerator for AppendGenerator {
const NAME: &'static str = "AppendGenerator";
fn as_any(&self) -> &dyn Any {
self
}

fn set_conflict_resolve_context(&mut self, ctx: ConflictResolveContext) {
self.conflict_resolve_ctx = ctx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ pub use conflict_resolve_context::SnapshotMerged;
pub use mutation_generator::MutationGenerator;
pub use snapshot_generator::SnapshotGenerator;
pub use truncate_generator::TruncateGenerator;
pub use truncate_generator::TruncateMode;
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use databend_common_exception::ErrorCode;
Expand Down Expand Up @@ -44,7 +45,9 @@ impl MutationGenerator {
}

impl SnapshotGenerator for MutationGenerator {
const NAME: &'static str = "MutationGenerator";
fn as_any(&self) -> &dyn Any {
self
}

fn set_conflict_resolve_context(&mut self, ctx: ConflictResolveContext) {
self.conflict_resolve_ctx = ctx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use databend_common_exception::Result;
Expand All @@ -23,11 +24,8 @@ use crate::operations::common::ConflictResolveContext;

#[async_trait::async_trait]
pub trait SnapshotGenerator {
const NAME: &'static str;

fn purge(&self) -> bool {
false
}
/// Convert to `Any`, to enable dynamic casting.
fn as_any(&self) -> &dyn Any;

fn set_conflict_resolve_context(&mut self, _ctx: ConflictResolveContext) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use databend_common_exception::Result;
Expand All @@ -22,23 +23,35 @@ use uuid::Uuid;

use crate::operations::common::SnapshotGenerator;

#[derive(Clone)]
pub enum TruncateMode {
// Truncate and keep the historical data.
Normal,
// Delete the data, used for delete operation.
Delete,
// Truncate and purge the historical data.
Purge,
}

#[derive(Clone)]
pub struct TruncateGenerator {
purge: bool,
mode: TruncateMode,
}

impl TruncateGenerator {
pub fn new(purge: bool) -> Self {
TruncateGenerator { purge }
pub fn new(mode: TruncateMode) -> Self {
TruncateGenerator { mode }
}

pub fn mode(&self) -> &TruncateMode {
&self.mode
}
}

#[async_trait::async_trait]
impl SnapshotGenerator for TruncateGenerator {
const NAME: &'static str = "TruncateGenerator";

fn purge(&self) -> bool {
self.purge
fn as_any(&self) -> &dyn Any {
self
}

fn generate_new_snapshot(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ use crate::operations::common::AbortOperation;
use crate::operations::common::CommitMeta;
use crate::operations::common::SnapshotGenerator;
use crate::operations::set_backoff;
use crate::operations::TruncateGenerator;
use crate::operations::TruncateMode;
use crate::FuseTable;

enum State {
Expand Down Expand Up @@ -118,7 +120,7 @@ where F: SnapshotGenerator + Send + 'static
prev_snapshot_id: Option<SnapshotId>,
deduplicated_label: Option<String>,
) -> Result<ProcessorPtr> {
let purge = snapshot_gen.purge() || table.transient();
let purge = Self::do_purge(table, &snapshot_gen);
Ok(ProcessorPtr::create(Box::new(CommitSink {
state: State::None,
ctx,
Expand All @@ -144,7 +146,7 @@ where F: SnapshotGenerator + Send + 'static
}

fn is_error_recoverable(&self, e: &ErrorCode) -> bool {
// When prev_snapshot_id is some, means it is an alter table column modification.
// When prev_snapshot_id is some, means it is an alter table column modification or truncate.
// In this case if commit to meta fail and error is TABLE_VERSION_MISMATCHED operation will be aborted.
if self.prev_snapshot_id.is_some() && e.code() == ErrorCode::TABLE_VERSION_MISMATCHED {
return false;
Expand Down Expand Up @@ -186,8 +188,22 @@ where F: SnapshotGenerator + Send + 'static
Ok(Event::Async)
}

fn is_truncate() -> bool {
F::NAME == "TruncateGenerator"
fn do_purge(table: &FuseTable, snapshot_gen: &F) -> bool {
if table.transient() {
return true;
}

snapshot_gen
.as_any()
.downcast_ref::<TruncateGenerator>()
.is_some_and(|gen| matches!(gen.mode(), TruncateMode::Purge))
}

fn do_truncate(&self) -> bool {
self.snapshot_gen
.as_any()
.downcast_ref::<TruncateGenerator>()
.is_some_and(|gen| !matches!(gen.mode(), TruncateMode::Delete))
}
}

Expand Down Expand Up @@ -349,7 +365,7 @@ where F: SnapshotGenerator + Send + 'static
.await
{
Ok(_) => {
if Self::is_truncate() {
if self.do_truncate() {
catalog
.truncate_table(&table_info, TruncateTableReq {
table_id: table_info.ident.table_id,
Expand Down
Loading

0 comments on commit b31d962

Please sign in to comment.