Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

or quic transport with tcp #5

Merged
merged 2 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ pub struct Config {
/// that no discovery address has been set in the CLI args.
pub enr_address: (Option<Ipv4Addr>, Option<Ipv6Addr>),

/// The udp4 port to broadcast to peers in order to reach back for discovery.
pub enr_udp4_port: Option<u16>,
/// The disc4 port to broadcast to peers in order to reach back for discovery.
pub enr_disc4_port: Option<u16>,

/// The tcp4 port to broadcast to peers in order to reach back for libp2p services.
pub enr_tcp4_port: Option<u16>,

/// The udp6 port to broadcast to peers in order to reach back for discovery.
pub enr_udp6_port: Option<u16>,
/// The disc6 port to broadcast to peers in order to reach back for discovery.
pub enr_disc6_port: Option<u16>,

/// The tcp6 port to broadcast to peers in order to reach back for libp2p services.
pub enr_tcp6_port: Option<u16>,
Expand Down Expand Up @@ -167,7 +167,7 @@ impl Config {
self.listen_addresses = ListenAddress::V4(ListenAddr {
addr,
disc_port,
quic_port,
quic_port: Some(quic_port),
tcp_port,
});
self.discv5_config.listen_config = discv5::ListenConfig::from_ip(addr.into(), disc_port);
Expand All @@ -177,17 +177,11 @@ impl Config {
/// Sets the listening address to use an ipv6 address. The discv5 ip_mode and table filter is
/// adjusted accordingly to ensure addresses that are present in the enr are globally
/// reachable.
pub fn set_ipv6_listening_address(
&mut self,
addr: Ipv6Addr,
tcp_port: u16,
disc_port: u16,
quic_port: u16,
) {
pub fn set_ipv6_listening_address(&mut self, addr: Ipv6Addr, tcp_port: u16, disc_port: u16) {
self.listen_addresses = ListenAddress::V6(ListenAddr {
addr,
disc_port,
quic_port,
quic_port: None,
tcp_port,
});

Expand All @@ -198,6 +192,8 @@ impl Config {
/// Sets the listening address to use both an ipv4 and ipv6 address. The discv5 ip_mode and
/// table filter is adjusted accordingly to ensure addresses that are present in the enr are
/// globally reachable.
/// TODO: add Quic support IPV6 port when https://github.com/libp2p/rust-libp2p/issues/4165
/// gets addressed.
pub fn set_ipv4_ipv6_listening_addresses(
&mut self,
v4_addr: Ipv4Addr,
Expand All @@ -207,19 +203,18 @@ impl Config {
v6_addr: Ipv6Addr,
tcp6_port: u16,
disc6_port: u16,
quic6_port: u16,
) {
self.listen_addresses = ListenAddress::DualStack(
ListenAddr {
addr: v4_addr,
disc_port: disc4_port,
quic_port: quic4_port,
quic_port: Some(quic4_port),
tcp_port: tcp4_port,
},
ListenAddr {
addr: v6_addr,
disc_port: disc6_port,
quic_port: quic6_port,
quic_port: None,
tcp_port: tcp6_port,
},
);
Expand All @@ -242,13 +237,18 @@ impl Config {
disc_port,
quic_port,
tcp_port,
}) => self.set_ipv4_listening_address(addr, tcp_port, disc_port, quic_port),
}) => self.set_ipv4_listening_address(
addr,
tcp_port,
disc_port,
quic_port.expect("Quic port should exist on an IPV4 address"),
),
ListenAddress::V6(ListenAddr {
addr,
disc_port,
quic_port,
quic_port: _,
tcp_port,
}) => self.set_ipv6_listening_address(addr, tcp_port, disc_port, quic_port),
}) => self.set_ipv6_listening_address(addr, tcp_port, disc_port),
ListenAddress::DualStack(
ListenAddr {
addr: ip4addr,
Expand All @@ -259,12 +259,17 @@ impl Config {
ListenAddr {
addr: ip6addr,
disc_port: disc6_port,
quic_port: quic6_port,
quic_port: _quic6_port,
tcp_port: tcp6_port,
},
) => self.set_ipv4_ipv6_listening_addresses(
ip4addr, tcp4_port, disc4_port, quic4_port, ip6addr, tcp6_port, disc6_port,
quic6_port,
ip4addr,
tcp4_port,
disc4_port,
quic4_port.expect("Quic port should exist on an IPV4 address"),
ip6addr,
tcp6_port,
disc6_port,
),
}
}
Expand Down Expand Up @@ -304,7 +309,7 @@ impl Default for Config {
let listen_addresses = ListenAddress::V4(ListenAddr {
addr: Ipv4Addr::UNSPECIFIED,
disc_port: 9000,
quic_port: 9001,
quic_port: Some(9001),
tcp_port: 9000,
});

Expand Down Expand Up @@ -338,9 +343,9 @@ impl Default for Config {
listen_addresses,
enr_address: (None, None),

enr_udp4_port: None,
enr_disc4_port: None,
enr_tcp4_port: None,
enr_udp6_port: None,
enr_disc6_port: None,
enr_tcp6_port: None,
target_peers: 50,
gs_config,
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/discovery/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ pub fn create_enr_builder_from_config<T: EnrKey>(
builder.ip6(*ip);
}

if let Some(udp4_port) = config.enr_udp4_port {
if let Some(udp4_port) = config.enr_disc4_port {
builder.udp4(udp4_port);
}

if let Some(udp6_port) = config.enr_udp6_port {
if let Some(udp6_port) = config.enr_disc6_port {
builder.udp6(udp6_port);
}

Expand Down
54 changes: 41 additions & 13 deletions beacon_node/lighthouse_network/src/listen_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ pub struct ListenAddr<Ip> {
/// The UDP port that discovery will listen on.
pub disc_port: u16,
/// The UDP port that QUIC will listen on.
pub quic_port: u16,
/// NB: Quic port is optional as it's not yet supported with IPV6
/// See https://github.com/libp2p/rust-libp2p/issues/4165
pub quic_port: Option<u16>,
/// The TCP port that libp2p will listen on.
pub tcp_port: u16,
}
Expand All @@ -21,8 +23,20 @@ impl<Ip: Into<IpAddr> + Clone> ListenAddr<Ip> {
(self.addr.clone().into(), self.disc_port).into()
}

pub fn quic_socket_addr(&self) -> SocketAddr {
(self.addr.clone().into(), self.quic_port).into()
pub fn quic_socket_addr(&self) -> Option<SocketAddr> {
let addr: IpAddr = self.addr.clone().into();
match addr {
IpAddr::V4(ip) => {
let addr = (
ip,
self.quic_port
.expect("Quic port should exist on an IPV4 address"),
)
.into();
Some(addr)
}
IpAddr::V6(_) => None,
}
}

pub fn libp2p_socket_addr(&self) -> SocketAddr {
Expand Down Expand Up @@ -55,23 +69,32 @@ impl ListenAddress {
}
}

/// Returns the TCP addresses.
pub fn tcp_addresses(&self) -> impl Iterator<Item = Multiaddr> + '_ {
let v4_multiaddr = self
/// Returns the Listen addresses.
pub fn listen_addresses(&self) -> impl Iterator<Item = Multiaddr> {
let v4_tcp_multiaddrs = self
.v4()
.map(|v4_addr| Multiaddr::from(v4_addr.addr).with(Protocol::Tcp(v4_addr.tcp_port)));

let v4_quic_multiaddrs = self
.v4()
.map(|v4_addr| Multiaddr::from(v4_addr.addr).with(Protocol::Tcp(v4_addr.tcp_port)));
let v6_multiaddr = self

let v6_tcp_multiaddrs = self
.v6()
.map(|v6_addr| Multiaddr::from(v6_addr.addr).with(Protocol::Tcp(v6_addr.tcp_port)));
v4_multiaddr.into_iter().chain(v6_multiaddr)

v4_tcp_multiaddrs
.into_iter()
.chain(v4_quic_multiaddrs)
.chain(v6_tcp_multiaddrs)
}

#[cfg(test)]
pub fn unused_v4_ports() -> Self {
ListenAddress::V4(ListenAddr {
addr: Ipv4Addr::UNSPECIFIED,
disc_port: unused_port::unused_udp4_port().unwrap(),
quic_port: unused_port::unused_udp4_port().unwrap(),
quic_port: unused_port::unused_udp4_port().transpose().unwrap(),
tcp_port: unused_port::unused_tcp4_port().unwrap(),
})
}
Expand All @@ -81,7 +104,7 @@ impl ListenAddress {
ListenAddress::V6(ListenAddr {
addr: Ipv6Addr::UNSPECIFIED,
disc_port: unused_port::unused_udp6_port().unwrap(),
quic_port: unused_port::unused_udp6_port().unwrap(),
quic_port: None,
tcp_port: unused_port::unused_tcp6_port().unwrap(),
})
}
Expand All @@ -96,15 +119,20 @@ impl slog::KV for ListenAddress {
if let Some(v4_addr) = self.v4() {
serializer.emit_arguments("ip4_address", &format_args!("{}", v4_addr.addr))?;
serializer.emit_u16("disc4_port", v4_addr.disc_port)?;
serializer.emit_u16("quic4_port", v4_addr.quic_port)?;
serializer.emit_u16(
"quic4_port",
v4_addr
.quic_port
.expect("Quic port should exist on an IPV4 address"),
)?;
serializer.emit_u16("tcp4_port", v4_addr.tcp_port)?;
}
if let Some(v6_addr) = self.v6() {
serializer.emit_arguments("ip6_address", &format_args!("{}", v6_addr.addr))?;
serializer.emit_u16("disc6_port", v6_addr.disc_port)?;
serializer.emit_u16("quic6_port", v6_addr.quic_port)?;
serializer.emit_none("quic6_port")?;
serializer.emit_u16("tcp6_port", v6_addr.tcp_port)?;
}
slog::Result::Ok(())
}
}
}
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
info!(self.log, "Libp2p Starting"; "peer_id" => %enr.peer_id(), "bandwidth_config" => format!("{}-{}", config.network_load, NetworkLoad::from(config.network_load).name));
debug!(self.log, "Attempting to open listening ports"; config.listen_addrs(), "discovery_enabled" => !config.disable_discovery);

for listen_multiaddr in config.listen_addrs().tcp_addresses() {
for listen_multiaddr in config.listen_addrs().listen_addresses() {
match self.swarm.listen_on(listen_multiaddr.clone()) {
Ok(_) => {
let mut log_address = listen_multiaddr;
Expand Down Expand Up @@ -1569,4 +1569,4 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
pub async fn next_event(&mut self) -> NetworkEvent<AppReqId, TSpec> {
futures::future::poll_fn(|cx| self.poll_network(cx)).await
}
}
}
24 changes: 12 additions & 12 deletions beacon_node/lighthouse_network/src/service/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::types::{
error, EnrAttestationBitfield, EnrSyncCommitteeBitfield, GossipEncoding, GossipKind,
};
use crate::{GossipTopic, NetworkConfig};
use futures::future::Either;
use libp2p::bandwidth::BandwidthSinks;
use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed};
use libp2p::gossipsub;
Expand Down Expand Up @@ -51,25 +52,24 @@ pub fn build_transport(
// yamux config
let mut yamux_config = yamux::Config::default();
yamux_config.set_window_update_mode(yamux::WindowUpdateMode::on_read());
let (transport, bandwidth) = transport
let transport = transport
.upgrade(core::upgrade::Version::V1)
.authenticate(generate_noise_config(&local_private_key))
.multiplex(yamux_config)
.timeout(Duration::from_secs(10))
.with_bandwidth_logging();
.timeout(Duration::from_secs(10));

// Enables Quic
/*
// The default quic configuration suits us for now.
let quic_config = libp2p_quic::Config::new(&local_private_key);
let transport = transport.or_transport(libp2p_quic::tokio::Transport::new(quic_config));

// TODO: Get quick to support bandwidth measurements.
*/

let transport = transport.boxed();
let (transport, bandwidth) = transport
.or_transport(libp2p_quic::tokio::Transport::new(quic_config))
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.with_bandwidth_logging();

Ok((transport, bandwidth))
Ok((transport.boxed(), bandwidth))
}

// Useful helper functions for debugging. Currently not used in the client.
Expand Down Expand Up @@ -274,4 +274,4 @@ pub(crate) fn save_metadata_to_disk<E: EthSpec>(
);
}
}
}
}
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ pub fn build_config(port: u16, mut boot_nodes: Vec<Enr>) -> NetworkConfig {
.tempdir()
.unwrap();

config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, port, port);
config.enr_udp4_port = Some(port);
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, port, port, port + 1);
config.enr_disc4_port = Some(port);
config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None);
config.boot_nodes_enr.append(&mut boot_nodes);
config.network_dir = path.into_path();
Expand Down
6 changes: 4 additions & 2 deletions beacon_node/network/src/nat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ impl UPnPConfig {
config.listen_addrs().v4().map(|v4_addr| UPnPConfig {
tcp_port: v4_addr.tcp_port,
disc_port: v4_addr.disc_port,
quic_port: v4_addr.quic_port,
quic_port: v4_addr
.quic_port
.expect("Quic port should exist on an IPV4 address"),
disable_discovery: config.disable_discovery,
disable_quic_support: config.disable_quic_support,
})
Expand Down Expand Up @@ -203,4 +205,4 @@ pub fn remove_mappings(tcp_port: Option<u16>, udp_ports: &[u16], log: &slog::Log
Err(e) => debug!(log, "UPnP failed to remove mappings"; "error" => %e),
}
}
}
}
7 changes: 4 additions & 3 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
Arg::with_name("port")
.long("port")
.value_name("PORT")
.help("The TCP/UDP ports to listen on. There are two UDP ports. The discovery UDP port will be set to this value and the Quic UDP port will be set to his value + 1. The discovery port can be modified by the \
.help("The TCP/UDP ports to listen on. There are two UDP ports. \
The discovery UDP port will be set to this value and the Quic UDP port will be set to this value + 1. The discovery port can be modified by the \
--discovery-port flag and the quic port can be modified by the --quic-port flag. If listening over both IPv4 and IPv6 the --port flag \
will apply to the IPv4 address and --port6 to the IPv6 address.")
.default_value("9000")
Expand Down Expand Up @@ -246,7 +247,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
Arg::with_name("disable-discovery")
.long("disable-discovery")
.help("Disables the discv5 discovery protocol. The node will not search for new peers or participate in the discovery protocol.")
.takes_value(false),
.takes_value(false)
.hidden(true)
)
.arg(
Expand Down Expand Up @@ -1165,4 +1166,4 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true)
.possible_values(ProgressiveBalancesMode::VARIANTS)
)
}
}
Loading