Skip to content

Commit

Permalink
feat: hilbert clustering (#17045)
Browse files Browse the repository at this point in the history
* hilbert_clustering

* fix test

* vacuum temp files after recluster

* fix

* add test

* fix

* fix

* add hilbert_clustering_information

* update

* update

* add comments
  • Loading branch information
zhyass authored Jan 11, 2025
1 parent 4470ee5 commit be6657d
Show file tree
Hide file tree
Showing 80 changed files with 2,136 additions and 644 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ databend-enterprise-attach-table = { path = "src/query/ee_features/attach_table"
databend-enterprise-background-service = { path = "src/query/ee_features/background_service" }
databend-enterprise-data-mask-feature = { path = "src/query/ee_features/data_mask" }
databend-enterprise-fail-safe = { path = "src/query/ee_features/fail_safe" }
databend-enterprise-hilbert-clustering = { path = "src/query/ee_features/hilbert_clustering" }
databend-enterprise-inverted-index = { path = "src/query/ee_features/inverted_index" }
databend-enterprise-meta = { path = "src/meta/ee" }
databend-enterprise-query = { path = "src/query/ee" }
Expand Down
3 changes: 2 additions & 1 deletion src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,9 @@ build_exceptions! {
// recluster error codes
NoNeedToRecluster(4011),
NoNeedToCompact(4012),
UnsupportedClusterType(4013),

RefreshTableInfoFailure(4012),
RefreshTableInfoFailure(4021),
}

// Service errors [5001,6000].
Expand Down
14 changes: 12 additions & 2 deletions src/common/license/src/license.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub enum Feature {
AmendTable,
#[serde(alias = "system_management", alias = "SYSTEM_MANAGEMENT")]
SystemManagement,
#[serde(alias = "hilbert_clustering", alias = "HILBERT_CLUSTERING")]
HilbertClustering,
#[serde(other)]
Unknown,
}
Expand Down Expand Up @@ -122,6 +124,7 @@ impl fmt::Display for Feature {
}
Feature::AmendTable => write!(f, "amend_table"),
Feature::SystemManagement => write!(f, "system_management"),
Feature::HilbertClustering => write!(f, "hilbert_clustering"),
Feature::Unknown => write!(f, "unknown"),
}
}
Expand Down Expand Up @@ -169,7 +172,8 @@ impl Feature {
| (Feature::InvertedIndex, Feature::InvertedIndex)
| (Feature::VirtualColumn, Feature::VirtualColumn)
| (Feature::AttacheTable, Feature::AttacheTable)
| (Feature::StorageEncryption, Feature::StorageEncryption) => Ok(true),
| (Feature::StorageEncryption, Feature::StorageEncryption)
| (Feature::HilbertClustering, Feature::HilbertClustering) => Ok(true),
(_, _) => Ok(false),
}
}
Expand Down Expand Up @@ -337,6 +341,11 @@ mod tests {
serde_json::from_str::<Feature>("\"amend_table\"").unwrap()
);

assert_eq!(
Feature::HilbertClustering,
serde_json::from_str::<Feature>("\"hilbert_clustering\"").unwrap()
);

assert_eq!(
Feature::Unknown,
serde_json::from_str::<Feature>("\"ssss\"").unwrap()
Expand Down Expand Up @@ -370,11 +379,12 @@ mod tests {
storage_usage: Some(1),
}),
Feature::AmendTable,
Feature::HilbertClustering,
]),
};

assert_eq!(
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,background_service,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,inverted_index,license_info,storage_encryption,storage_quota(storage_usage: 1),stream,vacuum,virtual_column] }",
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,background_service,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,hilbert_clustering,inverted_index,license_info,storage_encryption,storage_quota(storage_usage: 1),stream,vacuum,virtual_column] }",
license_info.to_string()
);
}
Expand Down
7 changes: 7 additions & 0 deletions src/query/catalog/src/plan/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,10 @@ impl ReclusterParts {
}
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
pub struct ReclusterInfoSideCar {
pub merged_blocks: Vec<Arc<BlockMeta>>,
pub removed_segment_indexes: Vec<usize>,
pub removed_statistics: Statistics,
}
25 changes: 25 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
use databend_common_ast::ast::Expr;
use databend_common_ast::parser::parse_comma_separated_exprs;
use databend_common_ast::parser::tokenize_sql;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
Expand Down Expand Up @@ -136,6 +139,28 @@ pub trait Table: Sync + Send {
Some(cluster_type)
}

fn resolve_cluster_keys(&self, ctx: Arc<dyn TableContext>) -> Option<Vec<Expr>> {
let Some((_, cluster_key_str)) = &self.cluster_key_meta() else {
return None;
};
let tokens = tokenize_sql(cluster_key_str).unwrap();
let sql_dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default();
let mut ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect).unwrap();
// unwrap tuple.
if ast_exprs.len() == 1 {
if let Expr::Tuple { exprs, .. } = &ast_exprs[0] {
ast_exprs = exprs.clone();
}
} else {
// Defensive check:
// `ast_exprs` should always contain one element which can be one of the following:
// 1. A tuple of composite cluster keys
// 2. A single cluster key
unreachable!("invalid cluster key ast expression, {:?}", ast_exprs);
}
Some(ast_exprs)
}

fn change_tracking_enabled(&self) -> bool {
false
}
Expand Down
18 changes: 15 additions & 3 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,23 @@ pub trait TableContext: Send + Sync {
max_files: Option<usize>,
) -> Result<FilteredCopyFiles>;

fn add_segment_location(&self, segment_loc: Location) -> Result<()>;
fn add_written_segment_location(&self, segment_loc: Location) -> Result<()>;

fn clear_segment_locations(&self) -> Result<()>;
fn clear_written_segment_locations(&self) -> Result<()>;

fn get_segment_locations(&self) -> Result<Vec<Location>>;
fn get_written_segment_locations(&self) -> Result<Vec<Location>>;

fn add_selected_segment_location(&self, _segment_loc: Location) {
unimplemented!()
}

fn get_selected_segment_locations(&self) -> Vec<Location> {
unimplemented!()
}

fn clear_selected_segment_locations(&self) {
unimplemented!()
}

fn add_file_status(&self, file_path: &str, file_status: FileStatus) -> Result<()>;

Expand Down
1 change: 1 addition & 0 deletions src/query/ee/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ databend-enterprise-attach-table = { workspace = true }
databend-enterprise-background-service = { workspace = true }
databend-enterprise-data-mask-feature = { workspace = true }
databend-enterprise-fail-safe = { workspace = true }
databend-enterprise-hilbert-clustering = { workspace = true }
databend-enterprise-inverted-index = { workspace = true }
databend-enterprise-resources-management = { workspace = true }
databend-enterprise-storage-encryption = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions src/query/ee/src/enterprise_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::attach_table::RealAttachTableHandler;
use crate::background_service::RealBackgroundService;
use crate::data_mask::RealDatamaskHandler;
use crate::fail_safe::RealFailSafeHandler;
use crate::hilbert_clustering::RealHilbertClusteringHandler;
use crate::inverted_index::RealInvertedIndexHandler;
use crate::license::license_mgr::RealLicenseManager;
use crate::resource_management::init_resources_management;
Expand All @@ -47,6 +48,7 @@ impl EnterpriseServices {
RealStorageQuotaHandler::init(&cfg)?;
RealFailSafeHandler::init()?;
init_resources_management(&cfg).await?;
RealHilbertClusteringHandler::init()?;
Ok(())
}
}
Loading

0 comments on commit be6657d

Please sign in to comment.