Skip to content

Commit

Permalink
disable connection handler when queue is full
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Nov 17, 2023
1 parent 3826831 commit 87a33d6
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 19 deletions.
5 changes: 5 additions & 0 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,11 @@ impl ConfigBuilder {
self
}

pub fn handler_max_queue(&mut self, len: usize) -> &mut Self {
self.config.protocol.max_queue_len = len;
self
}

/// Published message ids time cache duration. The default is 10 seconds.
pub fn published_message_ids_cache_time(
&mut self,
Expand Down
52 changes: 33 additions & 19 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ use std::{
};
use void::Void;

/// Number of messages in the send queue after which we report the peer back
/// to the application.
pub const SEND_QUEUE_DROP_LIMIT: usize = 100;

/// The event emitted by the Handler. This informs the behaviour of various events created
/// by the handler.
#[derive(Debug)]
Expand Down Expand Up @@ -145,6 +141,9 @@ pub enum DisabledHandler {
/// The maximum number of inbound or outbound substream attempts have happened and thereby the
/// handler has been disabled.
MaxSubstreamAttempts,

/// Peer is too slow delivering messages.
TooSlow { peer_reported: bool },
}

/// State of the inbound substream, opened either by us or by the remote.
Expand Down Expand Up @@ -245,13 +244,6 @@ impl EnabledHandler {
}
}

if self.send_queue.len() > SEND_QUEUE_DROP_LIMIT {
self.send_queue.clear();
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::ReportPeer,
));
}

// determine if we need to create the outbound stream
if !self.send_queue.is_empty()
&& self.outbound_substream.is_none()
Expand Down Expand Up @@ -428,15 +420,24 @@ impl ConnectionHandler for Handler {

fn on_behaviour_event(&mut self, message: HandlerIn) {
match self {
Handler::Enabled(handler) => match message {
HandlerIn::Message(m) => handler.send_queue.push(m),
HandlerIn::JoinedMesh => {
handler.in_mesh = true;
Handler::Enabled(handler) => {
if handler.send_queue.len() > handler.listen_protocol.max_queue_len {
*self = Handler::Disabled(DisabledHandler::TooSlow {
peer_reported: false,
});
return;
}
HandlerIn::LeftMesh => {
handler.in_mesh = false;

match message {
HandlerIn::Message(m) => handler.send_queue.push(m),
HandlerIn::JoinedMesh => {
handler.in_mesh = true;
}
HandlerIn::LeftMesh => {
handler.in_mesh = false;
}
}
},
}
Handler::Disabled(_) => {
log::debug!("Handler is disabled. Dropping message {:?}", message);
}
Expand Down Expand Up @@ -487,7 +488,20 @@ impl ConnectionHandler for Handler {

Poll::Pending
}
Handler::Disabled(DisabledHandler::MaxSubstreamAttempts) => Poll::Pending,
Handler::Disabled(DisabledHandler::TooSlow {
peer_reported: false,
}) => {
*self = Handler::Disabled(DisabledHandler::TooSlow {
peer_reported: true,
});
Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::ReportPeer,
))
}
Handler::Disabled(DisabledHandler::MaxSubstreamAttempts)
| Handler::Disabled(DisabledHandler::TooSlow {
peer_reported: true,
}) => Poll::Pending,
}
}

Expand Down
4 changes: 4 additions & 0 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub struct ProtocolConfig {
pub(crate) max_transmit_size: usize,
/// Determines the level of validation to be done on incoming messages.
pub(crate) validation_mode: ValidationMode,
/// /// Number of messages in the `ConnectionHandler` send queue after which
/// we report the peer back to the application.
pub(crate) max_queue_len: usize,
}

impl Default for ProtocolConfig {
Expand All @@ -72,6 +75,7 @@ impl Default for ProtocolConfig {
max_transmit_size: 65536,
validation_mode: ValidationMode::Strict,
protocol_ids: vec![GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL],
max_queue_len: 1000,
}
}
}
Expand Down

0 comments on commit 87a33d6

Please sign in to comment.