Skip to content

Commit

Permalink
Remove redundant listening hashmap from uTP listener and clear dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
ogenev committed May 25, 2022
1 parent 5c04aa2 commit 2e8a7f2
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 231 deletions.
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn run_trin(
};

// Initialize and spawn UTP listener
let (utp_events_tx, utp_listener_tx, utp_listener_rx, mut utp_listener) =
let (utp_events_tx, utp_listener_tx, _, mut utp_listener) =
UtpListener::new(Arc::clone(&discovery));
tokio::spawn(async move { utp_listener.start().await });

Expand All @@ -67,7 +67,6 @@ pub async fn run_trin(
initialize_state_network(
&discovery,
utp_listener_tx.clone(),
utp_listener_rx.clone(),
portalnet_config.clone(),
storage_config.clone(),
)
Expand All @@ -86,7 +85,6 @@ pub async fn run_trin(
initialize_history_network(
&discovery,
utp_listener_tx,
utp_listener_rx,
portalnet_config.clone(),
storage_config.clone(),
)
Expand Down
21 changes: 2 additions & 19 deletions trin-core/src/portalnet/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::portalnet::{
use crate::{
portalnet::types::content_key::RawContentKey,
utp::{
stream::{UtpListenerEvent, UtpListenerRequest, UtpSocket, BUF_SIZE},
stream::{UtpListenerRequest, UtpSocket, BUF_SIZE},
trin_helpers::{UtpAccept, UtpMessage, UtpStreamId},
},
};
Expand All @@ -32,10 +32,7 @@ use futures::channel::oneshot;
use parking_lot::RwLock;
use ssz::Encode;
use ssz_types::VariableList;
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock as TRwLock,
};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, warn};

pub use super::overlay_service::{OverlayRequestError, RequestDirection};
Expand Down Expand Up @@ -101,7 +98,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
config: OverlayConfig,
discovery: Arc<Discovery>,
utp_listener_tx: UnboundedSender<UtpListenerRequest>,
utp_listener_rx: Arc<TRwLock<UnboundedReceiver<UtpListenerEvent>>>,
storage: Arc<RwLock<PortalStorage>>,
data_radius: U256,
protocol: ProtocolId,
Expand All @@ -124,7 +120,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
Arc::clone(&data_radius),
protocol.clone(),
utp_listener_tx.clone(),
utp_listener_rx,
config.enable_metrics,
)
.await
Expand Down Expand Up @@ -285,13 +280,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
enr: Enr,
conn_id: u16,
) -> Result<Content, OverlayRequestError> {
let utp_request = UtpListenerRequest::FindContentStream(conn_id);
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(OverlayRequestError::UtpError(format!(
"Unable to send uTP FindContent stream request: {err}"
)));
}

// initiate the connection to the acceptor
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();
self.utp_listener_tx
Expand Down Expand Up @@ -401,11 +389,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
return Ok(response);
}

let utp_request = UtpListenerRequest::OfferStream(conn_id);
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(anyhow!("Unable to send uTP Offer stream request: {err}"));
}

// initiate the connection to the acceptor
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();

Expand Down
89 changes: 6 additions & 83 deletions trin-core/src/portalnet/overlay_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,7 @@ use crate::{
utp::stream::UtpListenerRequest,
};

use crate::{
portalnet::types::content_key::RawContentKey,
utp::{
stream::{UtpListenerEvent, UtpPayload},
trin_helpers::UtpStreamId,
},
};
use crate::utp::trin_helpers::UtpStreamId;
use delay_map::HashSetDelay;
use discv5::{
enr::NodeId,
Expand All @@ -43,10 +37,7 @@ use rand::seq::SliceRandom;
use ssz::Encode;
use ssz_types::{BitList, VariableList};
use thiserror::Error;
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
RwLock as TRwLock,
};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};

/// Maximum number of ENRs in response to FindNodes.
pub const FIND_NODES_MAX_NODES: usize = 32;
Expand Down Expand Up @@ -265,8 +256,6 @@ pub struct OverlayService<TContentKey, TMetric> {
response_tx: UnboundedSender<OverlayResponse>,
/// The sender half of a channel to send requests to uTP listener
utp_listener_tx: UnboundedSender<UtpListenerRequest>,
/// Receiver for UtpListener emitted events
utp_listener_rx: Arc<TRwLock<UnboundedReceiver<UtpListenerEvent>>>,
/// Phantom content key.
phantom_content_key: PhantomData<TContentKey>,
/// Phantom metric (distance function).
Expand All @@ -292,7 +281,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
data_radius: Arc<U256>,
protocol: ProtocolId,
utp_listener_sender: UnboundedSender<UtpListenerRequest>,
utp_listener_receiver: Arc<TRwLock<UnboundedReceiver<UtpListenerEvent>>>,
enable_metrics: bool,
) -> Result<UnboundedSender<OverlayRequest>, String> {
let (request_tx, request_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -325,7 +313,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
response_rx,
response_tx,
utp_listener_tx: utp_listener_sender,
utp_listener_rx: utp_listener_receiver,
phantom_content_key: PhantomData,
phantom_metric: PhantomData,
metrics,
Expand Down Expand Up @@ -397,9 +384,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL));

loop {
// let utp_listener_rx = self.utp_listener_rx.clone();
// let mut utp_listener_lock = utp_listener_rx.write_with_warn().await;

tokio::select! {
Some(request) = self.request_rx.recv() => self.process_request(request),
Some(response) = self.response_rx.recv() => {
Expand Down Expand Up @@ -428,14 +412,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
self.peers_to_ping.insert(node_id);
}
}
// Some(utp_event) = utp_listener_lock.recv() => {
// match utp_event {
// UtpListenerEvent::ProcessedClosedStreams(processed_streams) =>
// {
// self.handle_utp_payload(processed_streams);
// }
// }
// }
_ = OverlayService::<TContentKey, TMetric>::bucket_maintenance_poll(self.protocol.clone(), &self.kbuckets) => {}
_ = bucket_refresh_interval.tick() => {
debug!("[{:?}] Overlay bucket refresh lookup", self.protocol);
Expand Down Expand Up @@ -666,26 +642,11 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
} else {
let conn_id: u16 = crate::utp::stream::rand();

// listen for incoming connection request on conn_id, as part of utp handshake and
// temporarily storing content data, so we can send it right after we receive
// Listen for incoming uTP connection request on as part of uTP handshake and
// storing content data, so we can send it inside UtpListener right after we receive
// SYN packet from the requester
let utp_request = UtpListenerRequest::FindContentData(conn_id, content.clone());
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(OverlayRequestError::UtpError(format!(
"Unable to send uTP FindContentData stream request: {err}"
)));
}

// also listen on conn_id + 1 because this is the receive path
let conn_id_recv = conn_id.wrapping_add(1);

let utp_request = UtpListenerRequest::FindContentStream(conn_id_recv);
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(OverlayRequestError::UtpError(format!(
"Unable to send uTP FindContent stream request: {err}"
)));
}

self.add_utp_connection(
source,
conn_id_recv,
Expand All @@ -704,8 +665,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
}
}
Err(msg) => Err(OverlayRequestError::Failure(format!(
"Unable to respond to FindContent: {}",
msg
"Unable to respond to FindContent: {msg}",
))),
}
}
Expand Down Expand Up @@ -748,24 +708,8 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
})?;
}

// listen for incoming connection request on conn_id, as part of utp handshake
// Listen for incoming connection request on conn_id, as part of utp handshake
let conn_id: u16 = crate::utp::stream::rand();
let utp_request = UtpListenerRequest::OfferStream(conn_id);
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(OverlayRequestError::UtpError(format!(
"Unable to send uTP Offer stream request: {err}"
)));
}

// also listen on conn_id + 1 because this is the actual receive path for acceptor
let conn_id_recv = conn_id.wrapping_add(1);

let utp_request = UtpListenerRequest::AcceptStream(conn_id_recv, accept_keys.clone());
if let Err(err) = self.utp_listener_tx.send(utp_request) {
return Err(OverlayRequestError::UtpError(format!(
"Unable to send uTP Accept stream request: {err}"
)));
}

self.add_utp_connection(source, conn_id, UtpStreamId::AcceptStream(accept_keys))?;

Expand All @@ -777,19 +721,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
Ok(accept)
}

/// Handle all closed uTP streams, currently we process only AcceptStream here.
/// FindContent payload is processed explicitly when we send FindContent request.
fn handle_utp_payload(&self, streams: Vec<(UtpPayload, UtpStreamId)>) {
for stream in streams {
match stream {
(payload, UtpStreamId::AcceptStream(content_keys)) => {
self.process_accept_utp_payload(content_keys, payload);
}
_ => {}
}
}
}

/// Sends a TALK request via Discovery v5 to some destination node.
fn send_talk_req(&self, request: Request, request_id: OverlayRequestId, destination: Enr) {
let discovery = Arc::clone(&self.discovery);
Expand Down Expand Up @@ -821,12 +752,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
});
}

/// Process accepted uTP payload of the OFFER?ACCEPT stream
fn process_accept_utp_payload(&self, content_keys: Vec<RawContentKey>, payload: UtpPayload) {
// TODO: Verify the payload, store the content and propagate gossip.
debug!("DEBUG: Processing content keys: {content_keys:?}, with payload: {payload:?}, protocol: {:?}", self.protocol);
}

/// Processes an incoming request from some source node.
fn process_incoming_request(&mut self, request: Request, _id: RequestId, source: NodeId) {
// Look up the node in the routing table.
Expand Down Expand Up @@ -1354,7 +1279,6 @@ mod tests {
let discovery = Arc::new(Discovery::new(portal_config).unwrap());

let (utp_listener_tx, _) = unbounded_channel::<UtpListenerRequest>();
let (_, utp_listener_rx) = unbounded_channel::<UtpListenerEvent>();

// Initialize DB config
let storage_capacity: u32 = DEFAULT_STORAGE_CAPACITY.parse().unwrap();
Expand Down Expand Up @@ -1392,7 +1316,6 @@ mod tests {
response_tx,
response_rx,
utp_listener_tx,
utp_listener_rx: Arc::new(TRwLock::new(utp_listener_rx)),
phantom_content_key: PhantomData,
phantom_metric: PhantomData,
metrics,
Expand Down
Loading

0 comments on commit 2e8a7f2

Please sign in to comment.