Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Apr 2, 2024
1 parent 5b318e3 commit a761589
Show file tree
Hide file tree
Showing 17 changed files with 1,340 additions and 34 deletions.
51 changes: 45 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bench = false
[dependencies]
async-lock = "3.3.0"
async-trait = "0.1.77"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
backoff = { version = "0.4.0", features = ["tokio"] }
bitvec = "1.0.1"
# TODO: Switch to fs4 once https://github.com/al8n/fs4-rs/issues/15 is resolved
fs2 = "0.4.3"
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ include = [
anyhow = "1.0.79"
async-lock = "3.3.0"
async-trait = "0.1.77"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
backoff = { version = "0.4.0", features = ["tokio"] }
base58 = "0.2.0"
blake2 = "0.10.6"
blake3 = { version = "1.5.0", default-features = false }
Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ include = [
[dependencies]
async-mutex = "1.4.0"
async-trait = "0.1.77"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
bytes = "1.5.0"
backoff = { version = "0.4.0", features = ["tokio"] }
bytes = "1.6.0"
clap = { version = "4.4.18", features = ["color", "derive"] }
derive_more = "0.99.17"
either = "1.8.1"
Expand All @@ -34,7 +34,7 @@ memmap2 = "0.9.3"
nohash-hasher = "0.2.0"
parity-scale-codec = "3.6.9"
parking_lot = "0.12.1"
pin-project = "1.1.3"
pin-project = "1.1.5"
prometheus-client = "0.22.0"
rand = "0.8.5"
serde = { version = "1.0.195", features = ["derive"] }
Expand Down
1 change: 0 additions & 1 deletion crates/subspace-networking/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub(crate) struct BehaviorConfig<RecordStore> {

#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "Event")]
#[behaviour(event_process = false)]
pub(crate) struct Behavior<RecordStore> {
// TODO: Connection limits must be the first protocol due to https://github.com/libp2p/rust-libp2p/issues/4773 as
// suggested in https://github.com/libp2p/rust-libp2p/issues/4898#issuecomment-1818013483
Expand Down
35 changes: 17 additions & 18 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::utils::multihash::Multihash;
use crate::utils::HandlerFn;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
use futures::channel::mpsc::SendError;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, Stream, StreamExt};
use libp2p::gossipsub::{Sha256Topic, SubscriptionError};
Expand Down Expand Up @@ -43,7 +42,7 @@ impl Stream for TopicSubscription {

#[pin_project::pinned_drop]
impl PinnedDrop for TopicSubscription {
fn drop(mut self: std::pin::Pin<&mut Self>) {
fn drop(mut self: Pin<&mut Self>) {
let topic = self
.topic
.take()
Expand All @@ -70,7 +69,7 @@ impl PinnedDrop for TopicSubscription {
pub enum GetValueError {
/// Failed to send command to the node runner
#[error("Failed to send command to the node runner: {0}")]
SendCommand(#[from] SendError),
SendCommand(#[from] mpsc::SendError),
/// Node runner was dropped
#[error("Node runner was dropped")]
NodeRunnerDropped,
Expand All @@ -87,7 +86,7 @@ impl From<oneshot::Canceled> for GetValueError {
pub enum PutValueError {
/// Failed to send command to the node runner
#[error("Failed to send command to the node runner: {0}")]
SendCommand(#[from] SendError),
SendCommand(#[from] mpsc::SendError),
/// Node runner was dropped
#[error("Node runner was dropped")]
NodeRunnerDropped,
Expand All @@ -105,7 +104,7 @@ impl From<oneshot::Canceled> for PutValueError {
pub enum GetClosestPeersError {
/// Failed to send command to the node runner
#[error("Failed to send command to the node runner: {0}")]
SendCommand(#[from] SendError),
SendCommand(#[from] mpsc::SendError),
/// Node runner was dropped
#[error("Node runner was dropped")]
NodeRunnerDropped,
Expand All @@ -123,7 +122,7 @@ impl From<oneshot::Canceled> for GetClosestPeersError {
pub enum SubscribeError {
/// Failed to send command to the node runner
#[error("Failed to send command to the node runner: {0}")]
SendCommand(#[from] SendError),
SendCommand(#[from] mpsc::SendError),
/// Node runner was dropped
#[error("Node runner was dropped")]
NodeRunnerDropped,
Expand All @@ -143,7 +142,7 @@ impl From<oneshot::Canceled> for SubscribeError {
pub enum PublishError {
/// Failed to send command to the node runner
#[error("Failed to send command to the node runner: {0}")]
SendCommand(#[from] SendError),
SendCommand(#[from] mpsc::SendError),
/// Node runner was dropped
#[error("Node runner was dropped")]
NodeRunnerDropped,
Expand All @@ -163,7 +162,7 @@ impl From<oneshot::Canceled> for PublishError {
pub enum GetProvidersError {
/// Failed to send command to the node runner
#[error("Failed to send command to the node runner: {0}")]
SendCommand(#[from] SendError),
SendCommand(#[from] mpsc::SendError),
/// Node runner was dropped
#[error("Node runner was dropped")]
NodeRunnerDropped,
Expand All @@ -184,7 +183,7 @@ impl From<oneshot::Canceled> for GetProvidersError {
pub enum SendRequestError {
/// Failed to send command to the node runner
#[error("Failed to send command to the node runner: {0}")]
SendCommand(#[from] SendError),
SendCommand(#[from] mpsc::SendError),
/// Node runner was dropped
#[error("Node runner was dropped")]
NodeRunnerDropped,
Expand All @@ -207,7 +206,7 @@ impl From<oneshot::Canceled> for SendRequestError {
pub enum ConnectedPeersError {
/// Failed to send command to the node runner
#[error("Failed to send command to the node runner: {0}")]
SendCommand(#[from] SendError),
SendCommand(#[from] mpsc::SendError),
/// Node runner was dropped
#[error("Node runner was dropped")]
NodeRunnerDropped,
Expand All @@ -227,7 +226,7 @@ impl From<oneshot::Canceled> for ConnectedPeersError {
pub enum BootstrapError {
/// Failed to send command to the node runner
#[error("Failed to send command to the node runner: {0}")]
SendCommand(#[from] SendError),
SendCommand(#[from] mpsc::SendError),
/// Node runner was dropped
#[error("Node runner was dropped")]
NodeRunnerDropped,
Expand Down Expand Up @@ -471,7 +470,7 @@ impl Node {
}

/// Ban peer with specified peer ID.
pub async fn ban_peer(&self, peer_id: PeerId) -> Result<(), SendError> {
pub async fn ban_peer(&self, peer_id: PeerId) -> Result<(), mpsc::SendError> {
self.shared
.command_sender
.clone()
Expand All @@ -483,7 +482,7 @@ impl Node {
/// It could be used to test libp2p transports bypassing protocol checks for bootstrap
/// or listen-on addresses.
#[doc(hidden)]
pub async fn dial(&self, address: Multiaddr) -> Result<(), SendError> {
pub async fn dial(&self, address: Multiaddr) -> Result<(), mpsc::SendError> {
self.shared
.command_sender
.clone()
Expand Down Expand Up @@ -559,16 +558,16 @@ impl Node {
Ok(())
}

/// Callback is called when a peer is disconnected.
pub fn on_disconnected_peer(&self, callback: HandlerFn<PeerId>) -> HandlerId {
self.shared.handlers.disconnected_peer.add(callback)
}

/// Callback is called when a peer is connected.
pub fn on_connected_peer(&self, callback: HandlerFn<PeerId>) -> HandlerId {
self.shared.handlers.connected_peer.add(callback)
}

/// Callback is called when a peer is disconnected.
pub fn on_disconnected_peer(&self, callback: HandlerFn<PeerId>) -> HandlerId {
self.shared.handlers.disconnected_peer.add(callback)
}

/// Callback is called when a routable or unraoutable peer is discovered.
pub fn on_discovered_peer(&self, callback: HandlerFn<PeerDiscovered>) -> HandlerId {
self.shared.handlers.peer_discovered.add(callback)
Expand Down
13 changes: 10 additions & 3 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ where
%peer_id,
%is_reserved_peer,
?endpoint,
"Connection established [{num_established} from peer]"
%num_established,
"Connection established"
);

let maybe_remote_ip =
Expand Down Expand Up @@ -524,9 +525,12 @@ where
return;
}
};

debug!(
%peer_id,
?cause,
"Connection closed with peer {peer_id} [{num_established} from peer]"
%num_established,
"Connection closed with peer"
);

if num_established == 0 {
Expand Down Expand Up @@ -1291,7 +1295,10 @@ where
}
}
Ok(false) => {
panic!("Logic error, topic subscription wasn't created, this must never happen");
panic!(
"Logic error, topic subscription wasn't created, this \
must never happen"
);
}
Err(error) => {
let _ = result_sender.send(Err(error));
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ pub(crate) enum Command {
pub(crate) struct Handlers {
pub(crate) new_listener: Handler<Multiaddr>,
pub(crate) num_established_peer_connections_change: Handler<usize>,
pub(crate) disconnected_peer: Handler<PeerId>,
pub(crate) connected_peer: Handler<PeerId>,
pub(crate) disconnected_peer: Handler<PeerId>,
pub(crate) peer_discovered: Handler<PeerDiscovered>,
}

Expand Down
29 changes: 29 additions & 0 deletions shared/subspace-cluster-networking/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "subspace-cluster-networking"
version = "0.1.0"
authors = ["Nazar Mokrynskyi <[email protected]>"]
description = "Networking functionality for cluster applications"
edition = "2021"
license = "Apache-2.0"
homepage = "https://subspace.network"
repository = "https://github.com/subspace/subspace"
include = [
"/src",
"/Cargo.toml",
]

[dependencies]
async-trait = "0.1.77"
backoff = { version = "0.4.0", features = ["tokio"] }
event-listener-primitives = "2.0.1"
futures = "0.3.29"
libp2p = { version = "0.53.2", features = ["dns", "macros", "metrics", "noise", "request-response", "tcp", "tokio", "yamux"] }
parity-scale-codec = { version = "3.6.9", features = ["derive"] }
parking_lot = "0.12.1"
pin-project = "1.1.5"
thiserror = "1.0.56"
tokio = { version = "1.35.1", features = ["macros", "parking_lot", "rt-multi-thread"] }
tracing = "0.1.40"

[dev-dependencies]
futures_ringbuf = "0.4.0"
Loading

0 comments on commit a761589

Please sign in to comment.