Skip to content

Commit

Permalink
fix(network): Limit number of peer connections per IP address, Ignore…
Browse files Browse the repository at this point in the history
… new peer connections from the same IP and port (#6980)

* Limits num peer conns per ip

* Update zebra-network/src/peer_set/set.rs

* Update zebra-network/src/constants.rs

* Apply suggestions from code review

Co-authored-by: teor <[email protected]>

* Keep old peer connections, rather than replacing them with new connections

* Adds max_conns_per_ip field

Configures the max to usize::MAX for some tests.

* Adds a test to check that max_conns_per_ip is enforced

---------

Co-authored-by: teor <[email protected]>
  • Loading branch information
arya2 and teor2345 authored Jun 19, 2023
1 parent 0da2dcb commit 73ce8fb
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 1 deletion.
4 changes: 4 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub const INBOUND_PEER_LIMIT_MULTIPLIER: usize = 5;
/// See [`INBOUND_PEER_LIMIT_MULTIPLIER`] for details.
pub const OUTBOUND_PEER_LIMIT_MULTIPLIER: usize = 3;

/// The maximum number of peer connections Zebra will keep for a given IP address
/// before it drops any additional peer connections with that IP.
pub const MAX_CONNS_PER_IP: usize = 3;

/// The buffer size for the peer set.
///
/// This should be greater than 1 to avoid sender contention, but also reasonably
Expand Down
1 change: 1 addition & 0 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ where
inv_receiver,
address_metrics,
MinimumPeerVersion::new(latest_chain_tip, config.network),
None,
);
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);

Expand Down
51 changes: 50 additions & 1 deletion zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ where

/// The last time we logged a message about the peer set size
last_peer_log: Option<Instant>,

/// The configured maximum number of peers that can be in the
/// peer set per IP, defaults to [`crate::constants::MAX_CONNS_PER_IP`]
max_conns_per_ip: usize,
}

impl<D, C> Drop for PeerSet<D, C>
Expand All @@ -270,6 +274,7 @@ where
D::Error: Into<BoxError>,
C: ChainTip,
{
#[allow(clippy::too_many_arguments)]
/// Construct a peerset which uses `discover` to manage peer connections.
///
/// Arguments:
Expand All @@ -282,6 +287,9 @@ where
/// - `inv_stream`: receives inventory changes from peers,
/// allowing the peer set to direct inventory requests;
/// - `address_book`: when peer set is busy, it logs address book diagnostics.
/// - `minimum_peer_version`: endpoint to see the minimum peer protocol version in real time.
/// - `max_conns_per_ip`: configured maximum number of peers that can be in the
/// peer set per IP, defaults to [`crate::constants::MAX_CONNS_PER_IP`].
pub fn new(
config: &Config,
discover: D,
Expand All @@ -290,6 +298,7 @@ where
inv_stream: broadcast::Receiver<InventoryChange>,
address_metrics: watch::Receiver<AddressMetrics>,
minimum_peer_version: MinimumPeerVersion<C>,
max_conns_per_ip: Option<usize>,
) -> Self {
Self {
// New peers
Expand Down Expand Up @@ -317,6 +326,8 @@ where
// Metrics
last_peer_log: None,
address_metrics,

max_conns_per_ip: max_conns_per_ip.unwrap_or(crate::constants::MAX_CONNS_PER_IP),
}
}

Expand Down Expand Up @@ -476,6 +487,26 @@ where
}
}

/// Returns the number of peer connections Zebra already has with
/// the provided IP address
///
/// # Performance
///
/// This method is `O(connected peers)`, so it should not be called from a loop
/// that is already iterating through the peer set.
fn num_peers_with_ip(&self, ip: IpAddr) -> usize {
self.ready_services
.keys()
.chain(self.cancel_handles.keys())
.filter(|addr| addr.ip() == ip)
.count()
}

/// Returns `true` if Zebra is already connected to the IP and port in `addr`.
fn has_peer_with_addr(&self, addr: PeerSocketAddr) -> bool {
self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr)
}

/// Checks for newly inserted or removed services.
///
/// Puts inserted services in the unready list.
Expand All @@ -496,7 +527,25 @@ where
// - always do the same checks on every ready peer, and
// - check for any errors that happened right after the handshake
trace!(?key, "got Change::Insert from Discover");
self.remove(&key);

// # Security
//
// Drop the new peer if we are already connected to it.
// Preferring old connections avoids connection thrashing.
if self.has_peer_with_addr(key) {
std::mem::drop(svc);
continue;
}

// # Security
//
// drop the new peer if there are already `MAX_CONNS_PER_IP` peers with
// the same IP address in the peer set.
if self.num_peers_with_ip(key.ip()) >= self.max_conns_per_ip {
std::mem::drop(svc);
continue;
}

self.push_unready(key, svc);
}
}
Expand Down
24 changes: 24 additions & 0 deletions zebra-network/src/peer_set/set/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ struct PeerSetBuilder<D, C> {
inv_stream: Option<broadcast::Receiver<InventoryChange>>,
address_book: Option<Arc<std::sync::Mutex<AddressBook>>>,
minimum_peer_version: Option<MinimumPeerVersion<C>>,
max_conns_per_ip: Option<usize>,
}

impl PeerSetBuilder<(), ()> {
Expand All @@ -137,6 +138,7 @@ impl<D, C> PeerSetBuilder<D, C> {
inv_stream: self.inv_stream,
address_book: self.address_book,
minimum_peer_version: self.minimum_peer_version,
max_conns_per_ip: self.max_conns_per_ip,
}
}

Expand All @@ -146,13 +148,33 @@ impl<D, C> PeerSetBuilder<D, C> {
minimum_peer_version: MinimumPeerVersion<NewC>,
) -> PeerSetBuilder<D, NewC> {
PeerSetBuilder {
config: self.config,
discover: self.discover,
demand_signal: self.demand_signal,
handle_rx: self.handle_rx,
inv_stream: self.inv_stream,
address_book: self.address_book,
minimum_peer_version: Some(minimum_peer_version),
max_conns_per_ip: self.max_conns_per_ip,
}
}

/// Use the provided [`MinimumPeerVersion`] instance when constructing the [`PeerSet`] instance.
pub fn max_conns_per_ip(self, max_conns_per_ip: usize) -> PeerSetBuilder<D, C> {
assert!(
max_conns_per_ip > 0,
"max_conns_per_ip must be greater than zero"
);

PeerSetBuilder {
config: self.config,
discover: self.discover,
demand_signal: self.demand_signal,
handle_rx: self.handle_rx,
inv_stream: self.inv_stream,
address_book: self.address_book,
minimum_peer_version: self.minimum_peer_version,
max_conns_per_ip: Some(max_conns_per_ip),
}
}
}
Expand All @@ -175,6 +197,7 @@ where
let minimum_peer_version = self
.minimum_peer_version
.expect("`minimum_peer_version` must be set");
let max_conns_per_ip = self.max_conns_per_ip;

let demand_signal = self
.demand_signal
Expand All @@ -196,6 +219,7 @@ where
inv_stream,
address_metrics,
minimum_peer_version,
max_conns_per_ip,
);

(peer_set, guard)
Expand Down
5 changes: 5 additions & 0 deletions zebra-network/src/peer_set/set/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ proptest! {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version)
.max_conns_per_ip(usize::MAX)
.build();

check_if_only_up_to_date_peers_are_live(
Expand Down Expand Up @@ -72,6 +73,7 @@ proptest! {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();

check_if_only_up_to_date_peers_are_live(
Expand Down Expand Up @@ -122,6 +124,7 @@ proptest! {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();

// Get the total number of active peers
Expand Down Expand Up @@ -197,6 +200,7 @@ proptest! {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();

// Remove peers, test broadcast until there is only 1 peer left in the peerset
Expand Down Expand Up @@ -267,6 +271,7 @@ proptest! {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();

// Remove peers
Expand Down
49 changes: 49 additions & 0 deletions zebra-network/src/peer_set/set/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,55 @@ fn peer_set_ready_multiple_connections() {
});
}

#[test]
fn peer_set_rejects_connections_past_per_ip_limit() {
const NUM_PEER_VERSIONS: usize = crate::constants::MAX_CONNS_PER_IP + 1;

// Use three peers with the same version
let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Nu5);
let peer_versions = PeerVersions {
peer_versions: [peer_version; NUM_PEER_VERSIONS].into_iter().collect(),
};

// Start the runtime
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();

// Pause the runtime's timer so that it advances automatically.
//
// CORRECTNESS: This test does not depend on external resources that could really timeout, like
// real network connections.
tokio::time::pause();

// Get peers and client handles of them
let (discovered_peers, handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet);

// Make sure we have the right number of peers
assert_eq!(handles.len(), NUM_PEER_VERSIONS);

runtime.block_on(async move {
// Build a peerset
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();

// Get peerset ready
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");

// Check we have the right amount of ready services
assert_eq!(
peer_ready.ready_services.len(),
crate::constants::MAX_CONNS_PER_IP
);
});
}

/// Check that a peer set with an empty inventory registry routes requests to a random ready peer.
#[test]
fn peer_set_route_inv_empty_registry() {
Expand Down

0 comments on commit 73ce8fb

Please sign in to comment.