Skip to content

Commit

Permalink
Handle uTP Accept payload in overlay service
Browse files Browse the repository at this point in the history
  • Loading branch information
ogenev committed May 11, 2022
1 parent 376ff7d commit a0a2b33
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 40 deletions.
33 changes: 33 additions & 0 deletions trin-core/src/portalnet/overlay_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
utp::stream::UtpListenerRequest,
};

use crate::utp::{stream::UtpSocket, trin_helpers::UtpStreamId};
use delay_map::HashSetDelay;
use discv5::{
enr::NodeId,
Expand Down Expand Up @@ -378,6 +379,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
async fn start(&mut self) {
// Construct bucket refresh interval
let mut bucket_refresh_interval = tokio::time::interval(Duration::from_secs(60));
let mut utp_stream_handle_interval = tokio::time::interval(Duration::from_millis(10));

loop {
tokio::select! {
Expand Down Expand Up @@ -408,6 +410,20 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
self.peers_to_ping.insert(node_id);
}
}
_ = utp_stream_handle_interval.tick() => {
let (tx, rx) = tokio::sync::oneshot::channel::<Vec<(UtpSocket, UtpStreamId)>>();

// FIXME: Handle this possible error and spawn tokio thread
// Send request to uTP listener to process all closed uTP streams and wait for response
let _ = self.utp_listener_tx.send(UtpListenerRequest::ProcessClosedStreams(tx));

match rx.await {
Ok(streams) => {
self.handle_utp_streams(streams);
}
Err(err) => error!("Unable to receive ProcessClosedStreams response from uTP listener: {err}")
}
}
_ = OverlayService::<TContentKey, TMetric>::bucket_maintenance_poll(self.protocol.clone(), &self.kbuckets) => {}
_ = bucket_refresh_interval.tick() => {
debug!("[{:?}] Overlay bucket refresh lookup", self.protocol);
Expand Down Expand Up @@ -700,6 +716,19 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
Ok(accept)
}

/// Handle all closed uTP streams, currently we process only AcceptStream here.
/// FindContent payload is processed explicitly when we send FindContent request.
fn handle_utp_streams(&self, streams: Vec<(UtpSocket, UtpStreamId)>) {
for stream in streams {
match stream {
(socket, UtpStreamId::AcceptStream(content_keys)) => {
self.process_accept_utp_payload(content_keys, socket.recv_data_stream);
}
_ => {}
}
}
}

/// Sends a TALK request via Discovery v5 to some destination node.
fn send_talk_req(&self, request: Request, request_id: OverlayRequestId, destination: Enr) {
let discovery = Arc::clone(&self.discovery);
Expand Down Expand Up @@ -731,6 +760,10 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
});
}

fn process_accept_utp_payload(&self, content_keys: Vec<Vec<u8>>, payload: Vec<u8>) {
warn!("DEBUG: Processing content keys: {content_keys:?}, with payload: {payload:?}");
}

/// Processes an incoming request from some source node.
fn process_incoming_request(&mut self, request: Request, _id: RequestId, source: NodeId) {
// Look up the node in the routing table.
Expand Down
81 changes: 45 additions & 36 deletions trin-core/src/utp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
utp::{
packets::{ExtensionType, Packet, PacketType, HEADER_SIZE},
time::{now_microseconds, Delay, Timestamp},
trin_helpers::{UtpMessage, UtpMessageId},
trin_helpers::{UtpMessage, UtpStreamId},
util::{abs_diff, ewma, generate_sequential_identifiers},
},
};
Expand Down Expand Up @@ -111,6 +111,8 @@ pub enum UtpListenerRequest {
FindContentData(ConnId, ByteList),
/// Request to listen for FindContent stream
FindContentStream(ConnId),
/// Process all streams where uTP socket state is "Closed"
ProcessClosedStreams(oneshot::Sender<Vec<(UtpSocket, UtpStreamId)>>),
/// Request to listen for Offer stream
OfferStream(ConnId),
}
Expand All @@ -122,7 +124,7 @@ pub struct UtpListener {
/// Store all active connections
utp_connections: HashMap<ConnectionKey, UtpSocket>,
/// uTP connection ids to listen for
listening: HashMap<ConnId, UtpMessageId>,
listening: HashMap<ConnId, UtpStreamId>,
/// Receiver for uTP events sent from the main portal event handler
utp_event_rx: UnboundedReceiver<TalkRequest>,
/// Receiver for uTP requests sent from the overlay layer
Expand Down Expand Up @@ -221,9 +223,9 @@ impl UtpListener {
// TODO: Probably there is a better way with lifetimes to pass the HashMap value to a
// different thread without removing the key and re-adding it.
self.listening
.insert(conn.sender_connection_id, UtpMessageId::FindContentStream);
.insert(conn.sender_connection_id, UtpStreamId::FindContentStream);

if let Some(UtpMessageId::FindContentData(Content(content_data))) =
if let Some(UtpStreamId::FindContentData(Content(content_data))) =
utp_message_id
{
// We want to send uTP data only if the content is Content(ByteList)
Expand Down Expand Up @@ -264,12 +266,17 @@ impl UtpListener {
return;
}

let mut result = Vec::new();

let mut buf = [0; BUF_SIZE];
if let Err(msg) = conn.recv(&mut buf).await {
error!("Unable to receive uTP DATA packet: {msg}")
} else {
conn.recv_data_stream
.append(&mut Vec::from(packet.payload()));
match conn.recv(&mut buf).await {
Ok(bytes_read) => {
if let Some(bytes) = bytes_read {
result.extend_from_slice(&buf[..bytes]);
conn.recv_data_stream.append(&mut result);
}
}
Err(err) => error!("Unable to receive uTP DATA packet: {err}"),
}
}
}
Expand Down Expand Up @@ -314,24 +321,29 @@ impl UtpListener {
match request {
UtpListenerRequest::FindContentStream(conn_id) => {
self.listening
.insert(conn_id, UtpMessageId::FindContentStream);
.insert(conn_id, UtpStreamId::FindContentStream);
}
UtpListenerRequest::Connect(conn_id, node_id, tx) => {
let conn = self.connect(conn_id, node_id).await;
if tx.send(conn).is_err() {
warn!("Unable to send uTP socket to requester")
error!("Unable to send uTP socket to requester")
};
}
UtpListenerRequest::OfferStream(conn_id) => {
self.listening.insert(conn_id, UtpMessageId::OfferStream);
self.listening.insert(conn_id, UtpStreamId::OfferStream);
}
UtpListenerRequest::FindContentData(conn_id, content) => {
self.listening
.insert(conn_id, UtpMessageId::FindContentData(Content(content)));
.insert(conn_id, UtpStreamId::FindContentData(Content(content)));
}
UtpListenerRequest::AcceptStream(conn_id, accepted_keys) => {
self.listening
.insert(conn_id, UtpMessageId::AcceptStream(accepted_keys));
.insert(conn_id, UtpStreamId::AcceptStream(accepted_keys));
}
UtpListenerRequest::ProcessClosedStreams(tx) => {
if tx.send(self.process_closed_streams()).is_err() {
error!("Unable to send closed uTP streams to requester")
};
}
}
}
Expand All @@ -355,28 +367,25 @@ impl UtpListener {
}
}

// https://github.com/ethereum/portal-network-specs/pull/98\
// Currently the way to handle data over uTP isn't finalized yet, so we are going to use the
// handle data on connection closed method, as that seems to be the accepted method for now.
pub async fn process_utp_byte_stream(&mut self) {
let mut utp_connections = self.utp_connections.clone();
for (conn_key, conn) in self.utp_connections.iter_mut() {
if conn.state == SocketState::Closed {
let received_stream = conn.recv_data_stream.clone();
debug!("Received data: with len: {}", received_stream.len());

match self.listening.get(&conn.receiver_connection_id) {
Some(message_type) => {
if let UtpMessageId::AcceptStream(content_keys) = message_type {
// TODO: Implement this with overlay store and decode receiver stream if multiple content values are send
debug!("Store {content_keys:?}, {received_stream:?}");
}
}
_ => warn!("uTP listening HashMap doesn't have uTP stream message type"),
}
utp_connections.remove(conn_key);
}
}
/// Return all active uTP streams where socket state is "Closed" and UtpStreamId match recv_connection_id
pub fn process_closed_streams(&mut self) -> Vec<(UtpSocket, UtpStreamId)> {
// FIXME: Deal with cleaning all closed active connections
self.utp_connections
.clone()
.into_values()
.filter(|stream| stream.state == SocketState::Closed)
.map(|stream| {
// Cleanup all sender listening connections
self.listening.remove(&stream.sender_connection_id);

(
stream.clone(),
self.listening.remove(&stream.receiver_connection_id),
)
})
.filter(|result| result.1.is_some())
.map(|result| (result.0, result.1.unwrap()))
.collect()
}
}

Expand Down
7 changes: 3 additions & 4 deletions trin-core/src/utp/trin_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ pub struct UtpAccept {
pub message: Vec<(Vec<u8>, Vec<u8>)>,
}

// This is not in a spec, this is just for internally tracking for what portal message
// negotiated the uTP stream
#[derive(Debug, Clone)]
pub enum UtpMessageId {
/// Used to track which stream to which overlay request correspond
#[derive(Debug, Clone, PartialEq)]
pub enum UtpStreamId {
FindContentStream,
FindContentData(Content),
OfferStream,
Expand Down

0 comments on commit a0a2b33

Please sign in to comment.