diff --git a/client/network/src/service.rs b/client/network/src/service.rs index f943a03f50b38..b760b8715c9d0 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -87,7 +87,6 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, - task::Poll, }; pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; @@ -1277,600 +1276,598 @@ where _marker: PhantomData, } -impl Future for NetworkWorker +impl NetworkWorker where B: BlockT + 'static, H: ExHashT, Client: HeaderBackend + 'static, { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { - let this = &mut *self; - - // At the time of writing of this comment, due to a high volume of messages, the network - // worker sometimes takes a long time to process the loop below. When that happens, the - // rest of the polling is frozen. In order to avoid negative side-effects caused by this - // freeze, a limit to the number of iterations is enforced below. If the limit is reached, - // the task is interrupted then scheduled again. - // - // This allows for a more even distribution in the time taken by each sub-part of the - // polling. - let mut num_iterations = 0; - loop { - num_iterations += 1; - if num_iterations >= 100 { - cx.waker().wake_by_ref(); - break - } - - // Process the next message coming from the `NetworkService`. - let msg = match this.from_service.poll_next_unpin(cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => break, - }; - match msg { - ServiceToWorkerMsg::AnnounceBlock(hash, data) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .announce_block(hash, data), - ServiceToWorkerMsg::GetValue(key) => - this.network_service.behaviour_mut().get_value(key), - ServiceToWorkerMsg::PutValue(key, value) => - this.network_service.behaviour_mut().put_value(key, value), - ServiceToWorkerMsg::SetReservedOnly(reserved_only) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .set_reserved_only(reserved_only), - ServiceToWorkerMsg::SetReserved(peers) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .set_reserved_peers(peers), - ServiceToWorkerMsg::SetPeersetReserved(protocol, peers) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .set_reserved_peerset_peers(protocol, peers), - ServiceToWorkerMsg::AddReserved(peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .add_reserved_peer(peer_id), - ServiceToWorkerMsg::RemoveReserved(peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .remove_reserved_peer(peer_id), - ServiceToWorkerMsg::AddSetReserved(protocol, peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .add_set_reserved_peer(protocol, peer_id), - ServiceToWorkerMsg::RemoveSetReserved(protocol, peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .remove_set_reserved_peer(protocol, peer_id), - ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => - this.network_service.behaviour_mut().add_known_address(peer_id, addr), - ServiceToWorkerMsg::AddToPeersSet(protocol, peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .add_to_peers_set(protocol, peer_id), - ServiceToWorkerMsg::RemoveFromPeersSet(protocol, peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .remove_from_peers_set(protocol, peer_id), - ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender), - ServiceToWorkerMsg::Request { - target, - protocol, - request, - pending_response, - connect, - } => { - this.network_service.behaviour_mut().send_request( - &target, - &protocol, - request, - pending_response, - connect, - ); - }, - ServiceToWorkerMsg::NetworkStatus { pending_response } => { - let _ = pending_response.send(Ok(this.status())); - }, - ServiceToWorkerMsg::NetworkState { pending_response } => { - let _ = pending_response.send(Ok(this.network_state())); - }, - ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .disconnect_peer(&who, protocol_name), - ServiceToWorkerMsg::NewBestBlockImported(hash, number) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .new_best_block_imported(hash, number), - } - } - - // `num_iterations` serves the same purpose as in the previous loop. - // See the previous loop for explanations. - let mut num_iterations = 0; - loop { - num_iterations += 1; - if num_iterations >= 1000 { - cx.waker().wake_by_ref(); - break + /// Perform one action on the network. + pub async fn next_action(&mut self) { + // Next message from the service, or `future::pending()` if the stream has terminated. + let next_worker_msg = { + let from_service = &mut self.from_service; + + async move { + if let Some(msg) = from_service.next().await { + msg + } else { + loop { + futures::pending!(); + } + } } + }; - // Process the next action coming from the network. - let next_event = this.network_service.select_next_some(); - futures::pin_mut!(next_event); - let poll_value = next_event.poll_unpin(cx); - - match poll_value { - Poll::Pending => break, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { - protocol, - result, - .. - })) => { - if let Some(metrics) = this.metrics.as_ref() { - match result { - Ok(serve_time) => { - metrics - .requests_in_success_total - .with_label_values(&[&protocol]) - .observe(serve_time.as_secs_f64()); - }, - Err(err) => { - let reason = match err { - ResponseFailure::Network(InboundFailure::Timeout) => "timeout", - ResponseFailure::Network( - InboundFailure::UnsupportedProtocols, - ) => - // `UnsupportedProtocols` is reported for every single - // inbound request whenever a request with an unsupported - // protocol is received. This is not reported in order to - // avoid confusions. - continue, - ResponseFailure::Network(InboundFailure::ResponseOmission) => - "busy-omitted", - ResponseFailure::Network(InboundFailure::ConnectionClosed) => - "connection-closed", - }; + // Next swarm event, or `future::pending()` if the stream has terminated + // (guaranteed to never happen with `Swarm`). + let next_swarm_event = { + let swarm = &mut self.network_service; - metrics - .requests_in_failure_total - .with_label_values(&[&protocol, reason]) - .inc(); - }, - } + async move { + if let Some(event) = swarm.next().await { + event + } else { + loop { + futures::pending!(); } - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { - protocol, - duration, - result, - .. - })) => - if let Some(metrics) = this.metrics.as_ref() { - match result { - Ok(_) => { - metrics - .requests_out_success_total - .with_label_values(&[&protocol]) - .observe(duration.as_secs_f64()); - }, - Err(err) => { - let reason = match err { - RequestFailure::NotConnected => "not-connected", - RequestFailure::UnknownProtocol => "unknown-protocol", - RequestFailure::Refused => "refused", - RequestFailure::Obsolete => "obsolete", - RequestFailure::Network(OutboundFailure::DialFailure) => - "dial-failure", - RequestFailure::Network(OutboundFailure::Timeout) => "timeout", - RequestFailure::Network(OutboundFailure::ConnectionClosed) => - "connection-closed", - RequestFailure::Network( - OutboundFailure::UnsupportedProtocols, - ) => "unsupported", - }; + } + } + }; - metrics - .requests_out_failure_total - .with_label_values(&[&protocol, reason]) - .inc(); - }, - } + futures::select! { + msg = next_worker_msg.fuse() => { + // Process the next message coming from the `NetworkService`. + match msg { + ServiceToWorkerMsg::AnnounceBlock(hash, data) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .announce_block(hash, data), + ServiceToWorkerMsg::GetValue(key) => + self.network_service.behaviour_mut().get_value(key), + ServiceToWorkerMsg::PutValue(key, value) => + self.network_service.behaviour_mut().put_value(key, value), + ServiceToWorkerMsg::SetReservedOnly(reserved_only) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .set_reserved_only(reserved_only), + ServiceToWorkerMsg::SetReserved(peers) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .set_reserved_peers(peers), + ServiceToWorkerMsg::SetPeersetReserved(protocol, peers) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .set_reserved_peerset_peers(protocol, peers), + ServiceToWorkerMsg::AddReserved(peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .add_reserved_peer(peer_id), + ServiceToWorkerMsg::RemoveReserved(peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .remove_reserved_peer(peer_id), + ServiceToWorkerMsg::AddSetReserved(protocol, peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .add_set_reserved_peer(protocol, peer_id), + ServiceToWorkerMsg::RemoveSetReserved(protocol, peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .remove_set_reserved_peer(protocol, peer_id), + ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => + self.network_service.behaviour_mut().add_known_address(peer_id, addr), + ServiceToWorkerMsg::AddToPeersSet(protocol, peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .add_to_peers_set(protocol, peer_id), + ServiceToWorkerMsg::RemoveFromPeersSet(protocol, peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .remove_from_peers_set(protocol, peer_id), + ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender), + ServiceToWorkerMsg::Request { + target, + protocol, + request, + pending_response, + connect, + } => { + self.network_service.behaviour_mut().send_request( + &target, + &protocol, + request, + pending_response, + connect, + ); }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { - peer, - changes, - })) => - for change in changes { - this.network_service.behaviour().user_protocol().report_peer(peer, change); + ServiceToWorkerMsg::NetworkStatus { pending_response } => { + let _ = pending_response.send(Ok(self.status())); }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::PeerIdentify { - peer_id, - info: - IdentifyInfo { - protocol_version, - agent_version, - mut listen_addrs, - protocols, - .. - }, - })) => { - if listen_addrs.len() > 30 { - debug!( - target: "sub-libp2p", - "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}", - peer_id, protocol_version, agent_version - ); - listen_addrs.truncate(30); - } - for addr in listen_addrs { - this.network_service - .behaviour_mut() - .add_self_reported_address_to_dht(&peer_id, &protocols, addr); - } - this.network_service + ServiceToWorkerMsg::NetworkState { pending_response } => { + let _ = pending_response.send(Ok(self.network_state())); + }, + ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self + .network_service .behaviour_mut() .user_protocol_mut() - .add_default_set_discovered_nodes(iter::once(peer_id)); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id))) => { - this.network_service + .disconnect_peer(&who, protocol_name), + ServiceToWorkerMsg::NewBestBlockImported(hash, number) => self + .network_service .behaviour_mut() .user_protocol_mut() - .add_default_set_discovered_nodes(iter::once(peer_id)); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) => - if let Some(metrics) = this.metrics.as_ref() { - metrics.kademlia_random_queries_total.inc(); + .new_best_block_imported(hash, number), + } + }, + next_event = next_swarm_event.fuse() => { + // Process the next event coming from `Swarm`. + match next_event { + SwarmEvent::Behaviour(BehaviourOut::InboundRequest { + protocol, + result, + .. + }) => { + if let Some(metrics) = self.metrics.as_ref() { + match result { + Ok(serve_time) => { + metrics + .requests_in_success_total + .with_label_values(&[&protocol]) + .observe(serve_time.as_secs_f64()); + }, + Err(err) => { + let reason = match err { + ResponseFailure::Network(InboundFailure::Timeout) => + Some("timeout"), + ResponseFailure::Network( + InboundFailure::UnsupportedProtocols, + ) => + // `UnsupportedProtocols` is reported for every single + // inbound request whenever a request with an unsupported + // protocol is received. This is not reported in order to + // avoid confusions. + None, + ResponseFailure::Network(InboundFailure::ResponseOmission) => + Some("busy-omitted"), + ResponseFailure::Network(InboundFailure::ConnectionClosed) => + Some("connection-closed"), + }; + + if let Some(reason) = reason { + metrics + .requests_in_failure_total + .with_label_values(&[&protocol, reason]) + .inc(); + } + }, + } + } }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { - remote, - protocol, - negotiated_fallback, - notifications_sink, - role, - })) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics - .notifications_streams_opened_total - .with_label_values(&[&protocol]) - .inc(); - } - { - let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); - let _previous_value = peers_notifications_sinks - .insert((remote, protocol.clone()), notifications_sink); - debug_assert!(_previous_value.is_none()); - } - this.event_streams.send(Event::NotificationStreamOpened { + SwarmEvent::Behaviour(BehaviourOut::RequestFinished { + protocol, + duration, + result, + .. + }) => + if let Some(metrics) = self.metrics.as_ref() { + match result { + Ok(_) => { + metrics + .requests_out_success_total + .with_label_values(&[&protocol]) + .observe(duration.as_secs_f64()); + }, + Err(err) => { + let reason = match err { + RequestFailure::NotConnected => "not-connected", + RequestFailure::UnknownProtocol => "unknown-protocol", + RequestFailure::Refused => "refused", + RequestFailure::Obsolete => "obsolete", + RequestFailure::Network(OutboundFailure::DialFailure) => + "dial-failure", + RequestFailure::Network(OutboundFailure::Timeout) => "timeout", + RequestFailure::Network(OutboundFailure::ConnectionClosed) => + "connection-closed", + RequestFailure::Network( + OutboundFailure::UnsupportedProtocols, + ) => "unsupported", + }; + + metrics + .requests_out_failure_total + .with_label_values(&[&protocol, reason]) + .inc(); + }, + } + }, + SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { + peer, + changes, + }) => + for change in changes { + self.network_service.behaviour().user_protocol().report_peer(peer, change); + }, + SwarmEvent::Behaviour(BehaviourOut::PeerIdentify { + peer_id, + info: + IdentifyInfo { + protocol_version, + agent_version, + mut listen_addrs, + protocols, + .. + }, + }) => { + if listen_addrs.len() > 30 { + debug!( + target: "sub-libp2p", + "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}", + peer_id, protocol_version, agent_version + ); + listen_addrs.truncate(30); + } + for addr in listen_addrs { + self.network_service + .behaviour_mut() + .add_self_reported_address_to_dht(&peer_id, &protocols, addr); + } + self.network_service + .behaviour_mut() + .user_protocol_mut() + .add_default_set_discovered_nodes(iter::once(peer_id)); + }, + SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => { + self.network_service + .behaviour_mut() + .user_protocol_mut() + .add_default_set_discovered_nodes(iter::once(peer_id)); + }, + SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => + if let Some(metrics) = self.metrics.as_ref() { + metrics.kademlia_random_queries_total.inc(); + }, + SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { remote, protocol, negotiated_fallback, + notifications_sink, role, - }); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced { - remote, - protocol, - notifications_sink, - })) => { - let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); - if let Some(s) = peers_notifications_sinks.get_mut(&(remote, protocol)) { - *s = notifications_sink; - } else { - error!( - target: "sub-libp2p", - "NotificationStreamReplaced for non-existing substream" - ); - debug_assert!(false); - } + }) => { + if let Some(metrics) = self.metrics.as_ref() { + metrics + .notifications_streams_opened_total + .with_label_values(&[&protocol]) + .inc(); + } + { + let mut peers_notifications_sinks = self.peers_notifications_sinks.lock(); + let _previous_value = peers_notifications_sinks + .insert((remote, protocol.clone()), notifications_sink); + debug_assert!(_previous_value.is_none()); + } + self.event_streams.send(Event::NotificationStreamOpened { + remote, + protocol, + negotiated_fallback, + role, + }); + }, + SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced { + remote, + protocol, + notifications_sink, + }) => { + let mut peers_notifications_sinks = self.peers_notifications_sinks.lock(); + if let Some(s) = peers_notifications_sinks.get_mut(&(remote, protocol)) { + *s = notifications_sink; + } else { + error!( + target: "sub-libp2p", + "NotificationStreamReplaced for non-existing substream" + ); + debug_assert!(false); + } - // TODO: Notifications might have been lost as a result of the previous - // connection being dropped, and as a result it would be preferable to notify - // the users of this fact by simulating the substream being closed then - // reopened. - // The code below doesn't compile because `role` is unknown. Propagating the - // handshake of the secondary connections is quite an invasive change and - // would conflict with https://github.com/paritytech/substrate/issues/6403. - // Considering that dropping notifications is generally regarded as - // acceptable, this bug is at the moment intentionally left there and is - // intended to be fixed at the same time as - // https://github.com/paritytech/substrate/issues/6403. - // this.event_streams.send(Event::NotificationStreamClosed { - // remote, - // protocol, - // }); - // this.event_streams.send(Event::NotificationStreamOpened { - // remote, - // protocol, - // role, - // }); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { - remote, - protocol, - })) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics - .notifications_streams_closed_total - .with_label_values(&[&protocol[..]]) - .inc(); - } - this.event_streams.send(Event::NotificationStreamClosed { + // TODO: Notifications might have been lost as a result of the previous + // connection being dropped, and as a result it would be preferable to notify + // the users of this fact by simulating the substream being closed then + // reopened. + // The code below doesn't compile because `role` is unknown. Propagating the + // handshake of the secondary connections is quite an invasive change and + // would conflict with https://github.com/paritytech/substrate/issues/6403. + // Considering that dropping notifications is generally regarded as + // acceptable, this bug is at the moment intentionally left there and is + // intended to be fixed at the same time as + // https://github.com/paritytech/substrate/issues/6403. + // self.event_streams.send(Event::NotificationStreamClosed { + // remote, + // protocol, + // }); + // self.event_streams.send(Event::NotificationStreamOpened { + // remote, + // protocol, + // role, + // }); + }, + SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, - protocol: protocol.clone(), - }); - { - let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); - let _previous_value = peers_notifications_sinks.remove(&(remote, protocol)); - debug_assert!(_previous_value.is_some()); - } - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { - remote, - messages, - })) => { - if let Some(metrics) = this.metrics.as_ref() { - for (protocol, message) in &messages { + protocol, + }) => { + if let Some(metrics) = self.metrics.as_ref() { metrics - .notifications_sizes - .with_label_values(&["in", protocol]) - .observe(message.len() as f64); + .notifications_streams_closed_total + .with_label_values(&[&protocol[..]]) + .inc(); + } + self.event_streams.send(Event::NotificationStreamClosed { + remote, + protocol: protocol.clone(), + }); + { + let mut peers_notifications_sinks = self.peers_notifications_sinks.lock(); + let _previous_value = peers_notifications_sinks.remove(&(remote, protocol)); + debug_assert!(_previous_value.is_some()); + } + }, + SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { + remote, + messages, + }) => { + if let Some(metrics) = self.metrics.as_ref() { + for (protocol, message) in &messages { + metrics + .notifications_sizes + .with_label_values(&["in", protocol]) + .observe(message.len() as f64); + } + } + self.event_streams.send(Event::NotificationsReceived { remote, messages }); + }, + SwarmEvent::Behaviour(BehaviourOut::SyncConnected(remote)) => { + self.event_streams.send(Event::SyncConnected { remote }); + }, + SwarmEvent::Behaviour(BehaviourOut::SyncDisconnected(remote)) => { + self.event_streams.send(Event::SyncDisconnected { remote }); + }, + SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => { + if let Some(metrics) = self.metrics.as_ref() { + let query_type = match event { + DhtEvent::ValueFound(_) => "value-found", + DhtEvent::ValueNotFound(_) => "value-not-found", + DhtEvent::ValuePut(_) => "value-put", + DhtEvent::ValuePutFailed(_) => "value-put-failed", + }; + metrics + .kademlia_query_duration + .with_label_values(&[query_type]) + .observe(duration.as_secs_f64()); } - } - this.event_streams.send(Event::NotificationsReceived { remote, messages }); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::SyncConnected(remote))) => { - this.event_streams.send(Event::SyncConnected { remote }); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::SyncDisconnected(remote))) => { - this.event_streams.send(Event::SyncDisconnected { remote }); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration))) => { - if let Some(metrics) = this.metrics.as_ref() { - let query_type = match event { - DhtEvent::ValueFound(_) => "value-found", - DhtEvent::ValueNotFound(_) => "value-not-found", - DhtEvent::ValuePut(_) => "value-put", - DhtEvent::ValuePutFailed(_) => "value-put-failed", - }; - metrics - .kademlia_query_duration - .with_label_values(&[query_type]) - .observe(duration.as_secs_f64()); - } - this.event_streams.send(Event::Dht(event)); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::None)) => { - // Ignored event from lower layers. - }, - Poll::Ready(SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - concurrent_dial_errors, - }) => { - if let Some(errors) = concurrent_dial_errors { - debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors); - } else { - debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); - } + self.event_streams.send(Event::Dht(event)); + }, + SwarmEvent::Behaviour(BehaviourOut::None) => { + // Ignored event from lower layers. + }, + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + concurrent_dial_errors, + } => { + if let Some(errors) = concurrent_dial_errors { + debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors); + } else { + debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); + } - if let Some(metrics) = this.metrics.as_ref() { - let direction = match endpoint { - ConnectedPoint::Dialer { .. } => "out", - ConnectedPoint::Listener { .. } => "in", - }; - metrics.connections_opened_total.with_label_values(&[direction]).inc(); + if let Some(metrics) = self.metrics.as_ref() { + let direction = match endpoint { + ConnectedPoint::Dialer { .. } => "out", + ConnectedPoint::Listener { .. } => "in", + }; + metrics.connections_opened_total.with_label_values(&[direction]).inc(); - if num_established.get() == 1 { - metrics.distinct_peers_connections_opened_total.inc(); - } - } - }, - Poll::Ready(SwarmEvent::ConnectionClosed { - peer_id, - cause, - endpoint, - num_established, - }) => { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause); - if let Some(metrics) = this.metrics.as_ref() { - let direction = match endpoint { - ConnectedPoint::Dialer { .. } => "out", - ConnectedPoint::Listener { .. } => "in", - }; - let reason = match cause { - Some(ConnectionError::IO(_)) => "transport-error", - Some(ConnectionError::Handler(EitherError::A(EitherError::A( - EitherError::B(EitherError::A(PingFailure::Timeout)), - )))) => "ping-timeout", - Some(ConnectionError::Handler(EitherError::A(EitherError::A( - EitherError::A(NotifsHandlerError::SyncNotificationsClogged), - )))) => "sync-notifications-clogged", - Some(ConnectionError::Handler(_)) => "protocol-error", - Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout", - None => "actively-closed", - }; - metrics - .connections_closed_total - .with_label_values(&[direction, reason]) - .inc(); - - // `num_established` represents the number of *remaining* connections. - if num_established == 0 { - metrics.distinct_peers_connections_closed_total.inc(); + if num_established.get() == 1 { + metrics.distinct_peers_connections_opened_total.inc(); + } } - } - }, - Poll::Ready(SwarmEvent::NewListenAddr { address, .. }) => { - trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address); - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_local_addresses.inc(); - } - }, - Poll::Ready(SwarmEvent::ExpiredListenAddr { address, .. }) => { - info!(target: "sub-libp2p", "📪 No longer listening on {}", address); - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_local_addresses.dec(); - } - }, - Poll::Ready(SwarmEvent::OutgoingConnectionError { peer_id, error }) => { - if let Some(peer_id) = peer_id { - trace!( - target: "sub-libp2p", - "Libp2p => Failed to reach {:?}: {}", - peer_id, error, - ); + }, + SwarmEvent::ConnectionClosed { + peer_id, + cause, + endpoint, + num_established, + } => { + debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause); + if let Some(metrics) = self.metrics.as_ref() { + let direction = match endpoint { + ConnectedPoint::Dialer { .. } => "out", + ConnectedPoint::Listener { .. } => "in", + }; + let reason = match cause { + Some(ConnectionError::IO(_)) => "transport-error", + Some(ConnectionError::Handler(EitherError::A(EitherError::A( + EitherError::B(EitherError::A(PingFailure::Timeout)), + )))) => "ping-timeout", + Some(ConnectionError::Handler(EitherError::A(EitherError::A( + EitherError::A(NotifsHandlerError::SyncNotificationsClogged), + )))) => "sync-notifications-clogged", + Some(ConnectionError::Handler(_)) => "protocol-error", + Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout", + None => "actively-closed", + }; + metrics + .connections_closed_total + .with_label_values(&[direction, reason]) + .inc(); - if this.boot_node_ids.contains(&peer_id) { - if let DialError::WrongPeerId { obtained, endpoint } = &error { - if let ConnectedPoint::Dialer { address, role_override: _ } = - endpoint - { - warn!( - "💔 The bootnode you want to connect to at `{}` provided a different peer ID `{}` than the one you expect `{}`.", - address, - obtained, - peer_id, - ); + // `num_established` represents the number of *remaining* connections. + if num_established == 0 { + metrics.distinct_peers_connections_closed_total.inc(); + } + } + }, + SwarmEvent::NewListenAddr { address, .. } => { + trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address); + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_local_addresses.inc(); + } + }, + SwarmEvent::ExpiredListenAddr { address, .. } => { + info!(target: "sub-libp2p", "📪 No longer listening on {}", address); + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_local_addresses.dec(); + } + }, + SwarmEvent::OutgoingConnectionError { peer_id, error } => { + if let Some(peer_id) = peer_id { + trace!( + target: "sub-libp2p", + "Libp2p => Failed to reach {:?}: {}", + peer_id, error, + ); + + if self.boot_node_ids.contains(&peer_id) { + if let DialError::WrongPeerId { obtained, endpoint } = &error { + if let ConnectedPoint::Dialer { address, role_override: _ } = + endpoint + { + warn!( + "💔 The bootnode you want to connect to at `{}` provided a different peer ID `{}` than the one you expect `{}`.", + address, + obtained, + peer_id, + ); + } } } } - } - if let Some(metrics) = this.metrics.as_ref() { - let reason = match error { - DialError::ConnectionLimit(_) => Some("limit-reached"), - DialError::InvalidPeerId(_) => Some("invalid-peer-id"), - DialError::Transport(_) | DialError::ConnectionIo(_) => - Some("transport-error"), - DialError::Banned | - DialError::LocalPeerId | - DialError::NoAddresses | - DialError::DialPeerConditionFalse(_) | - DialError::WrongPeerId { .. } | - DialError::Aborted => None, // ignore them - }; - if let Some(reason) = reason { - metrics - .pending_connections_errors_total - .with_label_values(&[reason]) - .inc(); + if let Some(metrics) = self.metrics.as_ref() { + let reason = match error { + DialError::ConnectionLimit(_) => Some("limit-reached"), + DialError::InvalidPeerId(_) => Some("invalid-peer-id"), + DialError::Transport(_) | DialError::ConnectionIo(_) => + Some("transport-error"), + DialError::Banned | + DialError::LocalPeerId | + DialError::NoAddresses | + DialError::DialPeerConditionFalse(_) | + DialError::WrongPeerId { .. } | + DialError::Aborted => None, // ignore them + }; + if let Some(reason) = reason { + metrics + .pending_connections_errors_total + .with_label_values(&[reason]) + .inc(); + } } - } - }, - Poll::Ready(SwarmEvent::Dialing(peer_id)) => { - trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id) - }, - Poll::Ready(SwarmEvent::IncomingConnection { local_addr, send_back_addr }) => { - trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))", - local_addr, send_back_addr); - if let Some(metrics) = this.metrics.as_ref() { - metrics.incoming_connections_total.inc(); - } - }, - Poll::Ready(SwarmEvent::IncomingConnectionError { - local_addr, - send_back_addr, - error, - }) => { - debug!( - target: "sub-libp2p", - "Libp2p => IncomingConnectionError({},{}): {}", - local_addr, send_back_addr, error, - ); - if let Some(metrics) = this.metrics.as_ref() { - let reason = match error { - PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"), - PendingConnectionError::WrongPeerId { .. } => Some("invalid-peer-id"), - PendingConnectionError::Transport(_) | - PendingConnectionError::IO(_) => Some("transport-error"), - PendingConnectionError::Aborted => None, // ignore it - }; - - if let Some(reason) = reason { + }, + SwarmEvent::Dialing(peer_id) => { + trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id) + }, + SwarmEvent::IncomingConnection { local_addr, send_back_addr } => { + trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))", + local_addr, send_back_addr); + if let Some(metrics) = self.metrics.as_ref() { + metrics.incoming_connections_total.inc(); + } + }, + SwarmEvent::IncomingConnectionError { + local_addr, + send_back_addr, + error, + } => { + debug!( + target: "sub-libp2p", + "Libp2p => IncomingConnectionError({},{}): {}", + local_addr, send_back_addr, error, + ); + if let Some(metrics) = self.metrics.as_ref() { + let reason = match error { + PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"), + PendingConnectionError::WrongPeerId { .. } => Some("invalid-peer-id"), + PendingConnectionError::Transport(_) | + PendingConnectionError::IO(_) => Some("transport-error"), + PendingConnectionError::Aborted => None, // ignore it + }; + + if let Some(reason) = reason { + metrics + .incoming_connections_errors_total + .with_label_values(&[reason]) + .inc(); + } + } + }, + SwarmEvent::BannedPeer { peer_id, endpoint } => { + debug!( + target: "sub-libp2p", + "Libp2p => BannedPeer({}). Connected via {:?}.", + peer_id, endpoint, + ); + if let Some(metrics) = self.metrics.as_ref() { metrics .incoming_connections_errors_total - .with_label_values(&[reason]) + .with_label_values(&["banned"]) .inc(); } - } - }, - Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => { - debug!( - target: "sub-libp2p", - "Libp2p => BannedPeer({}). Connected via {:?}.", - peer_id, endpoint, - ); - if let Some(metrics) = this.metrics.as_ref() { - metrics - .incoming_connections_errors_total - .with_label_values(&["banned"]) - .inc(); - } - }, - Poll::Ready(SwarmEvent::ListenerClosed { reason, addresses, .. }) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_local_addresses.sub(addresses.len() as u64); - } - let addrs = - addresses.into_iter().map(|a| a.to_string()).collect::>().join(", "); - match reason { - Ok(()) => error!( - target: "sub-libp2p", - "📪 Libp2p listener ({}) closed gracefully", - addrs - ), - Err(e) => error!( - target: "sub-libp2p", - "📪 Libp2p listener ({}) closed: {}", - addrs, e - ), - } - }, - Poll::Ready(SwarmEvent::ListenerError { error, .. }) => { - debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error); - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_errors_total.inc(); - } - }, - }; - } + }, + SwarmEvent::ListenerClosed { reason, addresses, .. } => { + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_local_addresses.sub(addresses.len() as u64); + } + let addrs = + addresses.into_iter().map(|a| a.to_string()).collect::>().join(", "); + match reason { + Ok(()) => error!( + target: "sub-libp2p", + "📪 Libp2p listener ({}) closed gracefully", + addrs + ), + Err(e) => error!( + target: "sub-libp2p", + "📪 Libp2p listener ({}) closed: {}", + addrs, e + ), + } + }, + SwarmEvent::ListenerError { error, .. } => { + debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error); + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_errors_total.inc(); + } + }, + } + } + }; let num_connected_peers = - this.network_service.behaviour_mut().user_protocol_mut().num_connected_peers(); + self.network_service.behaviour_mut().user_protocol_mut().num_connected_peers(); // Update the variables shared with the `NetworkService`. - this.num_connected.store(num_connected_peers, Ordering::Relaxed); + self.num_connected.store(num_connected_peers, Ordering::Relaxed); { let external_addresses = - Swarm::>::external_addresses(&this.network_service) + Swarm::>::external_addresses(&self.network_service) .map(|r| &r.addr) .cloned() .collect(); - *this.external_addresses.lock() = external_addresses; + *self.external_addresses.lock() = external_addresses; } - let is_major_syncing = this + let is_major_syncing = self .network_service .behaviour_mut() .user_protocol_mut() @@ -1878,10 +1875,10 @@ where .state .is_major_syncing(); - this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); + self.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); - if let Some(metrics) = this.metrics.as_ref() { - if let Some(buckets) = this.network_service.behaviour_mut().num_entries_per_kbucket() { + if let Some(metrics) = self.metrics.as_ref() { + if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() { for (lower_ilog2_bucket_bound, num_entries) in buckets { metrics .kbuckets_num_nodes @@ -1889,25 +1886,23 @@ where .set(num_entries as u64); } } - if let Some(num_entries) = this.network_service.behaviour_mut().num_kademlia_records() { + if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() { metrics.kademlia_records_count.set(num_entries as u64); } if let Some(num_entries) = - this.network_service.behaviour_mut().kademlia_records_total_size() + self.network_service.behaviour_mut().kademlia_records_total_size() { metrics.kademlia_records_sizes_total.set(num_entries as u64); } metrics .peerset_num_discovered - .set(this.network_service.behaviour_mut().user_protocol().num_discovered_peers() + .set(self.network_service.behaviour_mut().user_protocol().num_discovered_peers() as u64); metrics.pending_connections.set( - Swarm::network_info(&this.network_service).connection_counters().num_pending() + Swarm::network_info(&self.network_service).connection_counters().num_pending() as u64, ); } - - Poll::Pending } } diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 1529b822ade32..e3b8416808b8c 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -289,7 +289,7 @@ async fn build_network_future< // The network worker has done something. Nothing special to do, but could be // used in the future to perform actions in response of things that happened on // the network. - _ = (&mut network).fuse() => {} + _ = network.next_action().fuse() => {} } } }