Skip to content

Commit

Permalink
Report RPC Errors to the application on peer disconnections (#5680)
Browse files Browse the repository at this point in the history
* Report RPC Errors to the application on peer disconnections

Co-authored-by: Age Manning <[email protected]>

* Expect RPCError::Disconnect to fail ongoing requests

* Drop lookups after peer disconnect and not awaiting events

* Allow RPCError disconnect through network service

* Update beacon_node/lighthouse_network/src/service/mod.rs

Co-authored-by: Age Manning <[email protected]>

* Merge branch 'unstable' into rpc-error-on-disconnect
  • Loading branch information
dapplion authored May 6, 2024
1 parent 436d54e commit b87c36a
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 101 deletions.
25 changes: 25 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,31 @@ where
!matches!(self.state, HandlerState::Deactivated)
}

// NOTE: This function gets polled to completion upon a connection close.
fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
// Inform the network behaviour of any failed requests

while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() {
let outbound_info = self
.outbound_substreams
.remove(&substream_id)
.expect("The value must exist for a key");
// If the state of the connection is closing, we do not need to report this case to
// the behaviour, as the connection has just closed non-gracefully
if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) {
continue;
}

// Register this request as an RPC Error
return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto: outbound_info.proto,
id: outbound_info.req_id,
})));
}
Poll::Ready(None)
}

fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
24 changes: 18 additions & 6 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,12 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
.goodbye_peer(peer_id, reason, source);
}

/// Hard (ungraceful) disconnect for testing purposes only
/// Use goodbye_peer for disconnections, do not use this function.
pub fn __hard_disconnect_testing_only(&mut self, peer_id: PeerId) {
let _ = self.swarm.disconnect_peer_id(peer_id);
}

/// Returns an iterator over all enr entries in the DHT.
pub fn enr_entries(&self) -> Vec<Enr> {
self.discovery().table_entries_enr()
Expand Down Expand Up @@ -1373,12 +1379,18 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
let peer_id = event.peer_id;

if !self.peer_manager().is_connected(&peer_id) {
debug!(
self.log,
"Ignoring rpc message of disconnecting peer";
event
);
return None;
// Sync expects a RPCError::Disconnected to drop associated lookups with this peer.
// Silencing this event breaks the API contract with RPC where every request ends with
// - A stream termination event, or
// - An RPCError event
if !matches!(event.event, HandlerEvent::Err(HandlerErr::Outbound { .. })) {
debug!(
self.log,
"Ignoring rpc message of disconnecting peer";
event
);
return None;
}
}

let handler_id = event.conn_id;
Expand Down
92 changes: 91 additions & 1 deletion beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod common;

use common::Protocol;
use lighthouse_network::rpc::methods::*;
use lighthouse_network::rpc::{methods::*, RPCError};
use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response};
use slog::{debug, warn, Level};
use ssz::Encode;
Expand Down Expand Up @@ -996,6 +996,96 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
})
}

#[test]
fn test_disconnect_triggers_rpc_error() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;

let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();

let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
Protocol::Tcp,
)
.await;

// BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new(
// Must have at least one root for the request to create a stream
vec![Hash256::from_low_u64_be(0)],
&spec,
));

// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, 42, rpc_request.clone());
}
NetworkEvent::RPCFailed { error, id: 42, .. } => match error {
RPCError::Disconnected => return,
other => panic!("received unexpected error {:?}", other),
},
other => {
warn!(log, "Ignoring other event {:?}", other);
}
}
}
};

// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
// messages
let mut sending_peer = None;
let receiver_future = async {
loop {
// this future either drives the sending/receiving or times out allowing messages to be
// sent in the timeout
match futures::future::select(
Box::pin(receiver.next_event()),
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
)
.await
{
futures::future::Either::Left((ev, _)) => match ev {
NetworkEvent::RequestReceived { peer_id, .. } => {
sending_peer = Some(peer_id);
}
other => {
warn!(log, "Ignoring other event {:?}", other);
}
},
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
}

// if we need to send messages send them here. This will happen after a delay
if let Some(peer_id) = sending_peer.take() {
warn!(log, "Receiver got request, disconnecting peer");
receiver.__hard_disconnect_testing_only(peer_id);
}
}
};

tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
})
}

/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC
/// Goodbye message.
fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {
Expand Down
38 changes: 2 additions & 36 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,49 +307,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// A peer has disconnected.
/// If the peer has active batches, those are considered failed and re-requested.
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn peer_disconnected(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> Result<(), BackFillError> {
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
if matches!(
self.state(),
BackFillState::Failed | BackFillState::NotRequired
) {
return Ok(());
}

if let Some(batch_ids) = self.active_requests.remove(peer_id) {
// fail the batches
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
match batch.download_failed(false) {
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
}
Ok(BatchOperationOutcome::Continue) => {}
Err(e) => {
self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
}
}
// If we have run out of peers in which to retry this batch, the backfill state
// transitions to a paused state.
// We still need to reset the state for all the affected batches, so we should not
// short circuit early
if self.retry_batch_download(network, id).is_err() {
debug!(
self.log,
"Batch could not be retried";
"batch_id" => id,
"error" => "no synced peers"
);
}
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
self.active_requests.remove(peer_id);

// Remove the peer from the participation list
self.participating_peers.remove(peer_id);
Expand Down
15 changes: 6 additions & 9 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,16 +382,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Error responses */

pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
/* Check disconnection for single lookups */
self.single_block_lookups.retain(|_, req| {
let should_drop_lookup =
req.should_drop_lookup_on_disconnected_peer(peer_id );

if should_drop_lookup {
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?req.block_root());
self.single_block_lookups.retain(|_, lookup| {
if lookup.remove_peer(peer_id) {
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?lookup.block_root());
false
} else {
true
}

!should_drop_lookup
});
}

Expand Down
26 changes: 9 additions & 17 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
&& self.blob_request_state.state.is_processed()
}

/// Checks both the block and blob request states to see if the peer is disconnected.
///
/// Returns true if the lookup should be dropped.
pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool {
self.block_request_state.state.remove_peer(peer_id);
self.blob_request_state.state.remove_peer(peer_id);

if self.all_available_peers().count() == 0 {
return true;
}

// Note: if the peer disconnected happens to have an on-going request associated with this
// lookup we will receive an RPCError and the lookup will fail. No need to manually retry
// now.
false
/// Remove peer from available peers. Return true if there are no more available peers and all
/// requests are not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
self.block_request_state.state.remove_peer(peer_id)
&& self.blob_request_state.state.remove_peer(peer_id)
}
}

Expand Down Expand Up @@ -465,9 +455,11 @@ impl<T: Clone> SingleLookupRequestState<T> {
self.available_peers.insert(*peer_id)
}

/// If a peer disconnects, this request could be failed. If so, an error is returned
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) {
/// Remove peer from available peers. Return true if there are no more available peers and the
/// request is not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) -> bool {
self.available_peers.remove(disconnected_peer_id);
self.available_peers.is_empty() && self.is_awaiting_download()
}

pub fn get_used_peers(&self) -> impl Iterator<Item = &PeerId> {
Expand Down
21 changes: 19 additions & 2 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,25 @@ impl TestRig {
})
}

fn peer_disconnected(&mut self, peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(peer_id));
fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id));

// Return RPCErrors for all active requests of peer
self.drain_network_rx();
while let Ok(request_id) = self.pop_received_network_event(|ev| match ev {
NetworkMessage::SendRequest {
peer_id,
request_id: RequestId::Sync(id),
..
} if *peer_id == disconnected_peer_id => Some(*id),
_ => None,
}) {
self.send_sync_message(SyncMessage::RpcError {
peer_id: disconnected_peer_id,
request_id,
error: RPCError::Disconnected,
});
}
}

fn drain_network_rx(&mut self) {
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.range_sync.peer_disconnect(&mut self.network, peer_id);
self.block_lookups.peer_disconnected(peer_id);
// Regardless of the outcome, we update the sync status.
let _ = self
.backfill_sync
.peer_disconnected(peer_id, &mut self.network);
let _ = self.backfill_sync.peer_disconnected(peer_id);
self.update_sync_state();
}

Expand Down
26 changes: 2 additions & 24 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,30 +174,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {

/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> ProcessingResult {
if let Some(batch_ids) = self.peers.remove(peer_id) {
// fail the batches
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
if let BatchOperationOutcome::Failed { blacklist } =
batch.download_failed(true)?
{
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: id,
});
}
self.retry_batch_download(network, id)?;
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
self.peers.remove(peer_id);

if self.peers.is_empty() {
Err(RemoveChain::EmptyPeerPool)
Expand Down
5 changes: 2 additions & 3 deletions beacon_node/network/src/sync/range_sync/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,8 @@ where
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
/// retries. In this case, we need to remove the chain.
fn remove_peer(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {
for (removed_chain, sync_type, remove_reason) in self
.chains
.call_all(|chain| chain.remove_peer(peer_id, network))
for (removed_chain, sync_type, remove_reason) in
self.chains.call_all(|chain| chain.remove_peer(peer_id))
{
self.on_chain_removed(
removed_chain,
Expand Down

0 comments on commit b87c36a

Please sign in to comment.