Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]Support create stage statement #3253

Merged
merged 7 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/management/tests/it/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use common_exception::Result;
use common_management::*;
use common_meta_api::KVApi;
use common_meta_embedded::MetaEmbedded;
use common_meta_types::Credentials;
use common_meta_types::SeqV;
use common_meta_types::StageParams;
use common_meta_types::UserStageInfo;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -107,6 +109,11 @@ async fn test_unknown_stage_drop_stage() -> Result<()> {
fn create_test_stage_info() -> UserStageInfo {
UserStageInfo {
stage_name: "mystage".to_string(),
stage_params: StageParams::new("test", Credentials::S3 {
access_key_id: String::from("test"),
secret_access_key: String::from("test"),
}),
file_format: None,
comments: "".to_string(),
}
}
Expand Down
4 changes: 4 additions & 0 deletions common/meta/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,8 @@ pub use user_info::UserInfo;
pub use user_privilege::UserPrivilege;
pub use user_privilege::UserPrivilegeType;
pub use user_quota::UserQuota;
pub use user_stage::Compression;
pub use user_stage::Credentials;
pub use user_stage::FileFormat;
pub use user_stage::StageParams;
pub use user_stage::UserStageInfo;
83 changes: 81 additions & 2 deletions common/meta/types/src/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,103 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;

use common_exception::ErrorCode;
use common_exception::Result;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct StageParams {
pub url: String,
pub credentials: Credentials,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub enum Credentials {
S3 {
access_key_id: String,
secret_access_key: String,
},
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub enum FileFormat {
Csv {
compression: Compression,
record_delimiter: String,
},
Parquet {
compression: Compression,
},
Json,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub enum Compression {
Auto,
Gzip,
Bz2,
Brotli,
Zstd,
Deflate,
RawDeflate,
Lzo,
Snappy,
None,
}

impl FromStr for Compression {
type Err = &'static str;

fn from_str(s: &str) -> std::result::Result<Compression, &'static str> {
let s = s.to_uppercase();
match s.as_str() {
"AUTO" => Ok(Compression::Auto),
"GZIP" => Ok(Compression::Gzip),
"BZ2" => Ok(Compression::Bz2),
"BROTLI" => Ok(Compression::Brotli),
"ZSTD" => Ok(Compression::Zstd),
"DEFLATE" => Ok(Compression::Deflate),
"RAW_DEFLATE" => Ok(Compression::RawDeflate),
"NONE" => Ok(Compression::None),
_ => Err("no match for compression"),
}
}
}

impl StageParams {
pub fn new(url: &str, credentials: Credentials) -> Self {
StageParams {
url: url.to_string(),
credentials,
}
}
}
/// Stage for data stage location.
/// Need to add more fields by need.
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct UserStageInfo {
#[serde(default)]
pub stage_name: String,

pub stage_params: StageParams,
#[serde(default)]
pub file_format: Option<FileFormat>,
#[serde(default)]
pub comments: String,
}

impl UserStageInfo {
pub fn new(stage_name: &str, comments: &str) -> Self {
pub fn new(
stage_name: &str,
comments: &str,
stage_params: StageParams,
file_format: Option<FileFormat>,
) -> Self {
UserStageInfo {
stage_name: stage_name.to_string(),
comments: comments.to_string(),
stage_params,
file_format,
}
}
}
Expand Down
16 changes: 15 additions & 1 deletion common/meta/types/tests/it/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,25 @@
// limitations under the License.

use common_exception::exception::Result;
use common_meta_types::Compression;
use common_meta_types::Credentials;
use common_meta_types::FileFormat;
use common_meta_types::StageParams;
use common_meta_types::UserStageInfo;

#[test]
fn test_user_stage() -> Result<()> {
let stage = UserStageInfo::new("databend", "this is a comment");
let stage = UserStageInfo::new(
"databend",
"this is a comment",
StageParams::new("test", Credentials::S3 {
access_key_id: "test".to_string(),
secret_access_key: "test".to_string(),
}),
Some(FileFormat::Parquet {
compression: Compression::None,
}),
);
let ser = serde_json::to_string(&stage)?;

let de = UserStageInfo::try_from(ser.into_bytes())?;
Expand Down
2 changes: 2 additions & 0 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ mod plan_use_database;
mod plan_user_alter;
mod plan_user_create;
mod plan_user_drop;
mod plan_user_stage_create;
mod plan_visitor;

pub use plan_aggregator_final::AggregatorFinalPlan;
Expand Down Expand Up @@ -146,4 +147,5 @@ pub use plan_use_database::UseDatabasePlan;
pub use plan_user_alter::AlterUserPlan;
pub use plan_user_create::CreateUserPlan;
pub use plan_user_drop::DropUserPlan;
pub use plan_user_stage_create::CreateUserStagePlan;
pub use plan_visitor::PlanVisitor;
4 changes: 4 additions & 0 deletions common/planners/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_datavalues::DataSchemaRef;

use crate::plan_broadcast::BroadcastPlan;
use crate::plan_subqueries_set::SubQueriesSetPlan;
use crate::plan_user_stage_create::CreateUserStagePlan;
use crate::AggregatorFinalPlan;
use crate::AggregatorPartialPlan;
use crate::AlterUserPlan;
Expand Down Expand Up @@ -90,6 +91,7 @@ pub enum PlanNode {
DropUser(DropUserPlan),
GrantPrivilege(GrantPrivilegePlan),
RevokePrivilege(RevokePrivilegePlan),
CreateUserStage(CreateUserStagePlan),
}

impl PlanNode {
Expand Down Expand Up @@ -131,6 +133,7 @@ impl PlanNode {
PlanNode::RevokePrivilege(v) => v.schema(),
PlanNode::Sink(v) => v.schema(),
PlanNode::Copy(v) => v.schema(),
PlanNode::CreateUserStage(v) => v.schema(),
}
}

Expand Down Expand Up @@ -171,6 +174,7 @@ impl PlanNode {
PlanNode::RevokePrivilege(_) => "RevokePrivilegePlan",
PlanNode::Sink(_) => "SinkPlan",
PlanNode::Copy(_) => "CopyPlan",
PlanNode::CreateUserStage(_) => "CreateUserStagePlan",
}
}

Expand Down
6 changes: 6 additions & 0 deletions common/planners/src/plan_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::CopyPlan;
use crate::CreateDatabasePlan;
use crate::CreateTablePlan;
use crate::CreateUserPlan;
use crate::CreateUserStagePlan;
use crate::DescribeTablePlan;
use crate::DropDatabasePlan;
use crate::DropTablePlan;
Expand Down Expand Up @@ -117,6 +118,7 @@ pub trait PlanRewriter {
PlanNode::DropUser(plan) => self.drop_user(plan),
PlanNode::GrantPrivilege(plan) => self.grant_privilege(plan),
PlanNode::RevokePrivilege(plan) => self.revoke_privilege(plan),
PlanNode::CreateUserStage(plan) => self.rewrite_create_stage(plan),
PlanNode::Sink(plan) => self.rewrite_sink(plan),
}
}
Expand Down Expand Up @@ -376,6 +378,10 @@ pub trait PlanRewriter {
Ok(PlanNode::RevokePrivilege(plan.clone()))
}

fn rewrite_create_stage(&mut self, plan: &CreateUserStagePlan) -> Result<PlanNode> {
Ok(PlanNode::CreateUserStage(plan.clone()))
}

fn rewrite_sink(&mut self, plan: &SinkPlan) -> Result<PlanNode> {
Ok(PlanNode::Sink(plan.clone()))
}
Expand Down
31 changes: 31 additions & 0 deletions common/planners/src/plan_user_stage_create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_meta_types::UserStageInfo;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct CreateUserStagePlan {
pub if_not_exists: bool,
pub user_stage_info: UserStageInfo,
}

impl CreateUserStagePlan {
pub fn schema(&self) -> DataSchemaRef {
Arc::new(DataSchema::empty())
}
}
6 changes: 6 additions & 0 deletions common/planners/src/plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::CopyPlan;
use crate::CreateDatabasePlan;
use crate::CreateTablePlan;
use crate::CreateUserPlan;
use crate::CreateUserStagePlan;
use crate::DescribeTablePlan;
use crate::DropDatabasePlan;
use crate::DropTablePlan;
Expand Down Expand Up @@ -131,6 +132,7 @@ pub trait PlanVisitor {
PlanNode::GrantPrivilege(plan) => self.visit_grant_privilege(plan),
PlanNode::RevokePrivilege(plan) => self.visit_revoke_privilege(plan),
PlanNode::Sink(plan) => self.visit_append(plan),
PlanNode::CreateUserStage(plan) => self.visit_create_stage(plan),
}
}

Expand Down Expand Up @@ -308,4 +310,8 @@ pub trait PlanVisitor {
fn visit_append(&mut self, _: &SinkPlan) -> Result<()> {
Ok(())
}

fn visit_create_stage(&mut self, _: &CreateUserStagePlan) -> Result<()> {
Ok(())
}
}
2 changes: 2 additions & 0 deletions query/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_planners::PlanNode;

use crate::interpreters::AlterUserInterpreter;
use crate::interpreters::CopyInterpreter;
use crate::interpreters::CreatStageInterpreter;
use crate::interpreters::CreatUserInterpreter;
use crate::interpreters::CreateDatabaseInterpreter;
use crate::interpreters::CreateTableInterpreter;
Expand Down Expand Up @@ -66,6 +67,7 @@ impl InterpreterFactory {
PlanNode::GrantPrivilege(v) => GrantPrivilegeInterpreter::try_create(ctx_clone, v),
PlanNode::RevokePrivilege(v) => RevokePrivilegeInterpreter::try_create(ctx_clone, v),
PlanNode::Copy(v) => CopyInterpreter::try_create(ctx_clone, v),
PlanNode::CreateUserStage(v) => CreatStageInterpreter::try_create(ctx_clone, v),
_ => Result::Err(ErrorCode::UnknownTypeOfQuery(format!(
"Can't get the interpreter by plan:{}",
plan.name()
Expand Down
73 changes: 73 additions & 0 deletions query/src/interpreters/interpreter_stage_create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::Result;
use common_planners::CreateUserStagePlan;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use common_tracing::tracing;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::sessions::QueryContext;

#[derive(Debug)]
pub struct CreatStageInterpreter {
ctx: Arc<QueryContext>,
plan: CreateUserStagePlan,
}

impl CreatStageInterpreter {
pub fn try_create(ctx: Arc<QueryContext>, plan: CreateUserStagePlan) -> Result<InterpreterPtr> {
Ok(Arc::new(CreatStageInterpreter { ctx, plan }))
}
}

#[async_trait::async_trait]
impl Interpreter for CreatStageInterpreter {
fn name(&self) -> &str {
"CreatStageInterpreter"
}

#[tracing::instrument(level = "info", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))]
async fn execute(
&self,
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_stage = plan.user_stage_info;
let create_stage = user_mgr.add_stage(user_stage).await;
if plan.if_not_exists {
create_stage.or_else(|e| {
// StageAlreadyExists(4061)
if e.code() == 4061 {
Ok(u64::MIN)
} else {
Err(e)
}
})?;
} else {
create_stage?;
}

Ok(Box::pin(DataBlockStream::create(
self.plan.schema(),
None,
vec![],
)))
}
}
Loading