Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: apply retry strategy when recovery #1768

Merged
merged 2 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ impl<S> GlobalBarrierManager<S>
where
S: MetaStore,
{
const RECOVERY_RETRY_INTERVAL: Duration = Duration::from_millis(500);

/// Create a new [`crate::barrier::GlobalBarrierManager`].
pub fn new(
env: MetaSrvEnv<S>,
Expand Down
151 changes: 81 additions & 70 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

use std::collections::HashSet;
use std::iter::Map;
use std::time::Duration;

use futures::future::try_join_all;
use log::{debug, error};
Expand All @@ -24,6 +26,7 @@ use risingwave_pb::stream_service::{
BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, SyncSourcesRequest,
UpdateActorsRequest,
};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use uuid::Uuid;

use crate::barrier::command::CommandContext;
Expand All @@ -39,13 +42,25 @@ impl<S> GlobalBarrierManager<S>
where
S: MetaStore,
{
// Retry base interval in milliseconds.
const RECOVERY_RETRY_BASE_INTERVAL: u64 = 100;
// Retry max interval.
const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(10);

#[inline(always)]
/// Initialize a retry strategy for operation in recovery.
fn get_retry_strategy() -> Map<ExponentialBackoff, fn(Duration) -> Duration> {
ExponentialBackoff::from_millis(Self::RECOVERY_RETRY_BASE_INTERVAL)
.max_delay(Self::RECOVERY_RETRY_MAX_INTERVAL)
.map(jitter)
}

/// Recovery the whole cluster from the latest epoch.
pub(crate) async fn recovery(
&self,
prev_epoch: u64,
prev_command: Option<Command>,
) -> RecoveryResult {
let mut new_epoch = self.env.epoch_generator().generate();
// Abort buffered schedules, they might be dirty already.
self.scheduled_barriers.abort().await;

Expand All @@ -55,35 +70,29 @@ where
}

debug!("recovery start!");
loop {
tokio::time::sleep(Self::RECOVERY_RETRY_INTERVAL).await;

let retry_category = Self::get_retry_strategy();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let retry_category = Self::get_retry_strategy();
let retry_strategy = Self::get_retry_strategy();

let (new_epoch, responses) = tokio_retry::Retry::spawn(retry_category, || async {
let info = self.resolve_actor_info(None).await;
let mut new_epoch = self.env.epoch_generator().generate();

// Reset all compute nodes, stop and drop existing actors.
if self
.reset_compute_nodes(&info, prev_epoch, new_epoch.into_inner())
.await
.is_err()
{
error!("reset_and_wait_compute_nodes failed");
continue;
}
self.reset_compute_nodes(&info, prev_epoch, new_epoch.into_inner())
.await;

// Refresh sources in local source manger of compute node.
if let Err(err) = self.sync_sources(&info).await {
error!("sync_sources failed: {}", err);
continue;
return Err(err);
}

// update and build all actors.
if let Err(err) = self.update_actors(&info).await {
error!("update_actors failed: {}", err);
continue;
return Err(err);
}
if let Err(err) = self.build_actors(&info).await {
error!("build_actors failed: {}", err);
continue;
return Err(err);
}

let prev_epoch = new_epoch.into_inner();
Expand All @@ -98,23 +107,32 @@ where
Command::checkpoint(),
);

let responses = self.inject_barrier(&command_ctx).await;

if responses.is_err() || command_ctx.post_collect().await.is_err() {
continue;
match self.inject_barrier(&command_ctx).await {
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
error!("post_collect failed: {}", err);
return Err(err);
}
Ok((new_epoch, response))
}
Err(err) => {
error!("inject_barrier failed: {}", err);
Err(err)
}
}
})
.await
.expect("Retry until recovery success.");
debug!("recovery success");

debug!("recovery success");
return (
new_epoch,
self.fragment_manager.all_chain_actor_ids().await,
responses
.unwrap()
.into_iter()
.flat_map(|r| r.finished_create_mviews)
.collect(),
);
}
return (
new_epoch,
self.fragment_manager.all_chain_actor_ids().await,
responses
.into_iter()
.flat_map(|r| r.finished_create_mviews)
.collect(),
);
}

/// Clean up previous command dirty data. Currently, we only need to handle table fragments info
Expand All @@ -123,14 +141,12 @@ where
/// it.
async fn clean_up(&self, prev_command: Command) {
if let Some(table_id) = prev_command.creating_table_id() {
while self
.fragment_manager
.drop_table_fragments(&table_id)
.await
.is_err()
{
tokio::time::sleep(Self::RECOVERY_RETRY_INTERVAL).await;
}
let retry_category = Self::get_retry_strategy();
tokio_retry::Retry::spawn(retry_category, || async {
self.fragment_manager.drop_table_fragments(&table_id).await
})
.await
.expect("Retry clean up until success");
}
}

Expand Down Expand Up @@ -233,39 +249,34 @@ where
}

/// Reset all compute nodes by calling `force_stop_actors`.
async fn reset_compute_nodes(
&self,
info: &BarrierActorInfo,
prev_epoch: u64,
new_epoch: u64,
) -> Result<()> {
for worker_node in info.node_map.values() {
loop {
// force shutdown actors on running compute nodes
match self.env.stream_clients().get(worker_node).await {
Ok(client) => {
if client
.to_owned()
.force_stop_actors(ForceStopActorsRequest {
request_id: Uuid::new_v4().to_string(),
epoch: Some(ProstEpoch {
curr: new_epoch,
prev: prev_epoch,
}),
})
.await
.is_ok()
{
break;
}
}
Err(err) => {
debug!("failed to get client: {}", err);
}
}
async fn reset_compute_nodes(&self, info: &BarrierActorInfo, prev_epoch: u64, new_epoch: u64) {
let futures = info.node_map.iter().map(|(_, worker_node)| {
let retry_strategy = Self::get_retry_strategy();

async move {
tokio_retry::Retry::spawn(retry_strategy, || async {
let client = self.env.stream_clients().get(worker_node).await?;
debug!("force stop actors: {}", worker_node.id);
client
.to_owned()
.force_stop_actors(ForceStopActorsRequest {
request_id: Uuid::new_v4().to_string(),
epoch: Some(ProstEpoch {
curr: new_epoch,
prev: prev_epoch,
}),
})
.await
.to_rw_result()
})
.await
.expect("Force stop actors until success");

Ok::<_, RwError>(())
}
}
});

try_join_all(futures).await.unwrap();
debug!("all compute nodes have been reset.");
Ok(())
}
}