Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cluster): fix unassign warehouse nodes failure #17195

Merged
merged 3 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 44 additions & 14 deletions src/query/management/src/warehouse/warehouse_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1829,6 +1829,16 @@ impl WarehouseApi for WarehouseMgr {
));
}

if nodes.iter().any(|(name, _)| name.is_empty()) {
return Err(ErrorCode::BadArguments("Assign cluster name is empty."));
}

if nodes.iter().any(|(_, list)| list.is_empty()) {
return Err(ErrorCode::BadArguments(
"Assign cluster nodes list is empty.",
));
}

for _idx in 0..10 {
let selected_nodes = self.pick_assign_warehouse_node(&warehouse, &nodes).await?;

Expand Down Expand Up @@ -1937,6 +1947,20 @@ impl WarehouseApi for WarehouseMgr {
return Err(ErrorCode::InvalidWarehouse("Warehouse name is empty."));
}

if nodes.is_empty() {
return Err(ErrorCode::BadArguments("Unassign list is empty."));
}

if nodes.iter().any(|(name, _)| name.is_empty()) {
return Err(ErrorCode::BadArguments("Unassign cluster name is empty."));
}

if nodes.iter().any(|(_, list)| list.is_empty()) {
return Err(ErrorCode::BadArguments(
"Unassign cluster nodes list is empty.",
));
}

for _idx in 0..10 {
let mut nodes = nodes.clone();
let mut drop_cluster_node_txn = TxnRequest::default();
Expand Down Expand Up @@ -2011,23 +2035,29 @@ impl WarehouseApi for WarehouseMgr {
));

if let Some(v) = nodes.get_mut(&node_snapshot.node_info.cluster_id) {
if let Some(remove_node) = v.pop() {
let SelectedNode::Random(node_group) = remove_node;
if node_snapshot.node_info.runtime_node_group == node_group {
let node = node_snapshot.node_info.leave_warehouse();

drop_cluster_node_txn
.if_then
.push(TxnOp::delete(cluster_node_key));
drop_cluster_node_txn.if_then.push(TxnOp::put_with_ttl(
node_key,
serde_json::to_vec(&node)?,
Some(self.lift_time * 4),
))
}
let runtime_node_group = node_snapshot.node_info.runtime_node_group.clone();
if v.remove_first(&SelectedNode::Random(runtime_node_group))
.is_some()
{
let node = node_snapshot.node_info.leave_warehouse();

drop_cluster_node_txn
.if_then
.push(TxnOp::delete(cluster_node_key));
drop_cluster_node_txn.if_then.push(TxnOp::put_with_ttl(
node_key,
serde_json::to_vec(&node)?,
Some(self.lift_time * 4),
));
}
}
}

let txn_reply = self.metastore.transaction(drop_cluster_node_txn).await?;

if txn_reply.success {
return Ok(());
}
}

Err(ErrorCode::WarehouseOperateConflict(
Expand Down
166 changes: 166 additions & 0 deletions src/query/management/tests/it/warehouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,172 @@ async fn test_recovery_create_warehouse() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_assign_nodes_for_invalid_warehouse() -> Result<()> {
let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?;

let assign_warehouse_nodes =
warehouse_manager.assign_warehouse_nodes(String::from(""), HashMap::new());

assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 2403);

let assign_warehouse_nodes =
warehouse_manager.assign_warehouse_nodes(String::from("test_warehouse"), HashMap::new());

assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 2408);

let assign_warehouse_nodes = warehouse_manager.assign_warehouse_nodes(
String::from("test_warehouse"),
HashMap::from([(String::new(), vec![])]),
);

assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 1006);

let assign_warehouse_nodes = warehouse_manager.assign_warehouse_nodes(
String::from("test_warehouse"),
HashMap::from([(String::from("test"), vec![])]),
);

assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 1006);

let assign_warehouse_nodes = warehouse_manager.assign_warehouse_nodes(
String::from("test_warehouse"),
HashMap::from([(String::from("test"), vec![SelectedNode::Random(None)])]),
);

assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 2406);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_unassign_nodes_for_invalid_warehouse() -> Result<()> {
let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?;

let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes("", HashMap::new());

assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2403);

let unassign_warehouse_nodes =
warehouse_manager.unassign_warehouse_nodes("test_warehouse", HashMap::new());

assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 1006);

let unassign_warehouse_nodes = warehouse_manager
.unassign_warehouse_nodes("test_warehouse", HashMap::from([(String::new(), vec![])]));

assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 1006);

let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
"test_warehouse",
HashMap::from([(String::from("test"), vec![])]),
);

assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 1006);

let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
"test_warehouse",
HashMap::from([(String::from("test"), vec![SelectedNode::Random(None)])]),
);

assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2406);

warehouse_manager
.create_warehouse(String::from("test_warehouse"), vec![
SelectedNode::Random(None),
SelectedNode::Random(None),
])
.await?;

let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
"test_warehouse",
HashMap::from([(String::from("test"), vec![SelectedNode::Random(None)])]),
);

assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2410);

let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
"test_warehouse",
HashMap::from([(String::from("default"), vec![SelectedNode::Random(Some(
String::from("unknown"),
))])]),
);

assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2401);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_unassign_all_nodes_for_warehouse() -> Result<()> {
let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?;
warehouse_manager
.create_warehouse(String::from("test_warehouse"), vec![SelectedNode::Random(
None,
)])
.await?;

let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
"test_warehouse",
HashMap::from([(String::from("default"), vec![
SelectedNode::Random(None),
SelectedNode::Random(Some(String::from("test_node_group"))),
])]),
);

assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2401);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_unassign_nodes_for_warehouse() -> Result<()> {
let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?;
let create_warehouse = warehouse_manager
.create_warehouse(String::from("test_warehouse"), vec![SelectedNode::Random(
None,
)]);

create_warehouse.await?;

let mut node_1 = system_managed_node(&GlobalUniqName::unique());
node_1.node_group = Some(String::from("test_node_group"));
warehouse_manager.start_node(node_1.clone()).await?;

let mut node_2 = system_managed_node(&GlobalUniqName::unique());
node_2.node_group = Some(String::from("test_node_group"));
warehouse_manager.start_node(node_2.clone()).await?;

let add_warehouse_cluster = warehouse_manager.add_warehouse_cluster(
String::from("test_warehouse"),
String::from("cluster_name"),
vec![
SelectedNode::Random(Some(String::from("test_node_group"))),
SelectedNode::Random(None),
SelectedNode::Random(Some(String::from("test_node_group"))),
],
);

add_warehouse_cluster.await?;

let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
"test_warehouse",
HashMap::from([(String::from("cluster_name"), vec![
SelectedNode::Random(None),
SelectedNode::Random(Some(String::from("test_node_group"))),
])]),
);

unassign_warehouse_nodes.await?;

let nodes = warehouse_manager
.list_warehouse_cluster_nodes("test_warehouse", "cluster_name")
.await?;

assert_eq!(nodes.len(), 1);
assert!(nodes[0].id == node_1.id || nodes[0].id == node_2.id);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_concurrent_recovery_create_warehouse() -> Result<()> {
let (_, warehouse_manager, nodes) = nodes(Duration::from_mins(30), 2).await?;
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,10 @@ impl ClusterDiscovery {
.duration_since(std::time::UNIX_EPOCH)
.expect("expect time");

let nanos = system_timestamp.as_nanos();
let cluster_idx = (nanos % warehouse_clusters_nodes_index.len() as u128) as usize;
let millis = system_timestamp.as_millis();
let cluster_idx = (millis % warehouse_clusters_nodes_index.len() as u128) as usize;
let pick_cluster_nodes = &warehouse_clusters_nodes[cluster_idx];
let nodes_idx = (nanos % pick_cluster_nodes.len() as u128) as usize;
let nodes_idx = (millis % pick_cluster_nodes.len() as u128) as usize;
Ok(Some(pick_cluster_nodes[nodes_idx].clone()))
}

Expand Down
Loading