diff --git a/common/management/tests/it/stage.rs b/common/management/tests/it/stage.rs index ee375690f0ab8..c627853bd2616 100644 --- a/common/management/tests/it/stage.rs +++ b/common/management/tests/it/stage.rs @@ -20,7 +20,9 @@ use common_exception::Result; use common_management::*; use common_meta_api::KVApi; use common_meta_embedded::MetaEmbedded; +use common_meta_types::Credentials; use common_meta_types::SeqV; +use common_meta_types::StageParams; use common_meta_types::UserStageInfo; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -107,6 +109,11 @@ async fn test_unknown_stage_drop_stage() -> Result<()> { fn create_test_stage_info() -> UserStageInfo { UserStageInfo { stage_name: "mystage".to_string(), + stage_params: StageParams::new("test", Credentials::S3 { + access_key_id: String::from("test"), + secret_access_key: String::from("test"), + }), + file_format: None, comments: "".to_string(), } } diff --git a/common/meta/types/src/lib.rs b/common/meta/types/src/lib.rs index 329b9780ba015..d6480a3ed94d7 100644 --- a/common/meta/types/src/lib.rs +++ b/common/meta/types/src/lib.rs @@ -90,4 +90,8 @@ pub use user_info::UserInfo; pub use user_privilege::UserPrivilege; pub use user_privilege::UserPrivilegeType; pub use user_quota::UserQuota; +pub use user_stage::Compression; +pub use user_stage::Credentials; +pub use user_stage::FileFormat; +pub use user_stage::StageParams; pub use user_stage::UserStageInfo; diff --git a/common/meta/types/src/user_stage.rs b/common/meta/types/src/user_stage.rs index 65df312196e48..3d51b07dd3760 100644 --- a/common/meta/types/src/user_stage.rs +++ b/common/meta/types/src/user_stage.rs @@ -12,24 +12,103 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; + use common_exception::ErrorCode; use common_exception::Result; +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +pub struct StageParams { + pub url: String, + pub credentials: Credentials, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +pub enum Credentials { + S3 { + access_key_id: String, + secret_access_key: String, + }, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +pub enum FileFormat { + Csv { + compression: Compression, + record_delimiter: String, + }, + Parquet { + compression: Compression, + }, + Json, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +pub enum Compression { + Auto, + Gzip, + Bz2, + Brotli, + Zstd, + Deflate, + RawDeflate, + Lzo, + Snappy, + None, +} + +impl FromStr for Compression { + type Err = &'static str; + + fn from_str(s: &str) -> std::result::Result { + let s = s.to_uppercase(); + match s.as_str() { + "AUTO" => Ok(Compression::Auto), + "GZIP" => Ok(Compression::Gzip), + "BZ2" => Ok(Compression::Bz2), + "BROTLI" => Ok(Compression::Brotli), + "ZSTD" => Ok(Compression::Zstd), + "DEFLATE" => Ok(Compression::Deflate), + "RAW_DEFLATE" => Ok(Compression::RawDeflate), + "NONE" => Ok(Compression::None), + _ => Err("no match for compression"), + } + } +} + +impl StageParams { + pub fn new(url: &str, credentials: Credentials) -> Self { + StageParams { + url: url.to_string(), + credentials, + } + } +} /// Stage for data stage location. -/// Need to add more fields by need. #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] pub struct UserStageInfo { #[serde(default)] pub stage_name: String, + + pub stage_params: StageParams, + #[serde(default)] + pub file_format: Option, #[serde(default)] pub comments: String, } impl UserStageInfo { - pub fn new(stage_name: &str, comments: &str) -> Self { + pub fn new( + stage_name: &str, + comments: &str, + stage_params: StageParams, + file_format: Option, + ) -> Self { UserStageInfo { stage_name: stage_name.to_string(), comments: comments.to_string(), + stage_params, + file_format, } } } diff --git a/common/meta/types/tests/it/user_stage.rs b/common/meta/types/tests/it/user_stage.rs index 83c2bb456d575..af4883e2f486a 100644 --- a/common/meta/types/tests/it/user_stage.rs +++ b/common/meta/types/tests/it/user_stage.rs @@ -13,11 +13,25 @@ // limitations under the License. use common_exception::exception::Result; +use common_meta_types::Compression; +use common_meta_types::Credentials; +use common_meta_types::FileFormat; +use common_meta_types::StageParams; use common_meta_types::UserStageInfo; #[test] fn test_user_stage() -> Result<()> { - let stage = UserStageInfo::new("databend", "this is a comment"); + let stage = UserStageInfo::new( + "databend", + "this is a comment", + StageParams::new("test", Credentials::S3 { + access_key_id: "test".to_string(), + secret_access_key: "test".to_string(), + }), + Some(FileFormat::Parquet { + compression: Compression::None, + }), + ); let ser = serde_json::to_string(&stage)?; let de = UserStageInfo::try_from(ser.into_bytes())?; diff --git a/common/planners/src/lib.rs b/common/planners/src/lib.rs index 97f69b8be7591..b11928cd21463 100644 --- a/common/planners/src/lib.rs +++ b/common/planners/src/lib.rs @@ -65,6 +65,7 @@ mod plan_use_database; mod plan_user_alter; mod plan_user_create; mod plan_user_drop; +mod plan_user_stage_create; mod plan_visitor; pub use plan_aggregator_final::AggregatorFinalPlan; @@ -146,4 +147,5 @@ pub use plan_use_database::UseDatabasePlan; pub use plan_user_alter::AlterUserPlan; pub use plan_user_create::CreateUserPlan; pub use plan_user_drop::DropUserPlan; +pub use plan_user_stage_create::CreateUserStagePlan; pub use plan_visitor::PlanVisitor; diff --git a/common/planners/src/plan_node.rs b/common/planners/src/plan_node.rs index 2e16baae6f662..fe7e86519d43e 100644 --- a/common/planners/src/plan_node.rs +++ b/common/planners/src/plan_node.rs @@ -18,6 +18,7 @@ use common_datavalues::DataSchemaRef; use crate::plan_broadcast::BroadcastPlan; use crate::plan_subqueries_set::SubQueriesSetPlan; +use crate::plan_user_stage_create::CreateUserStagePlan; use crate::AggregatorFinalPlan; use crate::AggregatorPartialPlan; use crate::AlterUserPlan; @@ -90,6 +91,7 @@ pub enum PlanNode { DropUser(DropUserPlan), GrantPrivilege(GrantPrivilegePlan), RevokePrivilege(RevokePrivilegePlan), + CreateUserStage(CreateUserStagePlan), } impl PlanNode { @@ -131,6 +133,7 @@ impl PlanNode { PlanNode::RevokePrivilege(v) => v.schema(), PlanNode::Sink(v) => v.schema(), PlanNode::Copy(v) => v.schema(), + PlanNode::CreateUserStage(v) => v.schema(), } } @@ -171,6 +174,7 @@ impl PlanNode { PlanNode::RevokePrivilege(_) => "RevokePrivilegePlan", PlanNode::Sink(_) => "SinkPlan", PlanNode::Copy(_) => "CopyPlan", + PlanNode::CreateUserStage(_) => "CreateUserStagePlan", } } diff --git a/common/planners/src/plan_rewriter.rs b/common/planners/src/plan_rewriter.rs index 51cfb314fdb56..87d99fa6d7299 100644 --- a/common/planners/src/plan_rewriter.rs +++ b/common/planners/src/plan_rewriter.rs @@ -30,6 +30,7 @@ use crate::CopyPlan; use crate::CreateDatabasePlan; use crate::CreateTablePlan; use crate::CreateUserPlan; +use crate::CreateUserStagePlan; use crate::DescribeTablePlan; use crate::DropDatabasePlan; use crate::DropTablePlan; @@ -117,6 +118,7 @@ pub trait PlanRewriter { PlanNode::DropUser(plan) => self.drop_user(plan), PlanNode::GrantPrivilege(plan) => self.grant_privilege(plan), PlanNode::RevokePrivilege(plan) => self.revoke_privilege(plan), + PlanNode::CreateUserStage(plan) => self.rewrite_create_stage(plan), PlanNode::Sink(plan) => self.rewrite_sink(plan), } } @@ -376,6 +378,10 @@ pub trait PlanRewriter { Ok(PlanNode::RevokePrivilege(plan.clone())) } + fn rewrite_create_stage(&mut self, plan: &CreateUserStagePlan) -> Result { + Ok(PlanNode::CreateUserStage(plan.clone())) + } + fn rewrite_sink(&mut self, plan: &SinkPlan) -> Result { Ok(PlanNode::Sink(plan.clone())) } diff --git a/common/planners/src/plan_user_stage_create.rs b/common/planners/src/plan_user_stage_create.rs new file mode 100644 index 0000000000000..7a1f7c96f9cee --- /dev/null +++ b/common/planners/src/plan_user_stage_create.rs @@ -0,0 +1,31 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datavalues::DataSchema; +use common_datavalues::DataSchemaRef; +use common_meta_types::UserStageInfo; + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub struct CreateUserStagePlan { + pub if_not_exists: bool, + pub user_stage_info: UserStageInfo, +} + +impl CreateUserStagePlan { + pub fn schema(&self) -> DataSchemaRef { + Arc::new(DataSchema::empty()) + } +} diff --git a/common/planners/src/plan_visitor.rs b/common/planners/src/plan_visitor.rs index a2093c822eb4e..da0d6005df06a 100644 --- a/common/planners/src/plan_visitor.rs +++ b/common/planners/src/plan_visitor.rs @@ -23,6 +23,7 @@ use crate::CopyPlan; use crate::CreateDatabasePlan; use crate::CreateTablePlan; use crate::CreateUserPlan; +use crate::CreateUserStagePlan; use crate::DescribeTablePlan; use crate::DropDatabasePlan; use crate::DropTablePlan; @@ -131,6 +132,7 @@ pub trait PlanVisitor { PlanNode::GrantPrivilege(plan) => self.visit_grant_privilege(plan), PlanNode::RevokePrivilege(plan) => self.visit_revoke_privilege(plan), PlanNode::Sink(plan) => self.visit_append(plan), + PlanNode::CreateUserStage(plan) => self.visit_create_stage(plan), } } @@ -308,4 +310,8 @@ pub trait PlanVisitor { fn visit_append(&mut self, _: &SinkPlan) -> Result<()> { Ok(()) } + + fn visit_create_stage(&mut self, _: &CreateUserStagePlan) -> Result<()> { + Ok(()) + } } diff --git a/query/src/interpreters/interpreter_factory.rs b/query/src/interpreters/interpreter_factory.rs index 01b3ab85b939a..6c48074a54397 100644 --- a/query/src/interpreters/interpreter_factory.rs +++ b/query/src/interpreters/interpreter_factory.rs @@ -20,6 +20,7 @@ use common_planners::PlanNode; use crate::interpreters::AlterUserInterpreter; use crate::interpreters::CopyInterpreter; +use crate::interpreters::CreatStageInterpreter; use crate::interpreters::CreatUserInterpreter; use crate::interpreters::CreateDatabaseInterpreter; use crate::interpreters::CreateTableInterpreter; @@ -66,6 +67,7 @@ impl InterpreterFactory { PlanNode::GrantPrivilege(v) => GrantPrivilegeInterpreter::try_create(ctx_clone, v), PlanNode::RevokePrivilege(v) => RevokePrivilegeInterpreter::try_create(ctx_clone, v), PlanNode::Copy(v) => CopyInterpreter::try_create(ctx_clone, v), + PlanNode::CreateUserStage(v) => CreatStageInterpreter::try_create(ctx_clone, v), _ => Result::Err(ErrorCode::UnknownTypeOfQuery(format!( "Can't get the interpreter by plan:{}", plan.name() diff --git a/query/src/interpreters/interpreter_stage_create.rs b/query/src/interpreters/interpreter_stage_create.rs new file mode 100644 index 0000000000000..28beb50c78600 --- /dev/null +++ b/query/src/interpreters/interpreter_stage_create.rs @@ -0,0 +1,73 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_planners::CreateUserStagePlan; +use common_streams::DataBlockStream; +use common_streams::SendableDataBlockStream; +use common_tracing::tracing; + +use crate::interpreters::Interpreter; +use crate::interpreters::InterpreterPtr; +use crate::sessions::QueryContext; + +#[derive(Debug)] +pub struct CreatStageInterpreter { + ctx: Arc, + plan: CreateUserStagePlan, +} + +impl CreatStageInterpreter { + pub fn try_create(ctx: Arc, plan: CreateUserStagePlan) -> Result { + Ok(Arc::new(CreatStageInterpreter { ctx, plan })) + } +} + +#[async_trait::async_trait] +impl Interpreter for CreatStageInterpreter { + fn name(&self) -> &str { + "CreatStageInterpreter" + } + + #[tracing::instrument(level = "info", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))] + async fn execute( + &self, + _input_stream: Option, + ) -> Result { + let plan = self.plan.clone(); + let user_mgr = self.ctx.get_sessions_manager().get_user_manager(); + let user_stage = plan.user_stage_info; + let create_stage = user_mgr.add_stage(user_stage).await; + if plan.if_not_exists { + create_stage.or_else(|e| { + // StageAlreadyExists(4061) + if e.code() == 4061 { + Ok(u64::MIN) + } else { + Err(e) + } + })?; + } else { + create_stage?; + } + + Ok(Box::pin(DataBlockStream::create( + self.plan.schema(), + None, + vec![], + ))) + } +} diff --git a/query/src/interpreters/interpreter_stage_create_test.rs b/query/src/interpreters/interpreter_stage_create_test.rs new file mode 100644 index 0000000000000..29016a0d3182e --- /dev/null +++ b/query/src/interpreters/interpreter_stage_create_test.rs @@ -0,0 +1,103 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::tokio; +use common_exception::Result; +use common_meta_types::Compression; +use common_meta_types::FileFormat; +use common_planners::*; +use futures::stream::StreamExt; +use pretty_assertions::assert_eq; + +use crate::interpreters::*; +use crate::sql::*; + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_create_stage_interpreter() -> Result<()> { + common_tracing::init_default_ut_tracing(); + + let ctx = crate::tests::create_query_context()?; + + static TEST_QUERY: &str = "CREATE STAGE IF NOT EXISTS test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter=',') comments='test'"; + if let PlanNode::CreateUserStage(plan) = PlanParser::parse(TEST_QUERY, ctx.clone()).await? { + let executor = CreatStageInterpreter::try_create(ctx.clone(), plan.clone())?; + assert_eq!(executor.name(), "CreatStageInterpreter"); + let mut stream = executor.execute(None).await?; + while let Some(_block) = stream.next().await {} + let stage = ctx + .get_sessions_manager() + .get_user_manager() + .get_stage("test_stage") + .await?; + + assert_eq!( + stage.file_format, + Some(FileFormat::Csv { + compression: Compression::Gzip, + record_delimiter: ",".to_string() + }) + ); + assert_eq!(stage.comments, String::from("test")) + } else { + panic!() + } + + if let PlanNode::CreateUserStage(plan) = PlanParser::parse(TEST_QUERY, ctx.clone()).await? { + let executor = CreatStageInterpreter::try_create(ctx.clone(), plan.clone())?; + assert_eq!(executor.name(), "CreatStageInterpreter"); + let is_err = executor.execute(None).await.is_err(); + assert!(!is_err); + let stage = ctx + .get_sessions_manager() + .get_user_manager() + .get_stage("test_stage") + .await?; + + assert_eq!( + stage.file_format, + Some(FileFormat::Csv { + compression: Compression::Gzip, + record_delimiter: ",".to_string() + }) + ); + assert_eq!(stage.comments, String::from("test")) + } else { + panic!() + } + + static TEST_QUERY1: &str = "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter=',') comments='test'"; + if let PlanNode::CreateUserStage(plan) = PlanParser::parse(TEST_QUERY1, ctx.clone()).await? { + let executor = CreatStageInterpreter::try_create(ctx.clone(), plan.clone())?; + assert_eq!(executor.name(), "CreatStageInterpreter"); + let is_err = executor.execute(None).await.is_err(); + assert!(is_err); + let stage = ctx + .get_sessions_manager() + .get_user_manager() + .get_stage("test_stage") + .await?; + + assert_eq!( + stage.file_format, + Some(FileFormat::Csv { + compression: Compression::Gzip, + record_delimiter: ",".to_string() + }) + ); + assert_eq!(stage.comments, String::from("test")) + } else { + panic!() + } + Ok(()) +} diff --git a/query/src/interpreters/mod.rs b/query/src/interpreters/mod.rs index 1867eb31a719b..1edb8843ef832 100644 --- a/query/src/interpreters/mod.rs +++ b/query/src/interpreters/mod.rs @@ -35,6 +35,8 @@ mod interpreter_setting_test; #[cfg(test)] mod interpreter_show_create_table_test; #[cfg(test)] +mod interpreter_stage_create_test; +#[cfg(test)] mod interpreter_table_create_test; #[cfg(test)] mod interpreter_table_drop_test; @@ -67,6 +69,7 @@ mod interpreter_revoke_privilege; mod interpreter_select; mod interpreter_setting; mod interpreter_show_create_table; +mod interpreter_stage_create; mod interpreter_table_create; mod interpreter_table_drop; mod interpreter_truncate_table; @@ -92,6 +95,7 @@ pub use interpreter_revoke_privilege::RevokePrivilegeInterpreter; pub use interpreter_select::SelectInterpreter; pub use interpreter_setting::SettingInterpreter; pub use interpreter_show_create_table::ShowCreateTableInterpreter; +pub use interpreter_stage_create::CreatStageInterpreter; pub use interpreter_table_create::CreateTableInterpreter; pub use interpreter_table_drop::DropTableInterpreter; pub use interpreter_truncate_table::TruncateTableInterpreter; diff --git a/query/src/sql/sql_parser.rs b/query/src/sql/sql_parser.rs index 6f11de3b378d2..8fd893ac1c344 100644 --- a/query/src/sql/sql_parser.rs +++ b/query/src/sql/sql_parser.rs @@ -16,10 +16,15 @@ // See notice.md use std::convert::TryFrom; +use std::str::FromStr; use std::time::Instant; use common_exception::ErrorCode; use common_meta_types::AuthType; +use common_meta_types::Compression; +use common_meta_types::Credentials; +use common_meta_types::FileFormat; +use common_meta_types::StageParams; use common_meta_types::UserPrivilege; use common_meta_types::UserPrivilegeType; use common_planners::ExplainType; @@ -47,6 +52,7 @@ use super::statements::DfCopy; use crate::sql::statements::DfAlterUser; use crate::sql::statements::DfCompactTable; use crate::sql::statements::DfCreateDatabase; +use crate::sql::statements::DfCreateStage; use crate::sql::statements::DfCreateTable; use crate::sql::statements::DfCreateUser; use crate::sql::statements::DfDescribeTable; @@ -466,12 +472,19 @@ impl<'a> DfParser<'a> { fn parse_create(&mut self) -> Result { match self.parser.next_token() { - Token::Word(w) => match w.keyword { - Keyword::TABLE => self.parse_create_table(), - Keyword::DATABASE => self.parse_create_database(), - Keyword::USER => self.parse_create_user(), - _ => self.expected("create statement", Token::Word(w)), - }, + Token::Word(w) => { + //TODO:make stage to sql parser keyword + if w.value.to_uppercase() == "STAGE" { + self.parse_create_stage() + } else { + match w.keyword { + Keyword::TABLE => self.parse_create_table(), + Keyword::DATABASE => self.parse_create_database(), + Keyword::USER => self.parse_create_user(), + _ => self.expected("create statement", Token::Word(w)), + } + } + } unexpected => self.expected("create statement", unexpected), } } @@ -682,6 +695,147 @@ impl<'a> DfParser<'a> { } } + fn parse_stage_file_format(&mut self) -> Result, ParserError> { + let file_format = if self.consume_token("FILE_FORMAT") { + self.parser.expect_token(&Token::Eq)?; + self.parser.expect_token(&Token::LParen)?; + + let format = if self.consume_token("FORMAT") { + self.parser.expect_token(&Token::Eq)?; + self.parser.next_token().to_string() + } else { + return parser_err!("Missing FORMAT"); + }; + + let file_format = match format.to_uppercase().as_str() { + "CSV" | "PARQUET" => { + let compression = if self.consume_token("COMPRESSION") { + self.parser.expect_token(&Token::Eq)?; + //TODO:check compression value correctness + let value = self.parser.next_token().to_string(); + Compression::from_str(value.as_str()) + .map_err(|e| ParserError::ParserError(e.to_string()))? + } else { + Compression::None + }; + if "CSV" == format.to_uppercase().as_str() { + if self.consume_token("RECORD_DELIMITER") { + self.parser.expect_token(&Token::Eq)?; + + let record_delimiter = match self.parser.next_token() { + Token::Word(w) => match w.value.to_uppercase().as_str() { + "NONE" => String::from(""), + _ => { + return self + .expected("record delimiter NONE", Token::Word(w)) + } + }, + Token::SingleQuotedString(s) => s, + unexpected => { + return self + .expected("not supported record delimiter", unexpected) + } + }; + + Some(FileFormat::Csv { + compression, + record_delimiter, + }) + } else { + Some(FileFormat::Csv { + compression, + record_delimiter: String::from(""), + }) + } + } else { + Some(FileFormat::Parquet { compression }) + } + } + "JSON" => Some(FileFormat::Json), + unexpected => { + return parser_err!(format!( + "Expected format type {}, found: {}", + "CSV|PARQUET|JSON", unexpected + )) + } + }; + + self.parser.expect_token(&Token::RParen)?; + file_format + } else { + None + }; + + Ok(file_format) + } + + fn parse_stage_credentials(&mut self, url: String) -> Result { + if !self.consume_token("CREDENTIALS") { + return parser_err!("Missing CREDENTIALS"); + } + self.parser.expect_token(&Token::Eq)?; + self.parser.expect_token(&Token::LParen)?; + + let credentials = if url.to_uppercase().starts_with("S3") { + //TODO: current credential field order is hard code + let access_key_id = if self.consume_token("ACCESS_KEY_ID") { + self.parser.expect_token(&Token::Eq)?; + self.parser.parse_literal_string()? + } else { + return parser_err!("Missing S3 ACCESS_KEY_ID"); + }; + + let secret_access_key = if self.consume_token("SECRET_ACCESS_KEY") { + self.parser.expect_token(&Token::Eq)?; + self.parser.parse_literal_string()? + } else { + return parser_err!("Missing S3 SECRET_ACCESS_KEY"); + }; + Credentials::S3 { + access_key_id, + secret_access_key, + } + } else { + return parser_err!("Not supported storage"); + }; + self.parser.expect_token(&Token::RParen)?; + Ok(credentials) + } + + fn parse_create_stage(&mut self) -> Result { + let if_not_exists = + self.parser + .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); + let name = self.parser.parse_literal_string()?; + let url = if self.consume_token("URL") { + self.parser.expect_token(&Token::Eq)?; + self.parser.parse_literal_string()? + } else { + return parser_err!("Missing URL"); + }; + + let credentials = self.parse_stage_credentials(url.clone())?; + let stage_params = StageParams::new(url.as_str(), credentials); + let file_format = self.parse_stage_file_format()?; + + let comments = if self.consume_token("COMMENTS") { + self.parser.expect_token(&Token::Eq)?; + self.parser.parse_literal_string()? + } else { + String::from("") + }; + + let create = DfCreateStage { + if_not_exists, + stage_name: name, + stage_params, + file_format, + comments, + }; + + Ok(DfStatement::CreateStage(create)) + } + fn parse_create_table(&mut self) -> Result { let if_not_exists = self.parser diff --git a/query/src/sql/sql_parser_test.rs b/query/src/sql/sql_parser_test.rs index 21cbefe239925..b25b58cb671e9 100644 --- a/query/src/sql/sql_parser_test.rs +++ b/query/src/sql/sql_parser_test.rs @@ -14,6 +14,10 @@ use common_exception::Result; use common_meta_types::AuthType; +use common_meta_types::Compression; +use common_meta_types::Credentials; +use common_meta_types::FileFormat; +use common_meta_types::StageParams; use common_meta_types::UserPrivilege; use common_meta_types::UserPrivilegeType; use sqlparser::ast::*; @@ -21,6 +25,7 @@ use sqlparser::ast::*; use crate::sql::statements::DfAlterUser; use crate::sql::statements::DfCopy; use crate::sql::statements::DfCreateDatabase; +use crate::sql::statements::DfCreateStage; use crate::sql::statements::DfCreateTable; use crate::sql::statements::DfCreateUser; use crate::sql::statements::DfDescribeTable; @@ -892,3 +897,130 @@ fn revoke_privilege_test() -> Result<()> { Ok(()) } + +#[test] +fn create_stage_test() -> Result<()> { + expect_parse_ok( + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z')", + DfStatement::CreateStage(DfCreateStage { + if_not_exists: false, + stage_name: "test_stage".to_string(), + stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: None, + comments: "".to_string(), + }), + )?; + + expect_parse_ok( + "CREATE STAGE IF NOT EXISTS test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z')", + DfStatement::CreateStage(DfCreateStage { + if_not_exists: true, + stage_name: "test_stage".to_string(), + stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: None, + comments: "".to_string(), + }), + )?; + + expect_parse_ok( + "CREATE STAGE IF NOT EXISTS test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter=',')", + DfStatement::CreateStage(DfCreateStage { + if_not_exists: true, + stage_name: "test_stage".to_string(), + stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: Some(FileFormat::Csv { compression: Compression::Gzip, record_delimiter: ",".to_string() }), + comments: "".to_string(), + }), + )?; + + expect_parse_ok( + "CREATE STAGE IF NOT EXISTS test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter=',') comments='test'", + DfStatement::CreateStage(DfCreateStage { + if_not_exists: true, + stage_name: "test_stage".to_string(), + stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: Some(FileFormat::Csv { compression: Compression::Gzip, record_delimiter: ",".to_string() }), + comments: "test".to_string(), + }), + )?; + + expect_parse_ok( + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=Parquet compression=AUTO) comments='test'", + DfStatement::CreateStage(DfCreateStage { + if_not_exists: false, + stage_name: "test_stage".to_string(), + stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: Some(FileFormat::Parquet { compression: Compression::Auto}), + comments: "test".to_string(), + }), + )?; + + expect_parse_ok( + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'", + DfStatement::CreateStage(DfCreateStage { + if_not_exists: false, + stage_name: "test_stage".to_string(), + stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: Some(FileFormat::Csv { compression: Compression::Auto, record_delimiter: "".to_string() }), + comments: "test".to_string(), + }), + )?; + + expect_parse_ok( + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=json) comments='test'", + DfStatement::CreateStage(DfCreateStage { + if_not_exists: false, + stage_name: "test_stage".to_string(), + stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: Some(FileFormat::Json ), + comments: "test".to_string(), + }), + )?; + + expect_parse_err( + "CREATE STAGE test_stage credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'", + String::from("sql parser error: Missing URL"), + )?; + + expect_parse_err( + "CREATE STAGE test_stage url='s3://load/files/' password=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'", + String::from("sql parser error: Missing CREDENTIALS"), + )?; + + expect_parse_err( + "CREATE STAGE test_stage url='s4://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'", + String::from("sql parser error: Not supported storage"), + )?; + + expect_parse_err( + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'", + String::from("sql parser error: Missing S3 ACCESS_KEY_ID"), + )?; + + expect_parse_err( + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' aecret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'", + String::from("sql parser error: Missing S3 SECRET_ACCESS_KEY"), + )?; + + expect_parse_err( + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(type=csv compression=AUTO record_delimiter=NONE) comments='test'", + String::from("sql parser error: Missing FORMAT"), + )?; + + expect_parse_err( + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(format=csv compression=AUTO1 record_delimiter=NONE) comments='test'", + String::from("sql parser error: no match for compression"), + )?; + + expect_parse_err( + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(format=csv compression=AUTO record_delimiter=NONE1) comments='test'", + String::from("sql parser error: Expected record delimiter NONE, found: NONE1"), + )?; + + expect_parse_err( + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(format=csv1 compression=AUTO record_delimiter=NONE) comments='test'", + String::from("sql parser error: Expected format type CSV|PARQUET|JSON, found: CSV1"), + )?; + + Ok(()) +} diff --git a/query/src/sql/sql_statement.rs b/query/src/sql/sql_statement.rs index 8f31dfa8c7dc4..94cba73a3fbac 100644 --- a/query/src/sql/sql_statement.rs +++ b/query/src/sql/sql_statement.rs @@ -23,6 +23,7 @@ use super::statements::DfCopy; use crate::sql::statements::DfAlterUser; use crate::sql::statements::DfCompactTable; use crate::sql::statements::DfCreateDatabase; +use crate::sql::statements::DfCreateStage; use crate::sql::statements::DfCreateTable; use crate::sql::statements::DfCreateUser; use crate::sql::statements::DfDescribeTable; @@ -98,6 +99,9 @@ pub enum DfStatement { // Grant GrantPrivilege(DfGrantStatement), RevokePrivilege(DfRevokeStatement), + + // Stage + CreateStage(DfCreateStage), } /// Comment hints from SQL. diff --git a/query/src/sql/statements/analyzer_statement.rs b/query/src/sql/statements/analyzer_statement.rs index 242426f4f581d..552d40b6514b4 100644 --- a/query/src/sql/statements/analyzer_statement.rs +++ b/query/src/sql/statements/analyzer_statement.rs @@ -172,6 +172,7 @@ impl AnalyzableStatement for DfStatement { DfStatement::RevokePrivilege(v) => v.analyze(ctx).await, DfStatement::DropUser(v) => v.analyze(ctx).await, DfStatement::Copy(v) => v.analyze(ctx).await, + DfStatement::CreateStage(v) => v.analyze(ctx).await, } } } diff --git a/query/src/sql/statements/mod.rs b/query/src/sql/statements/mod.rs index 9414eea2d003f..127c5fab793f4 100644 --- a/query/src/sql/statements/mod.rs +++ b/query/src/sql/statements/mod.rs @@ -24,6 +24,7 @@ mod statement_alter_user; mod statement_compact_table; mod statement_copy; mod statement_create_database; +mod statement_create_stage; mod statement_create_table; mod statement_create_user; mod statement_describe_table; @@ -57,6 +58,7 @@ pub use statement_alter_user::DfAlterUser; pub use statement_compact_table::DfCompactTable; pub use statement_copy::DfCopy; pub use statement_create_database::DfCreateDatabase; +pub use statement_create_stage::DfCreateStage; pub use statement_create_table::DfCreateTable; pub use statement_create_user::DfCreateUser; pub use statement_describe_table::DfDescribeTable; diff --git a/query/src/sql/statements/statement_create_stage.rs b/query/src/sql/statements/statement_create_stage.rs new file mode 100644 index 0000000000000..d2bd5afc0565f --- /dev/null +++ b/query/src/sql/statements/statement_create_stage.rs @@ -0,0 +1,54 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_meta_types::FileFormat; +use common_meta_types::StageParams; +use common_meta_types::UserStageInfo; +use common_planners::CreateUserStagePlan; +use common_planners::PlanNode; +use common_tracing::tracing; + +use crate::sessions::QueryContext; +use crate::sql::statements::AnalyzableStatement; +use crate::sql::statements::AnalyzedResult; + +#[derive(Debug, Clone, PartialEq)] +pub struct DfCreateStage { + pub if_not_exists: bool, + pub stage_name: String, + pub stage_params: StageParams, + pub file_format: Option, + pub comments: String, +} + +#[async_trait::async_trait] +impl AnalyzableStatement for DfCreateStage { + #[tracing::instrument(level = "info", skip(self, _ctx), fields(ctx.id = _ctx.get_id().as_str()))] + async fn analyze(&self, _ctx: Arc) -> Result { + Ok(AnalyzedResult::SimpleQuery(Box::new( + PlanNode::CreateUserStage(CreateUserStagePlan { + if_not_exists: self.if_not_exists, + user_stage_info: UserStageInfo::new( + self.stage_name.as_str(), + self.comments.as_str(), + self.stage_params.clone(), + self.file_format.clone(), + ), + }), + ))) + } +} diff --git a/query/src/users/user_stage_test.rs b/query/src/users/user_stage_test.rs index d059e142fa990..c816d409064f9 100644 --- a/query/src/users/user_stage_test.rs +++ b/query/src/users/user_stage_test.rs @@ -14,6 +14,8 @@ use common_base::tokio; use common_exception::Result; +use common_meta_types::Credentials; +use common_meta_types::StageParams; use common_meta_types::UserStageInfo; use pretty_assertions::assert_eq; @@ -32,13 +34,29 @@ async fn test_user_stage() -> Result<()> { // add 1. { - let stage_info = UserStageInfo::new(stage_name1, comments); + let stage_info = UserStageInfo::new( + stage_name1, + comments, + StageParams::new("test", Credentials::S3 { + access_key_id: String::from("test"), + secret_access_key: String::from("test"), + }), + None, + ); user_mgr.add_stage(stage_info).await?; } // add 2. { - let stage_info = UserStageInfo::new(stage_name2, comments); + let stage_info = UserStageInfo::new( + stage_name2, + comments, + StageParams::new("test", Credentials::S3 { + access_key_id: String::from("test"), + secret_access_key: String::from("test"), + }), + None, + ); user_mgr.add_stage(stage_info).await?; } diff --git a/tests/suites/0_stateless/05_0008_ddl_create_stage.result b/tests/suites/0_stateless/05_0008_ddl_create_stage.result new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/suites/0_stateless/05_0008_ddl_create_stage.sql b/tests/suites/0_stateless/05_0008_ddl_create_stage.sql new file mode 100644 index 0000000000000..7cfb8e538a7b0 --- /dev/null +++ b/tests/suites/0_stateless/05_0008_ddl_create_stage.sql @@ -0,0 +1,7 @@ +CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z'); +CREATE STAGE if not exists test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z'); +CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z'); -- {ErrorCode 4061} +CREATE STAGE test_stage1 url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=json) comments='test'; +CREATE STAGE test_stage2 url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'; +CREATE STAGE test_stage3 url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=Parquet compression=AUTO) comments='test'; +CREATE STAGE IF NOT EXISTS test_stage4 url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter=','); \ No newline at end of file