Skip to content

Commit

Permalink
feat(query): support compute quota feature (#15336)
Browse files Browse the repository at this point in the history
* feat(query): support compute quota feature

* feature(query): support compute quota feature

* feature(query): support compute quota feature
  • Loading branch information
zhang2014 authored Apr 27, 2024
1 parent fb31f82 commit 7edfa1f
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 65 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions src/common/license/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,4 @@ databend-common-base = { path = "../base" }
databend-common-exception = { path = "../exception" }
jwt-simple = "0.11.0"
serde = { workspace = true }
strum = "0.24.1"
strum_macros = "0.24.3"
serde_json = { workspace = true }
207 changes: 165 additions & 42 deletions src/common/license/src/license.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,107 @@ use std::fmt::Formatter;

use serde::Deserialize;
use serde::Serialize;
use strum::IntoEnumIterator;
use strum_macros::EnumIter;

#[derive(Debug, Clone, Eq, Ord, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct ComputeQuota {
threads_num: Option<usize>,
memory_usage: Option<usize>,
}

// All enterprise features are defined here.
#[derive(Debug, PartialEq, EnumIter)]
#[derive(Debug, Clone, Eq, Ord, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum Feature {
#[serde(alias = "license_info", alias = "LICENSE_INFO")]
LicenseInfo,
#[serde(alias = "vacuum", alias = "VACUUM")]
Vacuum,
#[serde(alias = "test", alias = "TEST")]
Test,
#[serde(alias = "virtual_column", alias = "VIRTUAL_COLUMN")]
VirtualColumn,
#[serde(alias = "background_service", alias = "BACKGROUND_SERVICE")]
BackgroundService,
#[serde(alias = "data_mask", alias = "DATA_MASK")]
DataMask,
#[serde(alias = "aggregate_index", alias = "AGGREGATE_INDEX")]
AggregateIndex,
#[serde(alias = "inverted_index", alias = "INVERTED_INDEX")]
InvertedIndex,
#[serde(alias = "computed_column", alias = "COMPUTED_COLUMN")]
ComputedColumn,
#[serde(alias = "storage_encryption", alias = "STORAGE_ENCRYPTION")]
StorageEncryption,
#[serde(alias = "stream", alias = "STREAM")]
Stream,
#[serde(alias = "compute_quota", alias = "COMPUTE_QUOTA")]
ComputeQuota(ComputeQuota),

#[serde(other)]
Unknown,
}

impl Display for Feature {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Feature::VirtualColumn => {
write!(f, "virtual_column")
}
Feature::LicenseInfo => {
write!(f, "license_info")
}
Feature::Vacuum => {
write!(f, "vacuum")
}
Feature::Test => {
write!(f, "test")
}
Feature::BackgroundService => {
write!(f, "background_service")
}
Feature::DataMask => {
write!(f, "data_mask")
}
Feature::AggregateIndex => {
write!(f, "aggregate_index")
}
Feature::InvertedIndex => {
write!(f, "inverted_index")
}
Feature::ComputedColumn => {
write!(f, "computed_column")
}
Feature::StorageEncryption => {
write!(f, "storage_encryption")
Feature::LicenseInfo => write!(f, "license_info"),
Feature::Vacuum => write!(f, "vacuum"),
Feature::Test => write!(f, "test"),
Feature::VirtualColumn => write!(f, "virtual_column"),
Feature::BackgroundService => write!(f, "background_service"),
Feature::DataMask => write!(f, "data_mask"),
Feature::AggregateIndex => write!(f, "aggregate_index"),
Feature::InvertedIndex => write!(f, "inverted_index"),
Feature::ComputedColumn => write!(f, "computed_column"),
Feature::StorageEncryption => write!(f, "storage_encryption"),
Feature::Stream => write!(f, "stream"),
Feature::ComputeQuota(v) => {
write!(f, "compute_quota(")?;

match &v.threads_num {
None => write!(f, "threads_num: unlimited,")?,
Some(threads_num) => write!(f, "threads_num: {}", *threads_num)?,
};

match v.memory_usage {
None => write!(f, "memory_usage: unlimited,"),
Some(memory_usage) => write!(f, "memory_usage: {}", memory_usage),
}
}
Feature::Stream => {
write!(f, "stream")
Feature::Unknown => write!(f, "unknown"),
}
}
}

impl Feature {
pub fn verify(&self, feature: &Feature) -> bool {
match (self, feature) {
(Feature::ComputeQuota(c), Feature::ComputeQuota(v)) => {
if let Some(thread_num) = c.threads_num {
if thread_num <= v.threads_num.unwrap_or(usize::MAX) {
return false;
}
}

if let Some(max_memory_usage) = c.memory_usage {
if max_memory_usage <= v.memory_usage.unwrap_or(usize::MAX) {
return false;
}
}

true
}
(Feature::Test, Feature::Test)
| (Feature::AggregateIndex, Feature::AggregateIndex)
| (Feature::ComputedColumn, Feature::ComputedColumn)
| (Feature::Vacuum, Feature::Vacuum)
| (Feature::LicenseInfo, Feature::LicenseInfo)
| (Feature::Stream, Feature::Stream)
| (Feature::BackgroundService, Feature::BackgroundService)
| (Feature::DataMask, Feature::DataMask)
| (Feature::InvertedIndex, Feature::InvertedIndex)
| (Feature::VirtualColumn, Feature::VirtualColumn)
| (Feature::StorageEncryption, Feature::StorageEncryption) => true,
(_, _) => false,
}
}
}
Expand All @@ -82,25 +128,102 @@ pub struct LicenseInfo {
pub r#type: Option<String>,
pub org: Option<String>,
pub tenants: Option<Vec<String>>,
pub features: Option<Vec<String>>,
pub features: Option<Vec<Feature>>,
}

impl LicenseInfo {
pub fn display_features(&self) -> String {
// sort all features in alphabet order and ignore test feature
let mut binding = self.features.clone().unwrap_or_default();
if binding.is_empty() {
binding = Feature::iter().map(|f| f.to_string()).collect::<Vec<_>>();
let mut features = self.features.clone().unwrap_or_default();

if features.is_empty() {
return String::from("Unlimited");
}
let mut features = binding
.iter()
.filter(|f| *f != &Feature::Test.to_string())
.collect::<Vec<_>>();

features.sort();

features
.iter()
.filter(|f| **f != Feature::Test)
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(",")
}
}

#[cfg(test)]
mod tests {
use crate::license::ComputeQuota;
use crate::license::Feature;

#[test]
fn test_deserialize_feature_from_string() {
assert_eq!(
Feature::LicenseInfo,
serde_json::from_str::<Feature>("\"license_info\"").unwrap()
);
assert_eq!(
Feature::Vacuum,
serde_json::from_str::<Feature>("\"Vacuum\"").unwrap()
);
assert_eq!(
Feature::Test,
serde_json::from_str::<Feature>("\"Test\"").unwrap()
);
assert_eq!(
Feature::VirtualColumn,
serde_json::from_str::<Feature>("\"VIRTUAL_COLUMN\"").unwrap()
);
assert_eq!(
Feature::BackgroundService,
serde_json::from_str::<Feature>("\"BackgroundService\"").unwrap()
);
assert_eq!(
Feature::DataMask,
serde_json::from_str::<Feature>("\"DataMask\"").unwrap()
);
assert_eq!(
Feature::AggregateIndex,
serde_json::from_str::<Feature>("\"AggregateIndex\"").unwrap()
);
assert_eq!(
Feature::InvertedIndex,
serde_json::from_str::<Feature>("\"InvertedIndex\"").unwrap()
);
assert_eq!(
Feature::ComputedColumn,
serde_json::from_str::<Feature>("\"ComputedColumn\"").unwrap()
);
assert_eq!(
Feature::StorageEncryption,
serde_json::from_str::<Feature>("\"StorageEncryption\"").unwrap()
);
assert_eq!(
Feature::Stream,
serde_json::from_str::<Feature>("\"Stream\"").unwrap()
);
assert_eq!(
Feature::ComputeQuota(ComputeQuota {
threads_num: Some(1),
memory_usage: Some(1),
}),
serde_json::from_str::<Feature>(
"{\"ComputeQuota\":{\"threads_num\":1, \"memory_usage\":1}}"
)
.unwrap()
);

assert_eq!(
Feature::ComputeQuota(ComputeQuota {
threads_num: None,
memory_usage: Some(1),
}),
serde_json::from_str::<Feature>("{\"ComputeQuota\":{\"memory_usage\":1}}").unwrap()
);

assert_eq!(
Feature::Unknown,
serde_json::from_str::<Feature>("\"ssss\"").unwrap()
);
}
}
20 changes: 12 additions & 8 deletions src/query/ee/src/license/license_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,18 @@ impl RealLicenseManager {
if l.custom.features.is_none() {
return Ok(());
}
let features = l.custom.features.as_ref().unwrap();
if !features.contains(&feature.to_string()) {
return Err(ErrorCode::LicenseKeyInvalid(format!(
"license key does not support feature {}, supported features: {}",
feature,
l.custom.display_features()
)));

let verify_features = l.custom.features.as_ref().unwrap();
for verify_feature in verify_features {
if verify_feature.verify(&feature) {
return Ok(());
}
}
Ok(())

Err(ErrorCode::LicenseKeyInvalid(format!(
"license key does not support feature {}, supported features: {}",
feature,
l.custom.display_features()
)))
}
}
16 changes: 6 additions & 10 deletions src/query/ee/tests/it/license/license_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use jwt_simple::prelude::UnixTimeStamp;
fn build_custom_claims(
license_type: String,
org: String,
features: Option<Vec<String>>,
features: Option<Vec<Feature>>,
) -> LicenseInfo {
LicenseInfo {
r#type: Some(license_type),
Expand Down Expand Up @@ -99,10 +99,10 @@ async fn test_license_features() -> databend_common_exception::Result<()> {
"trial".to_string(),
"expired".to_string(),
Some(vec![
"test".to_string(),
"license_info".to_string(),
"vacuum".to_string(),
"stream".to_string(),
Feature::Test,
Feature::LicenseInfo,
Feature::Vacuum,
Feature::Stream,
]),
),
Duration::from_hours(2),
Expand Down Expand Up @@ -153,11 +153,7 @@ async fn test_license_features() -> databend_common_exception::Result<()> {
build_custom_claims(
"trial".to_string(),
"expired".to_string(),
Some(vec![
"test".to_string(),
"license_info".to_string(),
"vacuum".to_string(),
]),
Some(vec![Feature::Test, Feature::LicenseInfo, Feature::Vacuum]),
),
Duration::from_hours(0),
);
Expand Down
2 changes: 1 addition & 1 deletion tests/suites/5_ee/00_check/00_0014_license_info.result
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[]
aggregate_index,background_service,computed_column,data_mask,inverted_index,license_info,storage_encryption,stream,vacuum,virtual_column
Unlimited
[(0,)]

0 comments on commit 7edfa1f

Please sign in to comment.