From df8cb0273909002e570eb80e07e1ca2072b98abe Mon Sep 17 00:00:00 2001 From: yjh Date: Mon, 27 Mar 2023 06:07:17 +0800 Subject: [PATCH] chore(sc-cli): improve runner and signals (#13688) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore(sc-cli): improve runner and signals * Update client/cli/src/runner.rs * fmt --------- Co-authored-by: Bastian Köcher --- client/cli/src/lib.rs | 2 + client/cli/src/runner.rs | 101 +++++--------------------------------- client/cli/src/signals.rs | 92 ++++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 89 deletions(-) create mode 100644 client/cli/src/signals.rs diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index e73321ecce5b3..5d451bbed6562 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -31,6 +31,7 @@ mod config; mod error; mod params; mod runner; +mod signals; pub use arg_enums::*; pub use clap; @@ -41,6 +42,7 @@ pub use params::*; pub use runner::*; pub use sc_service::{ChainSpec, Role}; pub use sc_tracing::logging::LoggerBuilder; +pub use signals::Signals; pub use sp_version::RuntimeVersion; /// Substrate client CLI diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 3d216aef4a75c..a8b75f2665aea 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -16,80 +16,15 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{error::Error as CliError, Result, SubstrateCli}; +use crate::{error::Error as CliError, Result, Signals, SubstrateCli}; use chrono::prelude::*; -use futures::{ - future::{self, BoxFuture, FutureExt}, - pin_mut, select, Future, -}; +use futures::{future::FutureExt, Future}; use log::info; use sc_service::{Configuration, Error as ServiceError, TaskManager}; use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL}; use std::{marker::PhantomData, time::Duration}; -/// Abstraction over OS signals to handle the shutdown of the node smoothly. -/// -/// On `unix` this represents `SigInt` and `SigTerm`. -pub struct Signals(BoxFuture<'static, ()>); - -impl Signals { - /// Capture the relevant signals to handle shutdown of the node smoothly. - /// - /// Needs to be called in a Tokio context to have access to the tokio reactor. - #[cfg(target_family = "unix")] - pub fn capture() -> std::result::Result { - use tokio::signal::unix::{signal, SignalKind}; - - let mut stream_int = signal(SignalKind::interrupt()).map_err(ServiceError::Io)?; - let mut stream_term = signal(SignalKind::terminate()).map_err(ServiceError::Io)?; - - Ok(Signals( - async move { - future::select(stream_int.recv().boxed(), stream_term.recv().boxed()).await; - } - .boxed(), - )) - } - - /// Capture the relevant signals to handle shutdown of the node smoothly. - /// - /// Needs to be called in a Tokio context to have access to the tokio reactor. - #[cfg(not(unix))] - pub fn capture() -> std::result::Result { - use tokio::signal::ctrl_c; - - Ok(Signals( - async move { - let _ = ctrl_c().await; - } - .boxed(), - )) - } - - /// A dummy signal that never returns. - pub fn dummy() -> Self { - Self(future::pending().boxed()) - } -} - -async fn main(func: F, signals: impl Future) -> std::result::Result<(), E> -where - F: Future> + future::FusedFuture, - E: std::error::Error + Send + Sync + 'static, -{ - let signals = signals.fuse(); - - pin_mut!(func, signals); - - select! { - _ = signals => {}, - res = func => res?, - } - - Ok(()) -} - -/// Build a tokio runtime with all features +/// Build a tokio runtime with all features. pub fn build_runtime() -> std::result::Result { tokio::runtime::Builder::new_multi_thread() .on_thread_start(|| { @@ -103,25 +38,6 @@ pub fn build_runtime() -> std::result::Result( - tokio_runtime: tokio::runtime::Runtime, - future: F, - task_manager: TaskManager, - signals: impl Future, -) -> std::result::Result<(), E> -where - F: Future> + future::Future, - E: std::error::Error + Send + Sync + 'static + From, -{ - let f = future.fuse(); - pin_mut!(f); - - tokio_runtime.block_on(main(f, signals))?; - drop(task_manager); - - Ok(()) -} - /// A Substrate CLI runtime that can be used to run a node or a command pub struct Runner { config: Configuration, @@ -171,7 +87,10 @@ impl Runner { self.print_node_infos(); let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?; - let res = self.tokio_runtime.block_on(main(task_manager.future().fuse(), self.signals.0)); + + let res = self + .tokio_runtime + .block_on(self.signals.run_until_signal(task_manager.future().fuse())); // We need to drop the task manager here to inform all tasks that they should shut down. // // This is important to be done before we instruct the tokio runtime to shutdown. Otherwise @@ -234,7 +153,11 @@ impl Runner { E: std::error::Error + Send + Sync + 'static + From + From, { let (future, task_manager) = runner(self.config)?; - run_until_exit::<_, E>(self.tokio_runtime, future, task_manager, self.signals.0) + self.tokio_runtime.block_on(self.signals.run_until_signal(future.fuse()))?; + // Drop the task manager before dropping the rest, to ensure that all futures were informed + // about the shut down. + drop(task_manager); + Ok(()) } /// Get an immutable reference to the node Configuration diff --git a/client/cli/src/signals.rs b/client/cli/src/signals.rs new file mode 100644 index 0000000000000..4b6a6f957a766 --- /dev/null +++ b/client/cli/src/signals.rs @@ -0,0 +1,92 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use futures::{ + future::{self, BoxFuture, FutureExt}, + pin_mut, select, Future, +}; + +use sc_service::Error as ServiceError; + +/// Abstraction over OS signals to handle the shutdown of the node smoothly. +/// +/// On `unix` this represents `SigInt` and `SigTerm`. +pub struct Signals(BoxFuture<'static, ()>); + +impl Signals { + /// Return the signals future. + pub fn future(self) -> BoxFuture<'static, ()> { + self.0 + } + + /// Capture the relevant signals to handle shutdown of the node smoothly. + /// + /// Needs to be called in a Tokio context to have access to the tokio reactor. + #[cfg(target_family = "unix")] + pub fn capture() -> std::result::Result { + use tokio::signal::unix::{signal, SignalKind}; + + let mut stream_int = signal(SignalKind::interrupt()).map_err(ServiceError::Io)?; + let mut stream_term = signal(SignalKind::terminate()).map_err(ServiceError::Io)?; + + Ok(Signals( + async move { + future::select(stream_int.recv().boxed(), stream_term.recv().boxed()).await; + } + .boxed(), + )) + } + + /// Capture the relevant signals to handle shutdown of the node smoothly. + /// + /// Needs to be called in a Tokio context to have access to the tokio reactor. + #[cfg(not(unix))] + pub fn capture() -> Result { + use tokio::signal::ctrl_c; + + Ok(Signals( + async move { + let _ = ctrl_c().await; + } + .boxed(), + )) + } + + /// A dummy signal that never returns. + pub fn dummy() -> Self { + Self(future::pending().boxed()) + } + + /// Run a future task until receive a signal. + pub async fn run_until_signal(self, func: F) -> Result<(), E> + where + F: Future> + future::FusedFuture, + E: std::error::Error + Send + Sync + 'static, + { + let signals = self.future().fuse(); + + pin_mut!(func, signals); + + select! { + _ = signals => {}, + res = func => res?, + } + + Ok(()) + } +}