Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add escape for management key #4363

Merged
3 changes: 3 additions & 0 deletions common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod runtime_tracker;
mod shutdown_signal;
mod stop_handle;
mod stoppable;
mod string_func;
mod thread;
mod uniq_id;

Expand All @@ -43,6 +44,8 @@ pub use shutdown_signal::SignalStream;
pub use shutdown_signal::SignalType;
pub use stop_handle::StopHandle;
pub use stoppable::Stoppable;
pub use string_func::escape_for_key;
pub use string_func::unescape_for_key;
pub use thread::Thread;
pub use tokio;
pub use uniq_id::GlobalSequence;
Expand Down
73 changes: 73 additions & 0 deletions common/base/src/string_func.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::string::FromUtf8Error;

pub fn escape_for_key(key: &str) -> Result<String, FromUtf8Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add some comments about this escape?
Like the raw string is, now we escape to what?
Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also,some unit test cases for this string_func module is prefered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have add comment and unit test.

let mut new_key = Vec::with_capacity(key.len());

fn hex(num: u8) -> u8 {
match num {
0..=9 => b'0' + num,
10..=15 => b'a' + (num - 10),
unreachable => unreachable!("Unreachable branch num = {}", unreachable),
}
}

for char in key.as_bytes() {
match char {
b'0'..=b'9' => new_key.push(*char),
b'_' | b'a'..=b'z' | b'A'..=b'Z' => new_key.push(*char),
_other => {
new_key.push(b'%');
new_key.push(hex(*char / 16));
new_key.push(hex(*char % 16));
}
}
}

String::from_utf8(new_key)
}

pub fn unescape_for_key(key: &str) -> Result<String, FromUtf8Error> {
let mut new_key = Vec::with_capacity(key.len());

fn unhex(num: u8) -> u8 {
match num {
b'0'..=b'9' => num - b'0',
b'a'..=b'f' => num - b'a',
unreachable => unreachable!("Unreachable branch num = {}", unreachable),
}
}

let bytes = key.as_bytes();

let mut index = 0;
while index < bytes.len() {
match bytes[index] {
b'%' => {
let mut num = unhex(bytes[index + 1]) * 16;
Copy link
Contributor

@ariesdevil ariesdevil Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the index out of range if the index is already the last byte? If we can guarantee % does not appear in the last position, we should leave a comment here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me I prefer to use iterator here so that we just call next() and peek() do stuff, and it's idiomatic rust way.

Copy link
Contributor Author

@zenmiao7 zenmiao7 Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hould we check the index out of range if the index is already the last byte? If we can guarantee % does not appear in the last position, we should leave a comment here.

This function is only used with reverse case of escape_for_key. I think there is no need to check the input. I have leave a comment in the code.

For me I prefer to use iterator here so that we just call next() and peek() do stuff, and it's idiomatic rust way.

In this function, the iterator sometimes needs to move 3 steps. Using index would be easier.

num += unhex(bytes[index + 2]);
new_key.push(num);
index += 3;
}
other => {
new_key.push(other);
index += 1;
}
}
}

String::from_utf8(new_key)
}
84 changes: 8 additions & 76 deletions common/management/src/cluster/cluster_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::UNIX_EPOCH;

use common_base::escape_for_key;
use common_base::unescape_for_key;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_api::KVApi;
Expand Down Expand Up @@ -58,70 +60,12 @@ impl ClusterMgr {
cluster_prefix: format!(
"{}/{}/{}/databend_query",
CLUSTER_API_KEY_PREFIX,
Self::escape_for_key(tenant)?,
Self::escape_for_key(cluster_id)?
escape_for_key(tenant)?,
escape_for_key(cluster_id)?
),
})
}

fn escape_for_key(key: &str) -> Result<String> {
let mut new_key = Vec::with_capacity(key.len());

fn hex(num: u8) -> u8 {
match num {
0..=9 => b'0' + num,
10..=15 => b'a' + (num - 10),
unreachable => unreachable!("Unreachable branch num = {}", unreachable),
}
}

for char in key.as_bytes() {
match char {
b'0'..=b'9' => new_key.push(*char),
b'_' | b'a'..=b'z' | b'A'..=b'Z' => new_key.push(*char),
_other => {
new_key.push(b'%');
new_key.push(hex(*char / 16));
new_key.push(hex(*char % 16));
}
}
}

Ok(String::from_utf8(new_key)?)
}

fn unescape_for_key(key: &str) -> Result<String> {
let mut new_key = Vec::with_capacity(key.len());

fn unhex(num: u8) -> u8 {
match num {
b'0'..=b'9' => num - b'0',
b'a'..=b'f' => num - b'a',
unreachable => unreachable!("Unreachable branch num = {}", unreachable),
}
}

let bytes = key.as_bytes();

let mut index = 0;
while index < bytes.len() {
match bytes[index] {
b'%' => {
let mut num = unhex(bytes[index + 1]) * 16;
num += unhex(bytes[index + 2]);
new_key.push(num);
index += 3;
}
other => {
new_key.push(other);
index += 1;
}
}
}

Ok(String::from_utf8(new_key)?)
}

fn new_lift_time(&self) -> KVMeta {
let now = std::time::SystemTime::now();
let expire_at = now
Expand All @@ -142,11 +86,7 @@ impl ClusterApi for ClusterMgr {
let seq = MatchSeq::Exact(0);
let meta = Some(self.new_lift_time());
let value = Operation::Update(serde_json::to_vec(&node)?);
let node_key = format!(
"{}/{}",
self.cluster_prefix,
Self::escape_for_key(&node.id)?
);
let node_key = format!("{}/{}", self.cluster_prefix, escape_for_key(&node.id)?);
let upsert_node = self
.kv_api
.upsert_kv(UpsertKVAction::new(&node_key, seq, value, meta));
Expand All @@ -169,7 +109,7 @@ impl ClusterApi for ClusterMgr {
for (node_key, value) in values {
let mut node_info = serde_json::from_slice::<NodeInfo>(&value.data)?;

let node_key = Self::unescape_for_key(&node_key)?;
let node_key = unescape_for_key(&node_key)?;
node_info.id = node_key[self.cluster_prefix.len() + 1..].to_string();
nodes_info.push(node_info);
}
Expand All @@ -178,11 +118,7 @@ impl ClusterApi for ClusterMgr {
}

async fn drop_node(&self, node_id: String, seq: Option<u64>) -> Result<()> {
let node_key = format!(
"{}/{}",
self.cluster_prefix,
Self::escape_for_key(&node_id)?
);
let node_key = format!("{}/{}", self.cluster_prefix, escape_for_key(&node_id)?);
let upsert_node = self.kv_api.upsert_kv(UpsertKVAction::new(
&node_key,
seq.into(),
Expand All @@ -205,11 +141,7 @@ impl ClusterApi for ClusterMgr {

async fn heartbeat(&self, node: &NodeInfo, seq: Option<u64>) -> Result<u64> {
let meta = Some(self.new_lift_time());
let node_key = format!(
"{}/{}",
self.cluster_prefix,
Self::escape_for_key(&node.id)?
);
let node_key = format!("{}/{}", self.cluster_prefix, escape_for_key(&node.id)?);
let seq = match seq {
None => MatchSeq::GE(1),
Some(exact) => MatchSeq::Exact(exact),
Expand Down
13 changes: 9 additions & 4 deletions common/management/src/stage/stage_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use common_base::escape_for_key;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_api::KVApi;
Expand Down Expand Up @@ -45,7 +46,7 @@ impl StageMgr {

Ok(StageMgr {
kv_api,
stage_prefix: format!("{}/{}", USER_STAGE_API_KEY_PREFIX, tenant),
stage_prefix: format!("{}/{}", USER_STAGE_API_KEY_PREFIX, escape_for_key(tenant)?),
})
}
}
Expand All @@ -55,7 +56,11 @@ impl StageApi for StageMgr {
async fn add_stage(&self, info: UserStageInfo) -> Result<u64> {
let seq = MatchSeq::Exact(0);
let val = Operation::Update(serde_json::to_vec(&info)?);
let key = format!("{}/{}", self.stage_prefix, info.stage_name);
let key = format!(
"{}/{}",
self.stage_prefix,
escape_for_key(&info.stage_name)?
);
let upsert_info = self
.kv_api
.upsert_kv(UpsertKVAction::new(&key, seq, val, None));
Expand All @@ -72,7 +77,7 @@ impl StageApi for StageMgr {
}

async fn get_stage(&self, name: &str, seq: Option<u64>) -> Result<SeqV<UserStageInfo>> {
let key = format!("{}/{}", self.stage_prefix, name);
let key = format!("{}/{}", self.stage_prefix, escape_for_key(name)?);
let kv_api = self.kv_api.clone();
let get_kv = async move { kv_api.get_kv(&key).await };
let res = get_kv.await?;
Expand All @@ -97,7 +102,7 @@ impl StageApi for StageMgr {
}

async fn drop_stage(&self, name: &str, seq: Option<u64>) -> Result<()> {
let key = format!("{}/{}", self.stage_prefix, name);
let key = format!("{}/{}", self.stage_prefix, escape_for_key(name)?);
let kv_api = self.kv_api.clone();
let upsert_kv = async move {
kv_api
Expand Down
11 changes: 6 additions & 5 deletions common/management/src/udf/udf_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use common_ast::udfs::UDFParser;
use common_base::escape_for_key;
use common_exception::ErrorCode;
use common_exception::Result;
use common_functions::is_builtin_function;
Expand Down Expand Up @@ -47,7 +48,7 @@ impl UdfMgr {

Ok(UdfMgr {
kv_api,
udf_prefix: format!("{}/{}", UDF_API_KEY_PREFIX, tenant),
udf_prefix: format!("{}/{}", UDF_API_KEY_PREFIX, escape_for_key(tenant)?),
})
}
}
Expand All @@ -69,7 +70,7 @@ impl UdfApi for UdfMgr {

let seq = MatchSeq::Exact(0);
let val = Operation::Update(serde_json::to_vec(&info)?);
let key = format!("{}/{}", self.udf_prefix, info.name);
let key = format!("{}/{}", self.udf_prefix, escape_for_key(&info.name)?);
let upsert_info = self
.kv_api
.upsert_kv(UpsertKVAction::new(&key, seq, val, None));
Expand Down Expand Up @@ -97,7 +98,7 @@ impl UdfApi for UdfMgr {
let _ = self.get_udf(info.name.as_str(), seq).await?;

let val = Operation::Update(serde_json::to_vec(&info)?);
let key = format!("{}/{}", self.udf_prefix, info.name);
let key = format!("{}/{}", self.udf_prefix, escape_for_key(&info.name)?);
let upsert_info =
self.kv_api
.upsert_kv(UpsertKVAction::new(&key, MatchSeq::from(seq), val, None));
Expand All @@ -113,7 +114,7 @@ impl UdfApi for UdfMgr {
}

async fn get_udf(&self, udf_name: &str, seq: Option<u64>) -> Result<SeqV<UserDefinedFunction>> {
let key = format!("{}/{}", self.udf_prefix, udf_name);
let key = format!("{}/{}", self.udf_prefix, escape_for_key(udf_name)?);
let kv_api = self.kv_api.clone();
let get_kv = async move { kv_api.get_kv(&key).await };
let res = get_kv.await?;
Expand All @@ -138,7 +139,7 @@ impl UdfApi for UdfMgr {
}

async fn drop_udf(&self, udf_name: &str, seq: Option<u64>) -> Result<()> {
let key = format!("{}/{}", self.udf_prefix, udf_name);
let key = format!("{}/{}", self.udf_prefix, escape_for_key(udf_name)?);
let kv_api = self.kv_api.clone();
let upsert_kv = async move {
kv_api
Expand Down
13 changes: 7 additions & 6 deletions common/management/src/user/user_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use common_base::escape_for_key;
use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::ToErrorCode;
Expand Down Expand Up @@ -49,7 +50,7 @@ impl UserMgr {

Ok(UserMgr {
kv_api,
user_prefix: format!("{}/{}", USER_API_KEY_PREFIX, tenant),
user_prefix: format!("{}/{}", USER_API_KEY_PREFIX, escape_for_key(tenant)?),
})
}

Expand All @@ -59,7 +60,7 @@ impl UserMgr {
seq: Option<u64>,
) -> common_exception::Result<u64> {
let user_key = format_user_key(&user_info.name, &user_info.hostname);
let key = format!("{}/{}", self.user_prefix, user_key);
let key = format!("{}/{}", self.user_prefix, escape_for_key(&user_key)?);
let value = serde_json::to_vec(&user_info)?;

let match_seq = match seq {
Expand Down Expand Up @@ -91,7 +92,7 @@ impl UserApi for UserMgr {
async fn add_user(&self, user_info: UserInfo) -> common_exception::Result<u64> {
let match_seq = MatchSeq::Exact(0);
let user_key = format_user_key(&user_info.name, &user_info.hostname);
let key = format!("{}/{}", self.user_prefix, user_key);
let key = format!("{}/{}", self.user_prefix, escape_for_key(&user_key)?);
let value = serde_json::to_vec(&user_info)?;

let kv_api = self.kv_api.clone();
Expand All @@ -118,7 +119,7 @@ impl UserApi for UserMgr {
seq: Option<u64>,
) -> Result<SeqV<UserInfo>> {
let user_key = format_user_key(&username, &hostname);
let key = format!("{}/{}", self.user_prefix, user_key);
let key = format!("{}/{}", self.user_prefix, escape_for_key(&user_key)?);
let res = self.kv_api.get_kv(&key).await?;
let seq_value =
res.ok_or_else(|| ErrorCode::UnknownUser(format!("unknown user {}", user_key)))?;
Expand Down Expand Up @@ -158,7 +159,7 @@ impl UserApi for UserMgr {
new_user_info.grants = user_info.grants;

let user_key = format_user_key(&new_user_info.name, &new_user_info.hostname);
let key = format!("{}/{}", self.user_prefix, user_key);
let key = format!("{}/{}", self.user_prefix, escape_for_key(&user_key)?);
let value = serde_json::to_vec(&new_user_info)?;

let match_seq = match seq {
Expand Down Expand Up @@ -218,7 +219,7 @@ impl UserApi for UserMgr {

async fn drop_user(&self, username: String, hostname: String, seq: Option<u64>) -> Result<()> {
let user_key = format_user_key(&username, &hostname);
let key = format!("{}/{}", self.user_prefix, user_key);
let key = format!("{}/{}", self.user_prefix, escape_for_key(&user_key)?);
let res = self
.kv_api
.upsert_kv(UpsertKVAction::new(
Expand Down
Loading