Skip to content

Commit

Permalink
fix(cluster): fix heartbeat self managed after network fail (#17241)
Browse files Browse the repository at this point in the history
* fix(cluster): fix heartbeat self managed after network fail

* fix(cluster): fix heartbeat self managed after network fail
  • Loading branch information
zhang2014 authored Jan 10, 2025
1 parent d75e65a commit 2e3e44f
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/query/management/src/warehouse/warehouse_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,11 @@ impl WarehouseApi for WarehouseMgr {
value: Some(seq_v), ..
})),
}) => Ok(seq_v.seq),
// compatibility
// After network fail, nodes may become expired due to failed heartbeats.
// For system-managed nodes, this situation has already been handled in resolve_conflicts.
// For self-managed nodes, we need to return seq = 0 so that the next heartbeat can proceed normally.
_ if matches!(node.node_type, NodeType::SelfManaged) => Ok(0),
_ => Err(ErrorCode::MetaServiceError("Heartbeat node info failure.")),
}
}
Expand Down
26 changes: 26 additions & 0 deletions src/query/management/tests/it/warehouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,32 @@ async fn test_drop_self_managed_warehouse() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_heartbeat_not_exists_self_managed_node() -> Result<()> {
let (_kv, warehouse_manager, _nodes) = nodes(Duration::from_mins(60), 0).await?;

let node_info = self_managed_node("test_node");

// heartbeat not exists node info
let mut heartbeat_node = node_info.clone();
let seq = warehouse_manager
.heartbeat_node(&mut heartbeat_node, 34234)
.await?;

assert_eq!(seq, 0);
assert_eq!(heartbeat_node, node_info);

let mut heartbeat_node = node_info.clone();
let seq = warehouse_manager
.heartbeat_node(&mut heartbeat_node, seq)
.await?;

assert_ne!(seq, 0);
assert_eq!(heartbeat_node, node_info);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_successfully_heartbeat_self_managed_node() -> Result<()> {
let (kv, warehouse_manager, _nodes) = nodes(Duration::from_mins(60), 0).await?;
Expand Down
11 changes: 9 additions & 2 deletions src/query/service/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,15 @@ impl ClusterDiscovery {
cluster_nodes.len() as f64,
);

let res = Cluster::create(res, self.local_id.clone());
Ok(res)
// compatibility, for self-managed nodes, we allow queries to continue executing even when the heartbeat fails.
if cluster_nodes.is_empty() && !config.query.cluster_id.is_empty() {
let mut cluster = Cluster::empty();
let mut_cluster = Arc::get_mut(&mut cluster).unwrap();
mut_cluster.local_id = self.local_id.clone();
return Ok(cluster);
}

Ok(Cluster::create(res, self.local_id.clone()))
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/query/service/tests/it/clusters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,25 @@ use databend_query::clusters::ClusterHelper;
use databend_query::test_kits::*;
use pretty_assertions::assert_eq;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_empty_cluster_discovery() -> Result<()> {
let _guard = TestFixture::setup().await?;

let config = ConfigBuilder::create().build();

let metastore = ClusterDiscovery::create_meta_client(&config).await?;
let cluster_discovery = ClusterDiscovery::try_create(&config, metastore.clone()).await?;

let discover_cluster = cluster_discovery.discover(&config).await?;

let discover_cluster_nodes = discover_cluster.get_nodes();
assert_eq!(discover_cluster_nodes.len(), 0);
assert!(discover_cluster.is_empty());
assert!(!discover_cluster.unassign);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_single_cluster_discovery() -> Result<()> {
let config = ConfigBuilder::create().build();
Expand Down

0 comments on commit 2e3e44f

Please sign in to comment.