Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit global uTP payload event in uTP listener when an uTP stream is closed/reset #325

Merged
merged 6 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions newsfragments/325.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- Rename `UtpSocket` to `UtpStream`.
- Refactor the way we are storing the received payload (DATA packets) in the uTP stream.
- Add a new AddActiveConnection UtpListener request and move the initialization of a uTP stream inside UtpListener.
- Add UtpStream -> UtpListener event channel and emit event inside UtpStream when stream state changes to Closed or Reset.
- Emit a global uTP listener event containing a uTP payload when a stream is closed.
- Remove redundant and dead code.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn run_trin(
};

// Initialize and spawn UTP listener
let (utp_events_tx, utp_listener_tx, mut utp_listener) =
let (utp_events_tx, utp_listener_tx, _, mut utp_listener) =
UtpListener::new(Arc::clone(&discovery));
tokio::spawn(async move { utp_listener.start().await });

Expand Down
1 change: 1 addition & 0 deletions trin-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ features = ["bundled"]

[dev-dependencies]
quickcheck = "1.0.3"
ntest = "0.8.0"
4 changes: 2 additions & 2 deletions trin-core/src/jsonrpc/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use validator::{Validate, ValidationError};
use crate::{
jsonrpc::endpoints::{HistoryEndpoint, StateEndpoint, TrinEndpoint},
portalnet::types::{
content_key::OverlayContentKey,
content_key::{OverlayContentKey, RawContentKey},
messages::{ByteList, CustomPayload, SszEnr},
},
utils::bytes::hex_decode,
Expand Down Expand Up @@ -275,7 +275,7 @@ impl TryFrom<[&Value; 2]> for OfferParams {
.collect();

if let Ok(content_keys) = content_keys {
let content_keys: Result<Vec<Vec<u8>>, _> = content_keys
let content_keys: Result<Vec<RawContentKey>, _> = content_keys
.iter()
.map(|s| hex_decode(s.as_str()))
.collect();
Expand Down
13 changes: 13 additions & 0 deletions trin-core/src/portalnet/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use rand::seq::SliceRandom;
use serde_json::{json, Value};
use std::{
convert::TryFrom,
fmt,
net::{IpAddr, SocketAddr},
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -54,6 +55,18 @@ pub struct Discovery {
pub listen_socket: SocketAddr,
}

impl fmt::Debug for Discovery {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Discovery: ( enr: {}, started: {}, listen_socket: {} )",
self.discv5.local_enr(),
self.started,
self.listen_socket
)
}
}

impl Discovery {
pub fn new(portal_config: PortalnetConfig) -> Result<Self, String> {
let listen_all_ips = SocketAddr::new("0.0.0.0".parse().unwrap(), portal_config.listen_port);
Expand Down
154 changes: 73 additions & 81 deletions trin-core/src/portalnet/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ use crate::portalnet::{
},
};

use crate::utp::{
stream::{UtpListenerRequest, UtpSocket, BUF_SIZE},
trin_helpers::{UtpAccept, UtpMessage},
use crate::{
portalnet::types::content_key::RawContentKey,
utp::{
stream::{UtpListenerRequest, UtpStream, BUF_SIZE},
trin_helpers::{UtpAccept, UtpMessage, UtpStreamId},
},
};
use discv5::{
enr::NodeId,
Expand Down Expand Up @@ -277,48 +280,42 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
enr: Enr,
conn_id: u16,
) -> Result<Content, OverlayRequestError> {
let utp_request = UtpListenerRequest::FindContentStream(conn_id);
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(OverlayRequestError::UtpError(format!(
"Unable to send uTP FindContent stream request: {err}"
)));
}

// initiate the connection to the acceptor
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();

let _ = self
.utp_listener_tx
.send(UtpListenerRequest::Connect(conn_id, enr.node_id(), tx));
let (tx, rx) = tokio::sync::oneshot::channel::<UtpStream>();
let utp_request = UtpListenerRequest::Connect(
conn_id,
enr,
self.protocol.clone(),
UtpStreamId::FindContentStream,
tx,
);

self.utp_listener_tx.send(utp_request).map_err(|err| {
OverlayRequestError::UtpError(format!(
"Unable to send Connect request with FindContent stream to UtpListener: {err}"
))
})?;

match rx.await {
Ok(conn) => {
match conn {
Ok(mut conn) => {
let mut result = Vec::new();
// Loop and receive all DATA packets, similar to `read_to_end`
loop {
let mut buf = [0; BUF_SIZE];
match conn.recv_from(&mut buf).await {
Ok((0, _)) => {
break;
}
Ok((bytes, _)) => {
result.extend_from_slice(&mut buf[..bytes]);
}
Err(err) => {
warn!("Unable to receive content via uTP: {err}");
return Err(OverlayRequestError::UtpError(err.to_string()));
}
}
Ok(mut conn) => {
let mut result = Vec::new();
// Loop and receive all DATA packets, similar to `read_to_end`
loop {
let mut buf = [0; BUF_SIZE];
match conn.recv_from(&mut buf).await {
Ok((0, _)) => {
break;
}
Ok((bytes, _)) => {
result.extend_from_slice(&mut buf[..bytes]);
}
Err(err) => {
warn!("Unable to receive content via uTP: {err}");
return Err(OverlayRequestError::UtpError(err.to_string()));
}
Ok(Content::Content(VariableList::from(result)))
}
Err(err) => {
warn!("Unable to initiate uTP stream with remote node. Error initializing uTP socket: {err}");
Err(OverlayRequestError::UtpError(err.to_string()))
}
}
Ok(Content::Content(VariableList::from(result)))
}
Err(err) => {
warn!("Unable to receive from uTP listener channel: {err}");
Expand All @@ -331,7 +328,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
/// Offer is also sent to nodes after FindContent (POKE)
pub async fn send_offer(
&self,
content_keys: Vec<Vec<u8>>,
content_keys: Vec<RawContentKey>,
enr: Enr,
) -> Result<Accept, OverlayRequestError> {
// Construct the request.
Expand Down Expand Up @@ -384,49 +381,44 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
return Ok(response);
}

let utp_request = UtpListenerRequest::OfferStream(conn_id);
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(anyhow!("Unable to send uTP Offer stream request: {err}"));
}

// initiate the connection to the acceptor
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();

let _ = self
.utp_listener_tx
.send(UtpListenerRequest::Connect(conn_id, enr.node_id(), tx));
let (tx, rx) = tokio::sync::oneshot::channel::<UtpStream>();
let utp_request = UtpListenerRequest::Connect(
conn_id,
enr,
self.protocol.clone(),
UtpStreamId::OfferStream,
tx,
);

self.utp_listener_tx
.send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?;

let mut conn = rx.await?;
// Handle STATE packet for SYN
let mut buf = [0; BUF_SIZE];
conn.recv(&mut buf).await?;

let content_items = self.provide_requested_content(&response, content_keys_offered);

let content_message = UtpAccept {
message: content_items,
};

match rx.await? {
Ok(mut conn) => {
// Handle STATE packet for SYN
let mut buf = [0; BUF_SIZE];
conn.recv(&mut buf).await?;

let content_items = self.provide_requested_content(&response, content_keys_offered);

let content_message = UtpAccept {
message: content_items,
};

tokio::spawn(async move {
// send the content to the acceptor over a uTP stream
if let Err(err) = conn
.send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..])
.await
{
warn!("Error sending content {err}");
};
// Close uTP connection
if let Err(err) = conn.close().await {
warn!("Unable to close uTP connection!: {err}")
};
});
Ok(response)
}
Err(err) => Err(anyhow!(
"Unable to initialize Offer uTP stream with remote node: {err}"
)),
}
tokio::spawn(async move {
// send the content to the acceptor over a uTP stream
if let Err(err) = conn
.send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..])
.await
{
warn!("Error sending content {err}");
};
// Close uTP connection
if let Err(err) = conn.close().await {
warn!("Unable to close uTP connection!: {err}")
};
});
Ok(response)
}

/// Provide the requested content key and content value for the acceptor
Expand Down
Loading