Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(security): Randomly drop connections when inbound service is overloaded #6790

Merged
merged 14 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,25 @@ pub const EWMA_DECAY_TIME_NANOS: f64 = 200.0 * NANOS_PER_SECOND;
/// The number of nanoseconds in one second.
const NANOS_PER_SECOND: f64 = 1_000_000_000.0;

/// The duration it takes for the drop probability of an overloaded connection to
/// reach [`MIN_OVERLOAD_DROP_PROBABILITY`].
///
/// Peer connections that receive multiple overloads have a higher probability of being dropped.
///
/// The probability of a connection being dropped gradually decreases during this interval
/// until it reaches the default drop probability ([`MIN_OVERLOAD_DROP_PROBABILITY`]).
///
/// Increasing this number increases the rate at which connections are dropped.
pub const OVERLOAD_PROTECTION_INTERVAL: Duration = MIN_INBOUND_PEER_CONNECTION_INTERVAL;

/// The minimum probability of dropping a peer connection when it receives an
/// [`Overloaded`](crate::PeerError::Overloaded) error.
pub const MIN_OVERLOAD_DROP_PROBABILITY: f32 = 0.05;

/// The maximum probability of dropping a peer connection when it receives an
/// [`Overloaded`](crate::PeerError::Overloaded) error.
pub const MAX_OVERLOAD_DROP_PROBABILITY: f32 = 0.95;

lazy_static! {
/// The minimum network protocol version accepted by this crate for each network,
/// represented as a network upgrade.
Expand Down
125 changes: 105 additions & 20 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
//! And it's unclear if these assumptions match the `zcashd` implementation.
//! It should be refactored into a cleaner set of request/response pairs (#1515).

use std::{borrow::Cow, collections::HashSet, fmt, pin::Pin, sync::Arc};
use std::{borrow::Cow, collections::HashSet, fmt, pin::Pin, sync::Arc, time::Instant};

use futures::{
future::{self, Either},
prelude::*,
stream::Stream,
};
use rand::{thread_rng, Rng};
use tokio::time::{sleep, Sleep};
use tower::Service;
use tower::{load_shed::error::Overloaded, Service, ServiceExt};
use tracing_futures::Instrument;

use zebra_chain::{
Expand All @@ -25,7 +26,10 @@ use zebra_chain::{
};

use crate::{
constants,
constants::{
self, MAX_OVERLOAD_DROP_PROBABILITY, MIN_OVERLOAD_DROP_PROBABILITY,
OVERLOAD_PROTECTION_INTERVAL,
},
meta_addr::MetaAddr,
peer::{
connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver,
Expand Down Expand Up @@ -508,6 +512,11 @@ pub struct Connection<S, Tx> {

/// The state for this peer, when the metrics were last updated.
pub(super) last_metrics_state: Option<Cow<'static, str>>,

/// The time of the last overload error response from the inbound
/// service to a request from this connection,
/// or None if this connection hasn't yet received an overload error.
last_overload_time: Option<Instant>,
}

impl<S, Tx> fmt::Debug for Connection<S, Tx> {
Expand Down Expand Up @@ -549,6 +558,7 @@ impl<S, Tx> Connection<S, Tx> {
connection_tracker,
metrics_label,
last_metrics_state: None,
last_overload_time: None,
}
}
}
Expand Down Expand Up @@ -1242,7 +1252,6 @@ where
/// of connected peers.
async fn drive_peer_request(&mut self, req: Request) {
trace!(?req);
use tower::{load_shed::error::Overloaded, ServiceExt};

// Add a metric for inbound requests
metrics::counter!(
Expand All @@ -1258,29 +1267,18 @@ where
tokio::task::yield_now().await;

if self.svc.ready().await.is_err() {
// Treat all service readiness errors as Overloaded
// TODO: treat `TryRecvError::Closed` in `Inbound::poll_ready` as a fatal error (#1655)
self.fail_with(PeerError::Overloaded);
self.fail_with(PeerError::ServiceShutdown);
return;
}

let rsp = match self.svc.call(req.clone()).await {
Err(e) => {
if e.is::<Overloaded>() {
tracing::info!(
remote_user_agent = ?self.connection_info.remote.user_agent,
negotiated_version = ?self.connection_info.negotiated_version,
peer = ?self.metrics_label,
last_peer_state = ?self.last_metrics_state,
// TODO: remove this detailed debug info once #6506 is fixed
remote_height = ?self.connection_info.remote.start_height,
cached_addrs = ?self.cached_addrs.len(),
connection_state = ?self.state,
"inbound service is overloaded, closing connection",
);
tracing::debug!("inbound service is overloaded, may close connection");

metrics::counter!("pool.closed.loadshed", 1);
self.fail_with(PeerError::Overloaded);
let now = Instant::now();

self.handle_inbound_overload(req, now).await;
} else {
// We could send a reject to the remote peer, but that might cause
// them to disconnect, and we might be using them to sync blocks.
Expand All @@ -1292,7 +1290,9 @@ where
client_receiver = ?self.client_rx,
"error processing peer request",
);
self.update_state_metrics(format!("In::Req::{}/Rsp::Error", req.command()));
}

return;
}
Ok(rsp) => rsp,
Expand All @@ -1307,6 +1307,7 @@ where
);
self.update_state_metrics(format!("In::Rsp::{}", rsp.command()));

// TODO: split response handler into its own method
match rsp.clone() {
Response::Nil => { /* generic success, do nothing */ }
Response::Peers(addrs) => {
Expand Down Expand Up @@ -1412,6 +1413,90 @@ where
// before checking the connection for the next inbound or outbound request.
tokio::task::yield_now().await;
}

/// Handle inbound service overload error responses by randomly terminating some connections.
///
/// # Security
///
/// When the inbound service is overloaded with requests, Zebra needs to drop some connections,
/// to reduce the load on the application. But dropping every connection that receives an
/// `Overloaded` error from the inbound service could cause Zebra to drop too many peer
/// connections, and stop itself downloading blocks or transactions.
///
/// Malicious or misbehaving peers can also overload the inbound service, and make Zebra drop
/// its connections to other peers.
///
/// So instead, Zebra drops some overloaded connections at random. If a connection has recently
/// overloaded the inbound service, it is more likely to be dropped. This makes it harder for a
/// single peer (or multiple peers) to perform a denial of service attack.
///
/// The inbound connection rate-limit also makes it hard for multiple peers to perform this
/// attack, because each inbound connection can only send one inbound request before its
/// probability of being disconnected increases.
async fn handle_inbound_overload(&mut self, req: Request, now: Instant) {
let prev = self.last_overload_time.replace(now);
let drop_connection_probability = overload_drop_connection_probability(now, prev);

if thread_rng().gen::<f32>() < drop_connection_probability {
metrics::counter!("pool.closed.loadshed", 1);

tracing::info!(
drop_connection_probability,
remote_user_agent = ?self.connection_info.remote.user_agent,
negotiated_version = ?self.connection_info.negotiated_version,
peer = ?self.metrics_label,
last_peer_state = ?self.last_metrics_state,
// TODO: remove this detailed debug info once #6506 is fixed
remote_height = ?self.connection_info.remote.start_height,
cached_addrs = ?self.cached_addrs.len(),
connection_state = ?self.state,
"inbound service is overloaded, closing connection",
);

self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Error", req.command()));
self.fail_with(PeerError::Overloaded);
} else {
self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Ignored", req.command()));
metrics::counter!("pool.ignored.loadshed", 1);
}
}
}

/// Returns the probability of dropping a connection where the last overload was at `prev`,
/// and the current overload is `now`.
///
/// # Security
///
/// Connections that haven't seen an overload error in the past OVERLOAD_PROTECTION_INTERVAL
/// have a small chance of being closed (MIN_OVERLOAD_DROP_PROBABILITY).
///
/// Connections that have seen a previous overload error in that time
/// have a higher chance of being dropped up to MAX_OVERLOAD_DROP_PROBABILITY.
/// This probability increases quadratically, so peers that send lots of inbound
/// requests are more likely to be dropped.
///
/// ## Examples
///
/// If a connection sends multiple overloads close together, it is very likely to be
/// disconnected. If a connection has two overloads multiple seconds apart, it is unlikely
/// to be disconnected.
fn overload_drop_connection_probability(now: Instant, prev: Option<Instant>) -> f32 {
let Some(prev) = prev else {
return MIN_OVERLOAD_DROP_PROBABILITY;
};

let protection_fraction_since_last_overload =
(now - prev).as_secs_f32() / OVERLOAD_PROTECTION_INTERVAL.as_secs_f32();

// Quadratically increase the disconnection probability for very recent overloads.
// Negative values are ignored by clamping to MIN_OVERLOAD_DROP_PROBABILITY.
let overload_fraction = protection_fraction_since_last_overload.powi(2);

let probability_range = MAX_OVERLOAD_DROP_PROBABILITY - MIN_OVERLOAD_DROP_PROBABILITY;
let raw_drop_probability =
MAX_OVERLOAD_DROP_PROBABILITY - (overload_fraction * probability_range);

raw_drop_probability.clamp(MIN_OVERLOAD_DROP_PROBABILITY, MAX_OVERLOAD_DROP_PROBABILITY)
}

impl<S, Tx> Connection<S, Tx> {
Expand Down
Loading