Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(request-response): Add support for custom dial options when making request to disconnected peer #5692

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ libp2p-pnet = { version = "0.25.0", path = "transports/pnet" }
libp2p-quic = { version = "0.11.2", path = "transports/quic" }
libp2p-relay = { version = "0.18.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.15.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.27.1", path = "protocols/request-response" }
libp2p-request-response = { version = "0.28.0", path = "protocols/request-response" }
libp2p-server = { version = "0.12.8", path = "misc/server" }
libp2p-stream = { version = "0.2.0-alpha.1", path = "protocols/stream" }
libp2p-swarm = { version = "0.45.2", path = "swarm" }
Expand Down
21 changes: 15 additions & 6 deletions protocols/autonat/src/v1/behaviour/as_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,20 @@ impl HandleInnerEvent for AsClient<'_> {
error,
request_id,
} => {
tracing::debug!(
%peer,
"Outbound Failure {} when on dial-back request to peer.",
error,
);
if let Some(peer) = peer {
tracing::debug!(
%peer,
%request_id,
"Outbound Failure {} when on dial-back request to peer.",
error,
);
} else {
tracing::debug!(
%request_id,
"Outbound Failure {} when on dial-back request to peer.",
error,
);
}
let probe_id = self
.ongoing_outbound
.remove(&request_id)
Expand All @@ -169,7 +178,7 @@ impl HandleInnerEvent for AsClient<'_> {
VecDeque::from([ToSwarm::GenerateEvent(Event::OutboundProbe(
OutboundProbeEvent::Error {
probe_id,
peer: Some(peer),
peer,
error: OutboundProbeError::OutboundRequest(error),
},
))])
Expand Down
5 changes: 5 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.28.0

- `send_request` Add support for custom dial options when making request to disconnected peer.
See [PR 5692](https://github.com/libp2p/rust-libp2p/pull/5692).

## 0.27.1

- Deprecate `void` crate.
Expand Down
4 changes: 2 additions & 2 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-request-response"
edition = "2021"
rust-version = { workspace = true }
description = "Generic Request/Response Protocols"
version = "0.27.1"
version = "0.28.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -19,7 +19,7 @@ libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
rand = "0.8"
serde = { version = "1.0", optional = true}
serde = { version = "1.0", optional = true }
serde_json = { version = "1.0.117", optional = true }
smallvec = "1.13.2"
tracing = { workspace = true }
Expand Down
125 changes: 86 additions & 39 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
/// An outbound request failed.
OutboundFailure {
/// The peer to whom the request was sent.
peer: PeerId,
peer: Option<PeerId>,
/// The (local) ID of the failed request.
request_id: OutboundRequestId,
/// The error that occurred.
Expand Down Expand Up @@ -333,6 +333,24 @@ impl Config {
}
}

#[derive(Debug, Eq, PartialEq, Hash)]
enum PendingOutgoingRequest {
PeerId(PeerId),
ConnectionId(ConnectionId),
}

impl From<PeerId> for PendingOutgoingRequest {
fn from(peer_id: PeerId) -> Self {
Self::PeerId(peer_id)
}
}

impl From<ConnectionId> for PendingOutgoingRequest {
fn from(connection_id: ConnectionId) -> Self {
Self::ConnectionId(connection_id)
}
}

/// A request/response protocol for some message codec.
pub struct Behaviour<TCodec>
where
Expand Down Expand Up @@ -360,7 +378,8 @@ where
addresses: PeerAddresses,
/// Requests that have not yet been sent and are waiting for a connection
/// to be established.
pending_outbound_requests: HashMap<PeerId, SmallVec<[OutboundMessage<TCodec>; 10]>>,
pending_outbound_requests:
HashMap<PendingOutgoingRequest, SmallVec<[OutboundMessage<TCodec>; 10]>>,
}

impl<TCodec> Behaviour<TCodec>
Expand Down Expand Up @@ -417,28 +436,45 @@ where
/// connection is established.
///
/// > **Note**: In order for such a dialing attempt to succeed,
/// > the `RequestResonse` protocol must either be embedded
/// > in another `NetworkBehaviour` that provides peer and
/// > address discovery, or known addresses of peers must be
/// > managed via [`Behaviour::add_address`] and
/// > the `peer` must be [`DialOpts`] with multiaddresses or
/// > in case of simple [`PeerId`] `RequestResponse` protocol
/// > must either be embedded in another `NetworkBehaviour`
/// > that provides peer and address discovery, or known addresses of
/// > peers must be managed via [`Behaviour::add_address`] and
/// > [`Behaviour::remove_address`].
pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
pub fn send_request<Peer>(&mut self, peer: Peer, request: TCodec::Request) -> OutboundRequestId
where
DialOpts: From<Peer>,
{
Comment on lines 438 to +448
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current description and usage of peer is a bit confusing IMO as it can be a peer id,multiaddress, or DialOpts.

What do you think of adding instead a separate function:
pub fn send_request_with_dial_ops(&mut self, dial_ops: DialOpts, ...).

That way no From<&PeerId> for DialOpts or breaking change on send_request would be needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed various options during community call and my understanding was that it is desirable to not have multiple methods and it is also desirable to keep it backwards compatible in terms of API. I do not have strong opinion though and happy to change if necessary.

let request_id = self.next_outbound_request_id();
let request = OutboundMessage {
request_id,
request,
protocols: self.outbound_protocols.clone(),
};

if let Some(request) = self.try_send_request(peer, request) {
self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(*peer).build(),
});
self.pending_outbound_requests
.entry(*peer)
.or_default()
.push(request);
}
let opts = DialOpts::from(peer);
let maybe_peer_id = opts.get_peer_id();
let request = if let Some(peer_id) = &maybe_peer_id {
if let Some(request) = self.try_send_request(peer_id, request) {
request
} else {
// Sent successfully
return request_id;
}
} else {
request
};

self.pending_outbound_requests
.entry(if let Some(peer_id) = maybe_peer_id {
peer_id.into()
} else {
opts.connection_id().into()
})
.or_default()
.push(request);
self.pending_events.push_back(ToSwarm::Dial { opts });

request_id
}
Expand Down Expand Up @@ -506,7 +542,7 @@ where
// Check if request is still pending to be sent.
let pen_conn = self
.pending_outbound_requests
.get(peer)
.get(&PendingOutgoingRequest::from(*peer))
.map(|rps| rps.iter().any(|rp| rp.request_id == *request_id))
.unwrap_or(false);

Expand Down Expand Up @@ -665,30 +701,41 @@ where
for request_id in connection.pending_outbound_responses {
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer: peer_id,
peer: Some(peer_id),
request_id,
error: OutboundFailure::ConnectionClosed,
}));
}
}

fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) {
if let Some(peer) = peer_id {
// If there are pending outgoing requests when a dial failure occurs,
// it is implied that we are not connected to the peer, since pending
// outgoing requests are drained when a connection is established and
// only created when a peer is not connected when a request is made.
// Thus these requests must be considered failed, even if there is
// another, concurrent dialing attempt ongoing.
if let Some(pending) = self.pending_outbound_requests.remove(&peer) {
for request in pending {
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer,
request_id: request.request_id,
error: OutboundFailure::DialFailure,
}));
}
fn on_dial_failure(
&mut self,
DialFailure {
peer_id,
connection_id,
..
}: DialFailure,
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved
) {
let key = if let Some(peer_id) = peer_id {
peer_id.into()
} else {
connection_id.into()
};

// If there are pending outgoing requests when a dial failure occurs,
// it is implied that we are not connected to the peer, since pending
// outgoing requests are drained when a connection is established and
// only created when a peer is not connected when a request is made.
// Thus, these requests must be considered failed, even if there is
// another, concurrent dialing attempt ongoing.
if let Some(pending) = self.pending_outbound_requests.remove(&key) {
for request in pending {
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer: peer_id,
request_id: request.request_id,
error: OutboundFailure::DialFailure,
}));
}
}
}
Expand All @@ -703,7 +750,7 @@ where
) {
let mut connection = Connection::new(connection_id, remote_address);

if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer.into()) {
for request in pending_requests {
connection
.pending_outbound_responses
Expand Down Expand Up @@ -887,7 +934,7 @@ where

self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer,
peer: Some(peer),
request_id,
error: OutboundFailure::Timeout,
}));
Expand All @@ -901,7 +948,7 @@ where

self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer,
peer: Some(peer),
request_id,
error: OutboundFailure::UnsupportedProtocols,
}));
Expand All @@ -912,7 +959,7 @@ where

self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer,
peer: Some(peer),
request_id,
error: OutboundFailure::Io(error),
}))
Expand Down
16 changes: 8 additions & 8 deletions protocols/request-response/tests/error_reporting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn report_outbound_failure_on_read_response() {
.send_request(&peer1_id, Action::FailOnReadResponse);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);

let error = match error {
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn report_outbound_failure_on_write_request() {
.send_request(&peer1_id, Action::FailOnWriteRequest);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);

let error = match error {
Expand Down Expand Up @@ -151,7 +151,7 @@ async fn report_outbound_timeout_on_read_response() {
.send_request(&peer1_id, Action::TimeoutOnReadResponse);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);
assert!(matches!(error, OutboundFailure::Timeout));
};
Expand Down Expand Up @@ -203,7 +203,7 @@ async fn report_outbound_failure_on_max_streams() {
.send_request(&peer1_id, Action::FailOnMaxStreams);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, outbound_req_id);
assert!(matches!(error, OutboundFailure::Io(_)));
};
Expand Down Expand Up @@ -236,7 +236,7 @@ async fn report_inbound_failure_on_read_request() {
.send_request(&peer1_id, Action::FailOnReadRequest);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);

match error {
Expand Down Expand Up @@ -295,7 +295,7 @@ async fn report_inbound_failure_on_write_response() {
.send_request(&peer1_id, Action::FailOnWriteResponse);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);

match error {
Expand Down Expand Up @@ -352,7 +352,7 @@ async fn report_inbound_timeout_on_write_response() {
.send_request(&peer1_id, Action::TimeoutOnWriteResponse);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);

match error {
Expand Down Expand Up @@ -612,7 +612,7 @@ async fn wait_inbound_failure(

async fn wait_outbound_failure(
swarm: &mut Swarm<request_response::Behaviour<TestCodec>>,
) -> Result<(PeerId, OutboundRequestId, OutboundFailure)> {
) -> Result<(Option<PeerId>, OutboundRequestId, OutboundFailure)> {
loop {
match swarm.select_next_some().await.try_into_behaviour_event() {
Ok(request_response::Event::OutboundFailure {
Expand Down
Loading
Loading