Skip to content

Commit

Permalink
fix: insert Plan should not have schema.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Apr 24, 2024
1 parent 3d18e3e commit f307e30
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 90 deletions.
14 changes: 7 additions & 7 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Interpreter for InsertInterpreter {
InsertInputSource::Values(InsertValue::Values { rows }) => {
build_res.main_pipeline.add_source(
|output| {
let inner = ValueSource::new(rows.clone(), self.plan.schema());
let inner = ValueSource::new(rows.clone(), self.plan.dest_schema());
AsyncSourcer::create(self.ctx.clone(), output, inner)
},
1,
Expand All @@ -131,7 +131,7 @@ impl Interpreter for InsertInterpreter {
data.to_string(),
self.ctx.clone(),
name_resolution_ctx,
self.plan.schema(),
self.plan.dest_schema(),
*start,
);
AsyncSourcer::create(self.ctx.clone(), output, inner)
Expand All @@ -147,7 +147,7 @@ impl Interpreter for InsertInterpreter {

match StageFileFormatType::from_str(format) {
Ok(f) if f.has_inner_schema() => {
let dest_schema = self.plan.schema();
let dest_schema = self.plan.dest_schema();
let func_ctx = self.ctx.get_function_context()?;

build_res.main_pipeline.add_transform(
Expand Down Expand Up @@ -175,7 +175,7 @@ impl Interpreter for InsertInterpreter {
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;

if format.get_type().has_inner_schema() {
let dest_schema = self.plan.schema();
let dest_schema = self.plan.dest_schema();
let func_ctx = self.ctx.get_function_context()?;

build_res.main_pipeline.add_transform(
Expand Down Expand Up @@ -230,7 +230,7 @@ impl Interpreter for InsertInterpreter {
table_info: table1.get_table_info().clone(),
select_schema: plan.schema(),
select_column_bindings,
insert_schema: self.plan.schema(),
insert_schema: self.plan.dest_schema(),
cast_needed: self.check_schema_cast(plan)?,
},
)));
Expand All @@ -247,7 +247,7 @@ impl Interpreter for InsertInterpreter {
table_info: table1.get_table_info().clone(),
select_schema: plan.schema(),
select_column_bindings,
insert_schema: self.plan.schema(),
insert_schema: self.plan.dest_schema(),
cast_needed: self.check_schema_cast(plan)?,
}))
}
Expand Down Expand Up @@ -294,7 +294,7 @@ impl Interpreter for InsertInterpreter {
self.ctx.clone(),
&mut build_res.main_pipeline,
table.clone(),
self.plan.schema(),
self.plan.dest_schema(),
None,
vec![],
self.plan.overwrite,
Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,10 @@ pub async fn clickhouse_handler_post(
.await
.map_err(BadRequest)?;

let schema = plan.schema();
let mut handle = None;
let output_schema = plan.schema();
if let Plan::Insert(insert) = &mut plan {
let dest_schema = insert.dest_schema();
if let InsertInputSource::StreamingWithFormat(format, start, input_context_ref) =
&mut insert.source
{
Expand All @@ -368,7 +369,7 @@ pub async fn clickhouse_handler_post(
.await
.map_err(InternalServerError)?;

let table_schema = infer_table_schema(&schema)
let table_schema = infer_table_schema(&dest_schema)
.map_err(|err| err.display_with_sql(&sql))
.map_err(InternalServerError)?;
let input_context = Arc::new(
Expand Down Expand Up @@ -419,7 +420,7 @@ pub async fn clickhouse_handler_post(
.await
.map_err(InternalServerError)?;

let table_schema = infer_table_schema(&schema)
let table_schema = infer_table_schema(&dest_schema)
.map_err(|err| err.display_with_sql(&sql))
.map_err(InternalServerError)?;
let input_context = Arc::new(
Expand Down Expand Up @@ -468,7 +469,7 @@ pub async fn clickhouse_handler_post(
.map_err(|err| err.display_with_sql(&sql))
.map_err(BadRequest)?;

execute(ctx, interpreter, schema, format, params, handle)
execute(ctx, interpreter, output_schema, format, params, handle)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(InternalServerError)
Expand Down
156 changes: 79 additions & 77 deletions src/query/service/src/servers/http/v1/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,89 +141,91 @@ pub async fn streaming_load(
.await
.map_err(InternalServerError)?;

let schema = plan.schema();
match &mut plan {
Plan::Insert(insert) => match &mut insert.source {
InsertInputSource::StreamingWithFileFormat {
format,
on_error_mode,
start,
input_context_option,
} => {
let sql_rest = &insert_sql[*start..].trim();
if !sql_rest.is_empty() {
return Err(poem::Error::from_string(
"should NOT have data after `FILE_FORMAT` in streaming load.",
StatusCode::BAD_REQUEST,
));
};
let to_table = context
.get_table(&insert.catalog, &insert.database, &insert.table)
.await
.map_err(|err| err.display_with_sql(insert_sql))
.map_err(InternalServerError)?;
let (tx, rx) = tokio::sync::mpsc::channel(2);
Plan::Insert(insert) => {
let schema = insert.dest_schema();
match &mut insert.source {
InsertInputSource::StreamingWithFileFormat {
format,
on_error_mode,
start,
input_context_option,
} => {
let sql_rest = &insert_sql[*start..].trim();
if !sql_rest.is_empty() {
return Err(poem::Error::from_string(
"should NOT have data after `FILE_FORMAT` in streaming load.",
StatusCode::BAD_REQUEST,
));
};
let to_table = context
.get_table(&insert.catalog, &insert.database, &insert.table)
.await
.map_err(|err| err.display_with_sql(insert_sql))
.map_err(InternalServerError)?;
let (tx, rx) = tokio::sync::mpsc::channel(2);

let table_schema = infer_table_schema(&schema)
.map_err(|err| err.display_with_sql(insert_sql))
.map_err(InternalServerError)?;
let input_context = Arc::new(
InputContext::try_create_from_insert_file_format(
context.clone(),
rx,
context.get_settings(),
format.clone(),
table_schema,
context.get_scan_progress(),
false,
to_table.get_block_thresholds(),
on_error_mode.clone(),
)
.await
.map_err(|err| err.display_with_sql(insert_sql))
.map_err(InternalServerError)?,
);
*input_context_option = Some(input_context.clone());
info!("streaming load with file_format {:?}", input_context);
let table_schema = infer_table_schema(&schema)
.map_err(|err| err.display_with_sql(insert_sql))
.map_err(InternalServerError)?;
let input_context = Arc::new(
InputContext::try_create_from_insert_file_format(
context.clone(),
rx,
context.get_settings(),
format.clone(),
table_schema,
context.get_scan_progress(),
false,
to_table.get_block_thresholds(),
on_error_mode.clone(),
)
.await
.map_err(|err| err.display_with_sql(insert_sql))
.map_err(InternalServerError)?,
);
*input_context_option = Some(input_context.clone());
info!("streaming load with file_format {:?}", input_context);

let query_id = context.get_id();
let handler = context.spawn(query_id, execute_query(context.clone(), plan));
let files = read_multi_part(multipart, tx, &input_context).await?;
let query_id = context.get_id();
let handler = context.spawn(query_id, execute_query(context.clone(), plan));
let files = read_multi_part(multipart, tx, &input_context).await?;

match handler.await {
Ok(Ok(_)) => Ok(Json(LoadResponse {
error: None,
state: "SUCCESS".to_string(),
id: uuid::Uuid::new_v4().to_string(),
stats: context.get_scan_progress_value(),
files,
})),
Ok(Err(cause)) => Err(poem::Error::from_string(
format!(
"execute fail: {}",
cause.display_with_sql(insert_sql).message()
),
StatusCode::BAD_REQUEST,
)),
Err(_) => Err(poem::Error::from_string(
"Maybe panic.",
StatusCode::INTERNAL_SERVER_ERROR,
)),
match handler.await {
Ok(Ok(_)) => Ok(Json(LoadResponse {
error: None,
state: "SUCCESS".to_string(),
id: uuid::Uuid::new_v4().to_string(),
stats: context.get_scan_progress_value(),
files,
})),
Ok(Err(cause)) => Err(poem::Error::from_string(
format!(
"execute fail: {}",
cause.display_with_sql(insert_sql).message()
),
StatusCode::BAD_REQUEST,
)),
Err(_) => Err(poem::Error::from_string(
"Maybe panic.",
StatusCode::INTERNAL_SERVER_ERROR,
)),
}
}
}
InsertInputSource::StreamingWithFormat(_, _, _) => Err(poem::Error::from_string(
"'INSERT INTO $table FORMAT <type> is now only supported in clickhouse handler,\
InsertInputSource::StreamingWithFormat(_, _, _) => Err(poem::Error::from_string(
"'INSERT INTO $table FORMAT <type> is now only supported in clickhouse handler,\
please use 'FILE_FORMAT = (type = <type> ...)' instead.",
StatusCode::BAD_REQUEST,
)),
_non_supported_source => Err(poem::Error::from_string(
format!(
"streaming upload only support 'INSERT INTO $table FILE_FORMAT = (type = <type> ...)' got {}.",
plan
),
StatusCode::BAD_REQUEST,
)),
},
StatusCode::BAD_REQUEST,
)),
_non_supported_source => Err(poem::Error::from_string(
format!(
"streaming upload only support 'INSERT INTO $table FILE_FORMAT = (type = <type> ...)' got {}.",
plan
),
StatusCode::BAD_REQUEST,
)),
}
}
non_insert_plan => Err(poem::Error::from_string(
format!(
"Only supports INSERT statement in streaming load, but got {}",
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/plans/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl PartialEq for Insert {
}

impl Insert {
pub fn schema(&self) -> DataSchemaRef {
pub fn dest_schema(&self) -> DataSchemaRef {
Arc::new(self.schema.clone().into())
}

Expand Down
1 change: 0 additions & 1 deletion src/query/sql/src/planner/plans/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ impl Plan {
Plan::ShowRoles(plan) => plan.schema(),
Plan::ShowGrants(plan) => plan.schema(),
Plan::ShowFileFormats(plan) => plan.schema(),
Plan::Insert(plan) => plan.schema(),
Plan::Replace(plan) => plan.schema(),
Plan::Presign(plan) => plan.schema(),
Plan::ShowShareEndpoint(plan) => plan.schema(),
Expand Down

0 comments on commit f307e30

Please sign in to comment.