Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize TPeerId into TConnInfo #1045

Merged
merged 3 commits into from
Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 137 additions & 87 deletions core/src/nodes/collection.rs

Large diffs are not rendered by default.

73 changes: 37 additions & 36 deletions core/src/nodes/handled_node_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod tests;
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.

/// Implementation of `Stream` that handles a collection of nodes.
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> {
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId> {
/// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts
/// the task. It is possible that we receive messages from tasks that used to be in this list
/// but no longer are, in which case we should ignore them.
Expand All @@ -73,13 +73,13 @@ pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THand
local_spawns: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,

/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)>,
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo>, TaskId)>,
/// Receiver side for the events.
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)>,
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo>, TaskId)>,
}

impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo> fmt::Debug for
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>
where
TUserData: fmt::Debug
{
Expand Down Expand Up @@ -126,30 +126,31 @@ where
}

/// Prototype for a `NodeHandler`.
pub trait IntoNodeHandler<TPeerId = PeerId> {
pub trait IntoNodeHandler<TConnInfo = PeerId> {
/// The node handler.
type Handler: NodeHandler;

/// Builds the node handler.
///
/// The `TPeerId` is the id of the node the handler is going to handle.
fn into_handler(self, remote_peer_id: &TPeerId) -> Self::Handler;
/// The `TConnInfo` is the information about the connection that the handler is going to handle.
/// This is generated by the `Transport` and typically implements the `ConnectionInfo` trait.
fn into_handler(self, remote_conn_info: &TConnInfo) -> Self::Handler;
}

impl<T, TPeerId> IntoNodeHandler<TPeerId> for T
impl<T, TConnInfo> IntoNodeHandler<TConnInfo> for T
where T: NodeHandler
{
type Handler = Self;

#[inline]
fn into_handler(self, _: &TPeerId) -> Self {
fn into_handler(self, _: &TConnInfo) -> Self {
self
}
}

/// Event that can happen on the `HandledNodesTasks`.
#[derive(Debug)]
pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> {
pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId> {
/// A task has been closed.
///
/// This happens once the node handler closes or an error happens.
Expand All @@ -169,7 +170,7 @@ pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THa
/// The task that succeeded.
task: Task<'a, TInEvent, TUserData>,
/// Identifier of the node.
peer_id: TPeerId,
conn_info: TConnInfo,
},

/// A task has produced an event.
Expand All @@ -185,8 +186,8 @@ pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THa
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskId(usize);

impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>
{
/// Creates a new empty collection.
#[inline]
Expand All @@ -209,8 +210,8 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
/// events.
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, user_data: TUserData, handler: TIntoHandler) -> TaskId
where
TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr> + Send + 'static,
TIntoHandler: IntoNodeHandler<TPeerId> + Send + 'static,
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr> + Send + 'static,
TIntoHandler: IntoNodeHandler<TConnInfo> + Send + 'static,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
Expand All @@ -219,7 +220,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
TPeerId: Send + 'static,
TConnInfo: Send + 'static,
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;
Expand Down Expand Up @@ -273,7 +274,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
}

/// Provides an API similar to `Stream`, except that it cannot produce an error.
pub fn poll(&mut self) -> Async<HandledNodesEvent<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>> {
pub fn poll(&mut self) -> Async<HandledNodesEvent<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>> {
let (message, task_id) = match self.poll_inner() {
Async::Ready(r) => r,
Async::NotReady => return Async::NotReady,
Expand All @@ -289,13 +290,13 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
event
}
},
InToExtMessage::NodeReached(peer_id) => {
InToExtMessage::NodeReached(conn_info) => {
HandledNodesEvent::NodeReached {
task: match self.tasks.entry(task_id) {
Entry::Occupied(inner) => Task { inner },
Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED")
},
peer_id
conn_info
}
},
InToExtMessage::TaskClosed(result, handler) => {
Expand All @@ -318,7 +319,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
/// split `poll()` in two. This method returns an `InToExtMessage` that is guaranteed to come
/// from an alive task.
// TODO: look into merging with `poll()`
fn poll_inner(&mut self) -> Async<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)> {
fn poll_inner(&mut self) -> Async<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo>, TaskId)> {
for to_spawn in self.to_spawn.drain() {
// We try to use the default executor, but fall back to polling the task manually if
// no executor is available. This makes it possible to use the core in environments
Expand Down Expand Up @@ -490,9 +491,9 @@ enum ExtToInMessage<TInEvent> {

/// Message to transmit from a task to the public API.
#[derive(Debug)]
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> {
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo> {
/// A connection to a node has succeeded.
NodeReached(TPeerId),
NodeReached(TConnInfo),
/// The task closed.
TaskClosed(TaskClosedEvent<TReachErr, THandlerErr>, Option<TIntoHandler>),
/// An event from the node.
Expand All @@ -501,28 +502,28 @@ enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> {

/// Implementation of `Future` that handles a single node, and all the communications between
/// the various components of the `HandledNodesTasks`.
struct NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId>
struct NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TConnInfo>
where
TMuxer: StreamMuxer,
TIntoHandler: IntoNodeHandler<TPeerId>,
TIntoHandler: IntoNodeHandler<TConnInfo>,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
{
/// Sender to transmit events to the outside.
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error, TPeerId>, TaskId)>,
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error, TConnInfo>, TaskId)>,
/// Receiving end for events sent from the main `HandledNodesTasks`.
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<ExtToInMessage<TInEvent>>>,
/// Inner state of the `NodeTask`.
inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TPeerId>,
inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TConnInfo>,
/// Identifier of the attempt.
id: TaskId,
/// Channels to keep alive for as long as we don't have an acknowledgment from the remote.
taken_over: SmallVec<[mpsc::UnboundedSender<ExtToInMessage<TInEvent>>; 1]>,
}

enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TPeerId>
enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TConnInfo>
where
TMuxer: StreamMuxer,
TIntoHandler: IntoNodeHandler<TPeerId>,
TIntoHandler: IntoNodeHandler<TConnInfo>,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
{
/// Future to resolve to connect to the node.
Expand All @@ -547,12 +548,12 @@ where
Poisoned,
}

impl<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId> Future for
NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId>
impl<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TConnInfo> Future for
NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TConnInfo>
where
TMuxer: StreamMuxer,
TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr>,
TIntoHandler: IntoNodeHandler<TPeerId>,
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr>,
TIntoHandler: IntoNodeHandler<TConnInfo>,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
{
type Item = ();
Expand All @@ -579,9 +580,9 @@ where
}
// Check whether dialing succeeded.
match future.poll() {
Ok(Async::Ready((peer_id, muxer))) => {
let mut node = HandledNode::new(muxer, handler.into_handler(&peer_id));
let event = InToExtMessage::NodeReached(peer_id);
Ok(Async::Ready((conn_info, muxer))) => {
let mut node = HandledNode::new(muxer, handler.into_handler(&conn_info));
let event = InToExtMessage::NodeReached(conn_info);
for event in events_buffer {
node.inject_event(event);
}
Expand Down
1 change: 1 addition & 0 deletions core/src/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod listeners;
pub mod node;
pub mod raw_swarm;

pub use self::collection::ConnectionInfo;
pub use self::node::Substream;
pub use self::handled_node::{NodeHandlerEvent, NodeHandlerEndpoint};
pub use self::raw_swarm::{ConnectedPoint, Peer, RawSwarm, RawSwarmEvent};
Loading