Skip to content

Commit

Permalink
Merge pull request #3253 from GrapeBaBa/issue_2900
Browse files Browse the repository at this point in the history
[Feature]Support create stage statement
  • Loading branch information
BohuTANG authored Dec 6, 2021
2 parents 96f953c + 1840c52 commit 160b73a
Show file tree
Hide file tree
Showing 22 changed files with 718 additions and 11 deletions.
7 changes: 7 additions & 0 deletions common/management/tests/it/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use common_exception::Result;
use common_management::*;
use common_meta_api::KVApi;
use common_meta_embedded::MetaEmbedded;
use common_meta_types::Credentials;
use common_meta_types::SeqV;
use common_meta_types::StageParams;
use common_meta_types::UserStageInfo;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -107,6 +109,11 @@ async fn test_unknown_stage_drop_stage() -> Result<()> {
fn create_test_stage_info() -> UserStageInfo {
UserStageInfo {
stage_name: "mystage".to_string(),
stage_params: StageParams::new("test", Credentials::S3 {
access_key_id: String::from("test"),
secret_access_key: String::from("test"),
}),
file_format: None,
comments: "".to_string(),
}
}
Expand Down
4 changes: 4 additions & 0 deletions common/meta/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,8 @@ pub use user_info::UserInfo;
pub use user_privilege::UserPrivilege;
pub use user_privilege::UserPrivilegeType;
pub use user_quota::UserQuota;
pub use user_stage::Compression;
pub use user_stage::Credentials;
pub use user_stage::FileFormat;
pub use user_stage::StageParams;
pub use user_stage::UserStageInfo;
83 changes: 81 additions & 2 deletions common/meta/types/src/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,103 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;

use common_exception::ErrorCode;
use common_exception::Result;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct StageParams {
pub url: String,
pub credentials: Credentials,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub enum Credentials {
S3 {
access_key_id: String,
secret_access_key: String,
},
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub enum FileFormat {
Csv {
compression: Compression,
record_delimiter: String,
},
Parquet {
compression: Compression,
},
Json,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub enum Compression {
Auto,
Gzip,
Bz2,
Brotli,
Zstd,
Deflate,
RawDeflate,
Lzo,
Snappy,
None,
}

impl FromStr for Compression {
type Err = &'static str;

fn from_str(s: &str) -> std::result::Result<Compression, &'static str> {
let s = s.to_uppercase();
match s.as_str() {
"AUTO" => Ok(Compression::Auto),
"GZIP" => Ok(Compression::Gzip),
"BZ2" => Ok(Compression::Bz2),
"BROTLI" => Ok(Compression::Brotli),
"ZSTD" => Ok(Compression::Zstd),
"DEFLATE" => Ok(Compression::Deflate),
"RAW_DEFLATE" => Ok(Compression::RawDeflate),
"NONE" => Ok(Compression::None),
_ => Err("no match for compression"),
}
}
}

impl StageParams {
pub fn new(url: &str, credentials: Credentials) -> Self {
StageParams {
url: url.to_string(),
credentials,
}
}
}
/// Stage for data stage location.
/// Need to add more fields by need.
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct UserStageInfo {
#[serde(default)]
pub stage_name: String,

pub stage_params: StageParams,
#[serde(default)]
pub file_format: Option<FileFormat>,
#[serde(default)]
pub comments: String,
}

impl UserStageInfo {
pub fn new(stage_name: &str, comments: &str) -> Self {
pub fn new(
stage_name: &str,
comments: &str,
stage_params: StageParams,
file_format: Option<FileFormat>,
) -> Self {
UserStageInfo {
stage_name: stage_name.to_string(),
comments: comments.to_string(),
stage_params,
file_format,
}
}
}
Expand Down
16 changes: 15 additions & 1 deletion common/meta/types/tests/it/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,25 @@
// limitations under the License.

use common_exception::exception::Result;
use common_meta_types::Compression;
use common_meta_types::Credentials;
use common_meta_types::FileFormat;
use common_meta_types::StageParams;
use common_meta_types::UserStageInfo;

#[test]
fn test_user_stage() -> Result<()> {
let stage = UserStageInfo::new("databend", "this is a comment");
let stage = UserStageInfo::new(
"databend",
"this is a comment",
StageParams::new("test", Credentials::S3 {
access_key_id: "test".to_string(),
secret_access_key: "test".to_string(),
}),
Some(FileFormat::Parquet {
compression: Compression::None,
}),
);
let ser = serde_json::to_string(&stage)?;

let de = UserStageInfo::try_from(ser.into_bytes())?;
Expand Down
2 changes: 2 additions & 0 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ mod plan_use_database;
mod plan_user_alter;
mod plan_user_create;
mod plan_user_drop;
mod plan_user_stage_create;
mod plan_visitor;

pub use plan_aggregator_final::AggregatorFinalPlan;
Expand Down Expand Up @@ -146,4 +147,5 @@ pub use plan_use_database::UseDatabasePlan;
pub use plan_user_alter::AlterUserPlan;
pub use plan_user_create::CreateUserPlan;
pub use plan_user_drop::DropUserPlan;
pub use plan_user_stage_create::CreateUserStagePlan;
pub use plan_visitor::PlanVisitor;
4 changes: 4 additions & 0 deletions common/planners/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_datavalues::DataSchemaRef;

use crate::plan_broadcast::BroadcastPlan;
use crate::plan_subqueries_set::SubQueriesSetPlan;
use crate::plan_user_stage_create::CreateUserStagePlan;
use crate::AggregatorFinalPlan;
use crate::AggregatorPartialPlan;
use crate::AlterUserPlan;
Expand Down Expand Up @@ -90,6 +91,7 @@ pub enum PlanNode {
DropUser(DropUserPlan),
GrantPrivilege(GrantPrivilegePlan),
RevokePrivilege(RevokePrivilegePlan),
CreateUserStage(CreateUserStagePlan),
}

impl PlanNode {
Expand Down Expand Up @@ -131,6 +133,7 @@ impl PlanNode {
PlanNode::RevokePrivilege(v) => v.schema(),
PlanNode::Sink(v) => v.schema(),
PlanNode::Copy(v) => v.schema(),
PlanNode::CreateUserStage(v) => v.schema(),
}
}

Expand Down Expand Up @@ -171,6 +174,7 @@ impl PlanNode {
PlanNode::RevokePrivilege(_) => "RevokePrivilegePlan",
PlanNode::Sink(_) => "SinkPlan",
PlanNode::Copy(_) => "CopyPlan",
PlanNode::CreateUserStage(_) => "CreateUserStagePlan",
}
}

Expand Down
6 changes: 6 additions & 0 deletions common/planners/src/plan_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::CopyPlan;
use crate::CreateDatabasePlan;
use crate::CreateTablePlan;
use crate::CreateUserPlan;
use crate::CreateUserStagePlan;
use crate::DescribeTablePlan;
use crate::DropDatabasePlan;
use crate::DropTablePlan;
Expand Down Expand Up @@ -117,6 +118,7 @@ pub trait PlanRewriter {
PlanNode::DropUser(plan) => self.drop_user(plan),
PlanNode::GrantPrivilege(plan) => self.grant_privilege(plan),
PlanNode::RevokePrivilege(plan) => self.revoke_privilege(plan),
PlanNode::CreateUserStage(plan) => self.rewrite_create_stage(plan),
PlanNode::Sink(plan) => self.rewrite_sink(plan),
}
}
Expand Down Expand Up @@ -376,6 +378,10 @@ pub trait PlanRewriter {
Ok(PlanNode::RevokePrivilege(plan.clone()))
}

fn rewrite_create_stage(&mut self, plan: &CreateUserStagePlan) -> Result<PlanNode> {
Ok(PlanNode::CreateUserStage(plan.clone()))
}

fn rewrite_sink(&mut self, plan: &SinkPlan) -> Result<PlanNode> {
Ok(PlanNode::Sink(plan.clone()))
}
Expand Down
31 changes: 31 additions & 0 deletions common/planners/src/plan_user_stage_create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_meta_types::UserStageInfo;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct CreateUserStagePlan {
pub if_not_exists: bool,
pub user_stage_info: UserStageInfo,
}

impl CreateUserStagePlan {
pub fn schema(&self) -> DataSchemaRef {
Arc::new(DataSchema::empty())
}
}
6 changes: 6 additions & 0 deletions common/planners/src/plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::CopyPlan;
use crate::CreateDatabasePlan;
use crate::CreateTablePlan;
use crate::CreateUserPlan;
use crate::CreateUserStagePlan;
use crate::DescribeTablePlan;
use crate::DropDatabasePlan;
use crate::DropTablePlan;
Expand Down Expand Up @@ -131,6 +132,7 @@ pub trait PlanVisitor {
PlanNode::GrantPrivilege(plan) => self.visit_grant_privilege(plan),
PlanNode::RevokePrivilege(plan) => self.visit_revoke_privilege(plan),
PlanNode::Sink(plan) => self.visit_append(plan),
PlanNode::CreateUserStage(plan) => self.visit_create_stage(plan),
}
}

Expand Down Expand Up @@ -308,4 +310,8 @@ pub trait PlanVisitor {
fn visit_append(&mut self, _: &SinkPlan) -> Result<()> {
Ok(())
}

fn visit_create_stage(&mut self, _: &CreateUserStagePlan) -> Result<()> {
Ok(())
}
}
2 changes: 2 additions & 0 deletions query/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_planners::PlanNode;

use crate::interpreters::AlterUserInterpreter;
use crate::interpreters::CopyInterpreter;
use crate::interpreters::CreatStageInterpreter;
use crate::interpreters::CreatUserInterpreter;
use crate::interpreters::CreateDatabaseInterpreter;
use crate::interpreters::CreateTableInterpreter;
Expand Down Expand Up @@ -66,6 +67,7 @@ impl InterpreterFactory {
PlanNode::GrantPrivilege(v) => GrantPrivilegeInterpreter::try_create(ctx_clone, v),
PlanNode::RevokePrivilege(v) => RevokePrivilegeInterpreter::try_create(ctx_clone, v),
PlanNode::Copy(v) => CopyInterpreter::try_create(ctx_clone, v),
PlanNode::CreateUserStage(v) => CreatStageInterpreter::try_create(ctx_clone, v),
_ => Result::Err(ErrorCode::UnknownTypeOfQuery(format!(
"Can't get the interpreter by plan:{}",
plan.name()
Expand Down
73 changes: 73 additions & 0 deletions query/src/interpreters/interpreter_stage_create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::Result;
use common_planners::CreateUserStagePlan;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use common_tracing::tracing;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::sessions::QueryContext;

#[derive(Debug)]
pub struct CreatStageInterpreter {
ctx: Arc<QueryContext>,
plan: CreateUserStagePlan,
}

impl CreatStageInterpreter {
pub fn try_create(ctx: Arc<QueryContext>, plan: CreateUserStagePlan) -> Result<InterpreterPtr> {
Ok(Arc::new(CreatStageInterpreter { ctx, plan }))
}
}

#[async_trait::async_trait]
impl Interpreter for CreatStageInterpreter {
fn name(&self) -> &str {
"CreatStageInterpreter"
}

#[tracing::instrument(level = "info", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))]
async fn execute(
&self,
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_stage = plan.user_stage_info;
let create_stage = user_mgr.add_stage(user_stage).await;
if plan.if_not_exists {
create_stage.or_else(|e| {
// StageAlreadyExists(4061)
if e.code() == 4061 {
Ok(u64::MIN)
} else {
Err(e)
}
})?;
} else {
create_stage?;
}

Ok(Box::pin(DataBlockStream::create(
self.plan.schema(),
None,
vec![],
)))
}
}
Loading

0 comments on commit 160b73a

Please sign in to comment.