Skip to content

Commit

Permalink
feat: distributed execution of replace into statement (#12119)
Browse files Browse the repository at this point in the history
* refactor copy into

* fix panic

* fix

* fix

* fix

* make lint

* fix logic error

* replace into values

* fix

* fix

* fix render result

* fix schema cast

* temp

* respect #12147

* respect #12100

* make lint

* respect #12130

* fix merge

* add exchange

* fix conflict

* fix schema cast

* fix conlfict

* fix

* fix copy plan

* clear log

* fix copy

* fix copy

* run ci

* fix purge

* make lint

* add exchange

* disable dist for value source

* adjust exchange

* remove top exchange

* adjust replace into

* reshuffle

* fix

* fix reshuffle

* move segment_partition_num

* resolve conflicts

* add need insert flag

* unbranched_replace_into_processor

* merge only pipeline

* fix segment index

* fix conflict

* remove log

* fix empty table

* fix stateful test

* fix stateful test

* modify test

* fix typo

* fix random source

* add setting

* remove empty file

* remove dead code

* add default setting

* Update src/query/service/src/interpreters/interpreter_replace.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/sql/src/executor/physical_plan_display.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/storages/fuse/src/operations/replace_into/processors/processor_unbranched_replace_into.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/sql/src/executor/physical_plan.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/sql/src/executor/physical_plan_display.rs

Co-authored-by: dantengsky <[email protected]>

* rename struct

* default 0

* regen golden file

* set enable_distributed_replace_into = 1 in slt

* make lint

---------

Co-authored-by: dantengsky <[email protected]>
Co-authored-by: JackTan25 <[email protected]>
  • Loading branch information
3 people authored Aug 10, 2023
1 parent d80df93 commit 93ad5f7
Show file tree
Hide file tree
Showing 44 changed files with 1,604 additions and 1,288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::Processor;

#[async_trait::async_trait]
pub trait AsyncAccumulatingTransform: Send {
const NAME: &'static str;
Expand Down
345 changes: 61 additions & 284 deletions src/query/service/src/interpreters/interpreter_copy.rs

Large diffs are not rendered by default.

18 changes: 11 additions & 7 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ use common_meta_app::schema::CatalogInfo;
use common_meta_app::schema::TableInfo;
use common_sql::binder::ColumnBindingBuilder;
use common_sql::executor::cast_expr_to_non_null_boolean;
use common_sql::executor::DeleteFinal;
use common_sql::executor::DeletePartial;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind;
use common_sql::executor::MutationAggregate;
use common_sql::executor::MutationKind;
use common_sql::executor::PhysicalPlan;
use common_sql::optimizer::CascadesOptimizer;
use common_sql::optimizer::DPhpy;
Expand Down Expand Up @@ -298,12 +299,15 @@ impl DeleteInterpreter {
});
}

Ok(PhysicalPlan::DeleteFinal(Box::new(DeleteFinal {
input: Box::new(root),
snapshot,
table_info,
catalog_info,
})))
Ok(PhysicalPlan::MutationAggregate(Box::new(
MutationAggregate {
input: Box::new(root),
snapshot,
table_info,
catalog_info,
mutation_kind: MutationKind::Delete,
},
)))
}
}

Expand Down
283 changes: 1 addition & 282 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::io::BufRead;
use std::io::Cursor;
use std::ops::Not;
use std::str::FromStr;
use std::sync::Arc;

use aho_corasick::AhoCorasick;
use common_ast::parser::parse_comma_separated_exprs;
use common_ast::parser::tokenize_sql;
use common_catalog::table::AppendMode;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::ColumnBuilder;
use common_expression::DataBlock;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_formats::FastFieldDecoderValues;
use common_io::cursor_ext::ReadBytesExt;
use common_io::cursor_ext::ReadCheckPointExt;
use common_meta_app::principal::StageFileFormatType;
use common_pipeline_sources::AsyncSource;
use common_pipeline_sources::AsyncSourcer;
use common_sql::executor::DistributedInsertSelect;
use common_sql::executor::PhysicalPlan;
use common_sql::executor::PhysicalPlanBuilder;
use common_sql::plans::Insert;
use common_sql::plans::InsertInputSource;
use common_sql::plans::Plan;
use common_sql::BindContext;
use common_sql::Metadata;
use common_sql::MetadataRef;
use common_sql::NameResolutionContext;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use parking_lot::RwLock;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::Interpreter;
Expand All @@ -56,15 +37,11 @@ use crate::pipelines::builders::build_append2table_with_commit_pipeline;
use crate::pipelines::processors::transforms::TransformRuntimeCastSchema;
use crate::pipelines::PipelineBuildResult;
use crate::pipelines::SourcePipeBuilder;
use crate::pipelines::ValueSource;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;

// Pre-generate the positions of `(`, `'` and `\`
static PATTERNS: &[&str] = &["(", "'", "\\"];

static INSERT_TOKEN_FINDER: Lazy<AhoCorasick> = Lazy::new(|| AhoCorasick::new(PATTERNS).unwrap());

pub struct InsertInterpreter {
ctx: Arc<QueryContext>,
plan: Insert,
Expand Down Expand Up @@ -294,261 +271,3 @@ impl Interpreter for InsertInterpreter {
Ok(())
}
}

pub struct ValueSource {
data: String,
ctx: Arc<dyn TableContext>,
name_resolution_ctx: NameResolutionContext,
bind_context: BindContext,
schema: DataSchemaRef,
metadata: MetadataRef,
is_finished: bool,
}

#[async_trait::async_trait]
impl AsyncSource for ValueSource {
const NAME: &'static str = "ValueSource";
const SKIP_EMPTY_DATA_BLOCK: bool = true;

#[async_trait::unboxed_simple]
#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
if self.is_finished {
return Ok(None);
}

// Use the number of '(' to estimate the number of rows
let mut estimated_rows = 0;
let mut positions = VecDeque::new();
for mat in INSERT_TOKEN_FINDER.find_iter(&self.data) {
if mat.pattern() == 0.into() {
estimated_rows += 1;
continue;
}
positions.push_back(mat.start());
}

let mut reader = Cursor::new(self.data.as_bytes());
let block = self
.read(estimated_rows, &mut reader, &mut positions)
.await?;
self.is_finished = true;
Ok(Some(block))
}
}

impl ValueSource {
pub fn new(
data: String,
ctx: Arc<dyn TableContext>,
name_resolution_ctx: NameResolutionContext,
schema: DataSchemaRef,
) -> Self {
let bind_context = BindContext::new();
let metadata = Arc::new(RwLock::new(Metadata::default()));

Self {
data,
ctx,
name_resolution_ctx,
schema,
bind_context,
metadata,
is_finished: false,
}
}

#[async_backtrace::framed]
pub async fn read<R: AsRef<[u8]>>(
&self,
estimated_rows: usize,
reader: &mut Cursor<R>,
positions: &mut VecDeque<usize>,
) -> Result<DataBlock> {
let mut columns = self
.schema
.fields()
.iter()
.map(|f| ColumnBuilder::with_capacity(f.data_type(), estimated_rows))
.collect::<Vec<_>>();

let mut bind_context = self.bind_context.clone();

let format = self.ctx.get_format_settings()?;
let field_decoder = FastFieldDecoderValues::create_for_insert(format);

for row in 0.. {
let _ = reader.ignore_white_spaces();
if reader.eof() {
break;
}
// Not the first row
if row != 0 {
reader.must_ignore_byte(b',')?;
}

self.parse_next_row(
&field_decoder,
reader,
&mut columns,
positions,
&mut bind_context,
self.metadata.clone(),
)
.await?;
}

let columns = columns
.into_iter()
.map(|col| col.build())
.collect::<Vec<_>>();
Ok(DataBlock::new_from_columns(columns))
}

/// Parse single row value, like ('111', 222, 1 + 1)
#[async_backtrace::framed]
async fn parse_next_row<R: AsRef<[u8]>>(
&self,
field_decoder: &FastFieldDecoderValues,
reader: &mut Cursor<R>,
columns: &mut [ColumnBuilder],
positions: &mut VecDeque<usize>,
bind_context: &mut BindContext,
metadata: MetadataRef,
) -> Result<()> {
let _ = reader.ignore_white_spaces();
let col_size = columns.len();
let start_pos_of_row = reader.checkpoint();

// Start of the row --- '('
if !reader.ignore_byte(b'(') {
return Err(ErrorCode::BadDataValueType(
"Must start with parentheses".to_string(),
));
}
// Ignore the positions in the previous row.
while let Some(pos) = positions.front() {
if *pos < start_pos_of_row as usize {
positions.pop_front();
} else {
break;
}
}

for col_idx in 0..col_size {
let _ = reader.ignore_white_spaces();
let col_end = if col_idx + 1 == col_size { b')' } else { b',' };

let col = columns
.get_mut(col_idx)
.ok_or_else(|| ErrorCode::Internal("ColumnBuilder is None"))?;

let (need_fallback, pop_count) = field_decoder
.read_field(col, reader, positions)
.map(|_| {
let _ = reader.ignore_white_spaces();
let need_fallback = reader.ignore_byte(col_end).not();
(need_fallback, col_idx + 1)
})
.unwrap_or((true, col_idx));

// ColumnBuilder and expr-parser both will eat the end ')' of the row.
if need_fallback {
for col in columns.iter_mut().take(pop_count) {
col.pop();
}
// rollback to start position of the row
reader.rollback(start_pos_of_row + 1);
skip_to_next_row(reader, 1)?;
let end_pos_of_row = reader.position();

// Parse from expression and append all columns.
reader.set_position(start_pos_of_row);
let row_len = end_pos_of_row - start_pos_of_row;
let buf = &reader.remaining_slice()[..row_len as usize];

let sql = std::str::from_utf8(buf).unwrap();
let settings = self.ctx.get_settings();
let sql_dialect = settings.get_sql_dialect()?;
let tokens = tokenize_sql(sql)?;
let exprs = parse_comma_separated_exprs(&tokens[1..tokens.len()], sql_dialect)?;

let values = bind_context
.exprs_to_scalar(
exprs,
&self.schema,
self.ctx.clone(),
&self.name_resolution_ctx,
metadata,
)
.await?;

for (col, scalar) in columns.iter_mut().zip(values) {
col.push(scalar.as_ref());
}
reader.set_position(end_pos_of_row);
return Ok(());
}
}

Ok(())
}
}

// Values |(xxx), (yyy), (zzz)
pub fn skip_to_next_row<R: AsRef<[u8]>>(reader: &mut Cursor<R>, mut balance: i32) -> Result<()> {
let _ = reader.ignore_white_spaces();

let mut quoted = false;
let mut escaped = false;

while balance > 0 {
let buffer = reader.remaining_slice();
if buffer.is_empty() {
break;
}

let size = buffer.len();

let it = buffer
.iter()
.position(|&c| c == b'(' || c == b')' || c == b'\\' || c == b'\'');

if let Some(it) = it {
let c = buffer[it];
reader.consume(it + 1);

if it == 0 && escaped {
escaped = false;
continue;
}
escaped = false;

match c {
b'\\' => {
escaped = true;
continue;
}
b'\'' => {
quoted ^= true;
continue;
}
b')' => {
if !quoted {
balance -= 1;
}
}
b'(' => {
if !quoted {
balance += 1;
}
}
_ => {}
}
} else {
escaped = false;
reader.consume(size);
}
}
Ok(())
}
Loading

1 comment on commit 93ad5f7

@vercel
Copy link

@vercel vercel bot commented on 93ad5f7 Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.vercel.app
databend.rs

Please sign in to comment.