diff --git a/Cargo.lock b/Cargo.lock index a8f596c0684ef..f37a3b41f2f0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -326,6 +326,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-io" version = "1.12.0" @@ -8626,9 +8637,9 @@ version = "0.10.0-dev" dependencies = [ "array-bytes", "assert_matches", + "async-channel", "async-trait", "asynchronous-codec", - "backtrace", "bytes", "either", "fnv", diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index b6b21247128d7..5a918bebd626e 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -15,9 +15,9 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] array-bytes = "4.1" +async-channel = "1.8.0" async-trait = "0.1" asynchronous-codec = "0.6" -backtrace = "0.3.67" bytes = "1" codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] } either = "1.5.3" diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 57a5092ae62ea..99ac022c2d8b6 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -31,20 +31,17 @@ //! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the //! collection. -use backtrace::Backtrace; -use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream}; +use futures::{prelude::*, ready, stream::FusedStream}; use log::error; use parking_lot::Mutex; use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64}; use sc_network_common::protocol::event::Event; use std::{ + backtrace::Backtrace, cell::RefCell, fmt, pin::Pin, - sync::{ - atomic::{AtomicI64, Ordering}, - Arc, - }, + sync::Arc, task::{Context, Poll}, }; @@ -52,20 +49,18 @@ use std::{ /// /// The name is used in Prometheus reports, the queue size threshold is used /// to warn if there are too many unprocessed events in the channel. -pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) { - let (tx, rx) = mpsc::unbounded(); +pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) { + let (tx, rx) = async_channel::unbounded(); let metrics = Arc::new(Mutex::new(None)); - let queue_size = Arc::new(AtomicI64::new(0)); let tx = Sender { inner: tx, name, - queue_size: queue_size.clone(), queue_size_warning, warning_fired: false, - creation_backtrace: Backtrace::new_unresolved(), + creation_backtrace: Backtrace::force_capture(), metrics: metrics.clone(), }; - let rx = Receiver { inner: rx, name, queue_size, metrics }; + let rx = Receiver { inner: rx, name, metrics }; (tx, rx) } @@ -77,16 +72,11 @@ pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver /// implement the `Clone` trait e.g. in Order to not complicate the logic keeping the metrics in /// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**. pub struct Sender { - inner: mpsc::UnboundedSender, + inner: async_channel::Sender, /// Name to identify the channel (e.g., in Prometheus and logs). name: &'static str, - /// Number of events in the queue. Clone of [`Receiver::in_transit`]. - // To not bother with ordering and possible underflow errors of the unsigned counter - // we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate. - // It can turn < 0 though. - queue_size: Arc, /// Threshold queue size to generate an error message in the logs. - queue_size_warning: i64, + queue_size_warning: usize, /// We generate the error message only once to not spam the logs. warning_fired: bool, /// Backtrace of a place where the channel was created. @@ -113,9 +103,8 @@ impl Drop for Sender { /// Receiving side of a channel. pub struct Receiver { - inner: mpsc::UnboundedReceiver, + inner: async_channel::Receiver, name: &'static str, - queue_size: Arc, /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] /// is assigned to an instance of [`OutChannels`]. metrics: Arc>>>>, @@ -126,7 +115,6 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { - let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed); let metrics = self.metrics.lock().clone(); match metrics.as_ref().map(|m| m.as_ref()) { Some(Some(metrics)) => metrics.event_out(&ev, self.name), @@ -191,17 +179,19 @@ impl OutChannels { /// Sends an event. pub fn send(&mut self, event: Event) { self.event_streams.retain_mut(|sender| { - let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed); - if queue_size == sender.queue_size_warning && !sender.warning_fired { + if sender.inner.len() >= sender.queue_size_warning && !sender.warning_fired { sender.warning_fired = true; - sender.creation_backtrace.resolve(); error!( - "The number of unprocessed events in channel `{}` reached {}.\n\ - The channel was created at:\n{:?}", - sender.name, sender.queue_size_warning, sender.creation_backtrace, + "The number of unprocessed events in channel `{}` exceeded {}.\n\ + The channel was created at:\n{:}\n + The last event was sent from:\n{:}", + sender.name, + sender.queue_size_warning, + sender.creation_backtrace, + Backtrace::force_capture(), ); } - sender.inner.unbounded_send(event.clone()).is_ok() + sender.inner.try_send(event.clone()).is_ok() }); if let Some(metrics) = &*self.metrics {