From d9d9c351b4eac022967e5a1ec44ef472d3703829 Mon Sep 17 00:00:00 2001 From: Ognyan Genev Date: Tue, 10 May 2022 15:48:26 +0300 Subject: [PATCH 1/6] Add uTP listener -> Overlay service event channel --- newsfragments/325.added.md | 1 + src/lib.rs | 4 +- trin-core/src/jsonrpc/types.rs | 4 +- trin-core/src/portalnet/overlay.rs | 18 ++- trin-core/src/portalnet/overlay_service.rs | 54 +++++++- trin-core/src/portalnet/types/content_key.rs | 3 + trin-core/src/portalnet/types/messages.rs | 4 +- trin-core/src/utp/stream.rs | 133 +++++++++++++------ trin-core/src/utp/trin_helpers.rs | 11 +- trin-core/tests/overlay.rs | 6 +- trin-history/src/lib.rs | 13 +- trin-history/src/network.rs | 9 +- trin-state/src/lib.rs | 13 +- trin-state/src/network.rs | 9 +- utp-testing/src/main.rs | 7 +- 15 files changed, 214 insertions(+), 75 deletions(-) create mode 100644 newsfragments/325.added.md diff --git a/newsfragments/325.added.md b/newsfragments/325.added.md new file mode 100644 index 000000000..db45ca6eb --- /dev/null +++ b/newsfragments/325.added.md @@ -0,0 +1 @@ +Process all closed uTP streams in UtpListener and pass the payload to overlay service. diff --git a/src/lib.rs b/src/lib.rs index d4e8c1513..2b41dc2f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,7 +52,7 @@ pub async fn run_trin( }; // Initialize and spawn UTP listener - let (utp_events_tx, utp_listener_tx, mut utp_listener) = + let (utp_events_tx, utp_listener_tx, utp_listener_rx, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); @@ -67,6 +67,7 @@ pub async fn run_trin( initialize_state_network( &discovery, utp_listener_tx.clone(), + utp_listener_rx.clone(), portalnet_config.clone(), storage_config.clone(), ) @@ -85,6 +86,7 @@ pub async fn run_trin( initialize_history_network( &discovery, utp_listener_tx, + utp_listener_rx, portalnet_config.clone(), storage_config.clone(), ) diff --git a/trin-core/src/jsonrpc/types.rs b/trin-core/src/jsonrpc/types.rs index e2841d6ae..fbccde635 100644 --- a/trin-core/src/jsonrpc/types.rs +++ b/trin-core/src/jsonrpc/types.rs @@ -10,7 +10,7 @@ use validator::{Validate, ValidationError}; use crate::{ jsonrpc::endpoints::{HistoryEndpoint, StateEndpoint, TrinEndpoint}, portalnet::types::{ - content_key::OverlayContentKey, + content_key::{OverlayContentKey, RawContentKey}, messages::{ByteList, CustomPayload, SszEnr}, }, utils::bytes::hex_decode, @@ -275,7 +275,7 @@ impl TryFrom<[&Value; 2]> for OfferParams { .collect(); if let Ok(content_keys) = content_keys { - let content_keys: Result>, _> = content_keys + let content_keys: Result, _> = content_keys .iter() .map(|s| hex_decode(s.as_str())) .collect(); diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index 082a7c040..7920289bd 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -15,9 +15,12 @@ use crate::portalnet::{ }, }; -use crate::utp::{ - stream::{UtpListenerRequest, UtpSocket, BUF_SIZE}, - trin_helpers::{UtpAccept, UtpMessage}, +use crate::{ + portalnet::types::content_key::RawContentKey, + utp::{ + stream::{UtpListenerEvent, UtpListenerRequest, UtpSocket, BUF_SIZE}, + trin_helpers::{UtpAccept, UtpMessage}, + }, }; use discv5::{ enr::NodeId, @@ -29,7 +32,10 @@ use futures::channel::oneshot; use parking_lot::RwLock; use ssz::Encode; use ssz_types::VariableList; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + RwLock as TRwLock, +}; use tracing::{debug, warn}; pub use super::overlay_service::{OverlayRequestError, RequestDirection}; @@ -95,6 +101,7 @@ impl config: OverlayConfig, discovery: Arc, utp_listener_tx: UnboundedSender, + utp_listener_rx: Arc>>, storage: Arc>, data_radius: U256, protocol: ProtocolId, @@ -117,6 +124,7 @@ impl Arc::clone(&data_radius), protocol.clone(), utp_listener_tx.clone(), + utp_listener_rx, config.enable_metrics, ) .await @@ -331,7 +339,7 @@ impl /// Offer is also sent to nodes after FindContent (POKE) pub async fn send_offer( &self, - content_keys: Vec>, + content_keys: Vec, enr: Enr, ) -> Result { // Construct the request. diff --git a/trin-core/src/portalnet/overlay_service.rs b/trin-core/src/portalnet/overlay_service.rs index aa9b3e443..661ded26a 100644 --- a/trin-core/src/portalnet/overlay_service.rs +++ b/trin-core/src/portalnet/overlay_service.rs @@ -19,6 +19,14 @@ use crate::{ utp::stream::UtpListenerRequest, }; +use crate::{ + locks::RwLoggingExt, + portalnet::types::content_key::RawContentKey, + utp::{ + stream::{UtpListenerEvent, UtpPayload}, + trin_helpers::UtpStreamId, + }, +}; use delay_map::HashSetDelay; use discv5::{ enr::NodeId, @@ -36,7 +44,10 @@ use rand::seq::SliceRandom; use ssz::Encode; use ssz_types::{BitList, VariableList}; use thiserror::Error; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + RwLock as TRwLock, +}; /// Maximum number of ENRs in response to FindNodes. pub const FIND_NODES_MAX_NODES: usize = 32; @@ -45,6 +56,8 @@ pub const FIND_CONTENT_MAX_NODES: usize = 32; /// With even distribution assumptions, 2**17 is enough to put each node (estimating 100k nodes, /// which is more than 10x the ethereum mainnet node count) into a unique bucket by the 17th bucket index. const EXPECTED_NON_EMPTY_BUCKETS: usize = 17; +/// Bucket refresh lookup interval in seconds +const BUCKET_REFRESH_INTERVAL: u64 = 60; /// An overlay request error. #[derive(Clone, Error, Debug)] @@ -253,6 +266,8 @@ pub struct OverlayService { response_tx: UnboundedSender, /// The sender half of a channel to send requests to uTP listener utp_listener_tx: UnboundedSender, + /// Receiver for UtpListener emitted events + utp_listener_rx: Arc>>, /// Phantom content key. phantom_content_key: PhantomData, /// Phantom metric (distance function). @@ -278,6 +293,7 @@ impl data_radius: Arc, protocol: ProtocolId, utp_listener_sender: UnboundedSender, + utp_listener_receiver: Arc>>, enable_metrics: bool, ) -> Result, String> { let (request_tx, request_rx) = mpsc::unbounded_channel(); @@ -310,6 +326,7 @@ impl response_rx, response_tx, utp_listener_tx: utp_listener_sender, + utp_listener_rx: utp_listener_receiver, phantom_content_key: PhantomData, phantom_metric: PhantomData, metrics, @@ -377,9 +394,13 @@ impl /// Bucket maintenance: Maintain the routing table (more info documented above function). async fn start(&mut self) { // Construct bucket refresh interval - let mut bucket_refresh_interval = tokio::time::interval(Duration::from_secs(60)); + let mut bucket_refresh_interval = + tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL)); loop { + let utp_listener_rx = self.utp_listener_rx.clone(); + let mut utp_listener_lock = utp_listener_rx.write_with_warn().await; + tokio::select! { Some(request) = self.request_rx.recv() => self.process_request(request), Some(response) = self.response_rx.recv() => { @@ -408,6 +429,14 @@ impl self.peers_to_ping.insert(node_id); } } + Some(utp_event) = utp_listener_lock.recv() => { + match utp_event { + UtpListenerEvent::ProcessedClosedStreams(processed_streams) => + { + self.handle_utp_payload(processed_streams); + } + } + } _ = OverlayService::::bucket_maintenance_poll(self.protocol.clone(), &self.kbuckets) => {} _ = bucket_refresh_interval.tick() => { debug!("[{:?}] Overlay bucket refresh lookup", self.protocol); @@ -707,6 +736,19 @@ impl Ok(accept) } + /// Handle all closed uTP streams, currently we process only AcceptStream here. + /// FindContent payload is processed explicitly when we send FindContent request. + fn handle_utp_payload(&self, streams: Vec<(UtpPayload, UtpStreamId)>) { + for stream in streams { + match stream { + (payload, UtpStreamId::AcceptStream(content_keys)) => { + self.process_accept_utp_payload(content_keys, payload); + } + _ => {} + } + } + } + /// Sends a TALK request via Discovery v5 to some destination node. fn send_talk_req(&self, request: Request, request_id: OverlayRequestId, destination: Enr) { let discovery = Arc::clone(&self.discovery); @@ -738,6 +780,12 @@ impl }); } + /// Process accepted uTP payload of the OFFER?ACCEPT stream + fn process_accept_utp_payload(&self, content_keys: Vec, payload: UtpPayload) { + // TODO: Verify the payload, store the content and propagate gossip. + debug!("DEBUG: Processing content keys: {content_keys:?}, with payload: {payload:?}, protocol: {:?}", self.protocol); + } + /// Processes an incoming request from some source node. fn process_incoming_request(&mut self, request: Request, _id: RequestId, source: NodeId) { // Look up the node in the routing table. @@ -1265,6 +1313,7 @@ mod tests { let discovery = Arc::new(Discovery::new(portal_config).unwrap()); let (utp_listener_tx, _) = unbounded_channel::(); + let (_, utp_listener_rx) = unbounded_channel::(); // Initialize DB config let storage_capacity: u32 = DEFAULT_STORAGE_CAPACITY.parse().unwrap(); @@ -1302,6 +1351,7 @@ mod tests { response_tx, response_rx, utp_listener_tx, + utp_listener_rx: Arc::new(TRwLock::new(utp_listener_rx)), phantom_content_key: PhantomData, phantom_metric: PhantomData, metrics, diff --git a/trin-core/src/portalnet/types/content_key.rs b/trin-core/src/portalnet/types/content_key.rs index 0cccbaf33..0646b3353 100644 --- a/trin-core/src/portalnet/types/content_key.rs +++ b/trin-core/src/portalnet/types/content_key.rs @@ -6,6 +6,9 @@ use ssz::{self, Decode, Encode}; use ssz_derive::{Decode, Encode}; use ssz_types::{typenum, FixedVector, VariableList}; +/// SSZ encoded overlay content key as bytes +pub type RawContentKey = Vec; + /// Types whose values represent keys to lookup content items in an overlay network. /// Keys are serializable. pub trait OverlayContentKey: Into> + TryFrom> + Clone { diff --git a/trin-core/src/portalnet/types/messages.rs b/trin-core/src/portalnet/types/messages.rs index b66274d62..5a2de62e9 100644 --- a/trin-core/src/portalnet/types/messages.rs +++ b/trin-core/src/portalnet/types/messages.rs @@ -17,7 +17,7 @@ use ssz_types::{typenum, BitList, VariableList}; use thiserror::Error; use validator::ValidationError; -use crate::portalnet::Enr; +use crate::portalnet::{types::content_key::RawContentKey, Enr}; pub type ByteList = VariableList; @@ -492,7 +492,7 @@ impl TryInto for Content { #[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct Offer { - pub content_keys: Vec>, + pub content_keys: Vec, } #[derive(Debug, PartialEq, Clone, Encode, Decode)] diff --git a/trin-core/src/utp/stream.rs b/trin-core/src/utp/stream.rs index 280ccbf67..c3e205c2a 100644 --- a/trin-core/src/utp/stream.rs +++ b/trin-core/src/utp/stream.rs @@ -21,11 +21,14 @@ use tokio::{ use crate::{ locks::RwLoggingExt, - portalnet::types::messages::{ByteList, Content::Content, ProtocolId}, + portalnet::types::{ + content_key::RawContentKey, + messages::{ByteList, Content::Content, ProtocolId}, + }, utp::{ packets::{ExtensionType, Packet, PacketType, HEADER_SIZE}, time::{now_microseconds, Delay, Timestamp}, - trin_helpers::{UtpMessage, UtpMessageId}, + trin_helpers::{UtpMessage, UtpStreamId}, util::{abs_diff, ewma, generate_sequential_identifiers}, }, }; @@ -58,10 +61,18 @@ const BASE_HISTORY: usize = 10; // base delays history size const MAX_BASE_DELAY_AGE: Delay = Delay(60_000_000); // Discv5 socket timeout in milliseconds const DISCV5_SOCKET_TIMEOUT: u64 = 25; +/// Process uTP streams interval in milliseconds +const PROCESS_UTP_STREAMS_INTERVAL: u64 = 20; /// uTP connection id type ConnId = u16; +/// uTP payload data +pub type UtpPayload = Vec; + +/// UtpListener unbounded receiver for emitted events +pub type UtpListenerUnboundedReceiver = Arc>>; + pub fn rand() -> u16 { rand::thread_rng().gen() } @@ -104,7 +115,7 @@ struct DelayDifferenceSample { /// and uTP listener pub enum UtpListenerRequest { /// Request to listen for Accept stream - AcceptStream(ConnId, Vec>), + AcceptStream(ConnId, Vec), /// Request to initialize uTP streram with remote node Connect(ConnId, NodeId, oneshot::Sender>), /// Request to listen for FindCOntent stream and send content data @@ -115,16 +126,26 @@ pub enum UtpListenerRequest { OfferStream(ConnId), } -// Basically the same idea as in the official Bit Torrent library we will store all of the active connections data here +/// Result from processing all closed uTP streams. Includes a tuple with the payload and the stream id. +type ProcessedClosedStreams = Vec<(UtpPayload, UtpStreamId)>; + +/// Emitted event with all processed uTP streams. Used to handle the uTP payload in overlay service +pub enum UtpListenerEvent { + ProcessedClosedStreams(ProcessedClosedStreams), +} + +/// Main uTP service used to listen and handle all uTP connections and streams pub struct UtpListener { /// Base discv5 layer discovery: Arc, /// Store all active connections utp_connections: HashMap, /// uTP connection ids to listen for - listening: HashMap, + listening: HashMap, /// Receiver for uTP events sent from the main portal event handler utp_event_rx: UnboundedReceiver, + /// Sender to overlay layer with processed uTP stream + overlay_tx: UnboundedSender, /// Receiver for uTP requests sent from the overlay layer overlay_rx: UnboundedReceiver, } @@ -135,21 +156,26 @@ impl UtpListener { ) -> ( UnboundedSender, UnboundedSender, + UtpListenerUnboundedReceiver, Self, ) { // Channel to process uTP TalkReq packets from main portal event handler let (utp_event_tx, utp_event_rx) = unbounded_channel::(); // Channel to process portal overlay requests let (utp_listener_tx, utp_listener_rx) = unbounded_channel::(); + // Channel to emit processed uTP payload to overlay service + let (overlay_tx, overlay_rx) = unbounded_channel::(); ( utp_event_tx, utp_listener_tx, + Arc::new(RwLock::new(overlay_rx)), UtpListener { discovery, utp_connections: HashMap::new(), listening: HashMap::new(), utp_event_rx, + overlay_tx, overlay_rx: utp_listener_rx, }, ) @@ -157,13 +183,23 @@ impl UtpListener { /// The main execution loop of the UtpListener service. pub async fn start(&mut self) { + let mut process_utp_streams_interval = + tokio::time::interval(Duration::from_millis(PROCESS_UTP_STREAMS_INTERVAL)); loop { tokio::select! { - Some(utp_request) = self.utp_event_rx.recv() => { - self.process_utp_request(utp_request).await - }, - Some(overlay_request) = self.overlay_rx.recv() => { - self.process_overlay_request(overlay_request).await + Some(utp_request) = self.utp_event_rx.recv() => { + self.process_utp_request(utp_request).await + }, + Some(overlay_request) = self.overlay_rx.recv() => { + self.process_overlay_request(overlay_request).await + }, + _ = process_utp_streams_interval.tick() => { + let processed_streams = self.process_closed_streams(); + + if let Err(err) = self.overlay_tx.send(UtpListenerEvent::ProcessedClosedStreams(processed_streams)) { + error!("Unable to send ProcessClosedStreams event to overlay layer: {err}"); + continue + } } } } @@ -221,9 +257,9 @@ impl UtpListener { // TODO: Probably there is a better way with lifetimes to pass the HashMap value to a // different thread without removing the key and re-adding it. self.listening - .insert(conn.sender_connection_id, UtpMessageId::FindContentStream); + .insert(conn.sender_connection_id, UtpStreamId::FindContentStream); - if let Some(UtpMessageId::FindContentData(Content(content_data))) = + if let Some(UtpStreamId::FindContentData(Content(content_data))) = utp_message_id { // We want to send uTP data only if the content is Content(ByteList) @@ -264,12 +300,17 @@ impl UtpListener { return; } + let mut result = Vec::new(); + let mut buf = [0; BUF_SIZE]; - if let Err(msg) = conn.recv(&mut buf).await { - error!("Unable to receive uTP DATA packet: {msg}") - } else { - conn.recv_data_stream - .append(&mut Vec::from(packet.payload())); + match conn.recv(&mut buf).await { + Ok(bytes_read) => { + if let Some(bytes) = bytes_read { + result.extend_from_slice(&buf[..bytes]); + conn.recv_data_stream.append(&mut result); + } + } + Err(err) => error!("Unable to receive uTP DATA packet: {err}"), } } } @@ -314,24 +355,24 @@ impl UtpListener { match request { UtpListenerRequest::FindContentStream(conn_id) => { self.listening - .insert(conn_id, UtpMessageId::FindContentStream); + .insert(conn_id, UtpStreamId::FindContentStream); } UtpListenerRequest::Connect(conn_id, node_id, tx) => { let conn = self.connect(conn_id, node_id).await; if tx.send(conn).is_err() { - warn!("Unable to send uTP socket to requester") + error!("Unable to send uTP socket to requester") }; } UtpListenerRequest::OfferStream(conn_id) => { - self.listening.insert(conn_id, UtpMessageId::OfferStream); + self.listening.insert(conn_id, UtpStreamId::OfferStream); } UtpListenerRequest::FindContentData(conn_id, content) => { self.listening - .insert(conn_id, UtpMessageId::FindContentData(Content(content))); + .insert(conn_id, UtpStreamId::FindContentData(Content(content))); } UtpListenerRequest::AcceptStream(conn_id, accepted_keys) => { self.listening - .insert(conn_id, UtpMessageId::AcceptStream(accepted_keys)); + .insert(conn_id, UtpStreamId::AcceptStream(accepted_keys)); } } } @@ -355,28 +396,32 @@ impl UtpListener { } } - // https://github.com/ethereum/portal-network-specs/pull/98\ - // Currently the way to handle data over uTP isn't finalized yet, so we are going to use the - // handle data on connection closed method, as that seems to be the accepted method for now. - pub async fn process_utp_byte_stream(&mut self) { - let mut utp_connections = self.utp_connections.clone(); - for (conn_key, conn) in self.utp_connections.iter_mut() { - if conn.state == SocketState::Closed { - let received_stream = conn.recv_data_stream.clone(); - debug!("Received data: with len: {}", received_stream.len()); - - match self.listening.get(&conn.receiver_connection_id) { - Some(message_type) => { - if let UtpMessageId::AcceptStream(content_keys) = message_type { - // TODO: Implement this with overlay store and decode receiver stream if multiple content values are send - debug!("Store {content_keys:?}, {received_stream:?}"); - } - } - _ => warn!("uTP listening HashMap doesn't have uTP stream message type"), - } - utp_connections.remove(conn_key); - } - } + /// Return and cleanup all active uTP streams where socket state is "Closed" + pub fn process_closed_streams(&mut self) -> Vec<(UtpPayload, UtpStreamId)> { + // This seems to be a hot loop, we may need to optimise it and find a better way to filter by closed + // connections without cloning all records. One reasonable way is to use some data-oriented + // design principles like Struct of Arrays vs. Array of Structs. + self.utp_connections + .clone() + .iter() + .filter(|conn| conn.1.state == SocketState::Closed) + .map(|conn| { + // Remove the closed connections from active connections + let receiver_stream_id = self + .listening + .remove(&conn.1.receiver_connection_id) + .expect("Receiver connection id should match active listening connections."); + self.listening + .remove(&conn.1.sender_connection_id) + .expect("Sender connection id should match active listening connections."); + let utp_socket = self + .utp_connections + .remove(conn.0) + .expect("uTP socket should match asctive utp connections."); + + (utp_socket.recv_data_stream, receiver_stream_id) + }) + .collect() } } diff --git a/trin-core/src/utp/trin_helpers.rs b/trin-core/src/utp/trin_helpers.rs index 9835b6fab..3b434cdaf 100644 --- a/trin-core/src/utp/trin_helpers.rs +++ b/trin-core/src/utp/trin_helpers.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] // These are just some Trin helper functions -use crate::portalnet::types::messages::Content; +use crate::portalnet::types::{content_key::RawContentKey, messages::Content}; use ssz_derive::{Decode, Encode}; // These Utp impl are related to sending messages over uTP not the implementation itself or stream @@ -51,14 +51,13 @@ pub struct UtpAccept { pub message: Vec<(Vec, Vec)>, } -// This is not in a spec, this is just for internally tracking for what portal message -// negotiated the uTP stream -#[derive(Debug, Clone)] -pub enum UtpMessageId { +/// Used to track which stream to which overlay request correspond +#[derive(Debug, Clone, PartialEq)] +pub enum UtpStreamId { FindContentStream, FindContentData(Content), OfferStream, - AcceptStream(Vec>), + AcceptStream(Vec), } #[cfg(test)] diff --git a/trin-core/tests/overlay.rs b/trin-core/tests/overlay.rs index 09f7fee4e..93c4181f5 100644 --- a/trin-core/tests/overlay.rs +++ b/trin-core/tests/overlay.rs @@ -23,7 +23,9 @@ use tokio::{ sync::{mpsc, mpsc::unbounded_channel}, time::{self, Duration}, }; -use trin_core::utp::stream::UtpListenerRequest; + +use tokio::sync::RwLock as TRwLock; +use trin_core::utp::stream::{UtpListenerEvent, UtpListenerRequest}; async fn init_overlay( discovery: Arc, @@ -38,11 +40,13 @@ async fn init_overlay( let overlay_config = OverlayConfig::default(); // Ignore all uTP events let (utp_listener_tx, _) = unbounded_channel::(); + let (_, utp_listener_rx) = unbounded_channel::(); OverlayProtocol::new( overlay_config, discovery, utp_listener_tx, + Arc::new(TRwLock::new(utp_listener_rx)), db, U256::MAX, protocol, diff --git a/trin-history/src/lib.rs b/trin-history/src/lib.rs index 401327bd8..8086e7578 100644 --- a/trin-history/src/lib.rs +++ b/trin-history/src/lib.rs @@ -9,7 +9,10 @@ use crate::{events::HistoryEvents, jsonrpc::HistoryRequestHandler}; use discv5::TalkRequest; use network::HistoryNetwork; use std::sync::Arc; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + RwLock, +}; use trin_core::{ cli::TrinConfig, jsonrpc::types::HistoryJsonRpcRequest, @@ -20,7 +23,7 @@ use trin_core::{ types::messages::PortalnetConfig, }, utils::bootnodes::parse_bootnodes, - utp::stream::{UtpListener, UtpListenerRequest}, + utp::stream::{UtpListener, UtpListenerEvent, UtpListenerRequest}, }; pub async fn main() -> Result<(), Box> { @@ -51,7 +54,8 @@ pub async fn main() -> Result<(), Box> { tokio::spawn(Arc::clone(&discovery).bucket_refresh_lookup()); // Initialize and spawn UTP listener - let (utp_sender, overlay_sender, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); + let (utp_sender, overlay_sender, overlay_receiver, mut utp_listener) = + UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); let (history_event_tx, history_event_rx) = mpsc::unbounded_channel::(); @@ -76,6 +80,7 @@ pub async fn main() -> Result<(), Box> { let history_network = HistoryNetwork::new( discovery.clone(), overlay_sender, + overlay_receiver, storage_config, portalnet_config.clone(), ) @@ -96,6 +101,7 @@ type HistoryJsonRpcTx = Option>; pub async fn initialize_history_network( discovery: &Arc, utp_listener_tx: UnboundedSender, + utp_listener_rx: Arc>>, portalnet_config: PortalnetConfig, storage_config: PortalStorageConfig, ) -> ( @@ -110,6 +116,7 @@ pub async fn initialize_history_network( let history_network = HistoryNetwork::new( Arc::clone(discovery), utp_listener_tx, + utp_listener_rx, storage_config, portalnet_config.clone(), ) diff --git a/trin-history/src/network.rs b/trin-history/src/network.rs index 7541b37e5..0082d6f11 100644 --- a/trin-history/src/network.rs +++ b/trin-history/src/network.rs @@ -3,7 +3,10 @@ use log::debug; use std::sync::Arc; use parking_lot::RwLock; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + RwLock as TRwLock, +}; use trin_core::{ portalnet::{ @@ -16,7 +19,7 @@ use trin_core::{ metric::XorMetric, }, }, - utp::stream::UtpListenerRequest, + utp::stream::{UtpListenerEvent, UtpListenerRequest}, }; /// History network layer on top of the overlay protocol. Encapsulates history network specific data and logic. @@ -29,6 +32,7 @@ impl HistoryNetwork { pub async fn new( discovery: Arc, utp_listener_tx: UnboundedSender, + utp_listener_rx: Arc>>, storage_config: PortalStorageConfig, portal_config: PortalnetConfig, ) -> Self { @@ -42,6 +46,7 @@ impl HistoryNetwork { config, discovery, utp_listener_tx, + utp_listener_rx, storage, portal_config.data_radius, ProtocolId::History, diff --git a/trin-state/src/lib.rs b/trin-state/src/lib.rs index 87cf67b8b..f4bdf7c14 100644 --- a/trin-state/src/lib.rs +++ b/trin-state/src/lib.rs @@ -5,7 +5,10 @@ use tokio::{sync::mpsc, task::JoinHandle}; use crate::{events::StateEvents, jsonrpc::StateRequestHandler}; use discv5::TalkRequest; use network::StateNetwork; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + RwLock, +}; use trin_core::{ cli::TrinConfig, jsonrpc::types::StateJsonRpcRequest, @@ -16,7 +19,7 @@ use trin_core::{ types::messages::PortalnetConfig, }, utils::bootnodes::parse_bootnodes, - utp::stream::{UtpListener, UtpListenerRequest}, + utp::stream::{UtpListener, UtpListenerEvent, UtpListenerRequest}, }; pub mod events; @@ -49,7 +52,8 @@ pub async fn main() -> Result<(), Box> { tokio::spawn(Arc::clone(&discovery).bucket_refresh_lookup()); // Initialize and spawn UTP listener - let (utp_sender, overlay_sender, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); + let (utp_sender, overlay_sender, overlay_receiver, mut utp_listener) = + UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); let (state_event_tx, state_event_rx) = mpsc::unbounded_channel::(); @@ -74,6 +78,7 @@ pub async fn main() -> Result<(), Box> { let state_network = StateNetwork::new( discovery.clone(), overlay_sender, + overlay_receiver, storage_config, portalnet_config.clone(), ) @@ -94,6 +99,7 @@ type StateJsonRpcTx = Option>; pub async fn initialize_state_network( discovery: &Arc, utp_listener_tx: UnboundedSender, + utp_listener_rx: Arc>>, portalnet_config: PortalnetConfig, storage_config: PortalStorageConfig, ) -> (StateHandler, StateNetworkTask, StateEventTx, StateJsonRpcTx) { @@ -102,6 +108,7 @@ pub async fn initialize_state_network( let state_network = StateNetwork::new( Arc::clone(discovery), utp_listener_tx, + utp_listener_rx, storage_config, portalnet_config.clone(), ) diff --git a/trin-state/src/network.rs b/trin-state/src/network.rs index f5fadc925..bfcb46cf6 100644 --- a/trin-state/src/network.rs +++ b/trin-state/src/network.rs @@ -5,7 +5,10 @@ use discv5::enr::NodeId; use eth_trie::EthTrie; use log::debug; use parking_lot::RwLock; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + RwLock as TRwLock, +}; use trin_core::{ portalnet::{ discovery::Discovery, @@ -17,7 +20,7 @@ use trin_core::{ metric::XorMetric, }, }, - utp::stream::UtpListenerRequest, + utp::stream::{UtpListenerEvent, UtpListenerRequest}, }; use crate::trie::TrieDB; @@ -33,6 +36,7 @@ impl StateNetwork { pub async fn new( discovery: Arc, utp_listener_tx: UnboundedSender, + utp_listener_rx: Arc>>, storage_config: PortalStorageConfig, portal_config: PortalnetConfig, ) -> Self { @@ -51,6 +55,7 @@ impl StateNetwork { config, discovery, utp_listener_tx, + utp_listener_rx, storage, portal_config.data_radius, ProtocolId::State, diff --git a/utp-testing/src/main.rs b/utp-testing/src/main.rs index 51444491e..2c53a4cc8 100644 --- a/utp-testing/src/main.rs +++ b/utp-testing/src/main.rs @@ -10,14 +10,16 @@ use trin_core::{ }, socket, utp::{ - stream::{UtpListener, UtpListenerRequest, UtpSocket}, + stream::{UtpListener, UtpListenerRequest, UtpListenerUnboundedReceiver, UtpSocket}, trin_helpers::UtpMessage, }, }; +#[allow(dead_code)] pub struct TestApp { discovery: Arc, utp_listener_tx: UnboundedSender, + utp_listener_rx: UtpListenerUnboundedReceiver, utp_event_tx: UnboundedSender, } @@ -134,13 +136,14 @@ async fn run_test_app(discv5_port: u16, socket_addr: SocketAddr) -> TestApp { discovery.start().await.unwrap(); let discovery = Arc::new(discovery); - let (utp_event_sender, utp_listener_tx, mut utp_listener) = + let (utp_event_sender, utp_listener_tx, utp_listener_rx, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); let test_app = TestApp { discovery, utp_listener_tx, + utp_listener_rx, utp_event_tx: utp_event_sender, }; From fd45399cc762222bbc063c5b9c6882848bec9f64 Mon Sep 17 00:00:00 2001 From: Ognyan Genev Date: Tue, 24 May 2022 10:48:41 +0300 Subject: [PATCH 2/6] Add protocol id when creating uTP socket and move uTP socket creation outside UtpListener --- trin-core/src/portalnet/overlay.rs | 25 +++-- trin-core/src/portalnet/overlay_service.rs | 78 +++++++++---- trin-core/src/utp/stream.rs | 123 ++++++++++++--------- utp-testing/src/main.rs | 23 +++- 4 files changed, 162 insertions(+), 87 deletions(-) diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index 7920289bd..d1f0eba80 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -294,10 +294,18 @@ impl // initiate the connection to the acceptor let (tx, rx) = tokio::sync::oneshot::channel::>(); - - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::Connect(conn_id, enr.node_id(), tx)); + self.utp_listener_tx + .send(UtpListenerRequest::Connect( + conn_id, + enr.node_id(), + self.protocol.clone(), + tx, + )) + .map_err(|err| { + OverlayRequestError::UtpError(format!( + "Unable to send Connect request with FindContent stream to UtpListener: {err}" + )) + })?; match rx.await { Ok(conn) => { @@ -400,9 +408,12 @@ impl // initiate the connection to the acceptor let (tx, rx) = tokio::sync::oneshot::channel::>(); - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::Connect(conn_id, enr.node_id(), tx)); + self.utp_listener_tx.send(UtpListenerRequest::Connect( + conn_id, + enr.node_id(), + self.protocol.clone(), + tx, + )).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?; match rx.await? { Ok(mut conn) => { diff --git a/trin-core/src/portalnet/overlay_service.rs b/trin-core/src/portalnet/overlay_service.rs index 661ded26a..64f7bccfa 100644 --- a/trin-core/src/portalnet/overlay_service.rs +++ b/trin-core/src/portalnet/overlay_service.rs @@ -20,7 +20,6 @@ use crate::{ }; use crate::{ - locks::RwLoggingExt, portalnet::types::content_key::RawContentKey, utp::{ stream::{UtpListenerEvent, UtpPayload}, @@ -398,8 +397,8 @@ impl tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL)); loop { - let utp_listener_rx = self.utp_listener_rx.clone(); - let mut utp_listener_lock = utp_listener_rx.write_with_warn().await; + // let utp_listener_rx = self.utp_listener_rx.clone(); + // let mut utp_listener_lock = utp_listener_rx.write_with_warn().await; tokio::select! { Some(request) = self.request_rx.recv() => self.process_request(request), @@ -429,14 +428,14 @@ impl self.peers_to_ping.insert(node_id); } } - Some(utp_event) = utp_listener_lock.recv() => { - match utp_event { - UtpListenerEvent::ProcessedClosedStreams(processed_streams) => - { - self.handle_utp_payload(processed_streams); - } - } - } + // Some(utp_event) = utp_listener_lock.recv() => { + // match utp_event { + // UtpListenerEvent::ProcessedClosedStreams(processed_streams) => + // { + // self.handle_utp_payload(processed_streams); + // } + // } + // } _ = OverlayService::::bucket_maintenance_poll(self.protocol.clone(), &self.kbuckets) => {} _ = bucket_refresh_interval.tick() => { debug!("[{:?}] Overlay bucket refresh lookup", self.protocol); @@ -446,6 +445,25 @@ impl } } + /// Send request to UtpLister to add an uTP stream to the active connections + fn add_utp_connection( + &self, + source: &NodeId, + conn_id_recv: u16, + ) -> Result<(), OverlayRequestError> { + if let Some(enr) = self.discovery.discv5.find_enr(source) { + // Initialize active uTP stream with requested note + let utp_request = + UtpListenerRequest::AddActiveConnection(enr, self.protocol.clone(), conn_id_recv); + if let Err(err) = self.utp_listener_tx.send(utp_request) { + return Err(OverlayRequestError::UtpError(format!( + "Unable to send uTP AddActiveConnection request: {err}" + ))); + } + } + Ok(()) + } + /// Main bucket refresh lookup logic fn bucket_refresh_lookup(&self) { // Look at local routing table and select the largest 17 buckets. @@ -518,8 +536,7 @@ impl // channel if the request was initiated internally (e.g. for maintenance). match request.direction { RequestDirection::Incoming { id, source } => { - let response = - self.handle_request(request.request.clone(), id.clone(), source.clone()); + let response = self.handle_request(request.request.clone(), id.clone(), &source); // Send response to responder if present. if let Some(responder) = request.responder { let _ = responder.send(response); @@ -552,7 +569,12 @@ impl debug!("[{:?}] Initializing request", self.protocol); match request { Request::FindContent(find_content) => { - Ok(Response::Content(self.handle_find_content(find_content)?)) + // TODO: Creating random NodeID here just to satisfy handle_find_content signature + // is a hack on top of the history RecursiveFindContent temporally hack. + let node_id = NodeId::random(); + Ok(Response::Content( + self.handle_find_content(find_content, &node_id)?, + )) } _ => Err(OverlayRequestError::InvalidRequest( "Initializing this overlay service request is not yet supported.".to_string(), @@ -565,23 +587,23 @@ impl &mut self, request: Request, id: RequestId, - source: NodeId, + source: &NodeId, ) -> Result { debug!("[{:?}] Handling request {}", self.protocol, id); match request { - Request::Ping(ping) => Ok(Response::Pong(self.handle_ping(ping, source))), + Request::Ping(ping) => Ok(Response::Pong(self.handle_ping(ping, &source))), Request::FindNodes(find_nodes) => { Ok(Response::Nodes(self.handle_find_nodes(find_nodes))) } - Request::FindContent(find_content) => { - Ok(Response::Content(self.handle_find_content(find_content)?)) - } - Request::Offer(offer) => Ok(Response::Accept(self.handle_offer(offer)?)), + Request::FindContent(find_content) => Ok(Response::Content( + self.handle_find_content(find_content, &source)?, + )), + Request::Offer(offer) => Ok(Response::Accept(self.handle_offer(offer, source)?)), } } /// Builds a `Pong` response for a `Ping` request. - fn handle_ping(&self, request: Ping, source: NodeId) -> Pong { + fn handle_ping(&self, request: Ping, source: &NodeId) -> Pong { debug!( "[{:?}] Handling ping request from node={}. Ping={:?}", self.protocol, source, request @@ -611,7 +633,11 @@ impl } /// Attempts to build a `Content` response for a `FindContent` request. - fn handle_find_content(&self, request: FindContent) -> Result { + fn handle_find_content( + &self, + request: FindContent, + source: &NodeId, + ) -> Result { self.metrics .as_ref() .and_then(|m| Some(m.report_inbound_find_content())); @@ -647,6 +673,7 @@ impl // also listen on conn_id + 1 because this is the receive path let conn_id_recv = conn_id.wrapping_add(1); + let utp_request = UtpListenerRequest::FindContentStream(conn_id_recv); if let Err(err) = self.utp_listener_tx.send(utp_request) { return Err(OverlayRequestError::UtpError(format!( @@ -654,6 +681,8 @@ impl ))); } + self.add_utp_connection(source, conn_id_recv)?; + // Connection id is send as BE because uTP header values are stored also as BE Ok(Content::ConnectionId(conn_id.to_be())) } @@ -673,7 +702,7 @@ impl } /// Attempts to build an `Accept` response for an `Offer` request. - fn handle_offer(&self, request: Offer) -> Result { + fn handle_offer(&self, request: Offer, source: &NodeId) -> Result { self.metrics .as_ref() .and_then(|m| Some(m.report_inbound_offer())); @@ -721,6 +750,7 @@ impl // also listen on conn_id + 1 because this is the actual receive path for acceptor let conn_id_recv = conn_id.wrapping_add(1); + let utp_request = UtpListenerRequest::AcceptStream(conn_id_recv, accept_keys); if let Err(err) = self.utp_listener_tx.send(utp_request) { return Err(OverlayRequestError::UtpError(format!( @@ -728,6 +758,8 @@ impl ))); } + self.add_utp_connection(source, conn_id)?; + let accept = Accept { connection_id: conn_id.to_be(), content_keys: requested_keys, diff --git a/trin-core/src/utp/stream.rs b/trin-core/src/utp/stream.rs index c3e205c2a..2e2094b91 100644 --- a/trin-core/src/utp/stream.rs +++ b/trin-core/src/utp/stream.rs @@ -62,7 +62,7 @@ const MAX_BASE_DELAY_AGE: Delay = Delay(60_000_000); // Discv5 socket timeout in milliseconds const DISCV5_SOCKET_TIMEOUT: u64 = 25; /// Process uTP streams interval in milliseconds -const PROCESS_UTP_STREAMS_INTERVAL: u64 = 20; +// const PROCESS_UTP_STREAMS_INTERVAL: u64 = 20; /// uTP connection id type ConnId = u16; @@ -116,8 +116,15 @@ struct DelayDifferenceSample { pub enum UtpListenerRequest { /// Request to listen for Accept stream AcceptStream(ConnId, Vec), - /// Request to initialize uTP streram with remote node - Connect(ConnId, NodeId, oneshot::Sender>), + /// Request to initialize uTP stream with remote node + Connect( + ConnId, + NodeId, + ProtocolId, + oneshot::Sender>, + ), + /// Request to add uTP stream to the active connections + AddActiveConnection(Enr, ProtocolId, ConnId), /// Request to listen for FindCOntent stream and send content data FindContentData(ConnId, ByteList), /// Request to listen for FindContent stream @@ -183,8 +190,8 @@ impl UtpListener { /// The main execution loop of the UtpListener service. pub async fn start(&mut self) { - let mut process_utp_streams_interval = - tokio::time::interval(Duration::from_millis(PROCESS_UTP_STREAMS_INTERVAL)); + // let mut process_utp_streams_interval = + // tokio::time::interval(Duration::from_millis(PROCESS_UTP_STREAMS_INTERVAL)); loop { tokio::select! { Some(utp_request) = self.utp_event_rx.recv() => { @@ -193,14 +200,14 @@ impl UtpListener { Some(overlay_request) = self.overlay_rx.recv() => { self.process_overlay_request(overlay_request).await }, - _ = process_utp_streams_interval.tick() => { - let processed_streams = self.process_closed_streams(); - - if let Err(err) = self.overlay_tx.send(UtpListenerEvent::ProcessedClosedStreams(processed_streams)) { - error!("Unable to send ProcessClosedStreams event to overlay layer: {err}"); - continue - } - } + // _ = process_utp_streams_interval.tick() => { + // let processed_streams = self.process_closed_streams(); + // + // if let Err(err) = self.overlay_tx.send(UtpListenerEvent::ProcessedClosedStreams(processed_streams)) { + // error!("Unable to send ProcessClosedStreams event to overlay layer: {err}"); + // continue + // } + // } } } } @@ -231,9 +238,8 @@ impl UtpListener { } } PacketType::Syn => { - if let Some(enr) = self.discovery.discv5.find_enr(node_id) { - // If neither of those cases happened handle this is a new request - let mut conn = UtpSocket::new(Arc::clone(&self.discovery), enr.clone()); + let conn_key = ConnectionKey::new(*node_id, connection_id); + if let Some(conn) = self.utp_connections.get_mut(&conn_key) { if conn.discv5_tx.send(packet).is_err() { error!("Unable to send SYN packet to uTP stream handler"); return; @@ -246,11 +252,6 @@ impl UtpListener { return; } - self.utp_connections.insert( - ConnectionKey::new(*node_id, conn.receiver_connection_id), - conn.clone(), - ); - // Get ownership of FindContentData and re-add the receiver connection let utp_message_id = self.listening.remove(&conn.sender_connection_id); @@ -263,37 +264,34 @@ impl UtpListener { utp_message_id { // We want to send uTP data only if the content is Content(ByteList) - tokio::spawn(async move { - debug!( - "Sending content data via uTP with len: {}", - content_data.len() - ); - // send the content to the requestor over a uTP stream - if let Err(msg) = conn - .send_to( - &UtpMessage::new(content_data.as_ssz_bytes()).encode() - [..], - ) - .await - { - error!("Error sending content {msg}"); - } else { - // Close uTP connection - if let Err(msg) = conn.close().await { - error!("Unable to close uTP connection!: {msg}") - } - }; - }); + debug!( + "Sending content data via uTP with len: {}", + content_data.len() + ); + // send the content to the requestor over a uTP stream + if let Err(msg) = conn + .send_to( + &UtpMessage::new(content_data.as_ssz_bytes()).encode()[..], + ) + .await + { + error!("Error sending content {msg}"); + } else { + // Close uTP connection + if let Err(msg) = conn.close().await { + error!("Unable to close uTP connection!: {msg}") + } + }; } } else { - warn!("Query requested an unknown ENR"); + warn!("Received SYN packet for an unknown active uTP stream"); } } // Receive DATA and FIN packets PacketType::Data => { if let Some(conn) = self .utp_connections - .get_mut(&ConnectionKey::new(*node_id, connection_id)) + .get_mut(&ConnectionKey::new(*node_id, connection_id - 1)) { if conn.discv5_tx.send(packet.clone()).is_err() { error!("Unable to send DATA packet to uTP stream handler"); @@ -312,12 +310,14 @@ impl UtpListener { } Err(err) => error!("Unable to receive uTP DATA packet: {err}"), } + } else { + warn!("Received DATA packet for an unknown active uTP stream") } } PacketType::Fin => { if let Some(conn) = self .utp_connections - .get_mut(&ConnectionKey::new(*node_id, connection_id)) + .get_mut(&ConnectionKey::new(*node_id, connection_id - 1)) { if conn.discv5_tx.send(packet).is_err() { error!("Unable to send FIN packet to uTP stream handler"); @@ -328,6 +328,8 @@ impl UtpListener { if let Err(msg) = conn.recv(&mut buf).await { error!("Unable to receive uTP FIN packet: {msg}") } + } else { + warn!("Received FIN packet for an unknown active uTP stream") } } PacketType::State => { @@ -340,6 +342,8 @@ impl UtpListener { } // We don't handle STATE packets here, because the uTP client is handling them // implicitly in the background when sending FIN packet with conn.close() + } else { + warn!("Received STATE packet for an unknown active uTP stream"); } } } @@ -353,12 +357,21 @@ impl UtpListener { /// Process overlay uTP requests async fn process_overlay_request(&mut self, request: UtpListenerRequest) { match request { + UtpListenerRequest::AddActiveConnection(connected_to, protocol_id, conn_id_recv) => { + let conn = UtpSocket::new( + Arc::clone(&self.discovery), + connected_to.clone(), + protocol_id, + ); + let conn_key = ConnectionKey::new(connected_to.node_id(), conn_id_recv); + self.utp_connections.insert(conn_key, conn); + } UtpListenerRequest::FindContentStream(conn_id) => { self.listening .insert(conn_id, UtpStreamId::FindContentStream); } - UtpListenerRequest::Connect(conn_id, node_id, tx) => { - let conn = self.connect(conn_id, node_id).await; + UtpListenerRequest::Connect(conn_id, node_id, protocol_id, tx) => { + let conn = self.connect(conn_id, node_id, protocol_id).await; if tx.send(conn).is_err() { error!("Unable to send uTP socket to requester") }; @@ -382,9 +395,10 @@ impl UtpListener { &mut self, connection_id: ConnId, node_id: NodeId, + protocol_id: ProtocolId, ) -> anyhow::Result { if let Some(enr) = self.discovery.discv5.find_enr(&node_id) { - let mut conn = UtpSocket::new(Arc::clone(&self.discovery), enr); + let mut conn = UtpSocket::new(Arc::clone(&self.discovery), enr, protocol_id); conn.make_connection(connection_id).await; self.utp_connections.insert( ConnectionKey::new(node_id, conn.receiver_connection_id), @@ -435,9 +449,12 @@ pub struct UtpSocket { /// Socket state pub state: SocketState, - // Remote peer + /// ENR of the connected remote peer connected_to: Enr, + /// Overlay protocol identifier + protocol_id: ProtocolId, + /// Sequence number for the next packet seq_nr: u16, @@ -517,13 +534,14 @@ pub struct UtpSocket { } impl UtpSocket { - pub fn new(socket: Arc, connected_to: Enr) -> Self { + pub fn new(socket: Arc, connected_to: Enr, protocol_id: ProtocolId) -> Self { let (receiver_id, sender_id) = generate_sequential_identifiers(); let (discv5_tx, discv5_rx) = unbounded_channel::(); Self { state: SocketState::Uninitialized, + protocol_id, seq_nr: 1, ack_nr: 0, receiver_connection_id: receiver_id, @@ -800,6 +818,7 @@ impl UtpSocket { } // Reset connection if connection id doesn't match and this isn't a SYN + if packet.get_type() != PacketType::Syn && self.state != SocketState::SynSent && !(packet.connection_id() == self.sender_connection_id @@ -1437,7 +1456,7 @@ mod tests { let discv5 = Arc::new(discv5); - let conn = UtpSocket::new(Arc::clone(&discv5), enr); + let conn = UtpSocket::new(Arc::clone(&discv5), enr, ProtocolId::History); // TODO: Create `Discv5Socket` struct to encapsulate all socket logic spawn_socket_recv(Arc::clone(&discv5), conn.clone()); @@ -1457,7 +1476,7 @@ mod tests { let discv5 = Arc::new(discv5); - let conn = UtpSocket::new(Arc::clone(&discv5), connected_to); + let conn = UtpSocket::new(Arc::clone(&discv5), connected_to, ProtocolId::History); spawn_socket_recv(Arc::clone(&discv5), conn.clone()); (discv5.local_enr(), conn) diff --git a/utp-testing/src/main.rs b/utp-testing/src/main.rs index 2c53a4cc8..54d8e27ec 100644 --- a/utp-testing/src/main.rs +++ b/utp-testing/src/main.rs @@ -30,9 +30,12 @@ impl TestApp { .send(UtpListenerRequest::OfferStream(conn_id)); let (tx, rx) = tokio::sync::oneshot::channel::>(); - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::Connect(conn_id, enr.node_id(), tx)); + let _ = self.utp_listener_tx.send(UtpListenerRequest::Connect( + conn_id, + enr.node_id(), + ProtocolId::History, + tx, + )); let mut conn = rx.await.unwrap().unwrap(); @@ -72,7 +75,7 @@ impl TestApp { }); } - async fn prepare_to_receive(&self, conn_id: u16) { + async fn prepare_to_receive(&self, source: Enr, conn_id: u16) { // listen for incoming connection request on conn_id, as part of utp handshake let _ = self .utp_listener_tx @@ -83,6 +86,14 @@ impl TestApp { let _ = self .utp_listener_tx .send(UtpListenerRequest::OfferStream(conn_id_recv)); + + let _ = self + .utp_listener_tx + .send(UtpListenerRequest::AddActiveConnection( + source, + ProtocolId::History, + conn_id, + )); } } @@ -114,7 +125,9 @@ async fn main() { .await .unwrap(); - server.prepare_to_receive(connection_id).await; + server + .prepare_to_receive(client.discovery.discv5.local_enr(), connection_id) + .await; client .send_utp_request(connection_id, payload, server_enr) From 29703a1df94653af3d46fe76dc19aa679389e9ea Mon Sep 17 00:00:00 2001 From: Ognyan Genev Date: Tue, 24 May 2022 16:20:10 +0300 Subject: [PATCH 3/6] Add UtpSocket -> UtpListener event channel and emit an event upon uTP socket closing --- trin-core/src/utp/stream.rs | 66 ++++++++++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 4 deletions(-) diff --git a/trin-core/src/utp/stream.rs b/trin-core/src/utp/stream.rs index 2e2094b91..a473962c3 100644 --- a/trin-core/src/utp/stream.rs +++ b/trin-core/src/utp/stream.rs @@ -141,6 +141,15 @@ pub enum UtpListenerEvent { ProcessedClosedStreams(ProcessedClosedStreams), } +/// uTP stream state events emitted from UtpSocket +#[derive(Clone, Debug)] +pub enum UtpStreamEvent { + /// Event containing received uTP payload, protocol id and receive connection id + Closed(UtpPayload, ProtocolId, ConnId), + /// Event containing protocol id and receive connection id + Reset(ProtocolId, ConnId), +} + /// Main uTP service used to listen and handle all uTP connections and streams pub struct UtpListener { /// Base discv5 layer @@ -155,6 +164,10 @@ pub struct UtpListener { overlay_tx: UnboundedSender, /// Receiver for uTP requests sent from the overlay layer overlay_rx: UnboundedReceiver, + /// Sender used in UtpSocket to emit stream state events + stream_tx: UnboundedSender, + /// Receiver for uTP stream state events + stream_rx: UnboundedReceiver, } impl UtpListener { @@ -172,6 +185,8 @@ impl UtpListener { let (utp_listener_tx, utp_listener_rx) = unbounded_channel::(); // Channel to emit processed uTP payload to overlay service let (overlay_tx, overlay_rx) = unbounded_channel::(); + // Channel to emit stream events from UtpSocket + let (stream_tx, stream_rx) = unbounded_channel::(); ( utp_event_tx, @@ -184,6 +199,8 @@ impl UtpListener { utp_event_rx, overlay_tx, overlay_rx: utp_listener_rx, + stream_tx, + stream_rx, }, ) } @@ -362,6 +379,7 @@ impl UtpListener { Arc::clone(&self.discovery), connected_to.clone(), protocol_id, + Some(self.stream_tx.clone()), ); let conn_key = ConnectionKey::new(connected_to.node_id(), conn_id_recv); self.utp_connections.insert(conn_key, conn); @@ -398,7 +416,12 @@ impl UtpListener { protocol_id: ProtocolId, ) -> anyhow::Result { if let Some(enr) = self.discovery.discv5.find_enr(&node_id) { - let mut conn = UtpSocket::new(Arc::clone(&self.discovery), enr, protocol_id); + let mut conn = UtpSocket::new( + Arc::clone(&self.discovery), + enr, + protocol_id, + Some(self.stream_tx.clone()), + ); conn.make_connection(connection_id).await; self.utp_connections.insert( ConnectionKey::new(node_id, conn.receiver_connection_id), @@ -530,11 +553,20 @@ pub struct UtpSocket { /// Receive channel for discv5 socket discv5_rx: Arc>>, + /// Sender to emit stream events to UtpListener + listener_tx: Option>, + + /// Store received uTP payload data over the stream pub recv_data_stream: Vec, } impl UtpSocket { - pub fn new(socket: Arc, connected_to: Enr, protocol_id: ProtocolId) -> Self { + pub fn new( + socket: Arc, + connected_to: Enr, + protocol_id: ProtocolId, + utp_listener_tx: Option>, + ) -> Self { let (receiver_id, sender_id) = generate_sequential_identifiers(); let (discv5_tx, discv5_rx) = unbounded_channel::(); @@ -570,6 +602,7 @@ impl UtpSocket { max_retransmission_retries: MAX_RETRANSMISSION_RETRIES, discv5_tx, discv5_rx: Arc::new(RwLock::new(discv5_rx)), + listener_tx: utp_listener_tx, } } @@ -901,6 +934,8 @@ impl UtpSocket { // Give up, the remote peer might not care about our missing packets self.state = SocketState::Closed; + self.emit_close_event(); + Ok(Some(reply)) } // Confirm with STATE packet when socket state is `Closed` and we receive FIN packet @@ -910,6 +945,7 @@ impl UtpSocket { (SocketState::FinSent, PacketType::State) => { if packet.ack_nr() == self.seq_nr { self.state = SocketState::Closed; + self.emit_close_event(); } else { self.handle_state_packet(packet).await; } @@ -918,6 +954,15 @@ impl UtpSocket { // Reset connection when receiving RESET packet (_, PacketType::Reset) => { self.state = SocketState::ResetReceived; + // Emit socket state event to UtpListener. Panic if error. + if let Some(listener_tx) = self.listener_tx.clone() { + listener_tx + .send(UtpStreamEvent::Reset( + self.protocol_id.clone(), + self.receiver_connection_id, + )) + .unwrap(); + } Err(anyhow!("Connection reset by remote peer")) } (state, ty) => { @@ -928,6 +973,19 @@ impl UtpSocket { } } + /// Emit socket state event to UtpListener. Panic if error. + fn emit_close_event(&mut self) { + if let Some(listener_tx) = self.listener_tx.clone() { + listener_tx + .send(UtpStreamEvent::Closed( + self.recv_data_stream.clone(), + self.protocol_id.clone(), + self.receiver_connection_id, + )) + .unwrap(); + } + } + fn prepare_reply(&self, original: &Packet, t: PacketType) -> Packet { let mut resp = Packet::new(); resp.set_type(t); @@ -1456,7 +1514,7 @@ mod tests { let discv5 = Arc::new(discv5); - let conn = UtpSocket::new(Arc::clone(&discv5), enr, ProtocolId::History); + let conn = UtpSocket::new(Arc::clone(&discv5), enr, ProtocolId::History, None); // TODO: Create `Discv5Socket` struct to encapsulate all socket logic spawn_socket_recv(Arc::clone(&discv5), conn.clone()); @@ -1476,7 +1534,7 @@ mod tests { let discv5 = Arc::new(discv5); - let conn = UtpSocket::new(Arc::clone(&discv5), connected_to, ProtocolId::History); + let conn = UtpSocket::new(Arc::clone(&discv5), connected_to, ProtocolId::History, None); spawn_socket_recv(Arc::clone(&discv5), conn.clone()); (discv5.local_enr(), conn) From 37a56fa5c79a21d7bcc0bd8cafd1052df60b7d89 Mon Sep 17 00:00:00 2001 From: Ognyan Genev Date: Wed, 25 May 2022 12:52:17 +0300 Subject: [PATCH 4/6] Move UtpStreamId inside UtpSocket --- trin-core/src/portalnet/overlay.rs | 4 +- trin-core/src/portalnet/overlay_service.rs | 21 +++++++--- trin-core/src/utp/stream.rs | 46 +++++++++++++++++----- trin-core/src/utp/trin_helpers.rs | 8 +++- utp-testing/src/main.rs | 4 +- 5 files changed, 63 insertions(+), 20 deletions(-) diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index d1f0eba80..7c472b101 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -19,7 +19,7 @@ use crate::{ portalnet::types::content_key::RawContentKey, utp::{ stream::{UtpListenerEvent, UtpListenerRequest, UtpSocket, BUF_SIZE}, - trin_helpers::{UtpAccept, UtpMessage}, + trin_helpers::{UtpAccept, UtpMessage, UtpStreamId}, }, }; use discv5::{ @@ -299,6 +299,7 @@ impl conn_id, enr.node_id(), self.protocol.clone(), + UtpStreamId::FindContentStream, tx, )) .map_err(|err| { @@ -412,6 +413,7 @@ impl conn_id, enr.node_id(), self.protocol.clone(), + UtpStreamId::OfferStream, tx, )).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?; diff --git a/trin-core/src/portalnet/overlay_service.rs b/trin-core/src/portalnet/overlay_service.rs index 64f7bccfa..2b0a08e29 100644 --- a/trin-core/src/portalnet/overlay_service.rs +++ b/trin-core/src/portalnet/overlay_service.rs @@ -450,11 +450,16 @@ impl &self, source: &NodeId, conn_id_recv: u16, + stream_id: UtpStreamId, ) -> Result<(), OverlayRequestError> { if let Some(enr) = self.discovery.discv5.find_enr(source) { // Initialize active uTP stream with requested note - let utp_request = - UtpListenerRequest::AddActiveConnection(enr, self.protocol.clone(), conn_id_recv); + let utp_request = UtpListenerRequest::AddActiveConnection( + enr, + self.protocol.clone(), + stream_id, + conn_id_recv, + ); if let Err(err) = self.utp_listener_tx.send(utp_request) { return Err(OverlayRequestError::UtpError(format!( "Unable to send uTP AddActiveConnection request: {err}" @@ -664,7 +669,7 @@ impl // listen for incoming connection request on conn_id, as part of utp handshake and // temporarily storing content data, so we can send it right after we receive // SYN packet from the requester - let utp_request = UtpListenerRequest::FindContentData(conn_id, content); + let utp_request = UtpListenerRequest::FindContentData(conn_id, content.clone()); if let Err(err) = self.utp_listener_tx.send(utp_request) { return Err(OverlayRequestError::UtpError(format!( "Unable to send uTP FindContentData stream request: {err}" @@ -681,7 +686,11 @@ impl ))); } - self.add_utp_connection(source, conn_id_recv)?; + self.add_utp_connection( + source, + conn_id_recv, + UtpStreamId::FindContentData(content), + )?; // Connection id is send as BE because uTP header values are stored also as BE Ok(Content::ConnectionId(conn_id.to_be())) @@ -751,14 +760,14 @@ impl // also listen on conn_id + 1 because this is the actual receive path for acceptor let conn_id_recv = conn_id.wrapping_add(1); - let utp_request = UtpListenerRequest::AcceptStream(conn_id_recv, accept_keys); + let utp_request = UtpListenerRequest::AcceptStream(conn_id_recv, accept_keys.clone()); if let Err(err) = self.utp_listener_tx.send(utp_request) { return Err(OverlayRequestError::UtpError(format!( "Unable to send uTP Accept stream request: {err}" ))); } - self.add_utp_connection(source, conn_id)?; + self.add_utp_connection(source, conn_id, UtpStreamId::AcceptStream(accept_keys))?; let accept = Accept { connection_id: conn_id.to_be(), diff --git a/trin-core/src/utp/stream.rs b/trin-core/src/utp/stream.rs index a473962c3..dda63edac 100644 --- a/trin-core/src/utp/stream.rs +++ b/trin-core/src/utp/stream.rs @@ -23,7 +23,7 @@ use crate::{ locks::RwLoggingExt, portalnet::types::{ content_key::RawContentKey, - messages::{ByteList, Content::Content, ProtocolId}, + messages::{ByteList, ProtocolId}, }, utp::{ packets::{ExtensionType, Packet, PacketType, HEADER_SIZE}, @@ -121,10 +121,11 @@ pub enum UtpListenerRequest { ConnId, NodeId, ProtocolId, + UtpStreamId, oneshot::Sender>, ), /// Request to add uTP stream to the active connections - AddActiveConnection(Enr, ProtocolId, ConnId), + AddActiveConnection(Enr, ProtocolId, UtpStreamId, ConnId), /// Request to listen for FindCOntent stream and send content data FindContentData(ConnId, ByteList), /// Request to listen for FindContent stream @@ -277,8 +278,7 @@ impl UtpListener { self.listening .insert(conn.sender_connection_id, UtpStreamId::FindContentStream); - if let Some(UtpStreamId::FindContentData(Content(content_data))) = - utp_message_id + if let Some(UtpStreamId::FindContentData(content_data)) = utp_message_id { // We want to send uTP data only if the content is Content(ByteList) debug!( @@ -374,11 +374,17 @@ impl UtpListener { /// Process overlay uTP requests async fn process_overlay_request(&mut self, request: UtpListenerRequest) { match request { - UtpListenerRequest::AddActiveConnection(connected_to, protocol_id, conn_id_recv) => { + UtpListenerRequest::AddActiveConnection( + connected_to, + protocol_id, + stream_id, + conn_id_recv, + ) => { let conn = UtpSocket::new( Arc::clone(&self.discovery), connected_to.clone(), protocol_id, + stream_id, Some(self.stream_tx.clone()), ); let conn_key = ConnectionKey::new(connected_to.node_id(), conn_id_recv); @@ -388,8 +394,8 @@ impl UtpListener { self.listening .insert(conn_id, UtpStreamId::FindContentStream); } - UtpListenerRequest::Connect(conn_id, node_id, protocol_id, tx) => { - let conn = self.connect(conn_id, node_id, protocol_id).await; + UtpListenerRequest::Connect(conn_id, node_id, protocol_id, stream_id, tx) => { + let conn = self.connect(conn_id, node_id, protocol_id, stream_id).await; if tx.send(conn).is_err() { error!("Unable to send uTP socket to requester") }; @@ -399,7 +405,7 @@ impl UtpListener { } UtpListenerRequest::FindContentData(conn_id, content) => { self.listening - .insert(conn_id, UtpStreamId::FindContentData(Content(content))); + .insert(conn_id, UtpStreamId::FindContentData(content)); } UtpListenerRequest::AcceptStream(conn_id, accepted_keys) => { self.listening @@ -414,12 +420,14 @@ impl UtpListener { connection_id: ConnId, node_id: NodeId, protocol_id: ProtocolId, + stream_id: UtpStreamId, ) -> anyhow::Result { if let Some(enr) = self.discovery.discv5.find_enr(&node_id) { let mut conn = UtpSocket::new( Arc::clone(&self.discovery), enr, protocol_id, + stream_id, Some(self.stream_tx.clone()), ); conn.make_connection(connection_id).await; @@ -478,6 +486,9 @@ pub struct UtpSocket { /// Overlay protocol identifier protocol_id: ProtocolId, + /// Overlay uTP stream id + stream_id: UtpStreamId, + /// Sequence number for the next packet seq_nr: u16, @@ -565,6 +576,7 @@ impl UtpSocket { socket: Arc, connected_to: Enr, protocol_id: ProtocolId, + stream_id: UtpStreamId, utp_listener_tx: Option>, ) -> Self { let (receiver_id, sender_id) = generate_sequential_identifiers(); @@ -574,6 +586,7 @@ impl UtpSocket { Self { state: SocketState::Uninitialized, protocol_id, + stream_id, seq_nr: 1, ack_nr: 0, receiver_connection_id: receiver_id, @@ -1482,6 +1495,7 @@ mod tests { packets::{Packet, PacketType}, stream::{SocketState, UtpSocket, BUF_SIZE}, time::now_microseconds, + trin_helpers::UtpStreamId, }, }; use discv5::Discv5Event; @@ -1514,7 +1528,13 @@ mod tests { let discv5 = Arc::new(discv5); - let conn = UtpSocket::new(Arc::clone(&discv5), enr, ProtocolId::History, None); + let conn = UtpSocket::new( + Arc::clone(&discv5), + enr, + ProtocolId::History, + UtpStreamId::OfferStream, + None, + ); // TODO: Create `Discv5Socket` struct to encapsulate all socket logic spawn_socket_recv(Arc::clone(&discv5), conn.clone()); @@ -1534,7 +1554,13 @@ mod tests { let discv5 = Arc::new(discv5); - let conn = UtpSocket::new(Arc::clone(&discv5), connected_to, ProtocolId::History, None); + let conn = UtpSocket::new( + Arc::clone(&discv5), + connected_to, + ProtocolId::History, + UtpStreamId::OfferStream, + None, + ); spawn_socket_recv(Arc::clone(&discv5), conn.clone()); (discv5.local_enr(), conn) diff --git a/trin-core/src/utp/trin_helpers.rs b/trin-core/src/utp/trin_helpers.rs index 3b434cdaf..f7c747b93 100644 --- a/trin-core/src/utp/trin_helpers.rs +++ b/trin-core/src/utp/trin_helpers.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] // These are just some Trin helper functions -use crate::portalnet::types::{content_key::RawContentKey, messages::Content}; +use crate::portalnet::types::{content_key::RawContentKey, messages::ByteList}; use ssz_derive::{Decode, Encode}; // These Utp impl are related to sending messages over uTP not the implementation itself or stream @@ -54,9 +54,13 @@ pub struct UtpAccept { /// Used to track which stream to which overlay request correspond #[derive(Debug, Clone, PartialEq)] pub enum UtpStreamId { + /// Stream id to initialize FindContent uTP connection and to listen for content payload FindContentStream, - FindContentData(Content), + /// Stream id to listen for incoming FindContent connection and to send back the content data to the requester + FindContentData(ByteList), + /// Stream id to send requested content from received ACCEPT response OfferStream, + /// Stream id to listen for OFFER uTP payload. Contains requested content keys. AcceptStream(Vec), } diff --git a/utp-testing/src/main.rs b/utp-testing/src/main.rs index 54d8e27ec..cfe020eb6 100644 --- a/utp-testing/src/main.rs +++ b/utp-testing/src/main.rs @@ -11,7 +11,7 @@ use trin_core::{ socket, utp::{ stream::{UtpListener, UtpListenerRequest, UtpListenerUnboundedReceiver, UtpSocket}, - trin_helpers::UtpMessage, + trin_helpers::{UtpMessage, UtpStreamId}, }, }; @@ -34,6 +34,7 @@ impl TestApp { conn_id, enr.node_id(), ProtocolId::History, + UtpStreamId::OfferStream, tx, )); @@ -92,6 +93,7 @@ impl TestApp { .send(UtpListenerRequest::AddActiveConnection( source, ProtocolId::History, + UtpStreamId::OfferStream, conn_id, )); } From 357a5ebf4ec6e9fd42f73ba29e3b27433154a65a Mon Sep 17 00:00:00 2001 From: Ognyan Genev Date: Wed, 25 May 2022 14:17:32 +0300 Subject: [PATCH 5/6] Remove redundant listening hashmap from uTP listener and clear dead code --- src/lib.rs | 4 +- trin-core/src/portalnet/overlay.rs | 21 +---- trin-core/src/portalnet/overlay_service.rs | 89 ++-------------------- trin-core/src/utp/stream.rs | 81 ++------------------ trin-core/tests/overlay.rs | 5 +- trin-history/src/lib.rs | 12 +-- trin-history/src/network.rs | 9 +-- trin-state/src/lib.rs | 12 +-- trin-state/src/network.rs | 9 +-- utp-testing/src/main.rs | 18 +---- 10 files changed, 28 insertions(+), 232 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2b41dc2f9..108b9e334 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,7 +52,7 @@ pub async fn run_trin( }; // Initialize and spawn UTP listener - let (utp_events_tx, utp_listener_tx, utp_listener_rx, mut utp_listener) = + let (utp_events_tx, utp_listener_tx, _, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); @@ -67,7 +67,6 @@ pub async fn run_trin( initialize_state_network( &discovery, utp_listener_tx.clone(), - utp_listener_rx.clone(), portalnet_config.clone(), storage_config.clone(), ) @@ -86,7 +85,6 @@ pub async fn run_trin( initialize_history_network( &discovery, utp_listener_tx, - utp_listener_rx, portalnet_config.clone(), storage_config.clone(), ) diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index 7c472b101..00fbd5fbf 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -18,7 +18,7 @@ use crate::portalnet::{ use crate::{ portalnet::types::content_key::RawContentKey, utp::{ - stream::{UtpListenerEvent, UtpListenerRequest, UtpSocket, BUF_SIZE}, + stream::{UtpListenerRequest, UtpSocket, BUF_SIZE}, trin_helpers::{UtpAccept, UtpMessage, UtpStreamId}, }, }; @@ -32,10 +32,7 @@ use futures::channel::oneshot; use parking_lot::RwLock; use ssz::Encode; use ssz_types::VariableList; -use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - RwLock as TRwLock, -}; +use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, warn}; pub use super::overlay_service::{OverlayRequestError, RequestDirection}; @@ -101,7 +98,6 @@ impl config: OverlayConfig, discovery: Arc, utp_listener_tx: UnboundedSender, - utp_listener_rx: Arc>>, storage: Arc>, data_radius: U256, protocol: ProtocolId, @@ -124,7 +120,6 @@ impl Arc::clone(&data_radius), protocol.clone(), utp_listener_tx.clone(), - utp_listener_rx, config.enable_metrics, ) .await @@ -285,13 +280,6 @@ impl enr: Enr, conn_id: u16, ) -> Result { - let utp_request = UtpListenerRequest::FindContentStream(conn_id); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(OverlayRequestError::UtpError(format!( - "Unable to send uTP FindContent stream request: {err}" - ))); - } - // initiate the connection to the acceptor let (tx, rx) = tokio::sync::oneshot::channel::>(); self.utp_listener_tx @@ -401,11 +389,6 @@ impl return Ok(response); } - let utp_request = UtpListenerRequest::OfferStream(conn_id); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(anyhow!("Unable to send uTP Offer stream request: {err}")); - } - // initiate the connection to the acceptor let (tx, rx) = tokio::sync::oneshot::channel::>(); diff --git a/trin-core/src/portalnet/overlay_service.rs b/trin-core/src/portalnet/overlay_service.rs index 2b0a08e29..50fc10e04 100644 --- a/trin-core/src/portalnet/overlay_service.rs +++ b/trin-core/src/portalnet/overlay_service.rs @@ -19,13 +19,7 @@ use crate::{ utp::stream::UtpListenerRequest, }; -use crate::{ - portalnet::types::content_key::RawContentKey, - utp::{ - stream::{UtpListenerEvent, UtpPayload}, - trin_helpers::UtpStreamId, - }, -}; +use crate::utp::trin_helpers::UtpStreamId; use delay_map::HashSetDelay; use discv5::{ enr::NodeId, @@ -43,10 +37,7 @@ use rand::seq::SliceRandom; use ssz::Encode; use ssz_types::{BitList, VariableList}; use thiserror::Error; -use tokio::sync::{ - mpsc::{self, UnboundedReceiver, UnboundedSender}, - RwLock as TRwLock, -}; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; /// Maximum number of ENRs in response to FindNodes. pub const FIND_NODES_MAX_NODES: usize = 32; @@ -265,8 +256,6 @@ pub struct OverlayService { response_tx: UnboundedSender, /// The sender half of a channel to send requests to uTP listener utp_listener_tx: UnboundedSender, - /// Receiver for UtpListener emitted events - utp_listener_rx: Arc>>, /// Phantom content key. phantom_content_key: PhantomData, /// Phantom metric (distance function). @@ -292,7 +281,6 @@ impl data_radius: Arc, protocol: ProtocolId, utp_listener_sender: UnboundedSender, - utp_listener_receiver: Arc>>, enable_metrics: bool, ) -> Result, String> { let (request_tx, request_rx) = mpsc::unbounded_channel(); @@ -325,7 +313,6 @@ impl response_rx, response_tx, utp_listener_tx: utp_listener_sender, - utp_listener_rx: utp_listener_receiver, phantom_content_key: PhantomData, phantom_metric: PhantomData, metrics, @@ -397,9 +384,6 @@ impl tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL)); loop { - // let utp_listener_rx = self.utp_listener_rx.clone(); - // let mut utp_listener_lock = utp_listener_rx.write_with_warn().await; - tokio::select! { Some(request) = self.request_rx.recv() => self.process_request(request), Some(response) = self.response_rx.recv() => { @@ -428,14 +412,6 @@ impl self.peers_to_ping.insert(node_id); } } - // Some(utp_event) = utp_listener_lock.recv() => { - // match utp_event { - // UtpListenerEvent::ProcessedClosedStreams(processed_streams) => - // { - // self.handle_utp_payload(processed_streams); - // } - // } - // } _ = OverlayService::::bucket_maintenance_poll(self.protocol.clone(), &self.kbuckets) => {} _ = bucket_refresh_interval.tick() => { debug!("[{:?}] Overlay bucket refresh lookup", self.protocol); @@ -666,26 +642,11 @@ impl } else { let conn_id: u16 = crate::utp::stream::rand(); - // listen for incoming connection request on conn_id, as part of utp handshake and - // temporarily storing content data, so we can send it right after we receive + // Listen for incoming uTP connection request on as part of uTP handshake and + // storing content data, so we can send it inside UtpListener right after we receive // SYN packet from the requester - let utp_request = UtpListenerRequest::FindContentData(conn_id, content.clone()); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(OverlayRequestError::UtpError(format!( - "Unable to send uTP FindContentData stream request: {err}" - ))); - } - - // also listen on conn_id + 1 because this is the receive path let conn_id_recv = conn_id.wrapping_add(1); - let utp_request = UtpListenerRequest::FindContentStream(conn_id_recv); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(OverlayRequestError::UtpError(format!( - "Unable to send uTP FindContent stream request: {err}" - ))); - } - self.add_utp_connection( source, conn_id_recv, @@ -704,8 +665,7 @@ impl } } Err(msg) => Err(OverlayRequestError::Failure(format!( - "Unable to respond to FindContent: {}", - msg + "Unable to respond to FindContent: {msg}", ))), } } @@ -748,24 +708,8 @@ impl })?; } - // listen for incoming connection request on conn_id, as part of utp handshake + // Listen for incoming connection request on conn_id, as part of utp handshake let conn_id: u16 = crate::utp::stream::rand(); - let utp_request = UtpListenerRequest::OfferStream(conn_id); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(OverlayRequestError::UtpError(format!( - "Unable to send uTP Offer stream request: {err}" - ))); - } - - // also listen on conn_id + 1 because this is the actual receive path for acceptor - let conn_id_recv = conn_id.wrapping_add(1); - - let utp_request = UtpListenerRequest::AcceptStream(conn_id_recv, accept_keys.clone()); - if let Err(err) = self.utp_listener_tx.send(utp_request) { - return Err(OverlayRequestError::UtpError(format!( - "Unable to send uTP Accept stream request: {err}" - ))); - } self.add_utp_connection(source, conn_id, UtpStreamId::AcceptStream(accept_keys))?; @@ -777,19 +721,6 @@ impl Ok(accept) } - /// Handle all closed uTP streams, currently we process only AcceptStream here. - /// FindContent payload is processed explicitly when we send FindContent request. - fn handle_utp_payload(&self, streams: Vec<(UtpPayload, UtpStreamId)>) { - for stream in streams { - match stream { - (payload, UtpStreamId::AcceptStream(content_keys)) => { - self.process_accept_utp_payload(content_keys, payload); - } - _ => {} - } - } - } - /// Sends a TALK request via Discovery v5 to some destination node. fn send_talk_req(&self, request: Request, request_id: OverlayRequestId, destination: Enr) { let discovery = Arc::clone(&self.discovery); @@ -821,12 +752,6 @@ impl }); } - /// Process accepted uTP payload of the OFFER?ACCEPT stream - fn process_accept_utp_payload(&self, content_keys: Vec, payload: UtpPayload) { - // TODO: Verify the payload, store the content and propagate gossip. - debug!("DEBUG: Processing content keys: {content_keys:?}, with payload: {payload:?}, protocol: {:?}", self.protocol); - } - /// Processes an incoming request from some source node. fn process_incoming_request(&mut self, request: Request, _id: RequestId, source: NodeId) { // Look up the node in the routing table. @@ -1354,7 +1279,6 @@ mod tests { let discovery = Arc::new(Discovery::new(portal_config).unwrap()); let (utp_listener_tx, _) = unbounded_channel::(); - let (_, utp_listener_rx) = unbounded_channel::(); // Initialize DB config let storage_capacity: u32 = DEFAULT_STORAGE_CAPACITY.parse().unwrap(); @@ -1392,7 +1316,6 @@ mod tests { response_tx, response_rx, utp_listener_tx, - utp_listener_rx: Arc::new(TRwLock::new(utp_listener_rx)), phantom_content_key: PhantomData, phantom_metric: PhantomData, metrics, diff --git a/trin-core/src/utp/stream.rs b/trin-core/src/utp/stream.rs index dda63edac..78e431be0 100644 --- a/trin-core/src/utp/stream.rs +++ b/trin-core/src/utp/stream.rs @@ -21,10 +21,7 @@ use tokio::{ use crate::{ locks::RwLoggingExt, - portalnet::types::{ - content_key::RawContentKey, - messages::{ByteList, ProtocolId}, - }, + portalnet::types::messages::ProtocolId, utp::{ packets::{ExtensionType, Packet, PacketType, HEADER_SIZE}, time::{now_microseconds, Delay, Timestamp}, @@ -114,8 +111,6 @@ struct DelayDifferenceSample { /// Represent overlay to uTP listener request. It is used as a way to communicate between the overlay protocol /// and uTP listener pub enum UtpListenerRequest { - /// Request to listen for Accept stream - AcceptStream(ConnId, Vec), /// Request to initialize uTP stream with remote node Connect( ConnId, @@ -126,12 +121,6 @@ pub enum UtpListenerRequest { ), /// Request to add uTP stream to the active connections AddActiveConnection(Enr, ProtocolId, UtpStreamId, ConnId), - /// Request to listen for FindCOntent stream and send content data - FindContentData(ConnId, ByteList), - /// Request to listen for FindContent stream - FindContentStream(ConnId), - /// Request to listen for Offer stream - OfferStream(ConnId), } /// Result from processing all closed uTP streams. Includes a tuple with the payload and the stream id. @@ -152,13 +141,13 @@ pub enum UtpStreamEvent { } /// Main uTP service used to listen and handle all uTP connections and streams +// FIXME: Deny dead_code +#[allow(dead_code)] pub struct UtpListener { /// Base discv5 layer discovery: Arc, /// Store all active connections utp_connections: HashMap, - /// uTP connection ids to listen for - listening: HashMap, /// Receiver for uTP events sent from the main portal event handler utp_event_rx: UnboundedReceiver, /// Sender to overlay layer with processed uTP stream @@ -196,7 +185,6 @@ impl UtpListener { UtpListener { discovery, utp_connections: HashMap::new(), - listening: HashMap::new(), utp_event_rx, overlay_tx, overlay_rx: utp_listener_rx, @@ -218,14 +206,6 @@ impl UtpListener { Some(overlay_request) = self.overlay_rx.recv() => { self.process_overlay_request(overlay_request).await }, - // _ = process_utp_streams_interval.tick() => { - // let processed_streams = self.process_closed_streams(); - // - // if let Err(err) = self.overlay_tx.send(UtpListenerEvent::ProcessedClosedStreams(processed_streams)) { - // error!("Unable to send ProcessClosedStreams event to overlay layer: {err}"); - // continue - // } - // } } } } @@ -270,15 +250,9 @@ impl UtpListener { return; } - // Get ownership of FindContentData and re-add the receiver connection - let utp_message_id = self.listening.remove(&conn.sender_connection_id); - - // TODO: Probably there is a better way with lifetimes to pass the HashMap value to a - // different thread without removing the key and re-adding it. - self.listening - .insert(conn.sender_connection_id, UtpStreamId::FindContentStream); - - if let Some(UtpStreamId::FindContentData(content_data)) = utp_message_id + // Send content data if the stream is listening for FindContent SYN packet + if let UtpStreamId::FindContentData(content_data) = + conn.stream_id.clone() { // We want to send uTP data only if the content is Content(ByteList) debug!( @@ -390,27 +364,12 @@ impl UtpListener { let conn_key = ConnectionKey::new(connected_to.node_id(), conn_id_recv); self.utp_connections.insert(conn_key, conn); } - UtpListenerRequest::FindContentStream(conn_id) => { - self.listening - .insert(conn_id, UtpStreamId::FindContentStream); - } UtpListenerRequest::Connect(conn_id, node_id, protocol_id, stream_id, tx) => { let conn = self.connect(conn_id, node_id, protocol_id, stream_id).await; if tx.send(conn).is_err() { error!("Unable to send uTP socket to requester") }; } - UtpListenerRequest::OfferStream(conn_id) => { - self.listening.insert(conn_id, UtpStreamId::OfferStream); - } - UtpListenerRequest::FindContentData(conn_id, content) => { - self.listening - .insert(conn_id, UtpStreamId::FindContentData(content)); - } - UtpListenerRequest::AcceptStream(conn_id, accepted_keys) => { - self.listening - .insert(conn_id, UtpStreamId::AcceptStream(accepted_keys)); - } } } @@ -440,34 +399,6 @@ impl UtpListener { Err(anyhow!("Trying to connect to unknow Enr")) } } - - /// Return and cleanup all active uTP streams where socket state is "Closed" - pub fn process_closed_streams(&mut self) -> Vec<(UtpPayload, UtpStreamId)> { - // This seems to be a hot loop, we may need to optimise it and find a better way to filter by closed - // connections without cloning all records. One reasonable way is to use some data-oriented - // design principles like Struct of Arrays vs. Array of Structs. - self.utp_connections - .clone() - .iter() - .filter(|conn| conn.1.state == SocketState::Closed) - .map(|conn| { - // Remove the closed connections from active connections - let receiver_stream_id = self - .listening - .remove(&conn.1.receiver_connection_id) - .expect("Receiver connection id should match active listening connections."); - self.listening - .remove(&conn.1.sender_connection_id) - .expect("Sender connection id should match active listening connections."); - let utp_socket = self - .utp_connections - .remove(conn.0) - .expect("uTP socket should match asctive utp connections."); - - (utp_socket.recv_data_stream, receiver_stream_id) - }) - .collect() - } } // Used to be MicroTransportProtocol impl but it is basically just called UtpStream compared to the diff --git a/trin-core/tests/overlay.rs b/trin-core/tests/overlay.rs index 93c4181f5..de726fee7 100644 --- a/trin-core/tests/overlay.rs +++ b/trin-core/tests/overlay.rs @@ -24,8 +24,7 @@ use tokio::{ time::{self, Duration}, }; -use tokio::sync::RwLock as TRwLock; -use trin_core::utp::stream::{UtpListenerEvent, UtpListenerRequest}; +use trin_core::utp::stream::UtpListenerRequest; async fn init_overlay( discovery: Arc, @@ -40,13 +39,11 @@ async fn init_overlay( let overlay_config = OverlayConfig::default(); // Ignore all uTP events let (utp_listener_tx, _) = unbounded_channel::(); - let (_, utp_listener_rx) = unbounded_channel::(); OverlayProtocol::new( overlay_config, discovery, utp_listener_tx, - Arc::new(TRwLock::new(utp_listener_rx)), db, U256::MAX, protocol, diff --git a/trin-history/src/lib.rs b/trin-history/src/lib.rs index 8086e7578..09cf3c2b4 100644 --- a/trin-history/src/lib.rs +++ b/trin-history/src/lib.rs @@ -9,10 +9,7 @@ use crate::{events::HistoryEvents, jsonrpc::HistoryRequestHandler}; use discv5::TalkRequest; use network::HistoryNetwork; use std::sync::Arc; -use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - RwLock, -}; +use tokio::sync::mpsc::UnboundedSender; use trin_core::{ cli::TrinConfig, jsonrpc::types::HistoryJsonRpcRequest, @@ -23,7 +20,7 @@ use trin_core::{ types::messages::PortalnetConfig, }, utils::bootnodes::parse_bootnodes, - utp::stream::{UtpListener, UtpListenerEvent, UtpListenerRequest}, + utp::stream::{UtpListener, UtpListenerRequest}, }; pub async fn main() -> Result<(), Box> { @@ -54,7 +51,7 @@ pub async fn main() -> Result<(), Box> { tokio::spawn(Arc::clone(&discovery).bucket_refresh_lookup()); // Initialize and spawn UTP listener - let (utp_sender, overlay_sender, overlay_receiver, mut utp_listener) = + let (utp_sender, overlay_sender, _, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); @@ -80,7 +77,6 @@ pub async fn main() -> Result<(), Box> { let history_network = HistoryNetwork::new( discovery.clone(), overlay_sender, - overlay_receiver, storage_config, portalnet_config.clone(), ) @@ -101,7 +97,6 @@ type HistoryJsonRpcTx = Option>; pub async fn initialize_history_network( discovery: &Arc, utp_listener_tx: UnboundedSender, - utp_listener_rx: Arc>>, portalnet_config: PortalnetConfig, storage_config: PortalStorageConfig, ) -> ( @@ -116,7 +111,6 @@ pub async fn initialize_history_network( let history_network = HistoryNetwork::new( Arc::clone(discovery), utp_listener_tx, - utp_listener_rx, storage_config, portalnet_config.clone(), ) diff --git a/trin-history/src/network.rs b/trin-history/src/network.rs index 0082d6f11..7541b37e5 100644 --- a/trin-history/src/network.rs +++ b/trin-history/src/network.rs @@ -3,10 +3,7 @@ use log::debug; use std::sync::Arc; use parking_lot::RwLock; -use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - RwLock as TRwLock, -}; +use tokio::sync::mpsc::UnboundedSender; use trin_core::{ portalnet::{ @@ -19,7 +16,7 @@ use trin_core::{ metric::XorMetric, }, }, - utp::stream::{UtpListenerEvent, UtpListenerRequest}, + utp::stream::UtpListenerRequest, }; /// History network layer on top of the overlay protocol. Encapsulates history network specific data and logic. @@ -32,7 +29,6 @@ impl HistoryNetwork { pub async fn new( discovery: Arc, utp_listener_tx: UnboundedSender, - utp_listener_rx: Arc>>, storage_config: PortalStorageConfig, portal_config: PortalnetConfig, ) -> Self { @@ -46,7 +42,6 @@ impl HistoryNetwork { config, discovery, utp_listener_tx, - utp_listener_rx, storage, portal_config.data_radius, ProtocolId::History, diff --git a/trin-state/src/lib.rs b/trin-state/src/lib.rs index f4bdf7c14..747c4a113 100644 --- a/trin-state/src/lib.rs +++ b/trin-state/src/lib.rs @@ -5,10 +5,7 @@ use tokio::{sync::mpsc, task::JoinHandle}; use crate::{events::StateEvents, jsonrpc::StateRequestHandler}; use discv5::TalkRequest; use network::StateNetwork; -use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - RwLock, -}; +use tokio::sync::mpsc::UnboundedSender; use trin_core::{ cli::TrinConfig, jsonrpc::types::StateJsonRpcRequest, @@ -19,7 +16,7 @@ use trin_core::{ types::messages::PortalnetConfig, }, utils::bootnodes::parse_bootnodes, - utp::stream::{UtpListener, UtpListenerEvent, UtpListenerRequest}, + utp::stream::{UtpListener, UtpListenerRequest}, }; pub mod events; @@ -52,7 +49,7 @@ pub async fn main() -> Result<(), Box> { tokio::spawn(Arc::clone(&discovery).bucket_refresh_lookup()); // Initialize and spawn UTP listener - let (utp_sender, overlay_sender, overlay_receiver, mut utp_listener) = + let (utp_sender, overlay_sender, _, mut utp_listener) = UtpListener::new(Arc::clone(&discovery)); tokio::spawn(async move { utp_listener.start().await }); @@ -78,7 +75,6 @@ pub async fn main() -> Result<(), Box> { let state_network = StateNetwork::new( discovery.clone(), overlay_sender, - overlay_receiver, storage_config, portalnet_config.clone(), ) @@ -99,7 +95,6 @@ type StateJsonRpcTx = Option>; pub async fn initialize_state_network( discovery: &Arc, utp_listener_tx: UnboundedSender, - utp_listener_rx: Arc>>, portalnet_config: PortalnetConfig, storage_config: PortalStorageConfig, ) -> (StateHandler, StateNetworkTask, StateEventTx, StateJsonRpcTx) { @@ -108,7 +103,6 @@ pub async fn initialize_state_network( let state_network = StateNetwork::new( Arc::clone(discovery), utp_listener_tx, - utp_listener_rx, storage_config, portalnet_config.clone(), ) diff --git a/trin-state/src/network.rs b/trin-state/src/network.rs index bfcb46cf6..f5fadc925 100644 --- a/trin-state/src/network.rs +++ b/trin-state/src/network.rs @@ -5,10 +5,7 @@ use discv5::enr::NodeId; use eth_trie::EthTrie; use log::debug; use parking_lot::RwLock; -use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - RwLock as TRwLock, -}; +use tokio::sync::mpsc::UnboundedSender; use trin_core::{ portalnet::{ discovery::Discovery, @@ -20,7 +17,7 @@ use trin_core::{ metric::XorMetric, }, }, - utp::stream::{UtpListenerEvent, UtpListenerRequest}, + utp::stream::UtpListenerRequest, }; use crate::trie::TrieDB; @@ -36,7 +33,6 @@ impl StateNetwork { pub async fn new( discovery: Arc, utp_listener_tx: UnboundedSender, - utp_listener_rx: Arc>>, storage_config: PortalStorageConfig, portal_config: PortalnetConfig, ) -> Self { @@ -55,7 +51,6 @@ impl StateNetwork { config, discovery, utp_listener_tx, - utp_listener_rx, storage, portal_config.data_radius, ProtocolId::State, diff --git a/utp-testing/src/main.rs b/utp-testing/src/main.rs index cfe020eb6..202c5877f 100644 --- a/utp-testing/src/main.rs +++ b/utp-testing/src/main.rs @@ -25,10 +25,6 @@ pub struct TestApp { impl TestApp { async fn send_utp_request(&mut self, conn_id: u16, payload: Vec, enr: Enr) { - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::OfferStream(conn_id)); - let (tx, rx) = tokio::sync::oneshot::channel::>(); let _ = self.utp_listener_tx.send(UtpListenerRequest::Connect( conn_id, @@ -77,23 +73,13 @@ impl TestApp { } async fn prepare_to_receive(&self, source: Enr, conn_id: u16) { - // listen for incoming connection request on conn_id, as part of utp handshake - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::OfferStream(conn_id)); - - // also listen on conn_id + 1 because this is the actual receive path for acceptor - let conn_id_recv = conn_id.wrapping_add(1); - let _ = self - .utp_listener_tx - .send(UtpListenerRequest::OfferStream(conn_id_recv)); - + // Listen for incoming connection request on conn_id, as part of uTP handshake let _ = self .utp_listener_tx .send(UtpListenerRequest::AddActiveConnection( source, ProtocolId::History, - UtpStreamId::OfferStream, + UtpStreamId::AcceptStream(vec![vec![]]), conn_id, )); } From 92f7a2ed5ba94f19559ed128a0bb9d90df94a196 Mon Sep 17 00:00:00 2001 From: Ognyan Genev Date: Wed, 25 May 2022 17:16:32 +0300 Subject: [PATCH 6/6] Emit global uTP listener events when uTP stream is closed or reset --- Cargo.lock | 48 +++ newsfragments/325.added.md | 7 +- trin-core/Cargo.toml | 1 + trin-core/src/portalnet/discovery.rs | 13 + trin-core/src/portalnet/overlay.rs | 136 ++++--- trin-core/src/portalnet/overlay_service.rs | 82 ++-- trin-core/src/portalnet/types/messages.rs | 2 +- trin-core/src/utp/stream.rs | 412 ++++++++++++--------- trin-core/src/utp/trin_helpers.rs | 4 +- trin-core/tests/utp_listener.rs | 152 ++++++++ utp-testing/src/main.rs | 14 +- 11 files changed, 571 insertions(+), 300 deletions(-) create mode 100644 trin-core/tests/utp_listener.rs diff --git a/Cargo.lock b/Cargo.lock index 0cab694ea..b30d9392c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1818,6 +1818,53 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "ntest" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "241de7455530a09d0d91dacd07165fbf78c422d2c06ff5c77791988ee1f6bf13" +dependencies = [ + "ntest_proc_macro_helper 0.8.0", + "ntest_test_cases", + "ntest_timeout", +] + +[[package]] +name = "ntest_proc_macro_helper" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f52e34b414605b77efc95c3f0ecef01df0c324bcc7f68d9a9cb7a7552777e52" + +[[package]] +name = "ntest_proc_macro_helper" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0e328d267a679d683b55222b3d06c2fb7358220857945bfc4e65a6b531e9994" + +[[package]] +name = "ntest_test_cases" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f7caf063242bb66721e74515dc01a915901063fa1f994bee7a2b9136f13370e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "ntest_timeout" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8efc215375b8c392c77dc45dbe9d7f4802e36b7936808ccd71047a9b03443e6" +dependencies = [ + "ntest_proc_macro_helper 0.7.5", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "num" version = "0.4.0" @@ -3535,6 +3582,7 @@ dependencies = [ "keccak-hash", "lazy_static 1.4.0", "log 0.4.14", + "ntest", "num", "parking_lot 0.11.2", "prometheus_exporter", diff --git a/newsfragments/325.added.md b/newsfragments/325.added.md index db45ca6eb..f137e04d5 100644 --- a/newsfragments/325.added.md +++ b/newsfragments/325.added.md @@ -1 +1,6 @@ -Process all closed uTP streams in UtpListener and pass the payload to overlay service. +- Rename `UtpSocket` to `UtpStream`. +- Refactor the way we are storing the received payload (DATA packets) in the uTP stream. +- Add a new AddActiveConnection UtpListener request and move the initialization of a uTP stream inside UtpListener. +- Add UtpStream -> UtpListener event channel and emit event inside UtpStream when stream state changes to Closed or Reset. +- Emit a global uTP listener event containing a uTP payload when a stream is closed. +- Remove redundant and dead code. diff --git a/trin-core/Cargo.toml b/trin-core/Cargo.toml index b89d90bae..1321852ff 100644 --- a/trin-core/Cargo.toml +++ b/trin-core/Cargo.toml @@ -69,3 +69,4 @@ features = ["bundled"] [dev-dependencies] quickcheck = "1.0.3" +ntest = "0.8.0" diff --git a/trin-core/src/portalnet/discovery.rs b/trin-core/src/portalnet/discovery.rs index a7a415848..30d2d7bf8 100644 --- a/trin-core/src/portalnet/discovery.rs +++ b/trin-core/src/portalnet/discovery.rs @@ -14,6 +14,7 @@ use rand::seq::SliceRandom; use serde_json::{json, Value}; use std::{ convert::TryFrom, + fmt, net::{IpAddr, SocketAddr}, sync::Arc, time::Duration, @@ -54,6 +55,18 @@ pub struct Discovery { pub listen_socket: SocketAddr, } +impl fmt::Debug for Discovery { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Discovery: ( enr: {}, started: {}, listen_socket: {} )", + self.discv5.local_enr(), + self.started, + self.listen_socket + ) + } +} + impl Discovery { pub fn new(portal_config: PortalnetConfig) -> Result { let listen_all_ips = SocketAddr::new("0.0.0.0".parse().unwrap(), portal_config.listen_port); diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index 00fbd5fbf..5a2b60d72 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -18,7 +18,7 @@ use crate::portalnet::{ use crate::{ portalnet::types::content_key::RawContentKey, utp::{ - stream::{UtpListenerRequest, UtpSocket, BUF_SIZE}, + stream::{UtpListenerRequest, UtpStream, BUF_SIZE}, trin_helpers::{UtpAccept, UtpMessage, UtpStreamId}, }, }; @@ -281,49 +281,41 @@ impl conn_id: u16, ) -> Result { // initiate the connection to the acceptor - let (tx, rx) = tokio::sync::oneshot::channel::>(); - self.utp_listener_tx - .send(UtpListenerRequest::Connect( - conn_id, - enr.node_id(), - self.protocol.clone(), - UtpStreamId::FindContentStream, - tx, + let (tx, rx) = tokio::sync::oneshot::channel::(); + let utp_request = UtpListenerRequest::Connect( + conn_id, + enr, + self.protocol.clone(), + UtpStreamId::FindContentStream, + tx, + ); + + self.utp_listener_tx.send(utp_request).map_err(|err| { + OverlayRequestError::UtpError(format!( + "Unable to send Connect request with FindContent stream to UtpListener: {err}" )) - .map_err(|err| { - OverlayRequestError::UtpError(format!( - "Unable to send Connect request with FindContent stream to UtpListener: {err}" - )) - })?; + })?; match rx.await { - Ok(conn) => { - match conn { - Ok(mut conn) => { - let mut result = Vec::new(); - // Loop and receive all DATA packets, similar to `read_to_end` - loop { - let mut buf = [0; BUF_SIZE]; - match conn.recv_from(&mut buf).await { - Ok((0, _)) => { - break; - } - Ok((bytes, _)) => { - result.extend_from_slice(&mut buf[..bytes]); - } - Err(err) => { - warn!("Unable to receive content via uTP: {err}"); - return Err(OverlayRequestError::UtpError(err.to_string())); - } - } + Ok(mut conn) => { + let mut result = Vec::new(); + // Loop and receive all DATA packets, similar to `read_to_end` + loop { + let mut buf = [0; BUF_SIZE]; + match conn.recv_from(&mut buf).await { + Ok((0, _)) => { + break; + } + Ok((bytes, _)) => { + result.extend_from_slice(&mut buf[..bytes]); + } + Err(err) => { + warn!("Unable to receive content via uTP: {err}"); + return Err(OverlayRequestError::UtpError(err.to_string())); } - Ok(Content::Content(VariableList::from(result))) - } - Err(err) => { - warn!("Unable to initiate uTP stream with remote node. Error initializing uTP socket: {err}"); - Err(OverlayRequestError::UtpError(err.to_string())) } } + Ok(Content::Content(VariableList::from(result))) } Err(err) => { warn!("Unable to receive from uTP listener channel: {err}"); @@ -390,47 +382,43 @@ impl } // initiate the connection to the acceptor - let (tx, rx) = tokio::sync::oneshot::channel::>(); - - self.utp_listener_tx.send(UtpListenerRequest::Connect( + let (tx, rx) = tokio::sync::oneshot::channel::(); + let utp_request = UtpListenerRequest::Connect( conn_id, - enr.node_id(), + enr, self.protocol.clone(), UtpStreamId::OfferStream, tx, - )).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?; + ); - match rx.await? { - Ok(mut conn) => { - // Handle STATE packet for SYN - let mut buf = [0; BUF_SIZE]; - conn.recv(&mut buf).await?; - - let content_items = self.provide_requested_content(&response, content_keys_offered); - - let content_message = UtpAccept { - message: content_items, - }; - - tokio::spawn(async move { - // send the content to the acceptor over a uTP stream - if let Err(err) = conn - .send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..]) - .await - { - warn!("Error sending content {err}"); - }; - // Close uTP connection - if let Err(err) = conn.close().await { - warn!("Unable to close uTP connection!: {err}") - }; - }); - Ok(response) - } - Err(err) => Err(anyhow!( - "Unable to initialize Offer uTP stream with remote node: {err}" - )), - } + self.utp_listener_tx + .send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?; + + let mut conn = rx.await?; + // Handle STATE packet for SYN + let mut buf = [0; BUF_SIZE]; + conn.recv(&mut buf).await?; + + let content_items = self.provide_requested_content(&response, content_keys_offered); + + let content_message = UtpAccept { + message: content_items, + }; + + tokio::spawn(async move { + // send the content to the acceptor over a uTP stream + if let Err(err) = conn + .send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..]) + .await + { + warn!("Error sending content {err}"); + }; + // Close uTP connection + if let Err(err) = conn.close().await { + warn!("Unable to close uTP connection!: {err}") + }; + }); + Ok(response) } /// Provide the requested content key and content value for the acceptor diff --git a/trin-core/src/portalnet/overlay_service.rs b/trin-core/src/portalnet/overlay_service.rs index 50fc10e04..91e706f8e 100644 --- a/trin-core/src/portalnet/overlay_service.rs +++ b/trin-core/src/portalnet/overlay_service.rs @@ -47,7 +47,7 @@ pub const FIND_CONTENT_MAX_NODES: usize = 32; /// which is more than 10x the ethereum mainnet node count) into a unique bucket by the 17th bucket index. const EXPECTED_NON_EMPTY_BUCKETS: usize = 17; /// Bucket refresh lookup interval in seconds -const BUCKET_REFRESH_INTERVAL: u64 = 60; +const BUCKET_REFRESH_INTERVAL_SECS: u64 = 60; /// An overlay request error. #[derive(Clone, Error, Debug)] @@ -381,7 +381,7 @@ impl async fn start(&mut self) { // Construct bucket refresh interval let mut bucket_refresh_interval = - tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL)); + tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL_SECS)); loop { tokio::select! { @@ -421,16 +421,16 @@ impl } } - /// Send request to UtpLister to add an uTP stream to the active connections + /// Send request to UtpListener to add a uTP stream to the active connections fn add_utp_connection( &self, source: &NodeId, conn_id_recv: u16, stream_id: UtpStreamId, ) -> Result<(), OverlayRequestError> { - if let Some(enr) = self.discovery.discv5.find_enr(source) { - // Initialize active uTP stream with requested note - let utp_request = UtpListenerRequest::AddActiveConnection( + if let Some(enr) = self.find_enr(source) { + // Initialize active uTP stream with requesting node + let utp_request = UtpListenerRequest::InitiateConnection( enr, self.protocol.clone(), stream_id, @@ -441,8 +441,12 @@ impl "Unable to send uTP AddActiveConnection request: {err}" ))); } + Ok(()) + } else { + Err(OverlayRequestError::UtpError( + "Can't find ENR in overlay routing table matching remote NodeId".to_string(), + )) } - Ok(()) } /// Main bucket refresh lookup logic @@ -507,6 +511,16 @@ impl .await } + /// Returns an ENR if one is known for the given NodeId in overlay routing table + pub fn find_enr(&self, node_id: &NodeId) -> Option { + // check if we know this node id in our routing table + let key = kbucket::Key::from(*node_id); + if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&key) { + return Some(entry.value().enr.clone()); + } + None + } + /// Processes an overlay request. fn process_request(&mut self, request: OverlayRequest) { // For incoming requests, handle the request, possibly send the response over the channel, @@ -549,14 +563,9 @@ impl fn initialize_request(&mut self, request: Request) -> Result { debug!("[{:?}] Initializing request", self.protocol); match request { - Request::FindContent(find_content) => { - // TODO: Creating random NodeID here just to satisfy handle_find_content signature - // is a hack on top of the history RecursiveFindContent temporally hack. - let node_id = NodeId::random(); - Ok(Response::Content( - self.handle_find_content(find_content, &node_id)?, - )) - } + Request::FindContent(find_content) => Ok(Response::Content( + self.handle_find_content(find_content, None)?, + )), _ => Err(OverlayRequestError::InvalidRequest( "Initializing this overlay service request is not yet supported.".to_string(), )), @@ -577,7 +586,7 @@ impl Ok(Response::Nodes(self.handle_find_nodes(find_nodes))) } Request::FindContent(find_content) => Ok(Response::Content( - self.handle_find_content(find_content, &source)?, + self.handle_find_content(find_content, Some(&source))?, )), Request::Offer(offer) => Ok(Response::Accept(self.handle_offer(offer, source)?)), } @@ -617,7 +626,7 @@ impl fn handle_find_content( &self, request: FindContent, - source: &NodeId, + source: Option<&NodeId>, ) -> Result { self.metrics .as_ref() @@ -640,21 +649,30 @@ impl if content.len() < 1000 { Ok(Content::Content(content)) } else { - let conn_id: u16 = crate::utp::stream::rand(); - - // Listen for incoming uTP connection request on as part of uTP handshake and - // storing content data, so we can send it inside UtpListener right after we receive - // SYN packet from the requester - let conn_id_recv = conn_id.wrapping_add(1); - - self.add_utp_connection( - source, - conn_id_recv, - UtpStreamId::FindContentData(content), - )?; - - // Connection id is send as BE because uTP header values are stored also as BE - Ok(Content::ConnectionId(conn_id.to_be())) + match source { + Some(source) => { + let conn_id: u16 = crate::utp::stream::rand(); + + // Listen for incoming uTP connection request on as part of uTP handshake and + // storing content data, so we can send it inside UtpListener right after we receive + // SYN packet from the requester + let conn_id_recv = conn_id.wrapping_add(1); + + self.add_utp_connection( + source, + conn_id_recv, + UtpStreamId::ContentStream(content), + )?; + + // Connection id is send as BE because uTP header values are stored also as BE + Ok(Content::ConnectionId(conn_id.to_be())) + + }, + None => { + return Err(OverlayRequestError::UtpError( + "Unable to start listening for uTP stream because source NodeID is not provided".to_string())) + } + } } } Ok(None) => { diff --git a/trin-core/src/portalnet/types/messages.rs b/trin-core/src/portalnet/types/messages.rs index 5a2de62e9..748baf128 100644 --- a/trin-core/src/portalnet/types/messages.rs +++ b/trin-core/src/portalnet/types/messages.rs @@ -143,7 +143,7 @@ pub enum ProtocolIdError { } /// Protocol identifiers -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum ProtocolId { State, History, diff --git a/trin-core/src/utp/stream.rs b/trin-core/src/utp/stream.rs index 78e431be0..e66e48caa 100644 --- a/trin-core/src/utp/stream.rs +++ b/trin-core/src/utp/stream.rs @@ -58,8 +58,6 @@ const BASE_HISTORY: usize = 10; // base delays history size const MAX_BASE_DELAY_AGE: Delay = Delay(60_000_000); // Discv5 socket timeout in milliseconds const DISCV5_SOCKET_TIMEOUT: u64 = 25; -/// Process uTP streams interval in milliseconds -// const PROCESS_UTP_STREAMS_INTERVAL: u64 = 20; /// uTP connection id type ConnId = u16; @@ -67,9 +65,6 @@ type ConnId = u16; /// uTP payload data pub type UtpPayload = Vec; -/// UtpListener unbounded receiver for emitted events -pub type UtpListenerUnboundedReceiver = Arc>>; - pub fn rand() -> u16 { rand::thread_rng().gen() } @@ -90,9 +85,9 @@ impl ConnectionKey { } } -/// uTP socket connection state +/// uTP stream connection state #[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub enum SocketState { +pub enum StreamState { Uninitialized, SynSent, SynRecv, @@ -110,51 +105,53 @@ struct DelayDifferenceSample { /// Represent overlay to uTP listener request. It is used as a way to communicate between the overlay protocol /// and uTP listener +#[derive(Debug)] pub enum UtpListenerRequest { - /// Request to initialize uTP stream with remote node + /// Request to create and connect to a uTP stream initiated by a remote node Connect( ConnId, - NodeId, + Enr, ProtocolId, UtpStreamId, - oneshot::Sender>, + oneshot::Sender, ), - /// Request to add uTP stream to the active connections - AddActiveConnection(Enr, ProtocolId, UtpStreamId, ConnId), + /// Request to initiate and add uTP stream to the connections hash map + InitiateConnection(Enr, ProtocolId, UtpStreamId, ConnId), } -/// Result from processing all closed uTP streams. Includes a tuple with the payload and the stream id. -type ProcessedClosedStreams = Vec<(UtpPayload, UtpStreamId)>; - -/// Emitted event with all processed uTP streams. Used to handle the uTP payload in overlay service +/// Emit global event to overlay handler +#[derive(Debug, PartialEq)] pub enum UtpListenerEvent { - ProcessedClosedStreams(ProcessedClosedStreams), + /// uTP stream is closed + ClosedStream(UtpPayload, ProtocolId, UtpStreamId), + /// uTP stream is reset + ResetStream(ProtocolId, UtpStreamId), } -/// uTP stream state events emitted from UtpSocket +/// uTP stream state events emitted from `UtpStream` #[derive(Clone, Debug)] pub enum UtpStreamEvent { - /// Event containing received uTP payload, protocol id and receive connection id - Closed(UtpPayload, ProtocolId, ConnId), - /// Event containing protocol id and receive connection id - Reset(ProtocolId, ConnId), + /// Event signaling that a UtpStream has completed, containing received uTP payload, protocol id, + /// receive connection id and node id of the remote peer + Closed(UtpPayload, ProtocolId, UtpStreamId, ConnectionKey), + /// Event signaling that a UtpStream has been reset, containing protocol id, receive connection id + /// and node id of the remote peer + Reset(ProtocolId, UtpStreamId, ConnectionKey), } /// Main uTP service used to listen and handle all uTP connections and streams -// FIXME: Deny dead_code -#[allow(dead_code)] pub struct UtpListener { /// Base discv5 layer discovery: Arc, /// Store all active connections - utp_connections: HashMap, + utp_connections: HashMap, /// Receiver for uTP events sent from the main portal event handler utp_event_rx: UnboundedReceiver, /// Sender to overlay layer with processed uTP stream overlay_tx: UnboundedSender, /// Receiver for uTP requests sent from the overlay layer overlay_rx: UnboundedReceiver, - /// Sender used in UtpSocket to emit stream state events + /// Sender used in UtpStream to emit stream state events stream_tx: UnboundedSender, /// Receiver for uTP stream state events stream_rx: UnboundedReceiver, @@ -166,7 +163,7 @@ impl UtpListener { ) -> ( UnboundedSender, UnboundedSender, - UtpListenerUnboundedReceiver, + UnboundedReceiver, Self, ) { // Channel to process uTP TalkReq packets from main portal event handler @@ -175,13 +172,13 @@ impl UtpListener { let (utp_listener_tx, utp_listener_rx) = unbounded_channel::(); // Channel to emit processed uTP payload to overlay service let (overlay_tx, overlay_rx) = unbounded_channel::(); - // Channel to emit stream events from UtpSocket + // Channel to emit stream events from UtpStream let (stream_tx, stream_rx) = unbounded_channel::(); ( utp_event_tx, utp_listener_tx, - Arc::new(RwLock::new(overlay_rx)), + overlay_rx, UtpListener { discovery, utp_connections: HashMap::new(), @@ -196,16 +193,17 @@ impl UtpListener { /// The main execution loop of the UtpListener service. pub async fn start(&mut self) { - // let mut process_utp_streams_interval = - // tokio::time::interval(Duration::from_millis(PROCESS_UTP_STREAMS_INTERVAL)); loop { tokio::select! { - Some(utp_request) = self.utp_event_rx.recv() => { - self.process_utp_request(utp_request).await - }, - Some(overlay_request) = self.overlay_rx.recv() => { - self.process_overlay_request(overlay_request).await - }, + Some(utp_request) = self.utp_event_rx.recv() => { + self.process_utp_request(utp_request).await + }, + Some(overlay_request) = self.overlay_rx.recv() => { + self.process_overlay_request(overlay_request).await + }, + Some(stream_event) = self.stream_rx.recv() => { + self.process_stream_event(stream_event) + } } } } @@ -251,8 +249,8 @@ impl UtpListener { } // Send content data if the stream is listening for FindContent SYN packet - if let UtpStreamId::FindContentData(content_data) = - conn.stream_id.clone() + if let UtpStreamId::ContentStream(content_data) = conn.stream_id.clone() + // TODO: Change this `clone` to borrow after rust 1.62 { // We want to send uTP data only if the content is Content(ByteList) debug!( @@ -282,7 +280,7 @@ impl UtpListener { PacketType::Data => { if let Some(conn) = self .utp_connections - .get_mut(&ConnectionKey::new(*node_id, connection_id - 1)) + .get_mut(&ConnectionKey::new(*node_id, connection_id.wrapping_sub(1))) { if conn.discv5_tx.send(packet.clone()).is_err() { error!("Unable to send DATA packet to uTP stream handler"); @@ -308,7 +306,7 @@ impl UtpListener { PacketType::Fin => { if let Some(conn) = self .utp_connections - .get_mut(&ConnectionKey::new(*node_id, connection_id - 1)) + .get_mut(&ConnectionKey::new(*node_id, connection_id.wrapping_sub(1))) { if conn.discv5_tx.send(packet).is_err() { error!("Unable to send FIN packet to uTP stream handler"); @@ -348,13 +346,13 @@ impl UtpListener { /// Process overlay uTP requests async fn process_overlay_request(&mut self, request: UtpListenerRequest) { match request { - UtpListenerRequest::AddActiveConnection( + UtpListenerRequest::InitiateConnection( connected_to, protocol_id, stream_id, conn_id_recv, ) => { - let conn = UtpSocket::new( + let conn = UtpStream::new( Arc::clone(&self.discovery), connected_to.clone(), protocol_id, @@ -364,55 +362,86 @@ impl UtpListener { let conn_key = ConnectionKey::new(connected_to.node_id(), conn_id_recv); self.utp_connections.insert(conn_key, conn); } - UtpListenerRequest::Connect(conn_id, node_id, protocol_id, stream_id, tx) => { - let conn = self.connect(conn_id, node_id, protocol_id, stream_id).await; + UtpListenerRequest::Connect(conn_id, enr, protocol_id, stream_id, tx) => { + let conn = self.connect(conn_id, enr, protocol_id, stream_id).await; if tx.send(conn).is_err() { - error!("Unable to send uTP socket to requester") + error!("Unable to send the uTP stream to requester") }; } } } + /// Emit global uTP listener event upon processing uTP stream event + fn process_stream_event(&mut self, event: UtpStreamEvent) { + match event { + UtpStreamEvent::Closed(utp_payload, protocol_id, stream_id, conn_key) => { + // Remove closed stream from active connections + if self.utp_connections.remove(&conn_key).is_none() { + error!("Unable to remove closed uTP stream from active connections, STREAM_CONN_ID_RECV: {}, CONNECTED_TO: {}", conn_key.conn_id_recv, conn_key.node_id); + } + + // Emit global event to overlay handler + if let Err(err) = self.overlay_tx.send(UtpListenerEvent::ClosedStream( + utp_payload, + protocol_id, + stream_id, + )) { + error!("Unable to send ClosedStream event to overlay handler: {err}"); + } + } + UtpStreamEvent::Reset(protocol_id, stream_id, conn_key) => { + // Remove reset stream from active connections + if self.utp_connections.remove(&conn_key).is_none() { + error!("Unable to remove reset uTP stream from active connections, STREAM_CONN_ID_RECV: {}, CONNECTED_TO: {}", conn_key.conn_id_recv, conn_key.node_id); + } + + if let Err(err) = self + .overlay_tx + .send(UtpListenerEvent::ResetStream(protocol_id, stream_id)) + { + error!("Unable to send ResetStream event to overlay handler: {err}"); + } + } + } + } + /// Initialize uTP stream with remote node async fn connect( &mut self, connection_id: ConnId, - node_id: NodeId, + enr: Enr, protocol_id: ProtocolId, stream_id: UtpStreamId, - ) -> anyhow::Result { - if let Some(enr) = self.discovery.discv5.find_enr(&node_id) { - let mut conn = UtpSocket::new( - Arc::clone(&self.discovery), - enr, - protocol_id, - stream_id, - Some(self.stream_tx.clone()), - ); - conn.make_connection(connection_id).await; - self.utp_connections.insert( - ConnectionKey::new(node_id, conn.receiver_connection_id), - conn.clone(), - ); - Ok(conn) - } else { - Err(anyhow!("Trying to connect to unknow Enr")) - } + ) -> UtpStream { + let mut conn = UtpStream::new( + Arc::clone(&self.discovery), + enr.clone(), + protocol_id, + stream_id, + Some(self.stream_tx.clone()), + ); + conn.make_connection(connection_id).await; + self.utp_connections.insert( + ConnectionKey::new(enr.node_id(), conn.receiver_connection_id), + conn.clone(), + ); + + conn } } // Used to be MicroTransportProtocol impl but it is basically just called UtpStream compared to the // Rust Tcp Lib so I changed it -#[derive(Clone)] -pub struct UtpSocket { +#[derive(Debug, Clone)] +pub struct UtpStream { /// The wrapped discv5 protocol socket: Arc, - /// Socket state - pub state: SocketState, + /// uTP stream state + pub state: StreamState, /// ENR of the connected remote peer - connected_to: Enr, + pub connected_to: Enr, /// Overlay protocol identifier protocol_id: ProtocolId, @@ -450,7 +479,7 @@ pub struct UtpSocket { /// Sent but not yet acknowledged packets send_window: Vec, - /// How many ACKs did the socket receive for packet with sequence number equal to `ack_nr` + /// How many ACKs did the stream receive for packet with sequence number equal to `ack_nr` duplicate_ack_count: u8, /// Sequence number of the latest packet the remote peer acknowledged @@ -496,13 +525,13 @@ pub struct UtpSocket { discv5_rx: Arc>>, /// Sender to emit stream events to UtpListener - listener_tx: Option>, + event_tx: Option>, /// Store received uTP payload data over the stream pub recv_data_stream: Vec, } -impl UtpSocket { +impl UtpStream { pub fn new( socket: Arc, connected_to: Enr, @@ -515,7 +544,7 @@ impl UtpSocket { let (discv5_tx, discv5_rx) = unbounded_channel::(); Self { - state: SocketState::Uninitialized, + state: StreamState::Uninitialized, protocol_id, stream_id, seq_nr: 1, @@ -546,7 +575,7 @@ impl UtpSocket { max_retransmission_retries: MAX_RETRANSMISSION_RETRIES, discv5_tx, discv5_rx: Arc::new(RwLock::new(discv5_rx)), - listener_tx: utp_listener_tx, + event_tx: utp_listener_tx, } } @@ -562,8 +591,8 @@ impl UtpSocket { // Note that the buffer passed to `send_to` might exceed the maximum packet // size, which will result in the data being split over several packets. pub async fn send_to(&mut self, buf: &[u8]) -> anyhow::Result { - if self.state == SocketState::Closed { - return Err(anyhow!("The socket is closed")); + if self.state == StreamState::Closed { + return Err(anyhow!("The stream is closed")); } let total_length = buf.len(); @@ -725,7 +754,7 @@ impl UtpSocket { } async fn make_connection(&mut self, connection_id: ConnId) { - if self.state == SocketState::Uninitialized { + if self.state == StreamState::Uninitialized { self.receiver_connection_id = connection_id; self.sender_connection_id = self.receiver_connection_id + 1; @@ -735,7 +764,7 @@ impl UtpSocket { packet.set_seq_nr(self.seq_nr); self.send_packet(&mut packet).await; - self.state = SocketState::SynSent; + self.state = StreamState::SynSent; } } @@ -770,7 +799,7 @@ impl UtpSocket { packet.set_ack_nr(self.ack_nr); self.send_packet(&mut packet).await; - self.state = SocketState::FinSent; + self.state = StreamState::FinSent; } #[async_recursion] @@ -782,7 +811,7 @@ impl UtpSocket { ); // To make uTP connection bidirectional, we want to always acknowledge the received packet - if self.state == SocketState::SynSent { + if self.state == StreamState::SynSent { self.ack_nr = packet.seq_nr(); } else { // Only acknowledge this if this follows the last one, else do it when we advance the send @@ -797,7 +826,7 @@ impl UtpSocket { // Reset connection if connection id doesn't match and this isn't a SYN if packet.get_type() != PacketType::Syn - && self.state != SocketState::SynSent + && self.state != StreamState::SynSent && !(packet.connection_id() == self.sender_connection_id || packet.connection_id() == self.receiver_connection_id) { @@ -813,13 +842,13 @@ impl UtpSocket { match (self.state, packet.get_type()) { // New connection, when we receive SYN packet, respond with STATE packet - (SocketState::Uninitialized, PacketType::Syn) => { + (StreamState::Uninitialized, PacketType::Syn) => { self.connected_to = src; self.ack_nr = packet.seq_nr(); self.seq_nr = rand::random(); self.receiver_connection_id = packet.connection_id() + 1; self.sender_connection_id = packet.connection_id(); - self.state = SocketState::Connected; + self.state = StreamState::Connected; self.last_dropped = self.ack_nr; let reply = self.prepare_reply(packet, PacketType::State); @@ -833,29 +862,29 @@ impl UtpSocket { // we want to forcibly terminate the connection (_, PacketType::Syn) => Ok(Some(self.prepare_reply(packet, PacketType::Reset))), // When SYN is send and we receive STATE, do not reply - (SocketState::SynSent, PacketType::State) => { + (StreamState::SynSent, PacketType::State) => { self.connected_to = src; self.ack_nr = packet.seq_nr() - 1; self.seq_nr += 1; - self.state = SocketState::Connected; + self.state = StreamState::Connected; self.last_acked = packet.ack_nr(); self.last_acked_timestamp = now_microseconds(); Ok(None) } // To make uTP connection bidirectional, we also can expect DATA packet if state is SynSent - (SocketState::SynSent, PacketType::Data) => Ok(self.handle_data_packet(packet)), - // Handle data packet if socket state is `Connected` or `FinSent` and packet type is DATA - (SocketState::Connected, PacketType::Data) - | (SocketState::FinSent, PacketType::Data) => Ok(self.handle_data_packet(packet)), - // Handle state packet if socket state is `Connected` and packet type is STATE - (SocketState::Connected, PacketType::State) => { + (StreamState::SynSent, PacketType::Data) => Ok(self.handle_data_packet(packet)), + // Handle data packet if stream state is `Connected` or `FinSent` and packet type is DATA + (StreamState::Connected, PacketType::Data) + | (StreamState::FinSent, PacketType::Data) => Ok(self.handle_data_packet(packet)), + // Handle state packet if stream state is `Connected` and packet type is STATE + (StreamState::Connected, PacketType::State) => { self.handle_state_packet(packet).await; Ok(None) } // Handle FIN packet. Check if all send packets are acknowledged. - (SocketState::Connected, PacketType::Fin) - | (SocketState::FinSent, PacketType::Fin) - | (SocketState::SynSent, PacketType::Fin) => { + (StreamState::Connected, PacketType::Fin) + | (StreamState::FinSent, PacketType::Fin) + | (StreamState::SynSent, PacketType::Fin) => { if packet.ack_nr() < self.seq_nr { debug!("FIN received but there are missing acknowledgements for sent packets"); } @@ -877,18 +906,18 @@ impl UtpSocket { } // Give up, the remote peer might not care about our missing packets - self.state = SocketState::Closed; + self.state = StreamState::Closed; self.emit_close_event(); Ok(Some(reply)) } - // Confirm with STATE packet when socket state is `Closed` and we receive FIN packet - (SocketState::Closed, PacketType::Fin) => { + // Confirm with STATE packet when stream state is `Closed` and we receive FIN packet + (StreamState::Closed, PacketType::Fin) => { Ok(Some(self.prepare_reply(packet, PacketType::State))) } - (SocketState::FinSent, PacketType::State) => { + (StreamState::FinSent, PacketType::State) => { if packet.ack_nr() == self.seq_nr { - self.state = SocketState::Closed; + self.state = StreamState::Closed; self.emit_close_event(); } else { self.handle_state_packet(packet).await; @@ -897,15 +926,18 @@ impl UtpSocket { } // Reset connection when receiving RESET packet (_, PacketType::Reset) => { - self.state = SocketState::ResetReceived; - // Emit socket state event to UtpListener. Panic if error. - if let Some(listener_tx) = self.listener_tx.clone() { - listener_tx - .send(UtpStreamEvent::Reset( - self.protocol_id.clone(), - self.receiver_connection_id, - )) - .unwrap(); + self.state = StreamState::ResetReceived; + // Emit stream state event to UtpListener + if let Some(listener_tx) = self.event_tx.clone() { + let conn_key = self.get_conn_key(); + + if let Err(err) = listener_tx.send(UtpStreamEvent::Reset( + self.protocol_id.clone(), + self.stream_id.clone(), + conn_key, + )) { + error!("Unable to send uTP RESET event to uTP listener: {err}"); + } } Err(anyhow!("Connection reset by remote peer")) } @@ -917,19 +949,33 @@ impl UtpSocket { } } - /// Emit socket state event to UtpListener. Panic if error. + /// Emit stream state event to UtpListener fn emit_close_event(&mut self) { - if let Some(listener_tx) = self.listener_tx.clone() { - listener_tx - .send(UtpStreamEvent::Closed( - self.recv_data_stream.clone(), - self.protocol_id.clone(), - self.receiver_connection_id, - )) - .unwrap(); + if let Some(listener_tx) = self.event_tx.clone() { + let conn_key = self.get_conn_key(); + + if let Err(err) = listener_tx.send(UtpStreamEvent::Closed( + self.recv_data_stream.clone(), + self.protocol_id.clone(), + self.stream_id.clone(), + conn_key, + )) { + error!("Unable to send uTP CLOSED event to uTP listener: {err}"); + } } } + /// Get connection key used in uTP listener to store active uTP connections + fn get_conn_key(&self) -> ConnectionKey { + let conn_id = match self.stream_id { + UtpStreamId::FindContentStream => self.receiver_connection_id, + UtpStreamId::ContentStream(_) => self.sender_connection_id, + UtpStreamId::OfferStream => self.receiver_connection_id, + UtpStreamId::AcceptStream(_) => self.sender_connection_id, + }; + ConnectionKey::new(self.connected_to.node_id(), conn_id) + } + fn prepare_reply(&self, original: &Packet, t: PacketType) -> Packet { let mut resp = Packet::new(); resp.set_type(t); @@ -947,14 +993,14 @@ impl UtpSocket { fn handle_data_packet(&mut self, packet: &Packet) -> Option { // We increase packet seq_nr if we are going to send DATA packet right after SYN-ACK. - if self.state == SocketState::SynSent { + if self.state == StreamState::SynSent { self.seq_nr += 1; - self.state = SocketState::Connected + self.state = StreamState::Connected } // If a FIN was previously sent, reply with a FIN packet acknowledging the received packet. let packet_type = match self.state { - SocketState::FinSent => PacketType::Fin, + StreamState::FinSent => PacketType::Fin, _ => PacketType::State, }; @@ -1172,15 +1218,15 @@ impl UtpSocket { return Ok((read, self.connected_to.clone())); } - // If the socket received a reset packet and all data has been flushed, then it can't + // If the stream received a reset packet and all data has been flushed, then it can't // receive anything else - if self.state == SocketState::ResetReceived { + if self.state == StreamState::ResetReceived { return Err(anyhow!("Connection reset by remote peer")); } loop { - // A closed socket with no pending data can only "read" 0 new bytes. - if self.state == SocketState::Closed { + // A closed stream with no pending data can only "read" 0 new bytes. + if self.state == StreamState::Closed { return Ok((0, self.connected_to.clone())); } @@ -1316,7 +1362,7 @@ impl UtpSocket { } } - /// Inserts a packet into the socket's buffer. + /// Inserts a packet into the stream's buffer. /// /// The packet is inserted in such a way that the packets in the buffer are sorted according to /// their sequence number in ascending order. This allows storing packets that were received out @@ -1356,10 +1402,10 @@ impl UtpSocket { /// This method allows both peers to receive all packets still in /// flight. pub async fn close(&mut self) -> anyhow::Result<()> { - // Nothing to do if the socket's already closed or not connected - if self.state == SocketState::Closed - || self.state == SocketState::Uninitialized - || self.state == SocketState::SynSent + // Nothing to do if the stream's already closed or not connected + if self.state == StreamState::Closed + || self.state == StreamState::Uninitialized + || self.state == StreamState::SynSent { return Ok(()); } @@ -1390,11 +1436,11 @@ impl UtpSocket { } debug!("CLosing connection, sent {:?}", packet); - self.state = SocketState::FinSent; + self.state = StreamState::FinSent; // Receive JAKE let mut buf = [0; BUF_SIZE]; - while self.state != SocketState::Closed { + while self.state != StreamState::Closed { self.recv(&mut buf).await?; } @@ -1424,7 +1470,7 @@ mod tests { utils::node_id::generate_random_remote_enr, utp::{ packets::{Packet, PacketType}, - stream::{SocketState, UtpSocket, BUF_SIZE}, + stream::{StreamState, UtpStream, BUF_SIZE}, time::now_microseconds, trin_helpers::UtpStreamId, }, @@ -1444,7 +1490,7 @@ mod tests { BASE_PORT + NEXT_OFFSET.fetch_add(1, Ordering::Relaxed) as u16 } - async fn server_setup() -> UtpSocket { + async fn server_setup() -> UtpStream { let ip_addr = socket::find_assigned_ip().expect("Could not find an IP for local connections"); let port = next_test_port(); @@ -1459,7 +1505,7 @@ mod tests { let discv5 = Arc::new(discv5); - let conn = UtpSocket::new( + let conn = UtpStream::new( Arc::clone(&discv5), enr, ProtocolId::History, @@ -1472,7 +1518,7 @@ mod tests { conn } - async fn client_setup(connected_to: Enr) -> (Enr, UtpSocket) { + async fn client_setup(connected_to: Enr) -> (Enr, UtpStream) { let port = next_test_port(); let matching_ip = connected_to.ip().unwrap(); let config = PortalnetConfig { @@ -1485,7 +1531,7 @@ mod tests { let discv5 = Arc::new(discv5); - let conn = UtpSocket::new( + let conn = UtpStream::new( Arc::clone(&discv5), connected_to, ProtocolId::History, @@ -1497,7 +1543,7 @@ mod tests { (discv5.local_enr(), conn) } - fn spawn_socket_recv(discv5: Arc, conn: UtpSocket) { + fn spawn_socket_recv(discv5: Arc, conn: UtpStream) { tokio::spawn(async move { let mut receiver = discv5.discv5.event_stream().await.unwrap(); while let Some(event) = receiver.recv().await { @@ -1534,7 +1580,7 @@ mod tests { let initial_connection_id: u16 = rand::random(); let sender_connection_id = initial_connection_id + 1; let (_, client_enr) = generate_random_remote_enr(); - let mut socket = server_setup().await; + let mut stream = server_setup().await; // --------------------------------- // Test connection setup - SYN packet @@ -1545,7 +1591,7 @@ mod tests { packet.set_connection_id(initial_connection_id); // Do we have a response? - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1575,7 +1621,7 @@ mod tests { packet.set_seq_nr(old_packet.seq_nr() + 1); packet.set_ack_nr(old_response.seq_nr()); - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1608,7 +1654,7 @@ mod tests { packet.set_seq_nr(old_packet.seq_nr() + 1); packet.set_ack_nr(old_response.seq_nr()); - let response = socket.handle_packet(&packet, client_enr).await; + let response = stream.handle_packet(&packet, client_enr).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1632,7 +1678,7 @@ mod tests { // Boilerplate test setup let initial_connection_id: u16 = rand::random(); let (_, client_enr) = generate_random_remote_enr(); - let mut socket = server_setup().await; + let mut stream = server_setup().await; // Establish connection let mut packet = Packet::new(); @@ -1640,7 +1686,7 @@ mod tests { packet.set_type(PacketType::Syn); packet.set_connection_id(initial_connection_id); - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1658,19 +1704,19 @@ mod tests { packet.set_seq_nr(old_packet.seq_nr() + 1); packet.set_ack_nr(old_response.seq_nr()); - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_none()); // Send a second keepalive packet, identical to the previous one - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_none()); - // Mark socket as closed - socket.state = SocketState::Closed; + // Mark stream as closed + stream.state = StreamState::Closed; } #[tokio::test] @@ -1678,7 +1724,7 @@ mod tests { // Boilerplate test setup let initial_connection_id: u16 = rand::random(); let (_, client_enr) = generate_random_remote_enr(); - let mut socket = server_setup().await; + let mut stream = server_setup().await; // Establish connection let mut packet = Packet::new(); @@ -1686,7 +1732,7 @@ mod tests { packet.set_type(PacketType::Syn); packet.set_connection_id(initial_connection_id); - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1700,7 +1746,7 @@ mod tests { packet.set_type(PacketType::State); packet.set_connection_id(new_connection_id); - let response = socket.handle_packet(&packet, client_enr).await; + let response = stream.handle_packet(&packet, client_enr).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1709,8 +1755,8 @@ mod tests { assert_eq!(response.get_type(), PacketType::Reset); assert_eq!(response.ack_nr(), packet.seq_nr()); - // Mark socket as closed - socket.state = SocketState::Closed; + // Mark stream as closed + stream.state = StreamState::Closed; } #[tokio::test] @@ -1718,7 +1764,7 @@ mod tests { // Boilerplate test setup let initial_connection_id: u16 = rand::random(); let (_, client_enr) = generate_random_remote_enr(); - let mut socket = server_setup().await; + let mut stream = server_setup().await; // Establish connection let mut packet = Packet::new(); @@ -1726,7 +1772,7 @@ mod tests { packet.set_type(PacketType::Syn); packet.set_connection_id(initial_connection_id); - let response = socket.handle_packet(&packet, client_enr.clone()).await; + let response = stream.handle_packet(&packet, client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); @@ -1754,20 +1800,20 @@ mod tests { window.push(packet); // Send packets in reverse order - let response = socket.handle_packet(&window[1], client_enr.clone()).await; + let response = stream.handle_packet(&window[1], client_enr.clone()).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); let response = response.unwrap(); assert!(response.ack_nr() != window[1].seq_nr()); - let response = socket.handle_packet(&window[0], client_enr).await; + let response = stream.handle_packet(&window[0], client_enr).await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.is_some()); - // Mark socket as closed - socket.state = SocketState::Closed; + // Mark stream as closed + stream.state = StreamState::Closed; } #[tokio::test] @@ -1782,20 +1828,20 @@ mod tests { (minute_in_microseconds + 2, 19), (minute_in_microseconds + 3, 9), ]; - let mut socket = server_setup().await; + let mut stream = server_setup().await; for (timestamp, delay) in samples { - socket.update_base_delay(delay.into(), ((timestamp + delay) as u32).into()); + stream.update_base_delay(delay.into(), ((timestamp + delay) as u32).into()); } let expected = vec![7i64, 9i64] .into_iter() .map(Into::into) .collect::>(); - let actual = socket.base_delays.iter().cloned().collect::>(); + let actual = stream.base_delays.iter().cloned().collect::>(); assert_eq!(expected, actual); assert_eq!( - socket.min_base_delay(), + stream.min_base_delay(), expected.iter().min().cloned().unwrap_or_default() ); } @@ -1893,40 +1939,40 @@ mod tests { #[tokio::test] async fn test_sorted_buffer_insertion() { - let mut socket = server_setup().await; + let mut stream = server_setup().await; let mut packet = Packet::new(); packet.set_seq_nr(1); - assert!(socket.incoming_buffer.is_empty()); + assert!(stream.incoming_buffer.is_empty()); - socket.insert_into_buffer(packet.clone()); - assert_eq!(socket.incoming_buffer.len(), 1); + stream.insert_into_buffer(packet.clone()); + assert_eq!(stream.incoming_buffer.len(), 1); packet.set_seq_nr(2); packet.set_timestamp(128.into()); - socket.insert_into_buffer(packet.clone()); - assert_eq!(socket.incoming_buffer.len(), 2); - assert_eq!(socket.incoming_buffer[1].seq_nr(), 2); - assert_eq!(socket.incoming_buffer[1].timestamp(), 128.into()); + stream.insert_into_buffer(packet.clone()); + assert_eq!(stream.incoming_buffer.len(), 2); + assert_eq!(stream.incoming_buffer[1].seq_nr(), 2); + assert_eq!(stream.incoming_buffer[1].timestamp(), 128.into()); packet.set_seq_nr(3); packet.set_timestamp(256.into()); - socket.insert_into_buffer(packet.clone()); - assert_eq!(socket.incoming_buffer.len(), 3); - assert_eq!(socket.incoming_buffer[2].seq_nr(), 3); - assert_eq!(socket.incoming_buffer[2].timestamp(), 256.into()); + stream.insert_into_buffer(packet.clone()); + assert_eq!(stream.incoming_buffer.len(), 3); + assert_eq!(stream.incoming_buffer[2].seq_nr(), 3); + assert_eq!(stream.incoming_buffer[2].timestamp(), 256.into()); // Replacing a packet with a more recent version doesn't work packet.set_seq_nr(2); packet.set_timestamp(456.into()); - socket.insert_into_buffer(packet.clone()); - assert_eq!(socket.incoming_buffer.len(), 3); - assert_eq!(socket.incoming_buffer[1].seq_nr(), 2); - assert_eq!(socket.incoming_buffer[1].timestamp(), 128.into()); + stream.insert_into_buffer(packet.clone()); + assert_eq!(stream.incoming_buffer.len(), 3); + assert_eq!(stream.incoming_buffer[1].seq_nr(), 2); + assert_eq!(stream.incoming_buffer[1].timestamp(), 128.into()); } #[tokio::test] @@ -1935,8 +1981,8 @@ mod tests { let mut server = server_setup().await; let (enr, mut client) = client_setup(server.connected_to.clone()).await; - assert_eq!(server.state, SocketState::Uninitialized); - assert_eq!(client.state, SocketState::Uninitialized); + assert_eq!(server.state, StreamState::Uninitialized); + assert_eq!(client.state, StreamState::Uninitialized); // Check proper difference in client's send connection id and receive connection id assert_eq!( @@ -1951,14 +1997,14 @@ mod tests { client.recv_from(&mut buf).await.unwrap(); // Expect SYN packet - assert_eq!(client.state, SocketState::Connected); + assert_eq!(client.state, StreamState::Connected); // After establishing a new connection, the server's ids are a mirror of the client's. assert_eq!( server.receiver_connection_id, server.sender_connection_id + 1 ); - assert_eq!(server.state, SocketState::Connected); + assert_eq!(server.state, StreamState::Connected); let mut packet = Packet::with_payload(&[1, 2, 3]); packet.set_wnd_size(BUF_SIZE as u32); diff --git a/trin-core/src/utp/trin_helpers.rs b/trin-core/src/utp/trin_helpers.rs index f7c747b93..7341ec14b 100644 --- a/trin-core/src/utp/trin_helpers.rs +++ b/trin-core/src/utp/trin_helpers.rs @@ -51,13 +51,13 @@ pub struct UtpAccept { pub message: Vec<(Vec, Vec)>, } -/// Used to track which stream to which overlay request correspond +/// Used to track which stream an overlay request corresponds with #[derive(Debug, Clone, PartialEq)] pub enum UtpStreamId { /// Stream id to initialize FindContent uTP connection and to listen for content payload FindContentStream, /// Stream id to listen for incoming FindContent connection and to send back the content data to the requester - FindContentData(ByteList), + ContentStream(ByteList), /// Stream id to send requested content from received ACCEPT response OfferStream, /// Stream id to listen for OFFER uTP payload. Contains requested content keys. diff --git a/trin-core/tests/utp_listener.rs b/trin-core/tests/utp_listener.rs new file mode 100644 index 000000000..138642e8b --- /dev/null +++ b/trin-core/tests/utp_listener.rs @@ -0,0 +1,152 @@ +use discv5::Discv5Event; +use ntest::timeout; +use ssz::Encode; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + str::FromStr, + sync::Arc, +}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use trin_core::{ + portalnet::{ + discovery::Discovery, + types::messages::{PortalnetConfig, ProtocolId}, + Enr, + }, + utp::{ + stream::{UtpListener, UtpListenerEvent, UtpListenerRequest, UtpStream, BUF_SIZE}, + trin_helpers::{ + UtpAccept, UtpMessage, + UtpStreamId::{AcceptStream, OfferStream}, + }, + }, +}; + +fn next_test_port() -> u16 { + use std::sync::atomic::{AtomicUsize, Ordering}; + // static here allow us to modify the global value and AtomicUsize can be shared safely between threads + static NEXT_OFFSET: AtomicUsize = AtomicUsize::new(0); + const BASE_PORT: u16 = 11600; + BASE_PORT + NEXT_OFFSET.fetch_add(1, Ordering::Relaxed) as u16 +} + +/// Spawn uTP listener instance and start discv5 event handler +async fn spawn_utp_listener() -> ( + Enr, + UnboundedSender, + UnboundedReceiver, +) { + let ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + let port = next_test_port(); + let config = PortalnetConfig { + listen_port: port, + external_addr: Some(SocketAddr::new(ip_addr, port)), + ..Default::default() + }; + let mut discv5 = Discovery::new(config).unwrap(); + let enr = discv5.discv5.local_enr(); + discv5.start().await.unwrap(); + + let discv5 = Arc::new(discv5); + + let (utp_event_tx, utp_listener_tx, utp_listener_rx, mut utp_listener) = + UtpListener::new(Arc::clone(&discv5)); + + tokio::spawn(async move { + let mut receiver = discv5.discv5.event_stream().await.unwrap(); + while let Some(event) = receiver.recv().await { + match event { + Discv5Event::TalkRequest(request) => { + let protocol_id = + ProtocolId::from_str(&hex::encode_upper(request.protocol())).unwrap(); + + match protocol_id { + ProtocolId::Utp => utp_event_tx.send(request).unwrap(), + _ => continue, + } + } + _ => continue, + } + } + }); + tokio::spawn(async move { utp_listener.start().await }); + + (enr, utp_listener_tx, utp_listener_rx) +} + +#[tokio::test] +#[timeout(100)] +/// Simulate simple OFFER -> ACCEPT uTP payload transfer +async fn utp_listener_events() { + let protocol_id = ProtocolId::History; + + // Initialize offer uTP listener + let (enr_offer, listener_tx_offer, mut listener_rx_offer) = spawn_utp_listener().await; + // Initialize acceptor uTP listener + let (enr_accept, listener_tx_accept, mut listener_rx_accept) = spawn_utp_listener().await; + + // Prepare to receive uTP stream from the offer node + let (requested_content_key, requested_content_value) = (vec![1], vec![1, 1, 1, 1]); + let stream_id = AcceptStream(vec![requested_content_key.clone()]); + let conn_id = 1234; + let request = UtpListenerRequest::InitiateConnection( + enr_offer.clone(), + protocol_id.clone(), + stream_id, + conn_id, + ); + listener_tx_accept.send(request).unwrap(); + + // Initialise an OFFER stream and send handshake uTP packet to the acceptor node + let stream_id = OfferStream; + let (tx, rx) = tokio::sync::oneshot::channel::(); + let offer_request = UtpListenerRequest::Connect( + conn_id, + enr_accept.clone(), + protocol_id.clone(), + stream_id, + tx, + ); + listener_tx_offer.send(offer_request).unwrap(); + + // Handle STATE packet for SYN handshake in the offer node + let mut conn = rx.await.unwrap(); + assert_eq!(conn.connected_to, enr_accept); + + let mut buf = [0; BUF_SIZE]; + conn.recv(&mut buf).await.unwrap(); + + // Send content key with content value to the acceptor node + let content_items = vec![( + requested_content_key.clone(), + requested_content_value.clone(), + )]; + + let content_message = UtpAccept { + message: content_items, + }; + + let utp_payload = UtpMessage::new(content_message.as_ssz_bytes()).encode(); + let expected_utp_payload = utp_payload.clone(); + + tokio::spawn(async move { + // Send the content to the acceptor over a uTP stream + conn.send_to(&utp_payload).await.unwrap(); + // Close uTP connection + conn.close().await.unwrap(); + }); + + // Check if the expected uTP listener events match the events in offer and accept nodes + let offer_event = listener_rx_offer.recv().await.unwrap(); + let expected_offer_event = + UtpListenerEvent::ClosedStream(vec![], protocol_id.clone(), OfferStream); + assert_eq!(offer_event, expected_offer_event); + + let accept_event = listener_rx_accept.recv().await.unwrap(); + let expected_accept_event = UtpListenerEvent::ClosedStream( + expected_utp_payload, + protocol_id.clone(), + AcceptStream(vec![requested_content_key]), + ); + assert_eq!(accept_event, expected_accept_event); +} diff --git a/utp-testing/src/main.rs b/utp-testing/src/main.rs index 202c5877f..1bdf18551 100644 --- a/utp-testing/src/main.rs +++ b/utp-testing/src/main.rs @@ -1,7 +1,7 @@ use discv5::{Discv5Event, TalkRequest}; use log::debug; use std::{net::SocketAddr, str::FromStr, sync::Arc}; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use trin_core::{ portalnet::{ discovery::Discovery, @@ -10,7 +10,7 @@ use trin_core::{ }, socket, utp::{ - stream::{UtpListener, UtpListenerRequest, UtpListenerUnboundedReceiver, UtpSocket}, + stream::{UtpListener, UtpListenerEvent, UtpListenerRequest, UtpStream}, trin_helpers::{UtpMessage, UtpStreamId}, }, }; @@ -19,22 +19,22 @@ use trin_core::{ pub struct TestApp { discovery: Arc, utp_listener_tx: UnboundedSender, - utp_listener_rx: UtpListenerUnboundedReceiver, + utp_listener_rx: UnboundedReceiver, utp_event_tx: UnboundedSender, } impl TestApp { async fn send_utp_request(&mut self, conn_id: u16, payload: Vec, enr: Enr) { - let (tx, rx) = tokio::sync::oneshot::channel::>(); + let (tx, rx) = tokio::sync::oneshot::channel::(); let _ = self.utp_listener_tx.send(UtpListenerRequest::Connect( conn_id, - enr.node_id(), + enr, ProtocolId::History, UtpStreamId::OfferStream, tx, )); - let mut conn = rx.await.unwrap().unwrap(); + let mut conn = rx.await.unwrap(); let mut buf = [0; 1500]; conn.recv(&mut buf).await.unwrap(); @@ -76,7 +76,7 @@ impl TestApp { // Listen for incoming connection request on conn_id, as part of uTP handshake let _ = self .utp_listener_tx - .send(UtpListenerRequest::AddActiveConnection( + .send(UtpListenerRequest::InitiateConnection( source, ProtocolId::History, UtpStreamId::AcceptStream(vec![vec![]]),