Skip to content

Commit

Permalink
replace into values
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyFan2002 committed Jul 22, 2023
1 parent a5782f5 commit 02ef7ff
Show file tree
Hide file tree
Showing 17 changed files with 744 additions and 571 deletions.
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl CopyInterpreter {
}

#[async_backtrace::framed]
async fn build_physical_plan(
pub async fn build_physical_plan(
&self,
plan: &CopyIntoTablePlan,
) -> Result<(PhysicalPlan, Vec<StageFileInfo>)> {
Expand Down
283 changes: 1 addition & 282 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::io::BufRead;
use std::io::Cursor;
use std::ops::Not;
use std::str::FromStr;
use std::sync::Arc;

use aho_corasick::AhoCorasick;
use common_ast::parser::parse_comma_separated_exprs;
use common_ast::parser::tokenize_sql;
use common_catalog::table::AppendMode;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::ColumnBuilder;
use common_expression::DataBlock;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_formats::FastFieldDecoderValues;
use common_io::cursor_ext::ReadBytesExt;
use common_io::cursor_ext::ReadCheckPointExt;
use common_meta_app::principal::StageFileFormatType;
use common_pipeline_sources::AsyncSource;
use common_pipeline_sources::AsyncSourcer;
use common_sql::executor::DistributedInsertSelect;
use common_sql::executor::PhysicalPlan;
use common_sql::executor::PhysicalPlanBuilder;
use common_sql::plans::Insert;
use common_sql::plans::InsertInputSource;
use common_sql::plans::Plan;
use common_sql::BindContext;
use common_sql::Metadata;
use common_sql::MetadataRef;
use common_sql::NameResolutionContext;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use parking_lot::RwLock;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::Interpreter;
Expand All @@ -56,15 +37,11 @@ use crate::pipelines::builders::build_append2table_with_commit_pipeline;
use crate::pipelines::processors::transforms::TransformRuntimeCastSchema;
use crate::pipelines::PipelineBuildResult;
use crate::pipelines::SourcePipeBuilder;
use crate::pipelines::ValueSource;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;

// Pre-generate the positions of `(`, `'` and `\`
static PATTERNS: &[&str] = &["(", "'", "\\"];

static INSERT_TOKEN_FINDER: Lazy<AhoCorasick> = Lazy::new(|| AhoCorasick::new(PATTERNS).unwrap());

pub struct InsertInterpreter {
ctx: Arc<QueryContext>,
plan: Insert,
Expand Down Expand Up @@ -289,261 +266,3 @@ impl Interpreter for InsertInterpreter {
Ok(())
}
}

pub struct ValueSource {
data: String,
ctx: Arc<dyn TableContext>,
name_resolution_ctx: NameResolutionContext,
bind_context: BindContext,
schema: DataSchemaRef,
metadata: MetadataRef,
is_finished: bool,
}

#[async_trait::async_trait]
impl AsyncSource for ValueSource {
const NAME: &'static str = "ValueSource";
const SKIP_EMPTY_DATA_BLOCK: bool = true;

#[async_trait::unboxed_simple]
#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
if self.is_finished {
return Ok(None);
}

// Use the number of '(' to estimate the number of rows
let mut estimated_rows = 0;
let mut positions = VecDeque::new();
for mat in INSERT_TOKEN_FINDER.find_iter(&self.data) {
if mat.pattern() == 0.into() {
estimated_rows += 1;
continue;
}
positions.push_back(mat.start());
}

let mut reader = Cursor::new(self.data.as_bytes());
let block = self
.read(estimated_rows, &mut reader, &mut positions)
.await?;
self.is_finished = true;
Ok(Some(block))
}
}

impl ValueSource {
pub fn new(
data: String,
ctx: Arc<dyn TableContext>,
name_resolution_ctx: NameResolutionContext,
schema: DataSchemaRef,
) -> Self {
let bind_context = BindContext::new();
let metadata = Arc::new(RwLock::new(Metadata::default()));

Self {
data,
ctx,
name_resolution_ctx,
schema,
bind_context,
metadata,
is_finished: false,
}
}

#[async_backtrace::framed]
pub async fn read<R: AsRef<[u8]>>(
&self,
estimated_rows: usize,
reader: &mut Cursor<R>,
positions: &mut VecDeque<usize>,
) -> Result<DataBlock> {
let mut columns = self
.schema
.fields()
.iter()
.map(|f| ColumnBuilder::with_capacity(f.data_type(), estimated_rows))
.collect::<Vec<_>>();

let mut bind_context = self.bind_context.clone();

let format = self.ctx.get_format_settings()?;
let field_decoder = FastFieldDecoderValues::create_for_insert(format);

for row in 0.. {
let _ = reader.ignore_white_spaces();
if reader.eof() {
break;
}
// Not the first row
if row != 0 {
reader.must_ignore_byte(b',')?;
}

self.parse_next_row(
&field_decoder,
reader,
&mut columns,
positions,
&mut bind_context,
self.metadata.clone(),
)
.await?;
}

let columns = columns
.into_iter()
.map(|col| col.build())
.collect::<Vec<_>>();
Ok(DataBlock::new_from_columns(columns))
}

/// Parse single row value, like ('111', 222, 1 + 1)
#[async_backtrace::framed]
async fn parse_next_row<R: AsRef<[u8]>>(
&self,
field_decoder: &FastFieldDecoderValues,
reader: &mut Cursor<R>,
columns: &mut [ColumnBuilder],
positions: &mut VecDeque<usize>,
bind_context: &mut BindContext,
metadata: MetadataRef,
) -> Result<()> {
let _ = reader.ignore_white_spaces();
let col_size = columns.len();
let start_pos_of_row = reader.checkpoint();

// Start of the row --- '('
if !reader.ignore_byte(b'(') {
return Err(ErrorCode::BadDataValueType(
"Must start with parentheses".to_string(),
));
}
// Ignore the positions in the previous row.
while let Some(pos) = positions.front() {
if *pos < start_pos_of_row as usize {
positions.pop_front();
} else {
break;
}
}

for col_idx in 0..col_size {
let _ = reader.ignore_white_spaces();
let col_end = if col_idx + 1 == col_size { b')' } else { b',' };

let col = columns
.get_mut(col_idx)
.ok_or_else(|| ErrorCode::Internal("ColumnBuilder is None"))?;

let (need_fallback, pop_count) = field_decoder
.read_field(col, reader, positions)
.map(|_| {
let _ = reader.ignore_white_spaces();
let need_fallback = reader.ignore_byte(col_end).not();
(need_fallback, col_idx + 1)
})
.unwrap_or((true, col_idx));

// ColumnBuilder and expr-parser both will eat the end ')' of the row.
if need_fallback {
for col in columns.iter_mut().take(pop_count) {
col.pop();
}
// rollback to start position of the row
reader.rollback(start_pos_of_row + 1);
skip_to_next_row(reader, 1)?;
let end_pos_of_row = reader.position();

// Parse from expression and append all columns.
reader.set_position(start_pos_of_row);
let row_len = end_pos_of_row - start_pos_of_row;
let buf = &reader.remaining_slice()[..row_len as usize];

let sql = std::str::from_utf8(buf).unwrap();
let settings = self.ctx.get_settings();
let sql_dialect = settings.get_sql_dialect()?;
let tokens = tokenize_sql(sql)?;
let exprs = parse_comma_separated_exprs(&tokens[1..tokens.len()], sql_dialect)?;

let values = bind_context
.exprs_to_scalar(
exprs,
&self.schema,
self.ctx.clone(),
&self.name_resolution_ctx,
metadata,
)
.await?;

for (col, scalar) in columns.iter_mut().zip(values) {
col.push(scalar.as_ref());
}
reader.set_position(end_pos_of_row);
return Ok(());
}
}

Ok(())
}
}

// Values |(xxx), (yyy), (zzz)
pub fn skip_to_next_row<R: AsRef<[u8]>>(reader: &mut Cursor<R>, mut balance: i32) -> Result<()> {
let _ = reader.ignore_white_spaces();

let mut quoted = false;
let mut escaped = false;

while balance > 0 {
let buffer = reader.remaining_slice();
if buffer.is_empty() {
break;
}

let size = buffer.len();

let it = buffer
.iter()
.position(|&c| c == b'(' || c == b')' || c == b'\\' || c == b'\'');

if let Some(it) = it {
let c = buffer[it];
reader.consume(it + 1);

if it == 0 && escaped {
escaped = false;
continue;
}
escaped = false;

match c {
b'\\' => {
escaped = true;
continue;
}
b'\'' => {
quoted ^= true;
continue;
}
b')' => {
if !quoted {
balance -= 1;
}
}
b'(' => {
if !quoted {
balance += 1;
}
}
_ => {}
}
} else {
escaped = false;
reader.consume(size);
}
}
Ok(())
}
Loading

0 comments on commit 02ef7ff

Please sign in to comment.