Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): enable heartbeat detection. #830

Merged
merged 7 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions rust/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ pub struct ComputeNodeOpts {
/// Enable reporting tracing information to jaeger
#[clap(long)]
pub enable_jaeger_tracing: bool,

/// Interval to send heartbeat in ms
#[clap(long, default_value = "1000")]
pub heartbeat_interval: u32,
}

use crate::server::compute_node_serve;
Expand Down
22 changes: 14 additions & 8 deletions rust/compute/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use hyper::{Body, Request, Response};
use prometheus::{Encoder, Registry, TextEncoder};
Expand Down Expand Up @@ -54,6 +55,12 @@ pub async fn compute_node_serve(
.await
.unwrap();

let mut sub_tasks: Vec<(JoinHandle<()>, UnboundedSender<()>)> =
vec![MetaClient::start_heartbeat_loop(
meta_client.clone(),
Duration::from_millis(opts.heartbeat_interval as u64),
)];

let registry = prometheus::Registry::new();
// Initialize state store.
let state_store_metrics = Arc::new(StateStoreMetrics::new(registry.clone()));
Expand All @@ -68,16 +75,14 @@ pub async fn compute_node_serve(
.unwrap();

// A hummock compactor is deployed along with compute node for now.
let mut compactor_handle = None;
if let StateStoreImpl::HummockStateStore(hummock) = state_store.clone() {
let (compactor_join_handle, compactor_shutdown_sender) = Compactor::start_compactor(
sub_tasks.push(Compactor::start_compactor(
hummock.inner().storage.options().clone(),
hummock.inner().storage.local_version_manager().clone(),
hummock.inner().storage.hummock_meta_client().clone(),
hummock.inner().storage.sstable_store(),
state_store_metrics,
);
compactor_handle = Some((compactor_join_handle, compactor_shutdown_sender));
));
}

let streaming_metrics = Arc::new(StreamingMetrics::new(registry.clone()));
Expand Down Expand Up @@ -121,10 +126,11 @@ pub async fn compute_node_serve(
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
_ = shutdown_recv.recv() => {
// Gracefully shutdown compactor
if let Some((compactor_join_handle, compactor_shutdown_sender)) = compactor_handle {
if compactor_shutdown_sender.send(()).is_ok() {
compactor_join_handle.await.unwrap();
for (join_handle, shutdown_sender) in sub_tasks {
if shutdown_sender.send(()).is_ok() {
if let Err(err) = join_handle.await {
tracing::warn!("shutdown err: {}", err);
}
}
}
},
Expand Down
4 changes: 4 additions & 0 deletions rust/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub struct FrontendOpts {

#[clap(long, default_value = "http://127.0.0.1:5690")]
pub meta_addr: String,

/// Interval to send heartbeat in ms
#[clap(long, default_value = "1000")]
pub heartbeat_interval: u32,
}

/// Start frontend
Expand Down
11 changes: 11 additions & 0 deletions rust/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use std::cell::RefCell;
use std::fmt::Formatter;
use std::rc::Rc;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use pgwire::pg_response::PgResponse;
use pgwire::pg_server::{Session, SessionManager};
use risingwave_common::error::Result;
use risingwave_pb::common::WorkerType;
use risingwave_rpc_client::MetaClient;
use risingwave_sqlparser::parser::Parser;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::watch;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -190,6 +192,8 @@ impl SessionImpl {
pub struct SessionManagerImpl {
env: FrontendEnv,
observer_join_handle: JoinHandle<()>,
heartbeat_join_handle: JoinHandle<()>,
_heartbeat_shutdown_sender: UnboundedSender<()>,
}

impl SessionManager for SessionManagerImpl {
Expand All @@ -203,15 +207,22 @@ impl SessionManager for SessionManagerImpl {
impl SessionManagerImpl {
pub async fn new(opts: &FrontendOpts) -> Result<Self> {
let (env, join_handle) = FrontendEnv::init(opts).await?;
let (heartbeat_join_handle, _heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
env.meta_client.clone(),
Duration::from_millis(opts.heartbeat_interval as u64),
);
Ok(Self {
env,
observer_join_handle: join_handle,
heartbeat_join_handle,
_heartbeat_shutdown_sender,
})
}

/// Used in unit test. Called before `LocalMeta::stop`.
pub fn terminate(&self) {
self.observer_join_handle.abort();
self.heartbeat_join_handle.abort();
}
}

Expand Down
12 changes: 9 additions & 3 deletions rust/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ffi::OsString;
use std::sync::Arc;
use std::time::Duration;

use clap::StructOpt;
use pgwire::pg_response::PgResponse;
Expand Down Expand Up @@ -82,9 +83,14 @@ impl FrontendMockMetaClient {
);

let cluster_manager = Arc::new(
StoredClusterManager::new(env.clone(), None, notification_manager.clone())
.await
.unwrap(),
StoredClusterManager::new(
env.clone(),
None,
notification_manager.clone(),
Duration::from_secs(3600),
)
.await
.unwrap(),
);

Self {
Expand Down
148 changes: 147 additions & 1 deletion rust/meta/src/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::ops::Add;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
Expand All @@ -11,13 +13,15 @@ use risingwave_common::try_match_expand;
use risingwave_pb::common::worker_node::State;
use risingwave_pb::common::{HostAddress, ParallelUnit, WorkerNode, WorkerType};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;

use crate::hummock::HummockManager;
use crate::manager::{
HashDispatchManager, HashDispatchManagerRef, IdCategory, IdGeneratorManagerRef, MetaSrvEnv,
NotificationManagerRef,
};
use crate::model::{MetadataModel, Worker};
use crate::model::{MetadataModel, Worker, INVALID_EXPIRE_AT};
use crate::storage::MetaStore;

pub type NodeId = u32;
Expand All @@ -35,6 +39,7 @@ pub struct StoredClusterManager<S> {
dispatch_manager_ref: HashDispatchManagerRef<S>,
workers: DashMap<WorkerKey, Worker>,
nm: NotificationManagerRef,
max_heartbeat_interval: Duration,
}

pub struct WorkerKey(pub HostAddress);
Expand All @@ -61,6 +66,7 @@ where
env: MetaSrvEnv<S>,
hummock_manager_ref: Option<Arc<HummockManager<S>>>,
nm: NotificationManagerRef,
max_heartbeat_interval: Duration,
) -> Result<Self> {
let meta_store_ref = env.meta_store_ref();
let workers = try_match_expand!(
Expand Down Expand Up @@ -88,6 +94,7 @@ where
dispatch_manager_ref,
workers: worker_map,
nm,
max_heartbeat_interval,
})
}

Expand Down Expand Up @@ -234,14 +241,153 @@ where
.filter(|entry| entry.value().worker_type() == worker_type)
.count()
}

pub fn heartbeat(&self, worker_id: u32) -> Result<()> {
// 1. Get unique key. TODO: avoid this step
let key = match self
.workers
.iter()
.find(|worker| worker.worker_id() == worker_id)
{
None => {
return Ok(());
}
Some(worker_2) => worker_2.value().key()?,
};
// 2. Update expire_at
if let Entry::Occupied(mut worker) = self.workers.entry(WorkerKey(key)) {
let expire_at = SystemTime::now()
.add(self.max_heartbeat_interval)
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs();
worker.get_mut().set_expire_at(expire_at);
}
Ok(())
}

fn update_worker_ttl(&self, host_address: HostAddress, expire_at: u64) {
match self.workers.entry(WorkerKey(host_address)) {
Entry::Occupied(mut worker) => {
worker.get_mut().set_expire_at(expire_at);
}
Entry::Vacant(_) => {}
}
}

pub fn start_heartbeat_checker(
cluster_manager_ref: StoredClusterManagerRef<S>,
check_interval: Duration,
) -> (JoinHandle<()>, UnboundedSender<()>) {
let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
let join_handle = tokio::spawn(async move {
let mut min_interval = tokio::time::interval(check_interval);
loop {
tokio::select! {
// Wait for interval
_ = min_interval.tick() => {},
// Shutdown
_ = shutdown_rx.recv() => {
tracing::info!("Heartbeat checker is shutting down");
return;
}
}
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs();
let workers_to_init_or_delete = cluster_manager_ref
.workers
.iter()
.filter(|worker| worker.expire_at() < now)
.map(|worker| worker.value().clone())
.collect_vec();
for worker in workers_to_init_or_delete {
let key = worker.key().expect("illegal key");
if worker.expire_at() == INVALID_EXPIRE_AT {
// Initialize expire_at
cluster_manager_ref.update_worker_ttl(
key,
now + cluster_manager_ref.max_heartbeat_interval.as_secs(),
);
} else if let Err(err) = cluster_manager_ref.delete_worker_node(key).await {
tracing::warn!("Failed to delete_worker_node. {}", err);
}
}
}
});
(join_handle, shutdown_tx)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::hummock::test_utils::setup_compute_env;

#[tokio::test]
async fn test_cluster_manager() -> Result<()> {
Ok(())
}

// This test takes seconds because the TTL is measured in seconds.
#[tokio::test]
#[ignore]
async fn test_heartbeat() {
let (_env, _hummock_manager, cluster_manager, worker_node) = setup_compute_env(1).await;
let context_id_1 = worker_node.id;
let fake_host_address_2 = HostAddress {
host: "127.0.0.1".to_string(),
port: 2,
};
let (_worker_node_2, _) = cluster_manager
.add_worker_node(fake_host_address_2, WorkerType::ComputeNode)
.await
.unwrap();
// Two live nodes
assert_eq!(
cluster_manager
.list_worker_node(WorkerType::ComputeNode, None)
.len(),
2
);

let ttl = cluster_manager.max_heartbeat_interval;
let check_interval = std::cmp::min(Duration::from_millis(100), ttl / 4);

// Keep worker 1 alive
let cluster_manager_ref = cluster_manager.clone();
let keep_alive_join_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(cluster_manager_ref.max_heartbeat_interval / 3).await;
cluster_manager_ref.heartbeat(context_id_1).unwrap();
}
});

tokio::time::sleep(ttl + check_interval).await;

// One node has actually expired but still got two, because heartbeat check is not started.
assert_eq!(
cluster_manager
.list_worker_node(WorkerType::ComputeNode, None)
.len(),
2
);

let (join_handle, shutdown_sender) =
StoredClusterManager::start_heartbeat_checker(cluster_manager.clone(), check_interval);
tokio::time::sleep(ttl * 2 + check_interval).await;

// One live node left.
assert_eq!(
cluster_manager
.list_worker_node(WorkerType::ComputeNode, None)
.len(),
1
);

shutdown_sender.send(()).unwrap();
join_handle.await.unwrap();
keep_alive_join_handle.abort();
}
}
13 changes: 9 additions & 4 deletions rust/meta/src/hummock/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::iter::once;
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use risingwave_pb::common::{HostAddress, WorkerType};
Expand All @@ -26,10 +27,14 @@ async fn get_hummock_meta_client() -> MockHummockMetaClient {
.unwrap(),
);
let notification_manager = Arc::new(NotificationManager::new());
let cluster_manager =
StoredClusterManager::new(env, Some(hummock_manager.clone()), notification_manager)
.await
.unwrap();
let cluster_manager = StoredClusterManager::new(
env,
Some(hummock_manager.clone()),
notification_manager,
Duration::from_secs(3600),
)
.await
.unwrap();
let fake_host_address = HostAddress {
host: "127.0.0.1".to_string(),
port: 80,
Expand Down
2 changes: 1 addition & 1 deletion rust/meta/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod level_handler;
mod mock_hummock_meta_client;
mod model;
#[cfg(test)]
mod test_utils;
pub mod test_utils;
mod vacuum;

use std::sync::Arc;
Expand Down
Loading