Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P: Add QUIC support #13786

Merged
merged 18 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
PrivateKey: cliCtx.String(cmd.P2PPrivKey.Name),
StaticPeerID: cliCtx.Bool(cmd.P2PStaticID.Name),
MetaDataDir: cliCtx.String(cmd.P2PMetadata.Name),
QUICPort: cliCtx.Uint(cmd.P2PQUICPort.Name),
TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name),
UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name),
MaxPeers: cliCtx.Uint(cmd.P2PMaxPeers.Name),
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ go_library(
"@com_github_libp2p_go_libp2p//core/peerstore:go_default_library",
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/transport/quic:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/transport/tcp:go_default_library",
"@com_github_libp2p_go_libp2p_mplex//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Config struct {
PrivateKey string
DataDir string
MetaDataDir string
QUICPort uint
TCPPort uint
UDPPort uint
MaxPeers uint
Expand Down
188 changes: 153 additions & 35 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
Expand All @@ -39,6 +40,11 @@ const (
udp6
)

type quicProtocol uint16

// quicProtocol is the "quic" key, which holds the QUIC port of the node.
func (quicProtocol) ENRKey() string { return "quic" }

// RefreshENR uses an epoch to refresh the enr entry for our node
// with the tracked committee ids for the epoch, allowing our node
// to be dynamically discoverable by others given our tracked committee ids.
Expand Down Expand Up @@ -100,31 +106,38 @@ func (s *Service) RefreshENR() {

// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() {
iterator := s.dv5Listener.RandomNodes()
iterator = enode.Filter(iterator, s.filterPeer)
iterator := enode.Filter(s.dv5Listener.RandomNodes(), s.filterPeer)
defer iterator.Close()

for {
// Exit if service's context is canceled
// Exit if service's context is canceled.
if s.ctx.Err() != nil {
break
}

if s.isPeerAtLimit(false /* inbound */) {
// Pause the main loop for a period to stop looking
// for new peers.
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}
exists := iterator.Next()
if !exists {

if exists := iterator.Next(); !exists {
break
}

node := iterator.Node()
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
}

if peerInfo == nil {
continue
}

// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
s.Peers().RandomizeBackOff(peerInfo.ID)
go func(info *peer.AddrInfo) {
Expand Down Expand Up @@ -167,8 +180,7 @@ func (s *Service) createListener(

// Listen to all network interfaces
// for both ip protocols.
networkVersion := "udp"
conn, err := net.ListenUDP(networkVersion, udpAddr)
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return nil, errors.Wrap(err, "could not listen to UDP")
}
Expand All @@ -178,6 +190,7 @@ func (s *Service) createListener(
ipAddr,
int(s.cfg.UDPPort),
int(s.cfg.TCPPort),
int(s.cfg.QUICPort),
)
if err != nil {
return nil, errors.Wrap(err, "could not create local node")
Expand Down Expand Up @@ -209,7 +222,7 @@ func (s *Service) createListener(
func (s *Service) createLocalNode(
privKey *ecdsa.PrivateKey,
ipAddr net.IP,
udpPort, tcpPort int,
udpPort, tcpPort, quicPort int,
) (*enode.LocalNode, error) {
db, err := enode.OpenDB("")
if err != nil {
Expand All @@ -218,11 +231,19 @@ func (s *Service) createLocalNode(
localNode := enode.NewLocalNode(db, privKey)

ipEntry := enr.IP(ipAddr)
udpEntry := enr.UDP(udpPort)
tcpEntry := enr.TCP(tcpPort)
localNode.Set(ipEntry)

udpEntry := enr.UDP(udpPort)
localNode.Set(udpEntry)

tcpEntry := enr.TCP(tcpPort)
localNode.Set(tcpEntry)

if features.Get().EnableQUIC {
quicEntry := quicProtocol(quicPort)
localNode.Set(quicEntry)
}

localNode.SetFallbackIP(ipAddr)
localNode.SetFallbackUDP(udpPort)

Expand Down Expand Up @@ -277,7 +298,7 @@ func (s *Service) startDiscoveryV5(
// filterPeer validates each node that we retrieve from our dht. We
// try to ascertain that the peer can be a valid protocol peer.
// Validity Conditions:
// 1. Peer has a valid IP and TCP port set in their enr.
// 1. Peer has a valid IP and a (QUIC and/or TCP) port set in their enr.
// 2. Peer hasn't been marked as 'bad'.
// 3. Peer is not currently active or connected.
// 4. Peer is ready to receive incoming connections.
Expand All @@ -294,17 +315,13 @@ func (s *Service) filterPeer(node *enode.Node) bool {
return false
}

// Ignore nodes with their TCP ports not set.
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Debug("Could not retrieve tcp port")
}
peerData, multiAddrs, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Debug("Could not convert to peer data")
return false
}

peerData, multiAddr, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Debug("Could not convert to peer data")
if len(multiAddrs) == 0 {
return false
}

Expand Down Expand Up @@ -337,6 +354,9 @@ func (s *Service) filterPeer(node *enode.Node) bool {
}
}

// If the peer has 2 multiaddrs, favor the QUIC address, which is in first position.
multiAddr := multiAddrs[0]

// Add peer to peer handler.
s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown)

Expand Down Expand Up @@ -380,11 +400,11 @@ func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
if err != nil {
return nil, errors.Wrapf(err, "Could not get enode from string")
}
addr, err := convertToSingleMultiAddr(enodeAddr)
nodeAddrs, err := convertToMultiAddrs(enodeAddr)
if err != nil {
return nil, errors.Wrapf(err, "Could not get multiaddr")
}
allAddrs = append(allAddrs, addr)
allAddrs = append(allAddrs, nodeAddrs...)
}
return allAddrs, nil
}
Expand Down Expand Up @@ -419,45 +439,143 @@ func parseGenericAddrs(addrs []string) (enodeString, multiAddrString []string) {
}

func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr {
var multiAddrs []ma.Multiaddr
// Expect each node to have a TCP and a QUIC address.
multiAddrs := make([]ma.Multiaddr, 0, 2*len(nodes))

for _, node := range nodes {
// ignore nodes with no ip address stored
// Skip nodes with no ip address stored.
if node.IP() == nil {
continue
}
multiAddr, err := convertToSingleMultiAddr(node)

// Get up to two multiaddrs (TCP and QUIC) for each node.
nodeMultiAddrs, err := convertToMultiAddrs(node)
if err != nil {
log.WithError(err).Error("Could not convert to multiAddr")
log.WithError(err).Errorf("Could not convert to multiAddr node %s", node)
continue
}
multiAddrs = append(multiAddrs, multiAddr)

multiAddrs = append(multiAddrs, nodeMultiAddrs...)
}

return multiAddrs
}

func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, ma.Multiaddr, error) {
multiAddr, err := convertToSingleMultiAddr(node)
func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, []ma.Multiaddr, error) {
multiAddrs, err := convertToMultiAddrs(node)
if err != nil {
return nil, nil, err
}
info, err := peer.AddrInfoFromP2pAddr(multiAddr)

if len(multiAddrs) == 0 {
return nil, nil, nil
}

infos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
if err != nil {
return nil, nil, err
return nil, nil, errors.Wrapf(err, "could not convert to peer info: %v", multiAddrs)
}

if len(infos) > 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems wrong, what happens if we have 2 multiaddrs returned from convertToMultiAddrs . This error would always be triggered.

Copy link
Contributor Author

@nalepae nalepae Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peer.AddrInfosFromP2pAddrs groups input maddrs by node.

If multiaddrs contains 5 multiaddrs, but belonging to only 2 nodes, (for example 3 multiaddrs for one node and 2 multiaddrs for the other node), then len(infos) == 2 and:

  • len(infos[0].Addrs) == 3
  • len(infos[1].Addrs) == 2

For information, this is the code of AddrInfosFromP2PAddrs:

// AddrInfosFromP2pAddrs converts a set of Multiaddrs to a set of AddrInfos.
func AddrInfosFromP2pAddrs(maddrs ...ma.Multiaddr) ([]AddrInfo, error) {
	m := make(map[ID][]ma.Multiaddr)
	for _, maddr := range maddrs {
		transport, id := SplitAddr(maddr)
		if id == "" {
			return nil, ErrInvalidAddr
		}
		if transport == nil {
			if _, ok := m[id]; !ok {
				m[id] = nil
			}
		} else {
			m[id] = append(m[id], transport)
		}
	}
	ais := make([]AddrInfo, 0, len(m))
	for id, maddrs := range m {
		ais = append(ais, AddrInfo{ID: id, Addrs: maddrs})
	}
	return ais, nil
}

In our case, all the multiaddrs come from the same node, so we expect at most 1 item in infos.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok in that case, if we do get a value not equal to 1, we should simply return an error. Otherwise the caller will assume the address info is valid and proceed to use it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added a check in func (s *Service) filterPeer(node *enode.Node) bool.

In this function, even if the only way to have peerData == nil is to have at the same time multiAddrs == nil, it's better to check peerData nilness explicitely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e9d80e2.

return nil, nil, errors.Errorf("infos contains %v elements, expected not more than 1", len(infos))
}

if len(infos) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we get no info, then we should return an error imo.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the caller will expect the addr info not to be nil and will end up panicking here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment about if len(infos) > 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e9d80e2.

return nil, multiAddrs, nil
}
return info, multiAddr, nil

return &infos[0], multiAddrs, nil
}

func convertToSingleMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
// convertToMultiAddrs converts an enode.Node to a list of multiaddrs.
// If the node has a both a QUIC and a TCP port set in their ENR, then
// the multiaddr corresponding to the QUIC port is added first, followed
// by the multiaddr corresponding to the TCP port.
func convertToMultiAddrs(node *enode.Node) ([]ma.Multiaddr, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was the name changed here ? its easy to get this mixed up with convertToMultiAddr . The previous name emphasized the singular nature

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, I do understand why singular was removed but its easy to confuse it with convertToMultiAddr , another name would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a0c0326.

multiaddrs := make([]ma.Multiaddr, 0, 2)

// Retrieve the node public key.
pubkey := node.Pubkey()
assertedKey, err := ecdsaprysm.ConvertToInterfacePubkey(pubkey)
if err != nil {
return nil, errors.Wrap(err, "could not get pubkey")
}

// Compute the node ID from the public key.
id, err := peer.IDFromPublicKey(assertedKey)
if err != nil {
return nil, errors.Wrap(err, "could not get peer id")
}
return multiAddressBuilderWithID(node.IP().String(), "tcp", uint(node.TCP()), id)

if features.Get().EnableQUIC {
// If the QUIC entry is present in the ENR, build the corresponding multiaddress.
port, ok, err := getPort(node, quic)
if err != nil {
return nil, errors.Wrap(err, "could not get QUIC port")
}

if ok {
addr, err := multiAddressBuilderWithID(node.IP(), quic, port, id)
if err != nil {
return nil, errors.Wrap(err, "could not build QUIC address")
}

multiaddrs = append(multiaddrs, addr)
}
}

// If the TCP entry is present in the ENR, build the corresponding multiaddress.
port, ok, err := getPort(node, tcp)
if err != nil {
return nil, errors.Wrap(err, "could not get TCP port")
}

if ok {
addr, err := multiAddressBuilderWithID(node.IP(), tcp, port, id)
if err != nil {
return nil, errors.Wrap(err, "could not build TCP address")
}

multiaddrs = append(multiaddrs, addr)
}

return multiaddrs, nil
}

// getPort retrieves the port for a given node and protocol, as well as a boolean
// indicating whether the port was found, and an error
func getPort(node *enode.Node, protocol internetProtocol) (uint, bool, error) {
var (
port uint
err error
)

switch protocol {
case tcp:
var entry enr.TCP
err = node.Load(&entry)
port = uint(entry)
case udp:
var entry enr.UDP
err = node.Load(&entry)
port = uint(entry)
case quic:
var entry quicProtocol
err = node.Load(&entry)
port = uint(entry)
default:
return 0, false, errors.Errorf("invalid protocol: %v", protocol)
}

if enr.IsNotFound(err) {
return port, false, nil
}

if err != nil {
return 0, false, errors.Wrap(err, "could not get port")
}

return port, true, nil
}

func convertToUdpMultiAddr(node *enode.Node) ([]ma.Multiaddr, error) {
Expand All @@ -475,14 +593,14 @@ func convertToUdpMultiAddr(node *enode.Node) ([]ma.Multiaddr, error) {
var ip4 enr.IPv4
var ip6 enr.IPv6
if node.Load(&ip4) == nil {
address, ipErr := multiAddressBuilderWithID(net.IP(ip4).String(), "udp", uint(node.UDP()), id)
address, ipErr := multiAddressBuilderWithID(net.IP(ip4), udp, uint(node.UDP()), id)
if ipErr != nil {
return nil, errors.Wrap(ipErr, "could not build IPv4 address")
}
addresses = append(addresses, address)
}
if node.Load(&ip6) == nil {
address, ipErr := multiAddressBuilderWithID(net.IP(ip6).String(), "udp", uint(node.UDP()), id)
address, ipErr := multiAddressBuilderWithID(net.IP(ip6), udp, uint(node.UDP()), id)
if ipErr != nil {
return nil, errors.Wrap(ipErr, "could not build IPv6 address")
}
Expand Down
Loading
Loading