Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: distributed execution of replace into statement #12119

Merged
merged 78 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
b564f78
refactor copy into
SkyFan2002 Jul 17, 2023
38b1843
fix panic
SkyFan2002 Jul 17, 2023
ef1c6e3
fix
SkyFan2002 Jul 17, 2023
6e47822
fix
SkyFan2002 Jul 17, 2023
aad1069
fix
SkyFan2002 Jul 17, 2023
89a74fa
make lint
SkyFan2002 Jul 17, 2023
a5782f5
fix logic error
SkyFan2002 Jul 18, 2023
02ef7ff
replace into values
SkyFan2002 Jul 22, 2023
0805f3c
fix
SkyFan2002 Jul 23, 2023
c010430
fix
SkyFan2002 Jul 25, 2023
ca6b203
fix render result
SkyFan2002 Jul 25, 2023
2bc78b9
fix schema cast
SkyFan2002 Jul 25, 2023
3379134
Merge remote-tracking branch 'upstream/main' into replace
SkyFan2002 Jul 25, 2023
48fb10e
temp
SkyFan2002 Jul 25, 2023
8406189
respect #12147
SkyFan2002 Jul 26, 2023
c812079
respect #12100
SkyFan2002 Jul 26, 2023
8c74402
make lint
SkyFan2002 Jul 26, 2023
477fdfd
respect #12130
SkyFan2002 Jul 26, 2023
4815cfb
Merge remote-tracking branch 'upstream/main' into replace
SkyFan2002 Jul 28, 2023
41f6361
fix merge
SkyFan2002 Jul 28, 2023
0d3ada3
add exchange
SkyFan2002 Jul 31, 2023
350eabb
Merge remote-tracking branch 'upstream/main' into replace
SkyFan2002 Jul 31, 2023
5b65dbe
fix conflict
SkyFan2002 Jul 31, 2023
5445e6b
fix schema cast
SkyFan2002 Jul 31, 2023
a1ec5fa
Merge remote-tracking branch 'upstream/main' into replace
SkyFan2002 Jul 31, 2023
0b340c7
fix conlfict
SkyFan2002 Jul 31, 2023
88726fd
fix
SkyFan2002 Jul 31, 2023
5f7f3c7
fix copy plan
SkyFan2002 Jul 31, 2023
fd3587e
clear log
SkyFan2002 Jul 31, 2023
c9db180
fix copy
SkyFan2002 Jul 31, 2023
84dd7c9
fix copy
SkyFan2002 Aug 1, 2023
0f0b058
run ci
SkyFan2002 Aug 1, 2023
43ecd8a
fix purge
SkyFan2002 Aug 1, 2023
56dc5e2
make lint
SkyFan2002 Aug 1, 2023
1cb294e
add exchange
SkyFan2002 Aug 2, 2023
ea68663
disable dist for value source
SkyFan2002 Aug 4, 2023
7c6da18
adjust exchange
SkyFan2002 Aug 4, 2023
03a6cc7
remove top exchange
SkyFan2002 Aug 4, 2023
154bbb7
adjust replace into
SkyFan2002 Aug 4, 2023
29d18ac
reshuffle
SkyFan2002 Aug 4, 2023
5ab41ab
fix
SkyFan2002 Aug 4, 2023
6db338e
fix reshuffle
SkyFan2002 Aug 4, 2023
866d475
move segment_partition_num
SkyFan2002 Aug 4, 2023
5a30ece
Merge remote-tracking branch 'origin/main' into dist-replace
dantengsky Aug 4, 2023
691ef8c
resolve conflicts
dantengsky Aug 4, 2023
efbfebc
Merge pull request #5 from dantengsky/dist-replace
SkyFan2002 Aug 4, 2023
93dd2bd
add need insert flag
SkyFan2002 Aug 5, 2023
5efb091
unbranched_replace_into_processor
SkyFan2002 Aug 5, 2023
b6bd9ef
merge only pipeline
SkyFan2002 Aug 5, 2023
fce6969
fix segment index
SkyFan2002 Aug 6, 2023
e08a920
Merge remote-tracking branch 'upstream/main' into replace
SkyFan2002 Aug 6, 2023
762902d
fix conflict
SkyFan2002 Aug 6, 2023
1462d2e
remove log
SkyFan2002 Aug 6, 2023
4d47ae3
fix empty table
SkyFan2002 Aug 6, 2023
5e99f76
fix stateful test
SkyFan2002 Aug 7, 2023
4d45942
fix stateful test
SkyFan2002 Aug 7, 2023
97f63ab
modify test
SkyFan2002 Aug 7, 2023
7aff967
fix typo
SkyFan2002 Aug 7, 2023
13b0de8
Merge branch 'main' into replace
JackTan25 Aug 7, 2023
760e93b
Merge branch 'main' into replace
SkyFan2002 Aug 7, 2023
9e99082
fix random source
SkyFan2002 Aug 7, 2023
ab06155
Merge branch 'main' into replace
SkyFan2002 Aug 7, 2023
baa7d6d
add setting
SkyFan2002 Aug 8, 2023
1227d4e
remove empty file
SkyFan2002 Aug 8, 2023
687a9c9
remove dead code
SkyFan2002 Aug 8, 2023
24ed0a7
add default setting
SkyFan2002 Aug 8, 2023
d784f0e
Update src/query/service/src/interpreters/interpreter_replace.rs
SkyFan2002 Aug 8, 2023
003b091
Update src/query/sql/src/executor/physical_plan_display.rs
SkyFan2002 Aug 8, 2023
1358ed0
Update src/query/storages/fuse/src/operations/replace_into/processors…
SkyFan2002 Aug 8, 2023
4ab93c4
Update src/query/sql/src/executor/physical_plan.rs
SkyFan2002 Aug 8, 2023
75e97ef
Update src/query/sql/src/executor/physical_plan_display.rs
SkyFan2002 Aug 8, 2023
835733e
rename struct
SkyFan2002 Aug 8, 2023
62e99e0
default 0
SkyFan2002 Aug 8, 2023
d5d3fac
regen golden file
SkyFan2002 Aug 9, 2023
d3bd729
set enable_distributed_replace_into = 1 in slt
SkyFan2002 Aug 9, 2023
288df86
Merge remote-tracking branch 'upstream/main' into replace
SkyFan2002 Aug 10, 2023
f545f7c
make lint
SkyFan2002 Aug 10, 2023
de3b7b4
Merge branch 'main' into replace
dantengsky Aug 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::Processor;

#[async_trait::async_trait]
pub trait AsyncAccumulatingTransform: Send {
const NAME: &'static str;
Expand Down
345 changes: 61 additions & 284 deletions src/query/service/src/interpreters/interpreter_copy.rs

Large diffs are not rendered by default.

18 changes: 11 additions & 7 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ use common_meta_app::schema::CatalogInfo;
use common_meta_app::schema::TableInfo;
use common_sql::binder::ColumnBindingBuilder;
use common_sql::executor::cast_expr_to_non_null_boolean;
use common_sql::executor::DeleteFinal;
use common_sql::executor::DeletePartial;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind;
use common_sql::executor::MutationAggregate;
use common_sql::executor::MutationKind;
use common_sql::executor::PhysicalPlan;
use common_sql::optimizer::CascadesOptimizer;
use common_sql::optimizer::HeuristicOptimizer;
Expand Down Expand Up @@ -296,12 +297,15 @@ impl DeleteInterpreter {
});
}

Ok(PhysicalPlan::DeleteFinal(Box::new(DeleteFinal {
input: Box::new(root),
snapshot,
table_info,
catalog_info,
})))
Ok(PhysicalPlan::MutationAggregate(Box::new(
MutationAggregate {
input: Box::new(root),
snapshot,
table_info,
catalog_info,
mutation_kind: MutationKind::Delete,
},
)))
}
}

Expand Down
283 changes: 1 addition & 282 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::io::BufRead;
use std::io::Cursor;
use std::ops::Not;
use std::str::FromStr;
use std::sync::Arc;

use aho_corasick::AhoCorasick;
use common_ast::parser::parse_comma_separated_exprs;
use common_ast::parser::tokenize_sql;
use common_catalog::table::AppendMode;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::ColumnBuilder;
use common_expression::DataBlock;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_formats::FastFieldDecoderValues;
use common_io::cursor_ext::ReadBytesExt;
use common_io::cursor_ext::ReadCheckPointExt;
use common_meta_app::principal::StageFileFormatType;
use common_pipeline_sources::AsyncSource;
use common_pipeline_sources::AsyncSourcer;
use common_sql::executor::DistributedInsertSelect;
use common_sql::executor::PhysicalPlan;
use common_sql::executor::PhysicalPlanBuilder;
use common_sql::plans::Insert;
use common_sql::plans::InsertInputSource;
use common_sql::plans::Plan;
use common_sql::BindContext;
use common_sql::Metadata;
use common_sql::MetadataRef;
use common_sql::NameResolutionContext;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use parking_lot::RwLock;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::Interpreter;
Expand All @@ -56,15 +37,11 @@ use crate::pipelines::builders::build_append2table_with_commit_pipeline;
use crate::pipelines::processors::transforms::TransformRuntimeCastSchema;
use crate::pipelines::PipelineBuildResult;
use crate::pipelines::SourcePipeBuilder;
use crate::pipelines::ValueSource;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;

// Pre-generate the positions of `(`, `'` and `\`
static PATTERNS: &[&str] = &["(", "'", "\\"];

static INSERT_TOKEN_FINDER: Lazy<AhoCorasick> = Lazy::new(|| AhoCorasick::new(PATTERNS).unwrap());

pub struct InsertInterpreter {
ctx: Arc<QueryContext>,
plan: Insert,
Expand Down Expand Up @@ -291,261 +268,3 @@ impl Interpreter for InsertInterpreter {
Ok(())
}
}

pub struct ValueSource {
data: String,
ctx: Arc<dyn TableContext>,
name_resolution_ctx: NameResolutionContext,
bind_context: BindContext,
schema: DataSchemaRef,
metadata: MetadataRef,
is_finished: bool,
}

#[async_trait::async_trait]
impl AsyncSource for ValueSource {
const NAME: &'static str = "ValueSource";
const SKIP_EMPTY_DATA_BLOCK: bool = true;

#[async_trait::unboxed_simple]
#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
if self.is_finished {
return Ok(None);
}

// Use the number of '(' to estimate the number of rows
let mut estimated_rows = 0;
let mut positions = VecDeque::new();
for mat in INSERT_TOKEN_FINDER.find_iter(&self.data) {
if mat.pattern() == 0.into() {
estimated_rows += 1;
continue;
}
positions.push_back(mat.start());
}

let mut reader = Cursor::new(self.data.as_bytes());
let block = self
.read(estimated_rows, &mut reader, &mut positions)
.await?;
self.is_finished = true;
Ok(Some(block))
}
}

impl ValueSource {
pub fn new(
data: String,
ctx: Arc<dyn TableContext>,
name_resolution_ctx: NameResolutionContext,
schema: DataSchemaRef,
) -> Self {
let bind_context = BindContext::new();
let metadata = Arc::new(RwLock::new(Metadata::default()));

Self {
data,
ctx,
name_resolution_ctx,
schema,
bind_context,
metadata,
is_finished: false,
}
}

#[async_backtrace::framed]
pub async fn read<R: AsRef<[u8]>>(
&self,
estimated_rows: usize,
reader: &mut Cursor<R>,
positions: &mut VecDeque<usize>,
) -> Result<DataBlock> {
let mut columns = self
.schema
.fields()
.iter()
.map(|f| ColumnBuilder::with_capacity(f.data_type(), estimated_rows))
.collect::<Vec<_>>();

let mut bind_context = self.bind_context.clone();

let format = self.ctx.get_format_settings()?;
let field_decoder = FastFieldDecoderValues::create_for_insert(format);

for row in 0.. {
let _ = reader.ignore_white_spaces();
if reader.eof() {
break;
}
// Not the first row
if row != 0 {
reader.must_ignore_byte(b',')?;
}

self.parse_next_row(
&field_decoder,
reader,
&mut columns,
positions,
&mut bind_context,
self.metadata.clone(),
)
.await?;
}

let columns = columns
.into_iter()
.map(|col| col.build())
.collect::<Vec<_>>();
Ok(DataBlock::new_from_columns(columns))
}

/// Parse single row value, like ('111', 222, 1 + 1)
#[async_backtrace::framed]
async fn parse_next_row<R: AsRef<[u8]>>(
&self,
field_decoder: &FastFieldDecoderValues,
reader: &mut Cursor<R>,
columns: &mut [ColumnBuilder],
positions: &mut VecDeque<usize>,
bind_context: &mut BindContext,
metadata: MetadataRef,
) -> Result<()> {
let _ = reader.ignore_white_spaces();
let col_size = columns.len();
let start_pos_of_row = reader.checkpoint();

// Start of the row --- '('
if !reader.ignore_byte(b'(') {
return Err(ErrorCode::BadDataValueType(
"Must start with parentheses".to_string(),
));
}
// Ignore the positions in the previous row.
while let Some(pos) = positions.front() {
if *pos < start_pos_of_row as usize {
positions.pop_front();
} else {
break;
}
}

for col_idx in 0..col_size {
let _ = reader.ignore_white_spaces();
let col_end = if col_idx + 1 == col_size { b')' } else { b',' };

let col = columns
.get_mut(col_idx)
.ok_or_else(|| ErrorCode::Internal("ColumnBuilder is None"))?;

let (need_fallback, pop_count) = field_decoder
.read_field(col, reader, positions)
.map(|_| {
let _ = reader.ignore_white_spaces();
let need_fallback = reader.ignore_byte(col_end).not();
(need_fallback, col_idx + 1)
})
.unwrap_or((true, col_idx));

// ColumnBuilder and expr-parser both will eat the end ')' of the row.
if need_fallback {
for col in columns.iter_mut().take(pop_count) {
col.pop();
}
// rollback to start position of the row
reader.rollback(start_pos_of_row + 1);
skip_to_next_row(reader, 1)?;
let end_pos_of_row = reader.position();

// Parse from expression and append all columns.
reader.set_position(start_pos_of_row);
let row_len = end_pos_of_row - start_pos_of_row;
let buf = &reader.remaining_slice()[..row_len as usize];

let sql = std::str::from_utf8(buf).unwrap();
let settings = self.ctx.get_settings();
let sql_dialect = settings.get_sql_dialect()?;
let tokens = tokenize_sql(sql)?;
let exprs = parse_comma_separated_exprs(&tokens[1..tokens.len()], sql_dialect)?;

let values = bind_context
.exprs_to_scalar(
exprs,
&self.schema,
self.ctx.clone(),
&self.name_resolution_ctx,
metadata,
)
.await?;

for (col, scalar) in columns.iter_mut().zip(values) {
col.push(scalar.as_ref());
}
reader.set_position(end_pos_of_row);
return Ok(());
}
}

Ok(())
}
}

// Values |(xxx), (yyy), (zzz)
pub fn skip_to_next_row<R: AsRef<[u8]>>(reader: &mut Cursor<R>, mut balance: i32) -> Result<()> {
let _ = reader.ignore_white_spaces();

let mut quoted = false;
let mut escaped = false;

while balance > 0 {
let buffer = reader.remaining_slice();
if buffer.is_empty() {
break;
}

let size = buffer.len();

let it = buffer
.iter()
.position(|&c| c == b'(' || c == b')' || c == b'\\' || c == b'\'');

if let Some(it) = it {
let c = buffer[it];
reader.consume(it + 1);

if it == 0 && escaped {
escaped = false;
continue;
}
escaped = false;

match c {
b'\\' => {
escaped = true;
continue;
}
b'\'' => {
quoted ^= true;
continue;
}
b')' => {
if !quoted {
balance -= 1;
}
}
b'(' => {
if !quoted {
balance += 1;
}
}
_ => {}
}
} else {
escaped = false;
reader.consume(size);
}
}
Ok(())
}
Loading