diff --git a/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs index 9a26b5342cca6..b598c1ef6a9bd 100644 --- a/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs @@ -402,6 +402,7 @@ impl FromToProto for mt::StageType { pb::user_stage_info::StageType::LegacyInternal => Ok(mt::StageType::LegacyInternal), pb::user_stage_info::StageType::External => Ok(mt::StageType::External), pb::user_stage_info::StageType::Internal => Ok(mt::StageType::Internal), + pb::user_stage_info::StageType::User => Ok(mt::StageType::User), } } @@ -410,6 +411,7 @@ impl FromToProto for mt::StageType { mt::StageType::LegacyInternal => Ok(pb::user_stage_info::StageType::LegacyInternal), mt::StageType::External => Ok(pb::user_stage_info::StageType::External), mt::StageType::Internal => Ok(pb::user_stage_info::StageType::Internal), + mt::StageType::User => Ok(pb::user_stage_info::StageType::User), } } } diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index d6ccd77f84def..2faa147f1a2b8 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -65,6 +65,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (16, "2022-09-29: Add: CopyOptions::split_size"), (17, "2022-10-28: Add: StageType::LegacyInternal"), (18, "2022-10-28: Add: FILEFormatOptions::escape"), + (19, "2022-10-31: Add: StageType::UserStage"), ]; pub const VER: u64 = META_CHANGE_LOG.last().unwrap().0; diff --git a/src/meta/proto-conv/tests/it/user_proto_conv.rs b/src/meta/proto-conv/tests/it/user_proto_conv.rs index 472297c69edd4..0b0bfefd923a9 100644 --- a/src/meta/proto-conv/tests/it/user_proto_conv.rs +++ b/src/meta/proto-conv/tests/it/user_proto_conv.rs @@ -679,3 +679,33 @@ pub(crate) fn test_internal_stage_info_v17() -> mt::UserStageInfo { ..Default::default() } } + +pub(crate) fn test_user_stage_info_v18() -> mt::UserStageInfo { + mt::UserStageInfo { + stage_name: "root".to_string(), + stage_type: mt::StageType::User, + stage_params: mt::StageParams { + storage: StorageParams::Fs(StorageFsConfig { + root: "/dir/to/files".to_string(), + }), + }, + file_format_options: mt::FileFormatOptions { + format: mt::StageFileFormatType::Json, + skip_header: 1024, + field_delimiter: "|".to_string(), + record_delimiter: "//".to_string(), + escape: "".to_string(), + compression: mt::StageFileCompression::Bz2, + }, + copy_options: mt::CopyOptions { + on_error: mt::OnErrorMode::SkipFileNum(666), + size_limit: 1038, + split_size: 0, + purge: true, + single: false, + max_file_size: 0, + }, + comment: "test".to_string(), + ..Default::default() + } +} diff --git a/src/meta/proto-conv/tests/it/user_stage.rs b/src/meta/proto-conv/tests/it/user_stage.rs index 7dd46f6e44252..be8e7ca8f93f7 100644 --- a/src/meta/proto-conv/tests/it/user_stage.rs +++ b/src/meta/proto-conv/tests/it/user_stage.rs @@ -27,6 +27,7 @@ use crate::user_proto_conv::test_gcs_stage_info; use crate::user_proto_conv::test_internal_stage_info_v17; use crate::user_proto_conv::test_oss_stage_info; use crate::user_proto_conv::test_s3_stage_info; +use crate::user_proto_conv::test_user_stage_info_v18; #[test] fn test_user_stage_fs_latest() -> anyhow::Result<()> { @@ -851,3 +852,48 @@ fn test_internal_stage_v17() -> anyhow::Result<()> { common::test_load_old(func_name!(), internal_stage_v17.as_slice(), want)?; Ok(()) } + +#[test] +fn test_user_stage_v18() -> anyhow::Result<()> { + common::test_pb_from_to("user_stage_v18", test_user_stage_info_v18())?; + + // Encoded data of version v18 of user_stage: + // It is generated with common::test_pb_from_to. + let user_stage_v18 = vec![ + 10, 4, 114, 111, 111, 116, 16, 3, 26, 25, 10, 23, 18, 21, 10, 13, 47, 100, 105, 114, 47, + 116, 111, 47, 102, 105, 108, 101, 115, 160, 6, 19, 168, 6, 1, 34, 20, 8, 1, 16, 128, 8, 26, + 1, 124, 34, 2, 47, 47, 40, 2, 160, 6, 19, 168, 6, 1, 42, 10, 10, 3, 32, 154, 5, 16, 142, 8, + 24, 1, 50, 4, 116, 101, 115, 116, 160, 6, 19, 168, 6, 1, + ]; + + let want = mt::UserStageInfo { + stage_name: "root".to_string(), + stage_type: mt::StageType::User, + stage_params: mt::StageParams { + storage: StorageParams::Fs(StorageFsConfig { + root: "/dir/to/files".to_string(), + }), + }, + file_format_options: mt::FileFormatOptions { + format: mt::StageFileFormatType::Json, + skip_header: 1024, + field_delimiter: "|".to_string(), + record_delimiter: "//".to_string(), + escape: "".to_string(), + compression: mt::StageFileCompression::Bz2, + }, + copy_options: mt::CopyOptions { + on_error: mt::OnErrorMode::SkipFileNum(666), + size_limit: 1038, + split_size: 0, + purge: true, + single: false, + max_file_size: 0, + }, + comment: "test".to_string(), + ..Default::default() + }; + + common::test_load_old(func_name!(), user_stage_v18.as_slice(), want)?; + Ok(()) +} diff --git a/src/meta/protos/proto/user.proto b/src/meta/protos/proto/user.proto index df1e36d451321..344518f9171fa 100644 --- a/src/meta/protos/proto/user.proto +++ b/src/meta/protos/proto/user.proto @@ -127,6 +127,7 @@ message UserStageInfo { LegacyInternal = 0; External = 1; Internal = 2; + User = 3; } message StageStorage { diff --git a/src/meta/types/src/user_stage.rs b/src/meta/types/src/user_stage.rs index 708957120ce64..8e8fcc9177583 100644 --- a/src/meta/types/src/user_stage.rs +++ b/src/meta/types/src/user_stage.rs @@ -59,6 +59,10 @@ pub enum StageType { LegacyInternal, External, Internal, + /// User Stage is the stage for every sql user. + /// + /// This is a stage that just in memory. We will not persist in metasrv + User, } impl fmt::Display for StageType { @@ -68,6 +72,7 @@ impl fmt::Display for StageType { StageType::LegacyInternal => "Internal", StageType::External => "External", StageType::Internal => "Internal", + StageType::User => "User", }; write!(f, "{}", name) } @@ -245,6 +250,7 @@ pub struct UserStageInfo { pub file_format_options: FileFormatOptions, pub copy_options: CopyOptions, pub comment: String, + /// TODO(xuanwo): stage doesn't have this info anymore, remove it. pub number_of_files: u64, pub creator: Option, } @@ -273,6 +279,7 @@ impl UserStageInfo { unreachable!("stage_prefix should never be called on external stage, must be a bug") } StageType::Internal => format!("/stage/internal/{}/", self.stage_name), + StageType::User => format!("/stage/user/{}/", self.stage_name), } } } diff --git a/src/query/storages/preludes/src/system/stages_table.rs b/src/query/storages/preludes/src/system/stages_table.rs index 15f493b4ca007..babbec9681529 100644 --- a/src/query/storages/preludes/src/system/stages_table.rs +++ b/src/query/storages/preludes/src/system/stages_table.rs @@ -59,8 +59,9 @@ impl AsyncSystemTable for StagesTable { stage_params.push(format!("{:?}", stage.stage_params).into_bytes()); copy_options.push(format!("{:?}", stage.copy_options).into_bytes()); file_format_options.push(format!("{:?}", stage.file_format_options).into_bytes()); + // TODO(xuanwo): we will remove this line. match stage.stage_type { - StageType::LegacyInternal | StageType::Internal => { + StageType::LegacyInternal | StageType::Internal | StageType::User => { number_of_files.push(Some(stage.number_of_files)); } StageType::External => {