From fd31d790de57887da6376f28571de928d6224724 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 7 Jan 2025 16:04:24 +0800 Subject: [PATCH 1/3] fix(cluster): fix unassign warehouse nodes failure --- .../management/src/warehouse/warehouse_mgr.rs | 48 +++++-- src/query/management/tests/it/warehouse.rs | 128 ++++++++++++++++++ 2 files changed, 162 insertions(+), 14 deletions(-) diff --git a/src/query/management/src/warehouse/warehouse_mgr.rs b/src/query/management/src/warehouse/warehouse_mgr.rs index 6b93ead41bc23..e7e370d63ef85 100644 --- a/src/query/management/src/warehouse/warehouse_mgr.rs +++ b/src/query/management/src/warehouse/warehouse_mgr.rs @@ -1937,6 +1937,20 @@ impl WarehouseApi for WarehouseMgr { return Err(ErrorCode::InvalidWarehouse("Warehouse name is empty.")); } + if nodes.is_empty() { + return Err(ErrorCode::BadArguments("Unassign list is empty.")); + } + + if nodes.iter().any(|(name, _)| name.is_empty()) { + return Err(ErrorCode::BadArguments("Unassign cluster name is empty.")); + } + + if nodes.iter().any(|(_, list)| list.is_empty()) { + return Err(ErrorCode::BadArguments( + "Unassign cluster nodes list is empty.", + )); + } + for _idx in 0..10 { let mut nodes = nodes.clone(); let mut drop_cluster_node_txn = TxnRequest::default(); @@ -2011,23 +2025,29 @@ impl WarehouseApi for WarehouseMgr { )); if let Some(v) = nodes.get_mut(&node_snapshot.node_info.cluster_id) { - if let Some(remove_node) = v.pop() { - let SelectedNode::Random(node_group) = remove_node; - if node_snapshot.node_info.runtime_node_group == node_group { - let node = node_snapshot.node_info.leave_warehouse(); - - drop_cluster_node_txn - .if_then - .push(TxnOp::delete(cluster_node_key)); - drop_cluster_node_txn.if_then.push(TxnOp::put_with_ttl( - node_key, - serde_json::to_vec(&node)?, - Some(self.lift_time * 4), - )) - } + let runtime_node_group = node_snapshot.node_info.runtime_node_group.clone(); + if v.remove_first(&SelectedNode::Random(runtime_node_group)) + .is_some() + { + let node = node_snapshot.node_info.leave_warehouse(); + + drop_cluster_node_txn + .if_then + .push(TxnOp::delete(cluster_node_key)); + drop_cluster_node_txn.if_then.push(TxnOp::put_with_ttl( + node_key, + serde_json::to_vec(&node)?, + Some(self.lift_time * 4), + )); } } } + + let txn_reply = self.metastore.transaction(drop_cluster_node_txn).await?; + + if txn_reply.success { + return Ok(()); + } } Err(ErrorCode::WarehouseOperateConflict( diff --git a/src/query/management/tests/it/warehouse.rs b/src/query/management/tests/it/warehouse.rs index af87241bb101d..aa383b8bd417b 100644 --- a/src/query/management/tests/it/warehouse.rs +++ b/src/query/management/tests/it/warehouse.rs @@ -613,6 +613,134 @@ async fn test_recovery_create_warehouse() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_unassign_nodes_for_invalid_warehouse() -> Result<()> { + let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?; + + let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes("", HashMap::new()); + + assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2403); + + let unassign_warehouse_nodes = + warehouse_manager.unassign_warehouse_nodes("test_warehouse", HashMap::new()); + + assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 1006); + + let unassign_warehouse_nodes = warehouse_manager + .unassign_warehouse_nodes("test_warehouse", HashMap::from([(String::new(), vec![])])); + + assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 1006); + + let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes( + "test_warehouse", + HashMap::from([(String::from("test"), vec![])]), + ); + + assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 1006); + + let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes( + "test_warehouse", + HashMap::from([(String::from("test"), vec![SelectedNode::Random(None)])]), + ); + + assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2406); + + warehouse_manager + .create_warehouse(String::from("test_warehouse"), vec![ + SelectedNode::Random(None), + SelectedNode::Random(None), + ]) + .await?; + + let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes( + "test_warehouse", + HashMap::from([(String::from("test"), vec![SelectedNode::Random(None)])]), + ); + + assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2410); + + let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes( + "test_warehouse", + HashMap::from([(String::from("default"), vec![SelectedNode::Random(Some( + String::from("unknown"), + ))])]), + ); + + assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2401); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_unassign_all_nodes_for_warehouse() -> Result<()> { + let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?; + warehouse_manager + .create_warehouse(String::from("test_warehouse"), vec![SelectedNode::Random( + None, + )]) + .await?; + + let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes( + "test_warehouse", + HashMap::from([(String::from("default"), vec![ + SelectedNode::Random(None), + SelectedNode::Random(Some(String::from("test_node_group"))), + ])]), + ); + + assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2401); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_unassign_nodes_for_warehouse() -> Result<()> { + let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?; + let create_warehouse = warehouse_manager + .create_warehouse(String::from("test_warehouse"), vec![SelectedNode::Random( + None, + )]); + + create_warehouse.await?; + + let mut node_1 = system_managed_node(&GlobalUniqName::unique()); + node_1.node_group = Some(String::from("test_node_group")); + warehouse_manager.start_node(node_1.clone()).await?; + + let mut node_2 = system_managed_node(&GlobalUniqName::unique()); + node_2.node_group = Some(String::from("test_node_group")); + warehouse_manager.start_node(node_2.clone()).await?; + + let add_warehouse_cluster = warehouse_manager.add_warehouse_cluster( + String::from("test_warehouse"), + String::from("cluster_name"), + vec![ + SelectedNode::Random(Some(String::from("test_node_group"))), + SelectedNode::Random(None), + SelectedNode::Random(Some(String::from("test_node_group"))), + ], + ); + + add_warehouse_cluster.await?; + + let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes( + "test_warehouse", + HashMap::from([(String::from("cluster_name"), vec![ + SelectedNode::Random(None), + SelectedNode::Random(Some(String::from("test_node_group"))), + ])]), + ); + + unassign_warehouse_nodes.await?; + + let nodes = warehouse_manager + .list_warehouse_cluster_nodes("test_warehouse", "cluster_name") + .await?; + + assert_eq!(nodes.len(), 1); + assert!(nodes[0].id == node_1.id || nodes[0].id == node_2.id); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_concurrent_recovery_create_warehouse() -> Result<()> { let (_, warehouse_manager, nodes) = nodes(Duration::from_mins(30), 2).await?; From bcb2c16f67c024f6fc5de8d534e350f65f6dea59 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 7 Jan 2025 16:15:29 +0800 Subject: [PATCH 2/3] fix(cluster): fix unassign warehouse nodes failure --- .../management/src/warehouse/warehouse_mgr.rs | 10 +++++ src/query/management/tests/it/warehouse.rs | 38 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/src/query/management/src/warehouse/warehouse_mgr.rs b/src/query/management/src/warehouse/warehouse_mgr.rs index e7e370d63ef85..4c59e8e474996 100644 --- a/src/query/management/src/warehouse/warehouse_mgr.rs +++ b/src/query/management/src/warehouse/warehouse_mgr.rs @@ -1829,6 +1829,16 @@ impl WarehouseApi for WarehouseMgr { )); } + if nodes.iter().any(|(name, _)| name.is_empty()) { + return Err(ErrorCode::BadArguments("Assign cluster name is empty.")); + } + + if nodes.iter().any(|(_, list)| list.is_empty()) { + return Err(ErrorCode::BadArguments( + "Assign cluster nodes list is empty.", + )); + } + for _idx in 0..10 { let selected_nodes = self.pick_assign_warehouse_node(&warehouse, &nodes).await?; diff --git a/src/query/management/tests/it/warehouse.rs b/src/query/management/tests/it/warehouse.rs index aa383b8bd417b..2b5ebb1e385b0 100644 --- a/src/query/management/tests/it/warehouse.rs +++ b/src/query/management/tests/it/warehouse.rs @@ -613,6 +613,44 @@ async fn test_recovery_create_warehouse() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_assign_nodes_for_invalid_warehouse() -> Result<()> { + let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?; + + let assign_warehouse_nodes = + warehouse_manager.assign_warehouse_nodes(String::from(""), HashMap::new()); + + assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 2403); + + let assign_warehouse_nodes = + warehouse_manager.assign_warehouse_nodes(String::from("test_warehouse"), HashMap::new()); + + assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 2408); + + let assign_warehouse_nodes = warehouse_manager.assign_warehouse_nodes( + String::from("test_warehouse"), + HashMap::from([(String::new(), vec![])]), + ); + + assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 1006); + + let assign_warehouse_nodes = warehouse_manager.assign_warehouse_nodes( + String::from("test_warehouse"), + HashMap::from([(String::from("test"), vec![])]), + ); + + assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 1006); + + let assign_warehouse_nodes = warehouse_manager.assign_warehouse_nodes( + String::from("test_warehouse"), + HashMap::from([(String::from("test"), vec![SelectedNode::Random(None)])]), + ); + + assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 2406); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_unassign_nodes_for_invalid_warehouse() -> Result<()> { let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?; From d2f853027c0e8b3899d44ade57e15847d9fa4ee3 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 7 Jan 2025 16:42:48 +0800 Subject: [PATCH 3/3] chore(cluster): use millis for forward requet seed --- src/query/service/src/clusters/cluster.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index 563dfa98fb092..acfbe065fec72 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -364,10 +364,10 @@ impl ClusterDiscovery { .duration_since(std::time::UNIX_EPOCH) .expect("expect time"); - let nanos = system_timestamp.as_nanos(); - let cluster_idx = (nanos % warehouse_clusters_nodes_index.len() as u128) as usize; + let millis = system_timestamp.as_millis(); + let cluster_idx = (millis % warehouse_clusters_nodes_index.len() as u128) as usize; let pick_cluster_nodes = &warehouse_clusters_nodes[cluster_idx]; - let nodes_idx = (nanos % pick_cluster_nodes.len() as u128) as usize; + let nodes_idx = (millis % pick_cluster_nodes.len() as u128) as usize; Ok(Some(pick_cluster_nodes[nodes_idx].clone())) }