Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(failover): Recover rebooted compute nodes with notification and snapshot #1500

Merged
merged 8 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ message MetaSnapshot {
repeated catalog.VirtualTable view = 6;
}

message SourceSnapshot {
repeated catalog.Source sources = 1;
}

message SubscribeResponse {
enum Operation {
INVALID = 0;
Expand All @@ -296,6 +300,7 @@ message SubscribeResponse {
catalog.Table table_v2 = 10;
catalog.Source source = 11;
MetaSnapshot fe_snapshot = 12;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be better to name these as SnapshotForFE and SnapshotForBE?

Copy link
Contributor Author

@zbzbw zbzbw Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just calling it FrontendSnapshot and BackendSnapshot 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, or just keep it until we should produce more info for backend.

SourceSnapshot be_snapshot = 13;
}
}

Expand Down
7 changes: 6 additions & 1 deletion rust/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use risingwave_storage::hummock::hummock_meta_client::RpcHummockMetaClient;
use risingwave_storage::monitor::{HummockMetrics, StateStoreMetrics};
use risingwave_storage::StateStoreImpl;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use risingwave_stream::task::{LocalStreamManager, ObserverManager, StreamEnvironment};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tower::make::Shared;
Expand Down Expand Up @@ -134,6 +134,11 @@ pub async fn compute_node_serve(
));
let source_mgr = Arc::new(MemSourceManager::new());

// Initialize observer manager and subscribe to notification service in meta.
let observer_mgr =
ObserverManager::new(meta_client.clone(), client_addr.clone(), source_mgr.clone()).await;
sub_tasks.push(observer_mgr.start().await.unwrap());

// Initialize batch environment.
let batch_config = Arc::new(config.batch.clone());
let batch_env = BatchEnvironment::new(
Expand Down
4 changes: 2 additions & 2 deletions rust/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl ObserverManager {
}
_ => {
return Err(ErrorCode::InternalError(format!(
"the first notify should be snapshot, but get {:?}",
"the first notify should be frontend snapshot, but get {:?}",
resp
))
.into())
Expand Down Expand Up @@ -142,7 +142,7 @@ impl ObserverManager {
resp
)
}
None => panic!("receive an unsupported notify {:?}", resp),
_ => panic!("receive an unsupported notify {:?}", resp),
}
assert!(
resp.version > catalog_guard.version(),
Expand Down
6 changes: 1 addition & 5 deletions rust/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use self::info::BarrierActorInfo;
use self::notifier::{Notifier, UnfinishedNotifiers};
use crate::cluster::ClusterManagerRef;
use crate::hummock::HummockManagerRef;
use crate::manager::{CatalogManagerRef, MetaSrvEnv, INVALID_EPOCH};
use crate::manager::{MetaSrvEnv, INVALID_EPOCH};
use crate::rpc::metrics::MetaMetrics;
use crate::storage::MetaStore;
use crate::stream::FragmentManagerRef;
Expand Down Expand Up @@ -141,8 +141,6 @@ pub struct GlobalBarrierManager<S: MetaStore> {

cluster_manager: ClusterManagerRef<S>,

catalog_manager: CatalogManagerRef<S>,

fragment_manager: FragmentManagerRef<S>,

hummock_manager: HummockManagerRef<S>,
Expand All @@ -164,7 +162,6 @@ where
pub fn new(
env: MetaSrvEnv<S>,
cluster_manager: ClusterManagerRef<S>,
catalog_manager: CatalogManagerRef<S>,
fragment_manager: FragmentManagerRef<S>,
hummock_manager: HummockManagerRef<S>,
metrics: Arc<MetaMetrics>,
Expand All @@ -183,7 +180,6 @@ where
Self {
interval,
cluster_manager,
catalog_manager,
fragment_manager,
scheduled_barriers: ScheduledBarriers::new(),
hummock_manager,
Expand Down
122 changes: 26 additions & 96 deletions rust/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use futures::future::try_join_all;
use log::debug;
use risingwave_common::error::{ErrorCode, Result, RwError, ToRwResult};
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::{ActorInfo, WorkerType};
use risingwave_pb::common::ActorInfo;
use risingwave_pb::stream_service::{
BroadcastActorInfoTableRequest, BuildActorsRequest, CreateSourceRequest,
ForceStopActorsRequest, UpdateActorsRequest,
BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, UpdateActorsRequest,
};
use uuid::Uuid;

Expand Down Expand Up @@ -50,18 +45,10 @@ where

let info = self.resolve_actor_info(None).await;

// reset all compute nodes and wait for online, and create sources on failed nodes.
match self.reset_and_wait_compute_nodes(&info).await {
Ok(failed_node_ids) => {
if let Err(e) = self.create_sources(&info, failed_node_ids).await {
debug!("create_sources failed: {:?}", e);
continue;
}
}
Err(e) => {
debug!("reset_and_wait_compute_nodes failed: {:?}", e);
continue;
}
// Reset all compute nodes and wait for online.
if self.reset_and_wait_compute_nodes(&info).await.is_err() {
debug!("reset_and_wait_compute_nodes failed");
continue;
}

// update and build all actors.
Expand Down Expand Up @@ -108,40 +95,6 @@ where
}
}

/// Create all sources in compute nodes.
async fn create_sources(
&self,
info: &BarrierActorInfo,
failed_node_ids: HashSet<u32>,
) -> Result<()> {
// Attention, using catalog v2 here, it's not compatible with Java frontend.
let sources = self.catalog_manager.list_sources().await?;

for worker_node in info
.node_map
.values()
.filter(|n| failed_node_ids.contains(&n.id))
{
let client = &self.env.stream_clients().get(worker_node).await?;
let futures = sources.iter().map(|source| {
let request = CreateSourceRequest {
source: Some(source.to_owned()),
};
async move {
client
.to_owned()
.create_source(request)
.await
.to_rw_result()
}
});

let _response = try_join_all(futures).await?;
}

Ok(())
}

/// Update all actors in compute nodes.
async fn update_actors(&self, info: &BarrierActorInfo) -> Result<()> {
let mut actor_infos = vec![];
Expand Down Expand Up @@ -213,55 +166,32 @@ where
}

/// Reset all compute nodes and wait for them to be online again.
async fn reset_and_wait_compute_nodes(&self, info: &BarrierActorInfo) -> Result<HashSet<u32>> {
let mut failed_worker_id = HashSet::<u32>::new();
/// While we are waiting, the `NotificationManager` will send a `BeSnapshot` to rebooted nodes
/// and build sources.
async fn reset_and_wait_compute_nodes(&self, info: &BarrierActorInfo) -> Result<()> {
for worker_node in info.node_map.values() {
// force shutdown actors on running compute nodes
match self.env.stream_clients().get(worker_node).await {
Ok(client) => {
match client
.to_owned()
.force_stop_actors(ForceStopActorsRequest {
request_id: String::new(),
})
.await
{
Ok(_) => {}
Err(err) => {
// this node has down, remove it from cluster manager.
debug!("failed to stop actors on {:?}: {}", worker_node, err);
self.cluster_manager
.deactivate_worker_node(worker_node.host.clone().unwrap())
.await?;
failed_worker_id.insert(worker_node.id);
loop {
// force shutdown actors on running compute nodes
match self.env.stream_clients().get(worker_node).await {
Ok(client) => {
if client
.to_owned()
.force_stop_actors(ForceStopActorsRequest {
request_id: String::new(),
})
.await
.is_ok()
{
break;
}
}
Err(err) => {
debug!("failed to get client: {}", err);
}
}
Err(err) => {
debug!("failed to get client: {}", err);
}
}
}
debug!("currently stopped compute nodes: {:?}", failed_worker_id);
loop {
tokio::time::sleep(Self::RECOVERY_RETRY_INTERVAL).await;
let all_nodes = self
.cluster_manager
.list_worker_node(WorkerType::ComputeNode, Some(Running))
.await
.iter()
.map(|worker_node| worker_node.id)
.collect::<HashSet<_>>();
if info
.node_map
.keys()
.all(|node_id| all_nodes.contains(node_id))
{
break;
}
}

debug!("all compute nodes have been restarted.");
Ok(failed_worker_id)
Ok(())
}
}
9 changes: 4 additions & 5 deletions rust/meta/src/manager/catalog_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,6 @@ where
core.get_catalog().await
}

pub async fn list_sources(&self) -> Result<Vec<Source>> {
let core = self.core.lock().await;
Source::list(core.env.meta_store()).await
}

pub async fn create_database(&self, database: &Database) -> Result<CatalogVersion> {
let mut core = self.core.lock().await;
if !core.has_database(database) {
Expand Down Expand Up @@ -667,6 +662,10 @@ where
))
}

pub async fn list_sources(&self) -> Result<Vec<Source>> {
Source::list(self.env.meta_store()).await
}

fn has_database(&self, database: &Database) -> bool {
self.databases.contains(database.get_name())
}
Expand Down
1 change: 0 additions & 1 deletion rust/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ pub async fn rpc_serve_with_store<S: MetaStore>(
let barrier_manager = Arc::new(GlobalBarrierManager::new(
env.clone(),
cluster_manager.clone(),
catalog_manager_v2.clone(),
fragment_manager.clone(),
hummock_manager.clone(),
meta_metrics.clone(),
Expand Down
14 changes: 13 additions & 1 deletion rust/meta/src/rpc/service/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::notification_service_server::NotificationService;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{MetaSnapshot, SubscribeRequest, SubscribeResponse};
use risingwave_pb::meta::{MetaSnapshot, SourceSnapshot, SubscribeRequest, SubscribeResponse};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Response, Status};
Expand Down Expand Up @@ -69,6 +69,18 @@ where

match worker_type {
WorkerType::ComputeNode => {
// send source snapshot to new workers.
zbzbw marked this conversation as resolved.
Show resolved Hide resolved
let catalog_guard = self.catalog_manager.get_catalog_core_guard().await;
let sources = catalog_guard.list_sources().await.map_err(tonic_err)?;
let source_snapshot = SourceSnapshot { sources };

tx.send(Ok(SubscribeResponse {
status: None,
operation: Operation::Snapshot as i32,
info: Some(Info::BeSnapshot(source_snapshot)),
version: self.env.epoch_generator().generate().into_inner(),
}))
.unwrap();
self.env
.notification_manager()
.insert_compute_sender(WorkerKey(host_address), tx)
Expand Down
4 changes: 1 addition & 3 deletions rust/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ mod tests {
use crate::barrier::GlobalBarrierManager;
use crate::cluster::ClusterManager;
use crate::hummock::HummockManager;
use crate::manager::{CatalogManager, MetaSrvEnv};
use crate::manager::MetaSrvEnv;
use crate::model::ActorId;
use crate::rpc::metrics::MetaMetrics;
use crate::storage::MemStore;
Expand Down Expand Up @@ -478,7 +478,6 @@ mod tests {
cluster_manager.activate_worker_node(host).await?;

let fragment_manager = Arc::new(FragmentManager::new(env.meta_store_ref()).await?);
let catalog_manager = Arc::new(CatalogManager::new(env.clone()).await?);
let meta_metrics = Arc::new(MetaMetrics::new());
let hummock_manager = Arc::new(
HummockManager::new(env.clone(), cluster_manager.clone(), meta_metrics.clone())
Expand All @@ -487,7 +486,6 @@ mod tests {
let barrier_manager = Arc::new(GlobalBarrierManager::new(
env.clone(),
cluster_manager.clone(),
catalog_manager,
fragment_manager.clone(),
hummock_manager,
meta_metrics.clone(),
Expand Down
24 changes: 24 additions & 0 deletions rust/source/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
use itertools::Itertools;
use parking_lot::{Mutex, MutexGuard};
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::error::ErrorCode::InternalError;
Expand All @@ -25,6 +26,8 @@ use risingwave_common::types::DataType;
use risingwave_common::{ensure, gen_error};
use risingwave_connector::base::SourceReader;
use risingwave_connector::new_connector;
use risingwave_pb::catalog::source::Info;
use risingwave_pb::meta::SourceSnapshot;

use crate::connector_source::ConnectorSource;
use crate::table_v2::TableSourceV2;
Expand All @@ -47,6 +50,9 @@ pub trait SourceManager: Debug + Sync + Send {

fn get_source(&self, source_id: &TableId) -> Result<SourceDesc>;
fn drop_source(&self, source_id: &TableId) -> Result<()>;

/// Create sources according to meta provided snapshot when compute node starts.
fn apply_snapshot(&self, snapshot: SourceSnapshot) -> Result<()>;
}

/// `SourceColumnDesc` is used to describe a column in the Source and is used as the column
Expand Down Expand Up @@ -176,6 +182,24 @@ impl SourceManager for MemSourceManager {
sources.remove(table_id);
Ok(())
}

fn apply_snapshot(&self, snapshot: SourceSnapshot) -> Result<()> {
for source in snapshot.sources {
match source.info.unwrap() {
Comment on lines +187 to +188
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may reuse the code from the create_source in stream manager.

Info::StreamSource(_) => todo!("support stream source"),
Info::TableSource(info) => {
let columns = info
.columns
.into_iter()
.map(|c| c.column_desc.unwrap().into())
.collect_vec();

self.create_table_source_v2(&TableId::new(source.id), columns)?
}
}
}
Ok(())
}
}

impl MemSourceManager {
Expand Down
Loading