Skip to content

Commit

Permalink
Errors for all RPC Requests (#5867)
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit 9699902
Author: dapplion <[email protected]>
Date:   Sun Jun 2 20:07:07 2024 +0200

    fix fmt

commit 05bddb9
Author: Age Manning <[email protected]>
Date:   Sun Jun 2 15:04:02 2024 +1000

    Update beacon_node/lighthouse_network/src/service/mod.rs

    Co-authored-by: João Oliveira <[email protected]>

commit a3c8e01
Author: Age Manning <[email protected]>
Date:   Thu May 30 17:38:35 2024 +1000

    Downgrade to 1.77

commit dbfde08
Author: Age Manning <[email protected]>
Date:   Thu May 30 17:34:11 2024 +1000

    Bump rust version to 1.78

commit 2e4fe3f
Author: Age Manning <[email protected]>
Date:   Thu May 30 17:26:50 2024 +1000

    Code improvement

commit 70e5326
Author: Age Manning <[email protected]>
Date:   Thu May 30 17:05:00 2024 +1000

    Report errors for rate limited requests

commit 7b0e346
Author: Age Manning <[email protected]>
Date:   Thu May 30 13:25:22 2024 +1000

    Return and error if peer has disconnected
  • Loading branch information
jimmygchen committed Jun 3, 2024
1 parent 83605e7 commit 1711836
Show file tree
Hide file tree
Showing 13 changed files with 453 additions and 413 deletions.
608 changes: 302 additions & 306 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ sysinfo = "0.26"
tempfile = "3"
tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.6", features = ["codec", "compat", "time"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "time"] }
tracing = "0.1.40"
tracing-appender = "0.2"
tracing-core = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.75.0-bullseye AS builder
FROM rust:1.78.0-bullseye AS builder
RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev
COPY . lighthouse
ARG FEATURES
Expand Down
31 changes: 4 additions & 27 deletions beacon_node/beacon_processor/src/work_reprocessing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::time::Duration;
use strum::AsRefStr;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::{EthSpec, Hash256, Slot};

Expand Down Expand Up @@ -196,8 +195,6 @@ enum InboundEvent {
ReadyLightClientUpdate(QueuedLightClientUpdateId),
/// A backfill batch that was queued is ready for processing.
ReadyBackfillSync(QueuedBackfillBatch),
/// A `DelayQueue` returned an error.
DelayQueueError(TimeError, &'static str),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage),
}
Expand Down Expand Up @@ -279,54 +276,42 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
// The sequential nature of blockchains means it is generally better to try and import all
// existing blocks before new ones.
match self.gossip_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyGossipBlock(
queued_block.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "gossip_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}

match self.rpc_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "rpc_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}

match self.attestations_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(attestation_id))) => {
Poll::Ready(Some(attestation_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyAttestation(
attestation_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "attestations_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}

match self.lc_updates_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(lc_id))) => {
Poll::Ready(Some(lc_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate(
lc_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
Expand Down Expand Up @@ -786,14 +771,6 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
}
InboundEvent::DelayQueueError(e, queue_name) => {
crit!(
log,
"Failed to poll queue";
"queue" => queue_name,
"e" => ?e
)
}
InboundEvent::ReadyAttestation(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = { workspace = true }
[dependencies]
discv5 = { workspace = true }
gossipsub = { workspace = true }
unsigned-varint = { version = "0.6", features = ["codec"] }
unsigned-varint = { version = "0.8", features = ["codec"] }
ssz_types = { workspace = true }
types = { workspace = true }
serde = { workspace = true }
Expand Down
92 changes: 33 additions & 59 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use libp2p::swarm::handler::{
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
};
use libp2p::swarm::Stream;
use slog::{crit, debug, trace, warn};
use slog::{crit, debug, trace};
use smallvec::SmallVec;
use std::{
collections::{hash_map::Entry, VecDeque},
Expand Down Expand Up @@ -414,70 +414,44 @@ where
}

// purge expired inbound substreams and send an error
loop {
match self.inbound_substreams_delay.poll_expired(cx) {
Poll::Ready(Some(Ok(inbound_id))) => {
// handle a stream timeout for various states
if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) {
// the delay has been removed
info.delay_key = None;
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
error: RPCError::StreamTimeout,
proto: info.protocol,
id: *inbound_id.get_ref(),
}));

if info.pending_items.back().map(|l| l.close_after()) == Some(false) {
// if the last chunk does not close the stream, append an error
info.pending_items.push_back(RPCCodedResponse::Error(
RPCResponseErrorCode::ServerError,
"Request timed out".into(),
));
}
}
}
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Inbound substream poll failed"; "error" => ?e);
// drops the peer if we cannot read the delay queue
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Close(RPCError::InternalError(
"Could not poll inbound stream timer",
)),

while let Poll::Ready(Some(inbound_id)) = self.inbound_substreams_delay.poll_expired(cx) {
// handle a stream timeout for various states
if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) {
// the delay has been removed
info.delay_key = None;
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
error: RPCError::StreamTimeout,
proto: info.protocol,
id: *inbound_id.get_ref(),
}));

if info.pending_items.back().map(|l| l.close_after()) == Some(false) {
// if the last chunk does not close the stream, append an error
info.pending_items.push_back(RPCCodedResponse::Error(
RPCResponseErrorCode::ServerError,
"Request timed out".into(),
));
}
Poll::Pending | Poll::Ready(None) => break,
}
}

// purge expired outbound substreams
loop {
match self.outbound_substreams_delay.poll_expired(cx) {
Poll::Ready(Some(Ok(outbound_id))) => {
if let Some(OutboundInfo { proto, req_id, .. }) =
self.outbound_substreams.remove(outbound_id.get_ref())
{
let outbound_err = HandlerErr::Outbound {
id: req_id,
proto,
error: RPCError::StreamTimeout,
};
// notify the user
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Err(outbound_err),
));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
}
}
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Outbound substream poll failed"; "error" => ?e);
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Close(RPCError::InternalError(
"Could not poll outbound stream timer",
)),
));
}
Poll::Pending | Poll::Ready(None) => break,
while let Poll::Ready(Some(outbound_id)) = self.outbound_substreams_delay.poll_expired(cx) {
if let Some(OutboundInfo { proto, req_id, .. }) =
self.outbound_substreams.remove(outbound_id.get_ref())
{
let outbound_err = HandlerErr::Outbound {
id: req_id,
proto,
error: RPCError::StreamTimeout,
};
// notify the user
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::Err(
outbound_err,
)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
}
}

Expand Down
35 changes: 33 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use libp2p::swarm::{
handler::ConnectionHandler, CloseConnection, ConnectionId, NetworkBehaviour, NotifyHandler,
ToSwarm,
};
use libp2p::swarm::{FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId;
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
use slog::{crit, debug, o};
Expand Down Expand Up @@ -283,9 +283,40 @@ where
Ok(handler)
}

fn on_swarm_event(&mut self, _event: FromSwarm) {
fn on_swarm_event(&mut self, event: FromSwarm) {
// NOTE: FromSwarm is a non exhaustive enum so updates should be based on release notes more
// than compiler feedback
// The self rate limiter holds on to requests and attempts to process them within our rate
// limits. If a peer disconnects whilst we are self-rate limiting, we want to terminate any
// pending requests and return an error response to the application.

if let FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
remaining_established,
connection_id,
..
}) = event
{
// If there are still connections remaining, do nothing.
if remaining_established > 0 {
return;
}
// Get a list of pending requests from the self rate limiter
if let Some(limiter) = self.self_limiter.as_mut() {
for (id, proto) in limiter.peer_disconnected(peer_id) {
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id: connection_id,
event: HandlerEvent::Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::Disconnected,
}),
});
self.events.push(error_msg);
}
}
}
}

fn on_connection_handler_event(
Expand Down
28 changes: 27 additions & 1 deletion beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,39 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
entry.remove();
}
}
// NOTE: There can be entries that have been removed due to peer disconnections, we simply
// ignore these messages here.
}

/// Informs the limiter that a peer has disconnected. This removes any pending requests and
/// returns their IDs.
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
// should never really be large. So we iterate for simplicity
let mut failed_requests = Vec::new();
self.delayed_requests
.retain(|(map_peer_id, protocol), queue| {
if map_peer_id == &peer_id {
// NOTE: Currently cannot remove entries from the DelayQueue, we will just let
// them expire and ignore them.
for message in queue {
failed_requests.push((message.request_id, *protocol))
}
// Remove the entry
false
} else {
// Keep the entry
true
}
});
failed_requests
}

pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<BehaviourAction<Id, E>> {
// First check the requests that were self rate limited, since those might add events to
// the queue. Also do this this before rate limiter prunning to avoid removing and
// immediately adding rate limiting keys.
if let Poll::Ready(Some(Ok(expired))) = self.next_peer_request.poll_expired(cx) {
if let Poll::Ready(Some(expired)) = self.next_peer_request.poll_expired(cx) {
let (peer_id, protocol) = expired.into_inner();
self.next_peer_request_ready(peer_id, protocol);
}
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/lighthouse_network/src/service/gossip_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl futures::stream::Stream for GossipCache {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.expirations.poll_expired(cx) {
Poll::Ready(Some(Ok(expired))) => {
Poll::Ready(Some(expired)) => {
let expected_key = expired.key();
let (topic, data) = expired.into_inner();
match self.topic_msgs.get_mut(&topic) {
Expand All @@ -259,7 +259,6 @@ impl futures::stream::Stream for GossipCache {
}
Poll::Ready(Some(Ok(topic)))
}
Poll::Ready(Some(Err(x))) => Poll::Ready(Some(Err(x.to_string()))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
Expand Down
15 changes: 13 additions & 2 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,12 +917,23 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
/* Eth2 RPC behaviour functions */

/// Send a request to a peer over RPC.
pub fn send_request(&mut self, peer_id: PeerId, request_id: AppReqId, request: Request) {
pub fn send_request(
&mut self,
peer_id: PeerId,
request_id: AppReqId,
request: Request,
) -> Result<(), (AppReqId, RPCError)> {
// Check if the peer is connected before sending an RPC request
if !self.swarm.is_connected(&peer_id) {
return Err((request_id, RPCError::Disconnected));
}

self.eth2_rpc_mut().send_request(
peer_id,
RequestId::Application(request_id),
request.into(),
)
);
Ok(())
}

/// Send a successful response to a peer over RPC.
Expand Down
Loading

0 comments on commit 1711836

Please sign in to comment.