Skip to content

Commit

Permalink
add truncate pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Mar 19, 2024
1 parent 3306bf3 commit 4126cb9
Show file tree
Hide file tree
Showing 25 changed files with 304 additions and 258 deletions.
4 changes: 2 additions & 2 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ pub trait Table: Sync + Send {
}

#[async_backtrace::framed]
async fn truncate(&self, ctx: Arc<dyn TableContext>) -> Result<()> {
let _ = ctx;
async fn truncate(&self, ctx: Arc<dyn TableContext>, pipeline: &mut Pipeline) -> Result<()> {
let (_, _) = (ctx, pipeline);
Ok(())
}

Expand Down
118 changes: 82 additions & 36 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::Arc;

use databend_common_base::base::ProgressValues;
use databend_common_catalog::plan::Filters;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::table::TableExt;
Expand Down Expand Up @@ -197,48 +198,93 @@ impl Interpreter for DeleteInterpreter {
})?;

let mut build_res = PipelineBuildResult::create();

let snapshot_opt = fuse_table.read_table_snapshot().await?;
// check if table is empty
let snapshot = if let Some(val) = snapshot_opt {
val
} else {
// no snapshot, no deletion
return Ok(build_res);
};
if snapshot.summary.row_count == 0 {
// empty snapshot, no deletion
return Ok(build_res);
}

build_res.main_pipeline.add_lock_guard(lock_guard);
// check if unconditional deletion
let deletion_filters = match filters {
None => {
let progress_values = ProgressValues {
rows: snapshot.summary.row_count as usize,
bytes: snapshot.summary.uncompressed_byte_size as usize,
};
self.ctx.get_write_progress().incr(&progress_values);
// deleting the whole table... just a truncate
fuse_table
.do_truncate(self.ctx.clone(), &mut build_res.main_pipeline, false)
.await?;
return Ok(build_res);
}
Some(filters) => filters,
};

let query_row_id_col = !self.plan.subquery_desc.is_empty();
if let Some(snapshot) = fuse_table
.fast_delete(
if col_indices.is_empty() && !query_row_id_col {
// here the situation: filter_expr is not null, but col_indices in empty, which
// indicates the expr being evaluated is unrelated to the value of rows:
// e.g.
// `delete from t where 1 = 1`, `delete from t where now()`,
// or `delete from t where RANDOM()::INT::BOOLEAN`
// if the `filter_expr` is of "constant" nullary :
// for the whole block, whether all of the rows should be kept or dropped,
// we can just return from here, without accessing the block data
if fuse_table.try_eval_const(
self.ctx.clone(),
&fuse_table.schema(),
&deletion_filters.filter,
)? {
let progress_values = ProgressValues {
rows: snapshot.summary.row_count as usize,
bytes: snapshot.summary.uncompressed_byte_size as usize,
};
self.ctx.get_write_progress().incr(&progress_values);

// deleting the whole table... just a truncate
fuse_table
.do_truncate(self.ctx.clone(), &mut build_res.main_pipeline, false)
.await?;
return Ok(build_res);
}
}

let cluster = self.ctx.get_cluster();
let is_lazy = !cluster.is_empty() && snapshot.segments.len() >= cluster.nodes.len();
let partitions = fuse_table
.mutation_read_partitions(
self.ctx.clone(),
filters.clone(),
snapshot.clone(),
col_indices.clone(),
query_row_id_col,
Some(deletion_filters.clone()),
is_lazy,
true,
)
.await?
{
let cluster = self.ctx.get_cluster();
let is_lazy = !cluster.is_empty() && snapshot.segments.len() >= cluster.nodes.len();
let partitions = fuse_table
.mutation_read_partitions(
self.ctx.clone(),
snapshot.clone(),
col_indices.clone(),
filters.clone(),
is_lazy,
true,
)
.await?;

// Safe to unwrap, because if filters is None, fast_delete will do truncate and return None.
let filters = filters.unwrap();
let physical_plan = Self::build_physical_plan(
filters,
partitions,
fuse_table.get_table_info().clone(),
col_indices,
snapshot,
catalog_info,
is_distributed,
query_row_id_col,
)?;

build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
}
.await?;

build_res.main_pipeline.add_lock_guard(lock_guard);
let physical_plan = Self::build_physical_plan(
deletion_filters,
partitions,
fuse_table.get_table_info().clone(),
col_indices,
snapshot,
catalog_info,
is_distributed,
query_row_id_col,
)?;

build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
Ok(build_res)
}
}
Expand Down
11 changes: 8 additions & 3 deletions src/query/service/src/interpreters/interpreter_table_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl Interpreter for DropTableInterpreter {
role_api.revoke_ownership(&owner_object).await?;
RoleCacheManager::instance().invalidate_cache(&tenant);

let mut build_res = PipelineBuildResult::create();
// if `plan.all`, truncate, then purge the historical data
if self.plan.all {
// the above `catalog.drop_table` operation changed the table meta version,
Expand All @@ -136,9 +137,13 @@ impl Interpreter for DropTableInterpreter {
// otherwise, plain truncate
if let Ok(fuse_table) = maybe_fuse_table {
let purge = true;
fuse_table.do_truncate(self.ctx.clone(), purge).await?
fuse_table
.do_truncate(self.ctx.clone(), &mut build_res.main_pipeline, purge)
.await?
} else {
latest.truncate(self.ctx.clone()).await?
latest
.truncate(self.ctx.clone(), &mut build_res.main_pipeline)
.await?
}
}

Expand All @@ -153,6 +158,6 @@ impl Interpreter for DropTableInterpreter {
.await?;
}

Ok(PipelineBuildResult::create())
Ok(build_res)
}
}
19 changes: 17 additions & 2 deletions src/query/service/src/interpreters/interpreter_table_truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use databend_common_catalog::table::TableExt;
use databend_common_config::GlobalConfig;
use databend_common_exception::Result;
use databend_common_sql::plans::TruncateTablePlan;
use databend_common_storages_fuse::FuseTable;

use crate::api::Packet;
use crate::api::TruncateTablePacket;
use crate::interpreters::Interpreter;
use crate::locks::LockManager;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
Expand Down Expand Up @@ -78,6 +80,15 @@ impl Interpreter for TruncateTableInterpreter {
// check mutability
table.check_mutable()?;

// Add table lock.
let maybe_fuse_table = FuseTable::try_from_table(table.as_ref()).is_ok();
let lock_guard = if maybe_fuse_table {
let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?;
table_lock.try_lock(self.ctx.clone()).await?
} else {
None
};

if self.proxy_to_cluster && table.broadcast_truncate_to_cluster() {
let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
Expand All @@ -96,7 +107,11 @@ impl Interpreter for TruncateTableInterpreter {
}
}

table.truncate(self.ctx.clone()).await?;
Ok(PipelineBuildResult::create())
let mut build_res = PipelineBuildResult::create();
build_res.main_pipeline.add_lock_guard(lock_guard);
table
.truncate(self.ctx.clone(), &mut build_res.main_pipeline)
.await?;
Ok(build_res)
}
}
9 changes: 9 additions & 0 deletions src/query/service/src/test_kits/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,15 @@ impl TestFixture {
execute_pipeline(ctx, build_res)
}

pub async fn truncate_default_table(&self) -> Result<()> {
let qry = format!(
"Truncate table {}.{}",
self.default_db_name(),
self.default_table_name()
);
self.execute_command(&qry).await
}

pub async fn execute_command(&self, query: &str) -> Result<()> {
let res = self.execute_query(query).await?;
res.try_collect::<Vec<DataBlock>>().await?;
Expand Down
12 changes: 2 additions & 10 deletions src/query/service/tests/it/storages/fuse/operations/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use databend_common_base::base::tokio;
use databend_common_exception::Result;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_query::test_kits::*;
Expand Down Expand Up @@ -71,15 +70,8 @@ async fn test_fuse_snapshot_analyze_and_truncate() -> Result<()> {

// truncate table
{
let ctx = fixture.new_query_ctx().await?;
let catalog = ctx
.get_catalog(fixture.default_catalog_name().as_str())
.await?;
let table = catalog
.get_table(ctx.get_tenant().as_str(), &db, &tbl)
.await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
fuse_table.truncate(ctx).await?;
let r = fixture.truncate_default_table().await;
assert!(r.is_ok());
}

// optimize after truncate table, ts file location will become None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn test_fuse_table_truncate() -> databend_common_exception::Result<()> {

// 1. truncate empty table
let prev_version = table.get_table_info().ident.seq;
let r = table.truncate(ctx.clone()).await;
let r = fixture.truncate_default_table().await;
let table = fixture.latest_default_table().await?;
// no side effects
assert_eq!(prev_version, table.get_table_info().ident.seq);
Expand Down Expand Up @@ -66,7 +66,7 @@ async fn test_fuse_table_truncate() -> databend_common_exception::Result<()> {
assert_eq!(stats.read_rows, (num_blocks * rows_per_block));

// truncate
let r = table.truncate(ctx.clone()).await;
let r = fixture.truncate_default_table().await;
assert!(r.is_ok());

// get the latest tbl
Expand Down Expand Up @@ -109,7 +109,6 @@ async fn test_fuse_table_truncate_appending_concurrently() -> databend_common_ex
// s3 should be a valid snapshot,full-scan should work as expected

let fixture = Arc::new(TestFixture::setup().await?);
let ctx = fixture.new_query_ctx().await?;

fixture.create_default_database().await?;
fixture.create_default_table().await?;
Expand Down Expand Up @@ -149,7 +148,7 @@ async fn test_fuse_table_truncate_appending_concurrently() -> databend_common_ex
let s2_table_to_appended = fixture.latest_default_table().await?;

// 4. perform `truncate` operation on s1
let r = s1_table_to_be_truncated.truncate(ctx.clone()).await;
let r = fixture.truncate_default_table().await;
// version mismatched, and `truncate purge` should result in error (but nothing should have been removed)
assert!(r.is_err());

Expand Down
5 changes: 0 additions & 5 deletions src/query/service/tests/it/storages/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,5 @@ async fn test_null_table() -> Result<()> {
assert_eq!(block.num_columns(), 1);
}

// truncate.
{
table.truncate(ctx).await?;
}

Ok(())
}
10 changes: 7 additions & 3 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,13 @@ impl Table for FuseTable {

#[minitrace::trace]
#[async_backtrace::framed]
async fn truncate(&self, ctx: Arc<dyn TableContext>) -> Result<()> {
let purge = false;
self.do_truncate(ctx, purge).await
async fn truncate(&self, ctx: Arc<dyn TableContext>, pipeline: &mut Pipeline) -> Result<()> {
let loc = self.snapshot_loc().await?;
if loc.is_none() {
return Ok(());
}

self.do_truncate(ctx, pipeline, false).await
}

#[minitrace::trace]
Expand Down
7 changes: 4 additions & 3 deletions src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;

use backoff::backoff::Backoff;
use chrono::Utc;
use databend_common_catalog::catalog::Catalog;
use databend_common_catalog::table::Table;
use databend_common_catalog::table::TableExt;
use databend_common_catalog::table_context::TableContext;
Expand Down Expand Up @@ -143,9 +144,10 @@ impl FuseTable {
}

let table_statistics_location = snapshot.table_statistics_location.clone();
let catalog = ctx.get_catalog(table_info.catalog()).await?;
// 2. update table meta
let res = Self::update_table_meta(
ctx,
catalog,
table_info,
location_generator,
snapshot,
Expand Down Expand Up @@ -176,7 +178,7 @@ impl FuseTable {
#[allow(clippy::too_many_arguments)]
#[async_backtrace::framed]
pub async fn update_table_meta(
ctx: &dyn TableContext,
catalog: Arc<dyn Catalog>,
table_info: &TableInfo,
location_generator: &TableMetaLocationGenerator,
snapshot: TableSnapshot,
Expand Down Expand Up @@ -210,7 +212,6 @@ impl FuseTable {
new_table_meta.updated_on = Utc::now();

// 2. prepare the request
let catalog = ctx.get_catalog(table_info.catalog()).await?;
let table_id = table_info.ident.table_id;
let table_version = table_info.ident.seq;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ impl AppendGenerator {

#[async_trait::async_trait]
impl SnapshotGenerator for AppendGenerator {
const NAME: &'static str = "AppendGenerator";

fn set_conflict_resolve_context(&mut self, ctx: ConflictResolveContext) {
self.conflict_resolve_ctx = ctx;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ mod append_generator;
mod conflict_resolve_context;
mod mutation_generator;
mod snapshot_generator;
mod truncate_generator;

pub use append_generator::AppendGenerator;
pub use conflict_resolve_context::ConflictResolveContext;
pub use conflict_resolve_context::SnapshotChanges;
pub use conflict_resolve_context::SnapshotMerged;
pub use mutation_generator::MutationGenerator;
pub use snapshot_generator::SnapshotGenerator;
pub use truncate_generator::TruncateGenerator;
Loading

0 comments on commit 4126cb9

Please sign in to comment.