Skip to content

Commit

Permalink
Merge pull request #1255 from drmingdrmer/cmn-stop
Browse files Browse the repository at this point in the history
[common/stoppable] feature: borrow codes from fuse-query to make a shared graceful shutdown impl
  • Loading branch information
databend-bot authored Aug 1, 2021
2 parents 4bf2bed + e54d122 commit 38c1c70
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 0 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"common/tracing",
"common/profiling",
"common/store-api",
"common/stoppable",
"common/management",
"common/io",

Expand Down
10 changes: 10 additions & 0 deletions common/exception/src/exception.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,16 @@ build_exceptions! {
// kv-api error codes
UnknownKey(6000),

}
// General errors
build_exceptions! {

// A task that already stopped and can not stop twice.
AlreadyStarted(7101),

// A task that already started and can not start twice.
AlreadyStopped(7102),

}

pub type Result<T> = std::result::Result<T, ErrorCode>;
Expand Down
23 changes: 23 additions & 0 deletions common/stoppable/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "common-stoppable"
version = "0.1.0"
authors = ["Datafuse Authors <[email protected]>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies] # In alphabetical order
# Workspace dependencies
common-exception = {path = "../exception"}
common-runtime = {path = "../runtime"}
common-tracing = {path = "../tracing"}

# Github dependencies

# Crates.io dependencies
anyhow = "1.0.42"
async-trait = "0.1"
ctrlc = { version = "3.1.9", features = ["termination"] }
futures = "0.3"

[dev-dependencies]
12 changes: 12 additions & 0 deletions common/stoppable/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2020-2021 The Datafuse Authors.
//
// SPDX-License-Identifier: Apache-2.0.

mod stop_handle;
mod stoppable;

pub use stop_handle::StopHandle;
pub use stoppable::Stoppable;

#[cfg(test)]
mod stoppable_test;
113 changes: 113 additions & 0 deletions common/stoppable/src/stop_handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2020-2021 The Datafuse Authors.
//
// SPDX-License-Identifier: Apache-2.0.

use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_runtime::tokio::sync::broadcast;
use common_tracing::tracing;
use futures::Future;

use crate::Stoppable;

/// Handle a group of `Stoppable` tasks.
/// When a user press ctrl-c, it calls the `stop()` method on every task to close them.
/// If a second ctrl-c is pressed, it sends a `()` through the `force` channel to notify tasks to shutdown at once.
///
/// Once `StopHandle` is dropped, it triggers a force stop on every tasks in it.
pub struct StopHandle {
stopping: Arc<AtomicBool>,
pub(crate) stoppable_tasks: Vec<Box<dyn Stoppable + Send>>,
}

impl StopHandle {
pub fn create() -> StopHandle {
StopHandle {
stopping: Arc::new(AtomicBool::new(false)),
stoppable_tasks: vec![],
}
}

pub fn stop_all(
&mut self,
force_tx: Option<broadcast::Sender<()>>,
) -> Result<impl Future<Output = ()> + Send + '_, ErrorCode> {
if self
.stopping
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return Err(ErrorCode::AlreadyStopped("StopHandle is shutting down"));
}

let mut handles = vec![];
for s in &mut self.stoppable_tasks {
let rx = force_tx.as_ref().map(|x| x.subscribe());
handles.push(s.stop(rx));
}

let join_all = futures::future::join_all(handles);
Ok(async move {
let _ = join_all.await;
})
}

pub fn wait_to_terminate(
mut self,
signal: broadcast::Sender<()>,
) -> impl Future<Output = ()> + 'static {
let mut rx = signal.subscribe();

async move {
// The first termination signal triggers graceful shutdown
// Ignore the result
let _ = rx.recv().await;

tracing::info!("Received termination signal.");
tracing::info!("Press Ctrl + C again to force shutdown.");

// A second signal indicates a force shutdown.
// It is the task's responsibility to decide whether to deal with it.
let fut = self.stop_all(Some(signal));
if let Ok(f) = fut {
f.await;
}
}
}

pub fn install_termination_handle() -> broadcast::Sender<()> {
let (tx, _rx) = broadcast::channel(16);

let t = tx.clone();
ctrlc::set_handler(move || {
if let Err(error) = t.send(()) {
tracing::error!("Could not send signal on channel {}", error);
std::process::exit(1);
}
})
.expect("Error setting Ctrl-C handler");

tx
}

pub fn push(&mut self, s: Box<dyn Stoppable + Send>) {
self.stoppable_tasks.push(s);
}
}

impl Drop for StopHandle {
fn drop(&mut self) {
let (tx, _rx) = broadcast::channel::<()>(16);

// let every task subscribe the channel, then send a force stop signal `()`
let fut = self.stop_all(Some(tx.clone()));

if let Ok(fut) = fut {
let _ = tx.send(());
futures::executor::block_on(fut);
}
}
}
26 changes: 26 additions & 0 deletions common/stoppable/src/stoppable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2020-2021 The Datafuse Authors.
//
// SPDX-License-Identifier: Apache-2.0.

use common_exception::ErrorCode;
use common_runtime::tokio::sync::broadcast;

/// A task that can be started and stopped.
#[async_trait::async_trait]
pub trait Stoppable {
/// Start working without blocking the calling thread.
/// When returned, it should have been successfully started.
/// Otherwise an Err() should be returned.
///
/// Calling `start()` on a started task should get an error.
async fn start(&mut self) -> Result<(), ErrorCode>;

/// Blocking stop. It should not return until everything is cleaned up.
///
/// In case a graceful `stop()` had blocked for too long,
/// the caller submit a FORCE stop by sending a `()` to `force`.
/// An impl should either close everything at once, or just ignore the `force` signal if it does not support force stop.
///
/// Calling `stop()` twice should get an error.
async fn stop(&mut self, mut force: Option<broadcast::Receiver<()>>) -> Result<(), ErrorCode>;
}
157 changes: 157 additions & 0 deletions common/stoppable/src/stoppable_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2020-2021 The Datafuse Authors.
//
// SPDX-License-Identifier: Apache-2.0.

use common_exception::ErrorCode;
use common_runtime::tokio;
use common_runtime::tokio::sync::broadcast;
use common_runtime::tokio::sync::oneshot;
use common_runtime::tokio::sync::oneshot::error::TryRecvError;
use common_runtime::tokio::time::Duration;
use common_tracing::tracing;

use crate::stop_handle::StopHandle;
use crate::Stoppable;

/// A task that takes 100 years to gracefully stop.
#[derive(Default)]
struct FooTask {}

#[async_trait::async_trait]
impl Stoppable for FooTask {
async fn start(&mut self) -> Result<(), ErrorCode> {
Ok(())
}

async fn stop(&mut self, force: Option<broadcast::Receiver<()>>) -> Result<(), ErrorCode> {
tracing::info!("--- FooTask stop, force: {:?}", force);

// block the stop until force stop.

if let Some(mut force) = force {
tracing::info!("--- waiting for force");
let _ = force.recv().await;
}
Ok(())
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_stoppable() -> anyhow::Result<()> {
// - Create a task and start it.
// - Stop but the task would block.
// - Signal the task to force stop.

let (stop_tx, rx) = broadcast::channel::<()>(1024);
let (fin_tx, mut fin_rx) = oneshot::channel::<()>();

let mut t = FooTask::default();

// Start the task

t.start().await?;

// Gracefully stop blocks.

tokio::spawn(async move {
let _ = t.stop(Some(rx)).await;
fin_tx.send(()).expect("fail to send fin signal");
});

// `stop` should not return.

tokio::time::sleep(Duration::from_millis(100)).await;

let res = fin_rx.try_recv();
match res {
Err(TryRecvError::Empty) => { /* good */ }
_ => {
panic!("should not ready");
}
};

// Send force stop

stop_tx.send(()).expect("fail to send force stop");

fin_rx.await?;

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_stop_handle() -> anyhow::Result<()> {
// - Create 2 tasks and start them.
// - Stop but the task would block.
// - Signal the task to force stop.

common_tracing::init_default_tracing();

let (stop_tx, _) = broadcast::channel::<()>(1024);

let mut t1 = FooTask::default();
let mut t2 = FooTask::default();

// Start the task

t1.start().await?;
t2.start().await?;

let (fin_tx, mut fin_rx) = oneshot::channel::<()>();

let mut h = StopHandle::create();
h.push(Box::new(t1));
h.push(Box::new(t2));

// Block on waiting for the handle to finish.

let fut = h.wait_to_terminate(stop_tx.clone());
tokio::spawn(async move {
fut.await;
fin_tx.send(()).expect("fail to send fin signal");
});

tracing::info!("--- send graceful stop");
stop_tx.send(()).expect("fail to set graceful stop");

// Broadcasting receiver can not receive the message sent before subscribing the sender.
// Wait for a while until the `stop()` method is called for every task.
tokio::time::sleep(Duration::from_millis(100)).await;

tracing::info!("--- fin_rx should receive nothing");
let res = fin_rx.try_recv();
match res {
Err(TryRecvError::Empty) => { /* good */ }
_ => {
panic!("should not ready");
}
};

tracing::info!("--- send force stop");
stop_tx.send(()).expect("fail to set force stop");

fin_rx.await?;

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_stop_handle_drop() -> anyhow::Result<()> {
// - Create a task and start it.
// - Then quit and the Drop should forcibly stop it and the test should not block.

common_tracing::init_default_tracing();

let (tx, _rx) = broadcast::channel::<()>(1024);

let mut t1 = FooTask::default();

// Start the task

t1.start().await?;

let mut h = StopHandle::create();
h.push(Box::new(t1));

Ok(())
}

0 comments on commit 38c1c70

Please sign in to comment.