Skip to content

Commit

Permalink
Move UtpStreamId inside UtpSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
ogenev committed May 25, 2022
1 parent f3bfeab commit 5c04aa2
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 20 deletions.
4 changes: 3 additions & 1 deletion trin-core/src/portalnet/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
portalnet::types::content_key::RawContentKey,
utp::{
stream::{UtpListenerEvent, UtpListenerRequest, UtpSocket, BUF_SIZE},
trin_helpers::{UtpAccept, UtpMessage},
trin_helpers::{UtpAccept, UtpMessage, UtpStreamId},
},
};
use discv5::{
Expand Down Expand Up @@ -299,6 +299,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
conn_id,
enr.node_id(),
self.protocol.clone(),
UtpStreamId::FindContentStream,
tx,
))
.map_err(|err| {
Expand Down Expand Up @@ -412,6 +413,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
conn_id,
enr.node_id(),
self.protocol.clone(),
UtpStreamId::OfferStream,
tx,
)).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?;

Expand Down
21 changes: 15 additions & 6 deletions trin-core/src/portalnet/overlay_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,16 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
&self,
source: &NodeId,
conn_id_recv: u16,
stream_id: UtpStreamId,
) -> Result<(), OverlayRequestError> {
if let Some(enr) = self.discovery.discv5.find_enr(source) {
// Initialize active uTP stream with requested note
let utp_request =
UtpListenerRequest::AddActiveConnection(enr, self.protocol.clone(), conn_id_recv);
let utp_request = UtpListenerRequest::AddActiveConnection(
enr,
self.protocol.clone(),
stream_id,
conn_id_recv,
);
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(OverlayRequestError::UtpError(format!(
"Unable to send uTP AddActiveConnection request: {err}"
Expand Down Expand Up @@ -664,7 +669,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
// listen for incoming connection request on conn_id, as part of utp handshake and
// temporarily storing content data, so we can send it right after we receive
// SYN packet from the requester
let utp_request = UtpListenerRequest::FindContentData(conn_id, content);
let utp_request = UtpListenerRequest::FindContentData(conn_id, content.clone());
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(OverlayRequestError::UtpError(format!(
"Unable to send uTP FindContentData stream request: {err}"
Expand All @@ -681,7 +686,11 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
)));
}

self.add_utp_connection(source, conn_id_recv)?;
self.add_utp_connection(
source,
conn_id_recv,
UtpStreamId::FindContentData(content),
)?;

// Connection id is send as BE because uTP header values are stored also as BE
Ok(Content::ConnectionId(conn_id.to_be()))
Expand Down Expand Up @@ -751,14 +760,14 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
// also listen on conn_id + 1 because this is the actual receive path for acceptor
let conn_id_recv = conn_id.wrapping_add(1);

let utp_request = UtpListenerRequest::AcceptStream(conn_id_recv, accept_keys);
let utp_request = UtpListenerRequest::AcceptStream(conn_id_recv, accept_keys.clone());
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(OverlayRequestError::UtpError(format!(
"Unable to send uTP Accept stream request: {err}"
)));
}

self.add_utp_connection(source, conn_id)?;
self.add_utp_connection(source, conn_id, UtpStreamId::AcceptStream(accept_keys))?;

let accept = Accept {
connection_id: conn_id.to_be(),
Expand Down
46 changes: 36 additions & 10 deletions trin-core/src/utp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
locks::RwLoggingExt,
portalnet::types::{
content_key::RawContentKey,
messages::{ByteList, Content::Content, ProtocolId},
messages::{ByteList, ProtocolId},
},
utp::{
packets::{ExtensionType, Packet, PacketType, HEADER_SIZE},
Expand Down Expand Up @@ -121,10 +121,11 @@ pub enum UtpListenerRequest {
ConnId,
NodeId,
ProtocolId,
UtpStreamId,
oneshot::Sender<anyhow::Result<UtpSocket>>,
),
/// Request to add uTP stream to the active connections
AddActiveConnection(Enr, ProtocolId, ConnId),
AddActiveConnection(Enr, ProtocolId, UtpStreamId, ConnId),
/// Request to listen for FindCOntent stream and send content data
FindContentData(ConnId, ByteList),
/// Request to listen for FindContent stream
Expand Down Expand Up @@ -277,8 +278,7 @@ impl UtpListener {
self.listening
.insert(conn.sender_connection_id, UtpStreamId::FindContentStream);

if let Some(UtpStreamId::FindContentData(Content(content_data))) =
utp_message_id
if let Some(UtpStreamId::FindContentData(content_data)) = utp_message_id
{
// We want to send uTP data only if the content is Content(ByteList)
debug!(
Expand Down Expand Up @@ -374,11 +374,17 @@ impl UtpListener {
/// Process overlay uTP requests
async fn process_overlay_request(&mut self, request: UtpListenerRequest) {
match request {
UtpListenerRequest::AddActiveConnection(connected_to, protocol_id, conn_id_recv) => {
UtpListenerRequest::AddActiveConnection(
connected_to,
protocol_id,
stream_id,
conn_id_recv,
) => {
let conn = UtpSocket::new(
Arc::clone(&self.discovery),
connected_to.clone(),
protocol_id,
stream_id,
Some(self.stream_tx.clone()),
);
let conn_key = ConnectionKey::new(connected_to.node_id(), conn_id_recv);
Expand All @@ -388,8 +394,8 @@ impl UtpListener {
self.listening
.insert(conn_id, UtpStreamId::FindContentStream);
}
UtpListenerRequest::Connect(conn_id, node_id, protocol_id, tx) => {
let conn = self.connect(conn_id, node_id, protocol_id).await;
UtpListenerRequest::Connect(conn_id, node_id, protocol_id, stream_id, tx) => {
let conn = self.connect(conn_id, node_id, protocol_id, stream_id).await;
if tx.send(conn).is_err() {
error!("Unable to send uTP socket to requester")
};
Expand All @@ -399,7 +405,7 @@ impl UtpListener {
}
UtpListenerRequest::FindContentData(conn_id, content) => {
self.listening
.insert(conn_id, UtpStreamId::FindContentData(Content(content)));
.insert(conn_id, UtpStreamId::FindContentData(content));
}
UtpListenerRequest::AcceptStream(conn_id, accepted_keys) => {
self.listening
Expand All @@ -414,12 +420,14 @@ impl UtpListener {
connection_id: ConnId,
node_id: NodeId,
protocol_id: ProtocolId,
stream_id: UtpStreamId,
) -> anyhow::Result<UtpSocket> {
if let Some(enr) = self.discovery.discv5.find_enr(&node_id) {
let mut conn = UtpSocket::new(
Arc::clone(&self.discovery),
enr,
protocol_id,
stream_id,
Some(self.stream_tx.clone()),
);
conn.make_connection(connection_id).await;
Expand Down Expand Up @@ -478,6 +486,9 @@ pub struct UtpSocket {
/// Overlay protocol identifier
protocol_id: ProtocolId,

/// Overlay uTP stream id
stream_id: UtpStreamId,

/// Sequence number for the next packet
seq_nr: u16,

Expand Down Expand Up @@ -565,6 +576,7 @@ impl UtpSocket {
socket: Arc<Discovery>,
connected_to: Enr,
protocol_id: ProtocolId,
stream_id: UtpStreamId,
utp_listener_tx: Option<UnboundedSender<UtpStreamEvent>>,
) -> Self {
let (receiver_id, sender_id) = generate_sequential_identifiers();
Expand All @@ -574,6 +586,7 @@ impl UtpSocket {
Self {
state: SocketState::Uninitialized,
protocol_id,
stream_id,
seq_nr: 1,
ack_nr: 0,
receiver_connection_id: receiver_id,
Expand Down Expand Up @@ -1482,6 +1495,7 @@ mod tests {
packets::{Packet, PacketType},
stream::{SocketState, UtpSocket, BUF_SIZE},
time::now_microseconds,
trin_helpers::UtpStreamId,
},
};
use discv5::Discv5Event;
Expand Down Expand Up @@ -1514,7 +1528,13 @@ mod tests {

let discv5 = Arc::new(discv5);

let conn = UtpSocket::new(Arc::clone(&discv5), enr, ProtocolId::History, None);
let conn = UtpSocket::new(
Arc::clone(&discv5),
enr,
ProtocolId::History,
UtpStreamId::OfferStream,
None,
);
// TODO: Create `Discv5Socket` struct to encapsulate all socket logic
spawn_socket_recv(Arc::clone(&discv5), conn.clone());

Expand All @@ -1534,7 +1554,13 @@ mod tests {

let discv5 = Arc::new(discv5);

let conn = UtpSocket::new(Arc::clone(&discv5), connected_to, ProtocolId::History, None);
let conn = UtpSocket::new(
Arc::clone(&discv5),
connected_to,
ProtocolId::History,
UtpStreamId::OfferStream,
None,
);
spawn_socket_recv(Arc::clone(&discv5), conn.clone());

(discv5.local_enr(), conn)
Expand Down
8 changes: 6 additions & 2 deletions trin-core/src/utp/trin_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code)]
// These are just some Trin helper functions

use crate::portalnet::types::{content_key::RawContentKey, messages::Content};
use crate::portalnet::types::{content_key::RawContentKey, messages::ByteList};
use ssz_derive::{Decode, Encode};

// These Utp impl are related to sending messages over uTP not the implementation itself or stream
Expand Down Expand Up @@ -54,9 +54,13 @@ pub struct UtpAccept {
/// Used to track which stream to which overlay request correspond
#[derive(Debug, Clone, PartialEq)]
pub enum UtpStreamId {
/// Stream id to initialize FindContent uTP connection and to listen for content payload
FindContentStream,
FindContentData(Content),
/// Stream id to listen for incoming FindContent connection and to send back the content data to the requester
FindContentData(ByteList),
/// Stream id to send requested content from received ACCEPT response
OfferStream,
/// Stream id to listen for OFFER uTP payload. Contains requested content keys.
AcceptStream(Vec<RawContentKey>),
}

Expand Down
4 changes: 3 additions & 1 deletion utp-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use trin_core::{
socket,
utp::{
stream::{UtpListener, UtpListenerRequest, UtpListenerUnboundedReceiver, UtpSocket},
trin_helpers::UtpMessage,
trin_helpers::{UtpMessage, UtpStreamId},
},
};

Expand All @@ -34,6 +34,7 @@ impl TestApp {
conn_id,
enr.node_id(),
ProtocolId::History,
UtpStreamId::OfferStream,
tx,
));

Expand Down Expand Up @@ -92,6 +93,7 @@ impl TestApp {
.send(UtpListenerRequest::AddActiveConnection(
source,
ProtocolId::History,
UtpStreamId::OfferStream,
conn_id,
));
}
Expand Down

0 comments on commit 5c04aa2

Please sign in to comment.