From 933ad98e4a512cfbf724141fe4734ce3ba9a96c4 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 22 Mar 2024 23:28:18 +0100 Subject: [PATCH 01/18] (Unrelated) DoppelGanger: Improve message. --- config/features/flags.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/features/flags.go b/config/features/flags.go index d64fb49f8bee..fec1b987ac00 100644 --- a/config/features/flags.go +++ b/config/features/flags.go @@ -109,8 +109,8 @@ var ( } enableDoppelGangerProtection = &cli.BoolFlag{ Name: "enable-doppelganger", - Usage: `Enables the validator to perform a doppelganger check. - This is not "a foolproof method to find duplicate instances in the network. + Usage: `Enables the validator to perform a doppelganger check. + This is not a foolproof method to find duplicate instances in the network. Your validator will still be vulnerable if it is being run in unsafe configurations.`, } disableStakinContractCheck = &cli.BoolFlag{ From 628633ccd93d1d99de8c6fd28ed59c5833086033 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 29 Mar 2024 11:24:28 +0100 Subject: [PATCH 02/18] `beacon-blocks-by-range`: Add `--network` option. --- cmd/prysmctl/p2p/request_blocks.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/cmd/prysmctl/p2p/request_blocks.go b/cmd/prysmctl/p2p/request_blocks.go index 47d6ae417a93..fa71eef93a94 100644 --- a/cmd/prysmctl/p2p/request_blocks.go +++ b/cmd/prysmctl/p2p/request_blocks.go @@ -23,6 +23,7 @@ import ( ) var requestBlocksFlags = struct { + Network string Peers string ClientPort uint APIEndpoints string @@ -42,6 +43,12 @@ var requestBlocksCmd = &cli.Command{ }, Flags: []cli.Flag{ cmd.ChainConfigFileFlag, + &cli.StringFlag{ + Name: "network", + Usage: "network to run on (mainnet, sepolia, holesky)", + Destination: &requestBlocksFlags.Network, + Value: "mainnet", + }, &cli.StringFlag{ Name: "peer-multiaddrs", Usage: "comma-separated, peer multiaddr(s) to connect to for p2p requests", @@ -82,6 +89,21 @@ var requestBlocksCmd = &cli.Command{ } func cliActionRequestBlocks(cliCtx *cli.Context) error { + switch requestBlocksFlags.Network { + case params.SepoliaName: + if err := params.SetActive(params.SepoliaConfig()); err != nil { + log.Fatal(err) + } + case params.HoleskyName: + if err := params.SetActive(params.HoleskyConfig()); err != nil { + log.Fatal(err) + } + case params.MainnetName: + // Do nothing + default: + log.Fatalf("Unknown network provided: %s", requestBlocksFlags.Network) + } + if cliCtx.IsSet(cmd.ChainConfigFileFlag.Name) { chainConfigFileName := cliCtx.String(cmd.ChainConfigFileFlag.Name) if err := params.LoadChainConfigFile(chainConfigFileName, nil); err != nil { From 9a365f9fb9e120dd91b9e5c87f8344be7b09ee5e Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 25 Mar 2024 11:46:15 +0100 Subject: [PATCH 03/18] `ensurePeerConnections`: Remove capital letter in error message. --- beacon-chain/p2p/watch_peers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/p2p/watch_peers.go b/beacon-chain/p2p/watch_peers.go index 59141b177d8f..0b493570e97a 100644 --- a/beacon-chain/p2p/watch_peers.go +++ b/beacon-chain/p2p/watch_peers.go @@ -50,7 +50,7 @@ func ensurePeerConnections(ctx context.Context, h host.Host, peers *peers.Status c := h.Network().ConnsToPeer(p.ID) if len(c) == 0 { if err := connectWithTimeout(ctx, h, p); err != nil { - log.WithField("peer", p.ID).WithField("addrs", p.Addrs).WithError(err).Errorf("Failed to reconnect to peer") + log.WithField("peer", p.ID).WithField("addrs", p.Addrs).WithError(err).Errorf("failed to reconnect to peer") continue } } From 90b5cea857b5cf56bb5efa4087e0db7d370c486b Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 20 Mar 2024 15:33:23 +0100 Subject: [PATCH 04/18] `MultiAddressBuilder{WithID}`: Refactor. --- beacon-chain/p2p/discovery.go | 6 ++-- beacon-chain/p2p/log.go | 3 +- beacon-chain/p2p/options.go | 62 ++++++++++++++++++++++------------- cmd/prysmctl/p2p/client.go | 2 +- 4 files changed, 45 insertions(+), 28 deletions(-) diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 5a08101a28ed..3953ba345e72 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -457,7 +457,7 @@ func convertToSingleMultiAddr(node *enode.Node) (ma.Multiaddr, error) { if err != nil { return nil, errors.Wrap(err, "could not get peer id") } - return multiAddressBuilderWithID(node.IP().String(), "tcp", uint(node.TCP()), id) + return multiAddressBuilderWithID(node.IP(), "tcp", uint(node.TCP()), id) } func convertToUdpMultiAddr(node *enode.Node) ([]ma.Multiaddr, error) { @@ -475,14 +475,14 @@ func convertToUdpMultiAddr(node *enode.Node) ([]ma.Multiaddr, error) { var ip4 enr.IPv4 var ip6 enr.IPv6 if node.Load(&ip4) == nil { - address, ipErr := multiAddressBuilderWithID(net.IP(ip4).String(), "udp", uint(node.UDP()), id) + address, ipErr := multiAddressBuilderWithID(net.IP(ip4), "udp", uint(node.UDP()), id) if ipErr != nil { return nil, errors.Wrap(ipErr, "could not build IPv4 address") } addresses = append(addresses, address) } if node.Load(&ip6) == nil { - address, ipErr := multiAddressBuilderWithID(net.IP(ip6).String(), "udp", uint(node.UDP()), id) + address, ipErr := multiAddressBuilderWithID(net.IP(ip6), "udp", uint(node.UDP()), id) if ipErr != nil { return nil, errors.Wrap(ipErr, "could not build IPv6 address") } diff --git a/beacon-chain/p2p/log.go b/beacon-chain/p2p/log.go index 7d395f805dcb..d83e1f8b37a0 100644 --- a/beacon-chain/p2p/log.go +++ b/beacon-chain/p2p/log.go @@ -1,6 +1,7 @@ package p2p import ( + "net" "strconv" "strings" @@ -29,7 +30,7 @@ func logIPAddr(id peer.ID, addrs ...ma.Multiaddr) { func logExternalIPAddr(id peer.ID, addr string, port uint) { if addr != "" { - multiAddr, err := MultiAddressBuilder(addr, port) + multiAddr, err := MultiAddressBuilder(net.ParseIP(addr), port) if err != nil { log.WithError(err).Error("Could not create multiaddress") return diff --git a/beacon-chain/p2p/options.go b/beacon-chain/p2p/options.go index 5d4b8fa31b99..f363276ca256 100644 --- a/beacon-chain/p2p/options.go +++ b/beacon-chain/p2p/options.go @@ -21,30 +21,32 @@ import ( ) // MultiAddressBuilder takes in an ip address string and port to produce a go multiaddr format. -func MultiAddressBuilder(ipAddr string, port uint) (ma.Multiaddr, error) { - parsedIP := net.ParseIP(ipAddr) - if parsedIP.To4() == nil && parsedIP.To16() == nil { - return nil, errors.Errorf("invalid ip address provided: %s", ipAddr) - } - if parsedIP.To4() != nil { - return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port)) +func MultiAddressBuilder(ip net.IP, port uint) (ma.Multiaddr, error) { + ipType, err := extractIpType(ip) + if err != nil { + return nil, errors.Wrap(err, "unable to determine IP type") } - return ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/tcp/%d", ipAddr, port)) + + // Example: /ip4/1.2.3.4./tcp/5678 + multiaddrStr := fmt.Sprintf("/%s/%s/tcp/%d", ipType, ip, port) + + return ma.NewMultiaddr(multiaddrStr) } // buildOptions for the libp2p host. func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Option, error) { cfg := s.cfg - listen, err := MultiAddressBuilder(ip.String(), cfg.TCPPort) + listen, err := MultiAddressBuilder(ip, cfg.TCPPort) if err != nil { - return nil, errors.Wrapf(err, "cannot produce multiaddr format from %s:%d", ip.String(), cfg.TCPPort) + return nil, errors.Wrapf(err, "cannot produce multiaddr format from %s:%d", ip, cfg.TCPPort) } if cfg.LocalIP != "" { - if net.ParseIP(cfg.LocalIP) == nil { + localIP := net.ParseIP(cfg.LocalIP) + if localIP == nil { return nil, errors.Wrapf(err, "invalid local ip provided: %s:%d", cfg.LocalIP, cfg.TCPPort) } - listen, err = MultiAddressBuilder(cfg.LocalIP, cfg.TCPPort) + listen, err = MultiAddressBuilder(localIP, cfg.TCPPort) if err != nil { return nil, errors.Wrapf(err, "cannot produce multiaddr format from %s:%d", cfg.LocalIP, cfg.TCPPort) } @@ -58,7 +60,7 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op return nil, errors.Wrapf(err, "cannot get ID from public key: %s", ifaceKey.GetPublic().Type().String()) } - log.Infof("Running node with peer id of %s ", id.String()) + log.Infof("Running node with peer id of %s ", id) options := []libp2p.Option{ privKeyOption(priKey), @@ -83,7 +85,7 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op } if cfg.HostAddress != "" { options = append(options, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { - external, err := MultiAddressBuilder(cfg.HostAddress, cfg.TCPPort) + external, err := MultiAddressBuilder(net.ParseIP(cfg.HostAddress), cfg.TCPPort) if err != nil { log.WithError(err).Error("Unable to create external multiaddress") } else { @@ -110,18 +112,32 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op return options, nil } -func multiAddressBuilderWithID(ipAddr, protocol string, port uint, id peer.ID) (ma.Multiaddr, error) { - parsedIP := net.ParseIP(ipAddr) - if parsedIP.To4() == nil && parsedIP.To16() == nil { - return nil, errors.Errorf("invalid ip address provided: %s", ipAddr) +func extractIpType(ip net.IP) (string, error) { + if ip.To4() != nil { + return "ip4", nil } - if id.String() == "" { - return nil, errors.New("empty peer id given") + + if ip.To16() != nil { + return "ip6", nil } - if parsedIP.To4() != nil { - return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/%s/%d/p2p/%s", ipAddr, protocol, port, id.String())) + + return "", errors.Errorf("provided IP address is neither IPv4 nor IPv6: %s", ip) +} + +func multiAddressBuilderWithID(ip net.IP, protocol string, port uint, id peer.ID) (ma.Multiaddr, error) { + if id == "" { + return nil, errors.Errorf("empty peer id given: %s", id) + } + + ipType, err := extractIpType(ip) + if err != nil { + return nil, errors.Wrap(err, "unable to determine IP type") } - return ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/%s/%d/p2p/%s", ipAddr, protocol, port, id.String())) + + // Example: /ip4/1.2.3.4/tcp/5678/p2p/16Uiu2HAkum7hhuMpWqFj3yNLcmQBGmThmqw2ohaCRThXQuKU9ohs + multiaddrStr := fmt.Sprintf("/%s/%s/%s/%d/p2p/%s", ipType, ip, protocol, port, id) + + return ma.NewMultiaddr(multiaddrStr) } // Adds a private key to the libp2p option if the option was provided. diff --git a/cmd/prysmctl/p2p/client.go b/cmd/prysmctl/p2p/client.go index ede67c8595e4..eae2962b5f05 100644 --- a/cmd/prysmctl/p2p/client.go +++ b/cmd/prysmctl/p2p/client.go @@ -53,7 +53,7 @@ func newClient(beaconEndpoints []string, clientPort uint) (*client, error) { if err != nil { return nil, errors.Wrap(err, "could not set up p2p metadata") } - listen, err := p2p.MultiAddressBuilder(ipAdd.String(), clientPort) + listen, err := p2p.MultiAddressBuilder(ipAdd, clientPort) if err != nil { return nil, errors.Wrap(err, "could not set up listening multiaddr") } From 8d4eaa23a7117fc9754e2f6c10d3872db73e2d86 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 20 Mar 2024 15:42:22 +0100 Subject: [PATCH 05/18] `buildOptions`: Improve log. --- beacon-chain/p2p/options.go | 2 +- testing/endtoend/components/beacon_node.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon-chain/p2p/options.go b/beacon-chain/p2p/options.go index f363276ca256..38fb3316183d 100644 --- a/beacon-chain/p2p/options.go +++ b/beacon-chain/p2p/options.go @@ -60,7 +60,7 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op return nil, errors.Wrapf(err, "cannot get ID from public key: %s", ifaceKey.GetPublic().Type().String()) } - log.Infof("Running node with peer id of %s ", id) + log.WithField("peerId", id).Info("Running node with") options := []libp2p.Option{ privKeyOption(priKey), diff --git a/testing/endtoend/components/beacon_node.go b/testing/endtoend/components/beacon_node.go index 5eb9273114c4..9e90eb314c10 100644 --- a/testing/endtoend/components/beacon_node.go +++ b/testing/endtoend/components/beacon_node.go @@ -313,7 +313,7 @@ func (node *BeaconNode) Start(ctx context.Context) error { } if config.UseFixedPeerIDs { - peerId, err := helpers.FindFollowingTextInFile(stdOutFile, "Running node with peer id of ") + peerId, err := helpers.FindFollowingTextInFile(stdOutFile, "Running node with peerId=") if err != nil { return fmt.Errorf("could not find peer id: %w", err) } From 841f3d71ef1fc87822f9a4c01bb4b8153a6436b4 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 20 Mar 2024 15:55:15 +0100 Subject: [PATCH 06/18] `NewService`: Bubbles up errors. --- beacon-chain/p2p/service.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index f87a99aa1f07..c6a58c928086 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -124,31 +124,34 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { if err != nil { return nil, errors.Wrapf(err, "failed to build p2p options") } + // Sets mplex timeouts configureMplex() h, err := libp2p.New(opts...) if err != nil { - log.WithError(err).Error("Failed to create p2p host") - return nil, err + return nil, errors.Wrapf(err, "failed to create p2p host") } s.host = h + // Gossipsub registration is done before we add in any new peers // due to libp2p's gossipsub implementation not taking into // account previously added peers when creating the gossipsub // object. psOpts := s.pubsubOptions() + // Set the pubsub global parameters that we require. setPubSubParameters() + // Reinitialize them in the event we are running a custom config. attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount syncCommsSubnetCount = params.BeaconConfig().SyncCommitteeSubnetCount gs, err := pubsub.NewGossipSub(s.ctx, s.host, psOpts...) if err != nil { - log.WithError(err).Error("Failed to start pubsub") - return nil, err + return nil, errors.Wrapf(err, "failed to create p2p pubsub") } + s.pubsub = gs s.peers = peers.NewStatus(ctx, &peers.StatusConfig{ From feb2b6f0a17a6909456ea1ff7ff4857b3b974e32 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 25 Mar 2024 13:17:10 +0100 Subject: [PATCH 07/18] `tcp` ==> `libp2ptcp` --- beacon-chain/p2p/options.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon-chain/p2p/options.go b/beacon-chain/p2p/options.go index 38fb3316183d..a923148f71d0 100644 --- a/beacon-chain/p2p/options.go +++ b/beacon-chain/p2p/options.go @@ -11,7 +11,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/security/noise" - "github.com/libp2p/go-libp2p/p2p/transport/tcp" + libp2ptcp "github.com/libp2p/go-libp2p/p2p/transport/tcp" gomplex "github.com/libp2p/go-mplex" ma "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" @@ -67,7 +67,7 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op libp2p.ListenAddrs(listen), libp2p.UserAgent(version.BuildData()), libp2p.ConnectionGater(s), - libp2p.Transport(tcp.NewTCPTransport), + libp2p.Transport(libp2ptcp.NewTCPTransport), libp2p.DefaultMuxers, libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), libp2p.Security(noise.ID, noise.New), From b4bd51adbb4c6e58b68536b4270a4462ce390336 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 25 Mar 2024 15:43:42 +0100 Subject: [PATCH 08/18] `multiAddressBuilderWithID`: Add the ability to build QUIC multiaddr --- beacon-chain/p2p/discovery.go | 6 ++-- beacon-chain/p2p/options.go | 25 +++++++++++++-- beacon-chain/p2p/options_test.go | 53 ++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 6 deletions(-) diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 3953ba345e72..d66dc1f83f03 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -457,7 +457,7 @@ func convertToSingleMultiAddr(node *enode.Node) (ma.Multiaddr, error) { if err != nil { return nil, errors.Wrap(err, "could not get peer id") } - return multiAddressBuilderWithID(node.IP(), "tcp", uint(node.TCP()), id) + return multiAddressBuilderWithID(node.IP(), tcp, uint(node.TCP()), id) } func convertToUdpMultiAddr(node *enode.Node) ([]ma.Multiaddr, error) { @@ -475,14 +475,14 @@ func convertToUdpMultiAddr(node *enode.Node) ([]ma.Multiaddr, error) { var ip4 enr.IPv4 var ip6 enr.IPv6 if node.Load(&ip4) == nil { - address, ipErr := multiAddressBuilderWithID(net.IP(ip4), "udp", uint(node.UDP()), id) + address, ipErr := multiAddressBuilderWithID(net.IP(ip4), udp, uint(node.UDP()), id) if ipErr != nil { return nil, errors.Wrap(ipErr, "could not build IPv4 address") } addresses = append(addresses, address) } if node.Load(&ip6) == nil { - address, ipErr := multiAddressBuilderWithID(net.IP(ip6), "udp", uint(node.UDP()), id) + address, ipErr := multiAddressBuilderWithID(net.IP(ip6), udp, uint(node.UDP()), id) if ipErr != nil { return nil, errors.Wrap(ipErr, "could not build IPv6 address") } diff --git a/beacon-chain/p2p/options.go b/beacon-chain/p2p/options.go index a923148f71d0..e767fb1c42ae 100644 --- a/beacon-chain/p2p/options.go +++ b/beacon-chain/p2p/options.go @@ -20,6 +20,14 @@ import ( "github.com/prysmaticlabs/prysm/v5/runtime/version" ) +type internetProtocol string + +const ( + udp = "udp" + tcp = "tcp" + quic = "quic" +) + // MultiAddressBuilder takes in an ip address string and port to produce a go multiaddr format. func MultiAddressBuilder(ip net.IP, port uint) (ma.Multiaddr, error) { ipType, err := extractIpType(ip) @@ -124,7 +132,9 @@ func extractIpType(ip net.IP) (string, error) { return "", errors.Errorf("provided IP address is neither IPv4 nor IPv6: %s", ip) } -func multiAddressBuilderWithID(ip net.IP, protocol string, port uint, id peer.ID) (ma.Multiaddr, error) { +func multiAddressBuilderWithID(ip net.IP, protocol internetProtocol, port uint, id peer.ID) (ma.Multiaddr, error) { + var multiaddrStr string + if id == "" { return nil, errors.Errorf("empty peer id given: %s", id) } @@ -134,8 +144,17 @@ func multiAddressBuilderWithID(ip net.IP, protocol string, port uint, id peer.ID return nil, errors.Wrap(err, "unable to determine IP type") } - // Example: /ip4/1.2.3.4/tcp/5678/p2p/16Uiu2HAkum7hhuMpWqFj3yNLcmQBGmThmqw2ohaCRThXQuKU9ohs - multiaddrStr := fmt.Sprintf("/%s/%s/%s/%d/p2p/%s", ipType, ip, protocol, port, id) + switch protocol { + case udp, tcp: + // Example with UDP: /ip4/1.2.3.4/udp/5678/p2p/16Uiu2HAkum7hhuMpWqFj3yNLcmQBGmThmqw2ohaCRThXQuKU9ohs + // Example with TCP: /ip6/1.2.3.4/tcp/5678/p2p/16Uiu2HAkum7hhuMpWqFj3yNLcmQBGmThmqw2ohaCRThXQuKU9ohs + multiaddrStr = fmt.Sprintf("/%s/%s/%s/%d/p2p/%s", ipType, ip, protocol, port, id) + case quic: + // Example: /ip4/1.2.3.4/udp/5678/quic-v1/p2p/16Uiu2HAkum7hhuMpWqFj3yNLcmQBGmThmqw2ohaCRThXQuKU9ohs + multiaddrStr = fmt.Sprintf("/%s/%s/udp/%d/quic-v1/p2p/%s", ipType, ip, port, id) + default: + return nil, errors.Errorf("unsupported protocol: %s", protocol) + } return ma.NewMultiaddr(multiaddrStr) } diff --git a/beacon-chain/p2p/options_test.go b/beacon-chain/p2p/options_test.go index 07b80e2b6897..4107d611cc55 100644 --- a/beacon-chain/p2p/options_test.go +++ b/beacon-chain/p2p/options_test.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v5/config/params" @@ -127,5 +128,57 @@ func TestDefaultMultiplexers(t *testing.T) { assert.Equal(t, protocol.ID("/yamux/1.0.0"), cfg.Muxers[0].ID) assert.Equal(t, protocol.ID("/mplex/6.7.0"), cfg.Muxers[1].ID) +} + +func TestMultiAddressBuilderWithID(t *testing.T) { + testCases := []struct { + name string + ip net.IP + protocol internetProtocol + port uint + id string + + expectedMultiaddrStr string + }{ + { + name: "UDP", + ip: net.IPv4(192, 168, 0, 1), + protocol: udp, + port: 5678, + id: "0025080212210204fb1ebb1aa467527d34306a4794a5171d6516405e720b909b7f816d63aef96a", + + expectedMultiaddrStr: "/ip4/192.168.0.1/udp/5678/p2p/16Uiu2HAkum7hhuMpWqFj3yNLcmQBGmThmqw2ohaCRThXQuKU9ohs", + }, + { + name: "TCP", + ip: net.IPv4(192, 168, 0, 1), + protocol: tcp, + port: 5678, + id: "0025080212210204fb1ebb1aa467527d34306a4794a5171d6516405e720b909b7f816d63aef96a", + + expectedMultiaddrStr: "/ip4/192.168.0.1/tcp/5678/p2p/16Uiu2HAkum7hhuMpWqFj3yNLcmQBGmThmqw2ohaCRThXQuKU9ohs", + }, + { + name: "QUIC", + ip: net.IPv4(192, 168, 0, 1), + protocol: quic, + port: 5678, + id: "0025080212210204fb1ebb1aa467527d34306a4794a5171d6516405e720b909b7f816d63aef96a", + expectedMultiaddrStr: "/ip4/192.168.0.1/udp/5678/quic-v1/p2p/16Uiu2HAkum7hhuMpWqFj3yNLcmQBGmThmqw2ohaCRThXQuKU9ohs", + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + id, err := hex.DecodeString(tt.id) + require.NoError(t, err) + + actualMultiaddr, err := multiAddressBuilderWithID(tt.ip, tt.protocol, tt.port, peer.ID(id)) + require.NoError(t, err) + + actualMultiaddrStr := actualMultiaddr.String() + require.Equal(t, tt.expectedMultiaddrStr, actualMultiaddrStr) + }) + } } From 924db9870dd6f675a87545a0960ffc66d560bb91 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 25 Mar 2024 16:06:01 +0100 Subject: [PATCH 09/18] `p2p Start`: Fix error message. --- beacon-chain/p2p/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index c6a58c928086..fb33fb8ee2c9 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -216,7 +216,7 @@ func (s *Service) Start() { if len(s.cfg.StaticPeers) > 0 { addrs, err := PeersFromStringAddrs(s.cfg.StaticPeers) if err != nil { - log.WithError(err).Error("Could not connect to static peer") + log.WithError(err).Error("could not convert ENR to multiaddr") } // Set trusted peers for those that are provided as static addresses. pids := peerIdsFromMultiAddrs(addrs) From b4ae4fb52469b1fe5cf7b192364f627df5e8ca9c Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 21 Mar 2024 18:11:57 +0100 Subject: [PATCH 10/18] `p2p`: Add QUIC support. --- beacon-chain/node/node.go | 1 + beacon-chain/p2p/BUILD.bazel | 1 + beacon-chain/p2p/config.go | 1 + beacon-chain/p2p/discovery.go | 173 +++++++++++++++++---- beacon-chain/p2p/discovery_test.go | 14 +- beacon-chain/p2p/fork_test.go | 31 ++-- beacon-chain/p2p/log.go | 26 ++-- beacon-chain/p2p/options.go | 33 +++- beacon-chain/p2p/options_test.go | 26 ++-- beacon-chain/p2p/service.go | 3 +- beacon-chain/p2p/service_test.go | 6 +- beacon-chain/p2p/subnets.go | 5 + beacon-chain/p2p/subnets_test.go | 12 +- beacon-chain/sync/rpc.go | 6 + cmd/beacon-chain/main.go | 1 + cmd/beacon-chain/usage.go | 1 + cmd/flags.go | 12 +- cmd/prysmctl/p2p/BUILD.bazel | 1 + cmd/prysmctl/p2p/client.go | 12 +- cmd/prysmctl/p2p/request_blobs.go | 30 ++-- cmd/prysmctl/p2p/request_blocks.go | 30 ++-- testing/endtoend/components/beacon_node.go | 1 + testing/endtoend/params/params.go | 15 +- testing/endtoend/params/params_test.go | 2 +- 24 files changed, 317 insertions(+), 126 deletions(-) diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index e87d68cf0ce9..64bef6d46363 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -707,6 +707,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error { PrivateKey: cliCtx.String(cmd.P2PPrivKey.Name), StaticPeerID: cliCtx.Bool(cmd.P2PStaticID.Name), MetaDataDir: cliCtx.String(cmd.P2PMetadata.Name), + QUICPort: cliCtx.Uint(cmd.P2PQUICPort.Name), TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name), UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name), MaxPeers: cliCtx.Uint(cmd.P2PMaxPeers.Name), diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index af7562399143..ddbbf7b5b105 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -90,6 +90,7 @@ go_library( "@com_github_libp2p_go_libp2p//core/peerstore:go_default_library", "@com_github_libp2p_go_libp2p//core/protocol:go_default_library", "@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library", + "@com_github_libp2p_go_libp2p//p2p/transport/quic:go_default_library", "@com_github_libp2p_go_libp2p//p2p/transport/tcp:go_default_library", "@com_github_libp2p_go_libp2p_mplex//:go_default_library", "@com_github_libp2p_go_libp2p_pubsub//:go_default_library", diff --git a/beacon-chain/p2p/config.go b/beacon-chain/p2p/config.go index ca5dbfa54c89..3da7d055cbb2 100644 --- a/beacon-chain/p2p/config.go +++ b/beacon-chain/p2p/config.go @@ -24,6 +24,7 @@ type Config struct { PrivateKey string DataDir string MetaDataDir string + QUICPort uint TCPPort uint UDPPort uint MaxPeers uint diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index d66dc1f83f03..9ee0ebd03274 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -39,6 +39,11 @@ const ( udp6 ) +type quicProtocol uint16 + +// quicProtocol is the "quic" key, which holds the QUIC port of the node. +func (quicProtocol) ENRKey() string { return "quic" } + // RefreshENR uses an epoch to refresh the enr entry for our node // with the tracked committee ids for the epoch, allowing our node // to be dynamically discoverable by others given our tracked committee ids. @@ -100,14 +105,15 @@ func (s *Service) RefreshENR() { // listen for new nodes watches for new nodes in the network and adds them to the peerstore. func (s *Service) listenForNewNodes() { - iterator := s.dv5Listener.RandomNodes() - iterator = enode.Filter(iterator, s.filterPeer) + iterator := enode.Filter(s.dv5Listener.RandomNodes(), s.filterPeer) defer iterator.Close() + for { - // Exit if service's context is canceled + // Exit if service's context is canceled. if s.ctx.Err() != nil { break } + if s.isPeerAtLimit(false /* inbound */) { // Pause the main loop for a period to stop looking // for new peers. @@ -115,16 +121,22 @@ func (s *Service) listenForNewNodes() { time.Sleep(pollingPeriod) continue } - exists := iterator.Next() - if !exists { + + if exists := iterator.Next(); !exists { break } + node := iterator.Node() peerInfo, _, err := convertToAddrInfo(node) if err != nil { log.WithError(err).Error("Could not convert to peer info") continue } + + if peerInfo == nil { + continue + } + // Make sure that peer is not dialed too often, for each connection attempt there's a backoff period. s.Peers().RandomizeBackOff(peerInfo.ID) go func(info *peer.AddrInfo) { @@ -167,8 +179,7 @@ func (s *Service) createListener( // Listen to all network interfaces // for both ip protocols. - networkVersion := "udp" - conn, err := net.ListenUDP(networkVersion, udpAddr) + conn, err := net.ListenUDP("udp", udpAddr) if err != nil { return nil, errors.Wrap(err, "could not listen to UDP") } @@ -178,6 +189,7 @@ func (s *Service) createListener( ipAddr, int(s.cfg.UDPPort), int(s.cfg.TCPPort), + int(s.cfg.QUICPort), ) if err != nil { return nil, errors.Wrap(err, "could not create local node") @@ -209,7 +221,7 @@ func (s *Service) createListener( func (s *Service) createLocalNode( privKey *ecdsa.PrivateKey, ipAddr net.IP, - udpPort, tcpPort int, + udpPort, tcpPort, quicPort int, ) (*enode.LocalNode, error) { db, err := enode.OpenDB("") if err != nil { @@ -220,9 +232,13 @@ func (s *Service) createLocalNode( ipEntry := enr.IP(ipAddr) udpEntry := enr.UDP(udpPort) tcpEntry := enr.TCP(tcpPort) + quicEntry := quicProtocol(quicPort) + localNode.Set(ipEntry) localNode.Set(udpEntry) localNode.Set(tcpEntry) + localNode.Set(quicEntry) + localNode.SetFallbackIP(ipAddr) localNode.SetFallbackUDP(udpPort) @@ -277,7 +293,7 @@ func (s *Service) startDiscoveryV5( // filterPeer validates each node that we retrieve from our dht. We // try to ascertain that the peer can be a valid protocol peer. // Validity Conditions: -// 1. Peer has a valid IP and TCP port set in their enr. +// 1. Peer has a valid IP and a (QUIC and/or TCP) port set in their enr. // 2. Peer hasn't been marked as 'bad'. // 3. Peer is not currently active or connected. // 4. Peer is ready to receive incoming connections. @@ -294,17 +310,13 @@ func (s *Service) filterPeer(node *enode.Node) bool { return false } - // Ignore nodes with their TCP ports not set. - if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil { - if !enr.IsNotFound(err) { - log.WithError(err).Debug("Could not retrieve tcp port") - } + peerData, multiAddrs, err := convertToAddrInfo(node) + if err != nil { + log.WithError(err).Debug("Could not convert to peer data") return false } - peerData, multiAddr, err := convertToAddrInfo(node) - if err != nil { - log.WithError(err).Debug("Could not convert to peer data") + if len(multiAddrs) == 0 { return false } @@ -337,6 +349,9 @@ func (s *Service) filterPeer(node *enode.Node) bool { } } + // If the peer has 2 multiaddrs, favor the QUIC address, which is in first position. + multiAddr := multiAddrs[0] + // Add peer to peer handler. s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown) @@ -380,11 +395,11 @@ func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) { if err != nil { return nil, errors.Wrapf(err, "Could not get enode from string") } - addr, err := convertToSingleMultiAddr(enodeAddr) + nodeAddrs, err := convertToMultiAddrs(enodeAddr) if err != nil { return nil, errors.Wrapf(err, "Could not get multiaddr") } - allAddrs = append(allAddrs, addr) + allAddrs = append(allAddrs, nodeAddrs...) } return allAddrs, nil } @@ -419,45 +434,141 @@ func parseGenericAddrs(addrs []string) (enodeString, multiAddrString []string) { } func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr { - var multiAddrs []ma.Multiaddr + // Expect each node to have a TCP and a QUIC address. + multiAddrs := make([]ma.Multiaddr, 0, 2*len(nodes)) + for _, node := range nodes { - // ignore nodes with no ip address stored + // Skip nodes with no ip address stored. if node.IP() == nil { continue } - multiAddr, err := convertToSingleMultiAddr(node) + + // Get up to two multiaddrs (TCP and QUIC) for each node. + nodeMultiAddrs, err := convertToMultiAddrs(node) if err != nil { - log.WithError(err).Error("Could not convert to multiAddr") + log.WithError(err).Errorf("Could not convert to multiAddr node %s", node) continue } - multiAddrs = append(multiAddrs, multiAddr) + + multiAddrs = append(multiAddrs, nodeMultiAddrs...) } + return multiAddrs } -func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, ma.Multiaddr, error) { - multiAddr, err := convertToSingleMultiAddr(node) +func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, []ma.Multiaddr, error) { + multiAddrs, err := convertToMultiAddrs(node) if err != nil { return nil, nil, err } - info, err := peer.AddrInfoFromP2pAddr(multiAddr) + + if len(multiAddrs) == 0 { + return nil, nil, nil + } + + infos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...) if err != nil { - return nil, nil, err + return nil, nil, errors.Wrapf(err, "could not convert to peer info: %v", multiAddrs) + } + + if len(infos) > 1 { + return nil, nil, errors.Errorf("infos contains %v elements, expected not more than 1", len(infos)) + } + + if len(infos) == 0 { + return nil, multiAddrs, nil } - return info, multiAddr, nil + + return &infos[0], multiAddrs, nil } -func convertToSingleMultiAddr(node *enode.Node) (ma.Multiaddr, error) { +// convertToMultiAddrs converts an enode.Node to a list of multiaddrs. +// If the node has a both a QUIC and a TCP port set in their ENR, then +// the multiaddr corresponding to the QUIC port is added first, followed +// by the multiaddr corresponding to the TCP port. +func convertToMultiAddrs(node *enode.Node) ([]ma.Multiaddr, error) { + multiaddrs := make([]ma.Multiaddr, 0, 2) + + // Retrieve the node public key. pubkey := node.Pubkey() assertedKey, err := ecdsaprysm.ConvertToInterfacePubkey(pubkey) if err != nil { return nil, errors.Wrap(err, "could not get pubkey") } + + // Compute the node ID from the public key. id, err := peer.IDFromPublicKey(assertedKey) if err != nil { return nil, errors.Wrap(err, "could not get peer id") } - return multiAddressBuilderWithID(node.IP(), tcp, uint(node.TCP()), id) + + // If the QUIC entry is present in the ENR, build the corresponding multiaddress. + port, ok, err := getPort(node, quic) + if err != nil { + return nil, errors.Wrap(err, "could not get QUIC port") + } + + if ok { + addr, err := multiAddressBuilderWithID(node.IP(), quic, port, id) + if err != nil { + return nil, errors.Wrap(err, "could not build QUIC address") + } + + multiaddrs = append(multiaddrs, addr) + } + + // If the TCP entry is present in the ENR, build the corresponding multiaddress. + port, ok, err = getPort(node, tcp) + if err != nil { + return nil, errors.Wrap(err, "could not get TCP port") + } + + if ok { + addr, err := multiAddressBuilderWithID(node.IP(), tcp, port, id) + if err != nil { + return nil, errors.Wrap(err, "could not build TCP address") + } + + multiaddrs = append(multiaddrs, addr) + } + + return multiaddrs, nil +} + +// getPort retrieves the port for a given node and protocol, as well as a boolean +// indicating whether the port was found, and an error +func getPort(node *enode.Node, protocol internetProtocol) (uint, bool, error) { + var ( + port uint + err error + ) + + switch protocol { + case tcp: + var entry enr.TCP + err = node.Load(&entry) + port = uint(entry) + case udp: + var entry enr.UDP + err = node.Load(&entry) + port = uint(entry) + case quic: + var entry quicProtocol + err = node.Load(&entry) + port = uint(entry) + default: + return 0, false, errors.Errorf("invalid protocol: %v", protocol) + } + + if enr.IsNotFound(err) { + return port, false, nil + } + + if err != nil { + return 0, false, errors.Wrap(err, "could not get port") + } + + return port, true, nil } func convertToUdpMultiAddr(node *enode.Node) ([]ma.Multiaddr, error) { diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 31d8a23e501c..e9b8a8368870 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -166,8 +166,9 @@ func TestCreateLocalNode(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Define ports. const ( - udpPort = 2000 - tcpPort = 3000 + udpPort = 2000 + tcpPort = 3000 + quicPort = 3000 ) // Create a private key. @@ -180,7 +181,7 @@ func TestCreateLocalNode(t *testing.T) { cfg: tt.cfg, } - localNode, err := service.createLocalNode(privKey, address, udpPort, tcpPort) + localNode, err := service.createLocalNode(privKey, address, udpPort, tcpPort, quicPort) if tt.expectedError { require.NotNil(t, err) return @@ -237,7 +238,7 @@ func TestMultiAddrsConversion_InvalidIPAddr(t *testing.T) { genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), } - node, err := s.createLocalNode(pkey, addr, 0, 0) + node, err := s.createLocalNode(pkey, addr, 0, 0, 0) require.NoError(t, err) multiAddr := convertToMultiAddr([]*enode.Node{node.Node()}) assert.Equal(t, 0, len(multiAddr), "Invalid ip address converted successfully") @@ -248,8 +249,9 @@ func TestMultiAddrConversion_OK(t *testing.T) { ipAddr, pkey := createAddrAndPrivKey(t) s := &Service{ cfg: &Config{ - TCPPort: 0, - UDPPort: 0, + UDPPort: 2000, + TCPPort: 3000, + QUICPort: 3000, }, genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), diff --git a/beacon-chain/p2p/fork_test.go b/beacon-chain/p2p/fork_test.go index 86541c894b2c..2c2705716e58 100644 --- a/beacon-chain/p2p/fork_test.go +++ b/beacon-chain/p2p/fork_test.go @@ -28,7 +28,8 @@ import ( ) func TestStartDiscv5_DifferentForkDigests(t *testing.T) { - port := 2000 + const port = 2000 + ipAddr, pkey := createAddrAndPrivKey(t) genesisTime := time.Now() genesisValidatorsRoot := make([]byte, fieldparams.RootLength) @@ -53,7 +54,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { var listeners []*discover.UDPv5 for i := 1; i <= 5; i++ { - port = 3000 + i + port := 3000 + i cfg.UDPPort = uint(port) ipAddr, pkey := createAddrAndPrivKey(t) @@ -98,13 +99,14 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { s.genesisTime = genesisTime s.genesisValidatorsRoot = make([]byte, 32) s.dv5Listener = lastListener - var addrs []ma.Multiaddr - for _, n := range nodes { - if s.filterPeer(n) { - addr, err := convertToSingleMultiAddr(n) + addrs := make([]ma.Multiaddr, 0) + + for _, node := range nodes { + if s.filterPeer(node) { + nodeAddrs, err := convertToMultiAddrs(node) require.NoError(t, err) - addrs = append(addrs, addr) + addrs = append(addrs, nodeAddrs...) } } @@ -114,10 +116,11 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { } func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { + const port = 2000 + params.SetupTestConfigCleanup(t) hook := logTest.NewGlobal() logrus.SetLevel(logrus.TraceLevel) - port := 2000 ipAddr, pkey := createAddrAndPrivKey(t) genesisTime := time.Now() genesisValidatorsRoot := make([]byte, 32) @@ -138,7 +141,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { var listeners []*discover.UDPv5 for i := 1; i <= 5; i++ { - port = 3000 + i + port := 3000 + i cfg.UDPPort = uint(port) ipAddr, pkey := createAddrAndPrivKey(t) @@ -188,13 +191,13 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { s.genesisTime = genesisTime s.genesisValidatorsRoot = make([]byte, 32) s.dv5Listener = lastListener - var addrs []ma.Multiaddr + addrs := make([]ma.Multiaddr, 0, len(nodes)) - for _, n := range nodes { - if s.filterPeer(n) { - addr, err := convertToSingleMultiAddr(n) + for _, node := range nodes { + if s.filterPeer(node) { + nodeAddrs, err := convertToMultiAddrs(node) require.NoError(t, err) - addrs = append(addrs, addr) + addrs = append(addrs, nodeAddrs...) } } if len(addrs) == 0 { diff --git a/beacon-chain/p2p/log.go b/beacon-chain/p2p/log.go index d83e1f8b37a0..bc848a1dbbd4 100644 --- a/beacon-chain/p2p/log.go +++ b/beacon-chain/p2p/log.go @@ -13,32 +13,32 @@ import ( var log = logrus.WithField("prefix", "p2p") func logIPAddr(id peer.ID, addrs ...ma.Multiaddr) { - var correctAddr ma.Multiaddr for _, addr := range addrs { - if strings.Contains(addr.String(), "/ip4/") || strings.Contains(addr.String(), "/ip6/") { - correctAddr = addr - break + if !(strings.Contains(addr.String(), "/ip4/") || strings.Contains(addr.String(), "/ip6/")) { + continue } - } - if correctAddr != nil { + log.WithField( "multiAddr", - correctAddr.String()+"/p2p/"+id.String(), + addr.String()+"/p2p/"+id.String(), ).Info("Node started p2p server") } } -func logExternalIPAddr(id peer.ID, addr string, port uint) { +func logExternalIPAddr(id peer.ID, addr string, tcpPort, quicPort uint) { if addr != "" { - multiAddr, err := MultiAddressBuilder(net.ParseIP(addr), port) + multiAddrs, err := MultiAddressBuilder(net.ParseIP(addr), tcpPort, quicPort) if err != nil { log.WithError(err).Error("Could not create multiaddress") return } - log.WithField( - "multiAddr", - multiAddr.String()+"/p2p/"+id.String(), - ).Info("Node started external p2p server") + + for _, multiAddr := range multiAddrs { + log.WithField( + "multiAddr", + multiAddr.String()+"/p2p/"+id.String(), + ).Info("Node started external p2p server") + } } } diff --git a/beacon-chain/p2p/options.go b/beacon-chain/p2p/options.go index e767fb1c42ae..391e74c486c6 100644 --- a/beacon-chain/p2p/options.go +++ b/beacon-chain/p2p/options.go @@ -11,12 +11,14 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/security/noise" + libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" libp2ptcp "github.com/libp2p/go-libp2p/p2p/transport/tcp" gomplex "github.com/libp2p/go-mplex" ma "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/config/features" ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa" + "github.com/prysmaticlabs/prysm/v5/runtime/version" ) @@ -29,22 +31,32 @@ const ( ) // MultiAddressBuilder takes in an ip address string and port to produce a go multiaddr format. -func MultiAddressBuilder(ip net.IP, port uint) (ma.Multiaddr, error) { +func MultiAddressBuilder(ip net.IP, tcpPort, quicPort uint) ([]ma.Multiaddr, error) { ipType, err := extractIpType(ip) if err != nil { return nil, errors.Wrap(err, "unable to determine IP type") } + // Example: /ip4/1.2.3.4/udp/5678/quic-v1 + multiAddrQUIC, err := ma.NewMultiaddr(fmt.Sprintf("/%s/%s/udp/%d/quic-v1", ipType, ip, quicPort)) + if err != nil { + return nil, errors.Wrapf(err, "cannot produce QUIC multiaddr format from %s:%d", ip, tcpPort) + } + // Example: /ip4/1.2.3.4./tcp/5678 - multiaddrStr := fmt.Sprintf("/%s/%s/tcp/%d", ipType, ip, port) + multiaddrStr := fmt.Sprintf("/%s/%s/tcp/%d", ipType, ip, tcpPort) + multiAddrTCP, err := ma.NewMultiaddr(multiaddrStr) + if err != nil { + return nil, errors.Wrapf(err, "cannot produce TCP multiaddr format from %s:%d", ip, tcpPort) + } - return ma.NewMultiaddr(multiaddrStr) + return []ma.Multiaddr{multiAddrTCP, multiAddrQUIC}, nil } // buildOptions for the libp2p host. func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Option, error) { cfg := s.cfg - listen, err := MultiAddressBuilder(ip, cfg.TCPPort) + multiaddrs, err := MultiAddressBuilder(ip, cfg.TCPPort, cfg.QUICPort) if err != nil { return nil, errors.Wrapf(err, "cannot produce multiaddr format from %s:%d", ip, cfg.TCPPort) } @@ -54,7 +66,7 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op return nil, errors.Wrapf(err, "invalid local ip provided: %s:%d", cfg.LocalIP, cfg.TCPPort) } - listen, err = MultiAddressBuilder(localIP, cfg.TCPPort) + multiaddrs, err = MultiAddressBuilder(localIP, cfg.TCPPort, cfg.QUICPort) if err != nil { return nil, errors.Wrapf(err, "cannot produce multiaddr format from %s:%d", cfg.LocalIP, cfg.TCPPort) } @@ -72,9 +84,10 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op options := []libp2p.Option{ privKeyOption(priKey), - libp2p.ListenAddrs(listen), + libp2p.ListenAddrs(multiaddrs...), libp2p.UserAgent(version.BuildData()), libp2p.ConnectionGater(s), + libp2p.Transport(libp2pquic.NewTransport), libp2p.Transport(libp2ptcp.NewTCPTransport), libp2p.DefaultMuxers, libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), @@ -85,23 +98,26 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op if cfg.EnableUPnP { options = append(options, libp2p.NATPortMap()) // Allow to use UPnP } + if cfg.RelayNodeAddr != "" { options = append(options, libp2p.AddrsFactory(withRelayAddrs(cfg.RelayNodeAddr))) } else { // Disable relay if it has not been set. options = append(options, libp2p.DisableRelay()) } + if cfg.HostAddress != "" { options = append(options, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { - external, err := MultiAddressBuilder(net.ParseIP(cfg.HostAddress), cfg.TCPPort) + externalMultiaddrs, err := MultiAddressBuilder(net.ParseIP(cfg.HostAddress), cfg.TCPPort, cfg.QUICPort) if err != nil { log.WithError(err).Error("Unable to create external multiaddress") } else { - addrs = append(addrs, external) + addrs = append(addrs, externalMultiaddrs...) } return addrs })) } + if cfg.HostDNS != "" { options = append(options, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { external, err := ma.NewMultiaddr(fmt.Sprintf("/dns4/%s/tcp/%d", cfg.HostDNS, cfg.TCPPort)) @@ -117,6 +133,7 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op if features.Get().DisableResourceManager { options = append(options, libp2p.ResourceManager(&network.NullResourceManager{})) } + return options, nil } diff --git a/beacon-chain/p2p/options_test.go b/beacon-chain/p2p/options_test.go index 4107d611cc55..b7f29788e342 100644 --- a/beacon-chain/p2p/options_test.go +++ b/beacon-chain/p2p/options_test.go @@ -89,30 +89,34 @@ func TestIPV6Support(t *testing.T) { lNode := enode.NewLocalNode(db, key) mockIPV6 := net.IP{0xff, 0x02, 0xAA, 0, 0x1F, 0, 0x2E, 0, 0, 0x36, 0x45, 0, 0, 0, 0, 0x02} lNode.Set(enr.IP(mockIPV6)) - ma, err := convertToSingleMultiAddr(lNode.Node()) + mas, err := convertToMultiAddrs(lNode.Node()) if err != nil { t.Fatal(err) } - ipv6Exists := false - for _, p := range ma.Protocols() { - if p.Name == "ip4" { - t.Error("Got ip4 address instead of ip6") + + for _, ma := range mas { + ipv6Exists := false + for _, p := range ma.Protocols() { + if p.Name == "ip4" { + t.Error("Got ip4 address instead of ip6") + } + if p.Name == "ip6" { + ipv6Exists = true + } } - if p.Name == "ip6" { - ipv6Exists = true + if !ipv6Exists { + t.Error("Multiaddress did not have ipv6 protocol") } } - if !ipv6Exists { - t.Error("Multiaddress did not have ipv6 protocol") - } } func TestDefaultMultiplexers(t *testing.T) { var cfg libp2p.Config _ = cfg p2pCfg := &Config{ - TCPPort: 2000, UDPPort: 2000, + TCPPort: 3000, + QUICPort: 3000, StateNotifier: &mock.MockStateNotifier{}, } svc := &Service{cfg: p2pCfg} diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index fb33fb8ee2c9..103bdcd617ba 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -247,9 +247,10 @@ func (s *Service) Start() { p2pHostAddress := s.cfg.HostAddress p2pTCPPort := s.cfg.TCPPort + p2pQUICPort := s.cfg.QUICPort if p2pHostAddress != "" { - logExternalIPAddr(s.host.ID(), p2pHostAddress, p2pTCPPort) + logExternalIPAddr(s.host.ID(), p2pHostAddress, p2pTCPPort, p2pQUICPort) verifyConnectivity(p2pHostAddress, p2pTCPPort, "tcp") } diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go index 7eabd44a3767..c09ad1db6407 100644 --- a/beacon-chain/p2p/service_test.go +++ b/beacon-chain/p2p/service_test.go @@ -102,8 +102,9 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) { cs := startup.NewClockSynchronizer() cfg := &Config{ - TCPPort: 2000, UDPPort: 2000, + TCPPort: 3000, + QUICPort: 3000, ClockWaiter: cs, } s, err := NewService(context.Background(), cfg) @@ -147,8 +148,9 @@ func TestService_Start_NoDiscoverFlag(t *testing.T) { cs := startup.NewClockSynchronizer() cfg := &Config{ - TCPPort: 2000, UDPPort: 2000, + TCPPort: 3000, + QUICPort: 3000, StateNotifier: &mock.MockStateNotifier{}, NoDiscovery: true, // <-- no s.dv5Listener is created ClockWaiter: cs, diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 8d313db8f558..2c6262232a00 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -93,6 +93,11 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, if err != nil { continue } + + if info == nil { + continue + } + wg.Add(1) go func() { if err := s.connectWithPeer(ctx, *info); err != nil { diff --git a/beacon-chain/p2p/subnets_test.go b/beacon-chain/p2p/subnets_test.go index 92f0b38107e0..2b270f731557 100644 --- a/beacon-chain/p2p/subnets_test.go +++ b/beacon-chain/p2p/subnets_test.go @@ -66,7 +66,7 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { genesisTime := time.Now() bootNodeService := &Service{ - cfg: &Config{TCPPort: 2000, UDPPort: 3000}, + cfg: &Config{UDPPort: 2000, TCPPort: 3000, QUICPort: 3000}, genesisTime: genesisTime, genesisValidatorsRoot: genesisValidatorsRoot, } @@ -89,8 +89,9 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { service, err := NewService(ctx, &Config{ Discv5BootStrapAddrs: []string{bootNodeENR}, MaxPeers: 30, - TCPPort: uint(2000 + i), - UDPPort: uint(3000 + i), + UDPPort: uint(2000 + i), + TCPPort: uint(3000 + i), + QUICPort: uint(3000 + i), }) require.NoError(t, err) @@ -133,8 +134,9 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { cfg := &Config{ Discv5BootStrapAddrs: []string{bootNodeENR}, MaxPeers: 30, - TCPPort: 2010, - UDPPort: 3010, + UDPPort: 2010, + TCPPort: 3010, + QUICPort: 3010, } service, err := NewService(ctx, cfg) diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index 9f3bd6ef3786..fb6e0530c909 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -142,7 +142,13 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { // it successfully writes a response. We don't blindly call // Close here because we may have only written a partial // response. + // About the special case for quic-v1, please see: + // https://github.com/quic-go/quic-go/issues/3291 defer func() { + if strings.Contains(stream.Conn().RemoteMultiaddr().String(), "quic-v1") { + time.Sleep(2 * time.Second) + } + _err := stream.Reset() _ = _err }() diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index 56a10b5903df..c7dec5e72431 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -90,6 +90,7 @@ var appFlags = []cli.Flag{ cmd.StaticPeers, cmd.RelayNode, cmd.P2PUDPPort, + cmd.P2PQUICPort, cmd.P2PTCPPort, cmd.P2PIP, cmd.P2PHost, diff --git a/cmd/beacon-chain/usage.go b/cmd/beacon-chain/usage.go index c07e49380b41..adc4c87a3085 100644 --- a/cmd/beacon-chain/usage.go +++ b/cmd/beacon-chain/usage.go @@ -55,6 +55,7 @@ var appHelpFlagGroups = []flagGroup{ cmd.BootstrapNode, cmd.RelayNode, cmd.P2PUDPPort, + cmd.P2PQUICPort, cmd.P2PTCPPort, cmd.DataDirFlag, cmd.VerbosityFlag, diff --git a/cmd/flags.go b/cmd/flags.go index ccd907a3a99d..bd7d135f6a0f 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -113,13 +113,19 @@ var ( // P2PUDPPort defines the port to be used by discv5. P2PUDPPort = &cli.IntFlag{ Name: "p2p-udp-port", - Usage: "The port used by discv5.", + Usage: "The UDP port used by the discovery service discv5.", Value: 12000, } - // P2PTCPPort defines the port to be used by libp2p. + // P2PQUICPort defines the QUIC port to be used by libp2p. + P2PQUICPort = &cli.IntFlag{ + Name: "p2p-quic-port", + Usage: "The QUIC port used by libp2p.", + Value: 13000, + } + // P2PTCPPort defines the TCP port to be used by libp2p. P2PTCPPort = &cli.IntFlag{ Name: "p2p-tcp-port", - Usage: "The port used by libp2p.", + Usage: "The TCP port used by libp2p.", Value: 13000, } // P2PIP defines the local IP to be used by libp2p. diff --git a/cmd/prysmctl/p2p/BUILD.bazel b/cmd/prysmctl/p2p/BUILD.bazel index ec19ed30c0d9..bb04938b454f 100644 --- a/cmd/prysmctl/p2p/BUILD.bazel +++ b/cmd/prysmctl/p2p/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "@com_github_libp2p_go_libp2p//core/peer:go_default_library", "@com_github_libp2p_go_libp2p//core/protocol:go_default_library", "@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library", + "@com_github_libp2p_go_libp2p//p2p/transport/quic:go_default_library", "@com_github_libp2p_go_libp2p//p2p/transport/tcp:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prysmaticlabs_fastssz//:go_default_library", diff --git a/cmd/prysmctl/p2p/client.go b/cmd/prysmctl/p2p/client.go index eae2962b5f05..146e7f0ef365 100644 --- a/cmd/prysmctl/p2p/client.go +++ b/cmd/prysmctl/p2p/client.go @@ -14,7 +14,8 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/security/noise" - "github.com/libp2p/go-libp2p/p2p/transport/tcp" + libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" + libp2ptcp "github.com/libp2p/go-libp2p/p2p/transport/tcp" "github.com/pkg/errors" ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/go-bitfield" @@ -43,7 +44,7 @@ type client struct { nodeClient pb.NodeClient } -func newClient(beaconEndpoints []string, clientPort uint) (*client, error) { +func newClient(beaconEndpoints []string, tcpPort, quicPort uint) (*client, error) { ipAdd := ipAddr() priv, err := privKey() if err != nil { @@ -53,15 +54,16 @@ func newClient(beaconEndpoints []string, clientPort uint) (*client, error) { if err != nil { return nil, errors.Wrap(err, "could not set up p2p metadata") } - listen, err := p2p.MultiAddressBuilder(ipAdd, clientPort) + multiaddrs, err := p2p.MultiAddressBuilder(ipAdd, tcpPort, quicPort) if err != nil { return nil, errors.Wrap(err, "could not set up listening multiaddr") } options := []libp2p.Option{ privKeyOption(priv), - libp2p.ListenAddrs(listen), + libp2p.ListenAddrs(multiaddrs...), libp2p.UserAgent(version.BuildData()), - libp2p.Transport(tcp.NewTCPTransport), + libp2p.Transport(libp2pquic.NewTransport), + libp2p.Transport(libp2ptcp.NewTCPTransport), } options = append(options, libp2p.Security(noise.ID, noise.New)) options = append(options, libp2p.Ping(false)) diff --git a/cmd/prysmctl/p2p/request_blobs.go b/cmd/prysmctl/p2p/request_blobs.go index 95eae8a6907f..50c9efda259e 100644 --- a/cmd/prysmctl/p2p/request_blobs.go +++ b/cmd/prysmctl/p2p/request_blobs.go @@ -22,11 +22,12 @@ import ( ) var requestBlobsFlags = struct { - Peers string - ClientPort uint - APIEndpoints string - StartSlot uint64 - Count uint64 + Peers string + ClientPortTCP uint + ClientPortQUIC uint + APIEndpoints string + StartSlot uint64 + Count uint64 }{} var requestBlobsCmd = &cli.Command{ @@ -47,9 +48,16 @@ var requestBlobsCmd = &cli.Command{ Value: "", }, &cli.UintFlag{ - Name: "client-port", - Usage: "port to use for the client as a libp2p host", - Destination: &requestBlobsFlags.ClientPort, + Name: "client-port-tcp", + Aliases: []string{"client-port"}, + Usage: "TCP port to use for the client as a libp2p host", + Destination: &requestBlobsFlags.ClientPortTCP, + Value: 13001, + }, + &cli.UintFlag{ + Name: "client-port-quic", + Usage: "QUIC port to use for the client as a libp2p host", + Destination: &requestBlobsFlags.ClientPortQUIC, Value: 13001, }, &cli.StringFlag{ @@ -60,13 +68,13 @@ var requestBlobsCmd = &cli.Command{ }, &cli.Uint64Flag{ Name: "start-slot", - Usage: "start slot for blocks by range request. If unset, will use start_slot(current_epoch-1)", + Usage: "start slot for blobs by range request. If unset, will use start_slot(current_epoch-1)", Destination: &requestBlobsFlags.StartSlot, Value: 0, }, &cli.Uint64Flag{ Name: "count", - Usage: "number of blocks to request, (default 32)", + Usage: "number of blobs to request, (default 32)", Destination: &requestBlobsFlags.Count, Value: 32, }, @@ -90,7 +98,7 @@ func cliActionRequestBlobs(cliCtx *cli.Context) error { allAPIEndpoints = strings.Split(requestBlobsFlags.APIEndpoints, ",") } var err error - c, err := newClient(allAPIEndpoints, requestBlobsFlags.ClientPort) + c, err := newClient(allAPIEndpoints, requestBlobsFlags.ClientPortTCP, requestBlobsFlags.ClientPortQUIC) if err != nil { return err } diff --git a/cmd/prysmctl/p2p/request_blocks.go b/cmd/prysmctl/p2p/request_blocks.go index fa71eef93a94..de14d33facdf 100644 --- a/cmd/prysmctl/p2p/request_blocks.go +++ b/cmd/prysmctl/p2p/request_blocks.go @@ -23,13 +23,14 @@ import ( ) var requestBlocksFlags = struct { - Network string - Peers string - ClientPort uint - APIEndpoints string - StartSlot uint64 - Count uint64 - Step uint64 + Network string + Peers string + ClientPortTCP uint + ClientPortQUIC uint + APIEndpoints string + StartSlot uint64 + Count uint64 + Step uint64 }{} var requestBlocksCmd = &cli.Command{ @@ -56,9 +57,16 @@ var requestBlocksCmd = &cli.Command{ Value: "", }, &cli.UintFlag{ - Name: "client-port", - Usage: "port to use for the client as a libp2p host", - Destination: &requestBlocksFlags.ClientPort, + Name: "client-port-tcp", + Aliases: []string{"client-port"}, + Usage: "TCP port to use for the client as a libp2p host", + Destination: &requestBlocksFlags.ClientPortTCP, + Value: 13001, + }, + &cli.UintFlag{ + Name: "client-port-quic", + Usage: "QUIC port to use for the client as a libp2p host", + Destination: &requestBlocksFlags.ClientPortQUIC, Value: 13001, }, &cli.StringFlag{ @@ -120,7 +128,7 @@ func cliActionRequestBlocks(cliCtx *cli.Context) error { allAPIEndpoints = strings.Split(requestBlocksFlags.APIEndpoints, ",") } var err error - c, err := newClient(allAPIEndpoints, requestBlocksFlags.ClientPort) + c, err := newClient(allAPIEndpoints, requestBlocksFlags.ClientPortTCP, requestBlocksFlags.ClientPortQUIC) if err != nil { return err } diff --git a/testing/endtoend/components/beacon_node.go b/testing/endtoend/components/beacon_node.go index 9e90eb314c10..9c93188862b4 100644 --- a/testing/endtoend/components/beacon_node.go +++ b/testing/endtoend/components/beacon_node.go @@ -257,6 +257,7 @@ func (node *BeaconNode) Start(ctx context.Context) error { fmt.Sprintf("--%s=%s", flags.ExecutionJWTSecretFlag.Name, jwtPath), fmt.Sprintf("--%s=%d", flags.MinSyncPeers.Name, 1), fmt.Sprintf("--%s=%d", cmdshared.P2PUDPPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeUDPPort+index), + fmt.Sprintf("--%s=%d", cmdshared.P2PQUICPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeQUICPort+index), fmt.Sprintf("--%s=%d", cmdshared.P2PTCPPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeTCPPort+index), fmt.Sprintf("--%s=%d", cmdshared.P2PMaxPeers.Name, expectedNumOfPeers), fmt.Sprintf("--%s=%d", flags.MonitoringPortFlag.Name, e2e.TestParams.Ports.PrysmBeaconNodeMetricsPort+index), diff --git a/testing/endtoend/params/params.go b/testing/endtoend/params/params.go index ac63cc9f1376..ded1c9735b53 100644 --- a/testing/endtoend/params/params.go +++ b/testing/endtoend/params/params.go @@ -46,6 +46,7 @@ type ports struct { Eth1ProxyPort int PrysmBeaconNodeRPCPort int PrysmBeaconNodeUDPPort int + PrysmBeaconNodeQUICPort int PrysmBeaconNodeTCPPort int PrysmBeaconNodeGatewayPort int PrysmBeaconNodeMetricsPort int @@ -144,10 +145,11 @@ const ( PrysmBeaconNodeRPCPort = 4150 PrysmBeaconNodeUDPPort = PrysmBeaconNodeRPCPort + portSpan - PrysmBeaconNodeTCPPort = PrysmBeaconNodeRPCPort + 2*portSpan - PrysmBeaconNodeGatewayPort = PrysmBeaconNodeRPCPort + 3*portSpan - PrysmBeaconNodeMetricsPort = PrysmBeaconNodeRPCPort + 4*portSpan - PrysmBeaconNodePprofPort = PrysmBeaconNodeRPCPort + 5*portSpan + PrysmBeaconNodeQUICPort = PrysmBeaconNodeRPCPort + 2*portSpan + PrysmBeaconNodeTCPPort = PrysmBeaconNodeRPCPort + 3*portSpan + PrysmBeaconNodeGatewayPort = PrysmBeaconNodeRPCPort + 4*portSpan + PrysmBeaconNodeMetricsPort = PrysmBeaconNodeRPCPort + 5*portSpan + PrysmBeaconNodePprofPort = PrysmBeaconNodeRPCPort + 6*portSpan LighthouseBeaconNodeP2PPort = 5150 LighthouseBeaconNodeHTTPPort = LighthouseBeaconNodeP2PPort + portSpan @@ -330,6 +332,10 @@ func initializeStandardPorts(shardCount, shardIndex int, ports *ports, existingR if err != nil { return err } + beaconNodeQUICPort, err := port(PrysmBeaconNodeQUICPort, shardCount, shardIndex, existingRegistrations) + if err != nil { + return err + } beaconNodeTCPPort, err := port(PrysmBeaconNodeTCPPort, shardCount, shardIndex, existingRegistrations) if err != nil { return err @@ -367,6 +373,7 @@ func initializeStandardPorts(shardCount, shardIndex int, ports *ports, existingR ports.Eth1ProxyPort = eth1ProxyPort ports.PrysmBeaconNodeRPCPort = beaconNodeRPCPort ports.PrysmBeaconNodeUDPPort = beaconNodeUDPPort + ports.PrysmBeaconNodeQUICPort = beaconNodeQUICPort ports.PrysmBeaconNodeTCPPort = beaconNodeTCPPort ports.PrysmBeaconNodeGatewayPort = beaconNodeGatewayPort ports.PrysmBeaconNodeMetricsPort = beaconNodeMetricsPort diff --git a/testing/endtoend/params/params_test.go b/testing/endtoend/params/params_test.go index a0b24dab6fc4..e0f795984b37 100644 --- a/testing/endtoend/params/params_test.go +++ b/testing/endtoend/params/params_test.go @@ -30,7 +30,7 @@ func TestStandardPorts(t *testing.T) { var existingRegistrations []int testPorts := &ports{} assert.NoError(t, initializeStandardPorts(2, 0, testPorts, &existingRegistrations)) - assert.Equal(t, 16, len(existingRegistrations)) + assert.Equal(t, 17, len(existingRegistrations)) assert.NotEqual(t, 0, testPorts.PrysmBeaconNodeGatewayPort) assert.NotEqual(t, 0, testPorts.PrysmBeaconNodeTCPPort) assert.NotEqual(t, 0, testPorts.JaegerTracingPort) From 236579d748ae7743e98d3dd3276cc01e411a12a9 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 26 Mar 2024 19:22:09 +0100 Subject: [PATCH 11/18] Status: Implement `{Inbound,Outbound}Connected{TCP,QUIC}`. --- beacon-chain/p2p/peers/status.go | 57 ++++++++++- beacon-chain/p2p/peers/status_test.go | 136 ++++++++++++++++++++++++++ 2 files changed, 191 insertions(+), 2 deletions(-) diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index 7e487fa2a790..757125463b31 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -26,6 +26,7 @@ import ( "context" "math" "sort" + "strings" "time" "github.com/ethereum/go-ethereum/p2p/enr" @@ -449,6 +450,32 @@ func (p *Status) InboundConnected() []peer.ID { return peers } +// InboundConnectedTCP returns the current batch of inbound peers that are connected using TCP. +func (p *Status) InboundConnectedTCP() []peer.ID { + p.store.RLock() + defer p.store.RUnlock() + peers := make([]peer.ID, 0) + for pid, peerData := range p.store.Peers() { + if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), "tcp") { + peers = append(peers, pid) + } + } + return peers +} + +// InboundConnectedTCP returns the current batch of inbound peers that are connected using QUIC. +func (p *Status) InboundConnectedQUIC() []peer.ID { + p.store.RLock() + defer p.store.RUnlock() + peers := make([]peer.ID, 0) + for pid, peerData := range p.store.Peers() { + if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), "quic") { + peers = append(peers, pid) + } + } + return peers +} + // Outbound returns the current batch of outbound peers. func (p *Status) Outbound() []peer.ID { p.store.RLock() @@ -475,7 +502,33 @@ func (p *Status) OutboundConnected() []peer.ID { return peers } -// Active returns the peers that are connecting or connected. +// OutboundConnected returns the current batch of outbound peers that are connected using TCP. +func (p *Status) OutboundConnectedTCP() []peer.ID { + p.store.RLock() + defer p.store.RUnlock() + peers := make([]peer.ID, 0) + for pid, peerData := range p.store.Peers() { + if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), "tcp") { + peers = append(peers, pid) + } + } + return peers +} + +// OutboundConnected returns the current batch of outbound peers that are connected using QUIC. +func (p *Status) OutboundConnectedQUIC() []peer.ID { + p.store.RLock() + defer p.store.RUnlock() + peers := make([]peer.ID, 0) + for pid, peerData := range p.store.Peers() { + if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), "quic") { + peers = append(peers, pid) + } + } + return peers +} + +// Active returns the peers that are active (connecting or connected). func (p *Status) Active() []peer.ID { p.store.RLock() defer p.store.RUnlock() @@ -514,7 +567,7 @@ func (p *Status) Disconnected() []peer.ID { return peers } -// Inactive returns the peers that are disconnecting or disconnected. +// Inactive returns the peers that are inactive (disconnecting or disconnected). func (p *Status) Inactive() []peer.ID { p.store.RLock() defer p.store.RUnlock() diff --git a/beacon-chain/p2p/peers/status_test.go b/beacon-chain/p2p/peers/status_test.go index 2a19b9644728..c2b5ffa9f7fa 100644 --- a/beacon-chain/p2p/peers/status_test.go +++ b/beacon-chain/p2p/peers/status_test.go @@ -1111,6 +1111,74 @@ func TestInbound(t *testing.T) { assert.Equal(t, inbound.String(), result[0].String()) } +func TestInboundConnected(t *testing.T) { + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &scorers.Config{ + BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ + Threshold: 0, + }, + }, + }) + + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") + require.NoError(t, err) + inbound := createPeer(t, p, addr, network.DirInbound, peers.PeerConnected) + createPeer(t, p, addr, network.DirInbound, peers.PeerConnecting) + + result := p.InboundConnected() + require.Equal(t, 1, len(result)) + assert.Equal(t, inbound.String(), result[0].String()) +} + +func TestInboundConnectedTCP(t *testing.T) { + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &scorers.Config{ + BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ + Threshold: 0, + }, + }, + }) + + addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") + require.NoError(t, err) + + addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1") + require.NoError(t, err) + + inboundTCP := createPeer(t, p, addrTCP, network.DirInbound, peers.PeerConnected) + createPeer(t, p, addrQUIC, network.DirInbound, peers.PeerConnected) + + result := p.InboundConnectedTCP() + require.Equal(t, 1, len(result)) + assert.Equal(t, inboundTCP.String(), result[0].String()) +} + +func TestInboundConnectedQUIC(t *testing.T) { + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &scorers.Config{ + BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ + Threshold: 0, + }, + }, + }) + + addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1") + require.NoError(t, err) + + addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") + require.NoError(t, err) + + inboundQUIC := createPeer(t, p, addrQUIC, network.DirInbound, peers.PeerConnected) + createPeer(t, p, addrTCP, network.DirInbound, peers.PeerConnected) + + result := p.InboundConnectedQUIC() + require.Equal(t, 1, len(result)) + assert.Equal(t, inboundQUIC.String(), result[0].String()) +} + func TestOutbound(t *testing.T) { p := peers.NewStatus(context.Background(), &peers.StatusConfig{ PeerLimit: 30, @@ -1130,6 +1198,74 @@ func TestOutbound(t *testing.T) { assert.Equal(t, outbound.String(), result[0].String()) } +func TestOutboundConnected(t *testing.T) { + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &scorers.Config{ + BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ + Threshold: 0, + }, + }, + }) + + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") + require.NoError(t, err) + inbound := createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected) + createPeer(t, p, addr, network.DirOutbound, peers.PeerConnecting) + + result := p.OutboundConnected() + require.Equal(t, 1, len(result)) + assert.Equal(t, inbound.String(), result[0].String()) +} + +func TestOutbondConnectedTCP(t *testing.T) { + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &scorers.Config{ + BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ + Threshold: 0, + }, + }, + }) + + addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") + require.NoError(t, err) + + addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1") + require.NoError(t, err) + + outboundTCP := createPeer(t, p, addrTCP, network.DirOutbound, peers.PeerConnected) + createPeer(t, p, addrQUIC, network.DirOutbound, peers.PeerConnected) + + result := p.OutboundConnectedTCP() + require.Equal(t, 1, len(result)) + assert.Equal(t, outboundTCP.String(), result[0].String()) +} + +func TestOutboundConnectedQUIC(t *testing.T) { + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &scorers.Config{ + BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ + Threshold: 0, + }, + }, + }) + + addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1") + require.NoError(t, err) + + addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") + require.NoError(t, err) + + outboundQUIC := createPeer(t, p, addrQUIC, network.DirOutbound, peers.PeerConnected) + createPeer(t, p, addrTCP, network.DirOutbound, peers.PeerConnected) + + result := p.OutboundConnectedQUIC() + require.Equal(t, 1, len(result)) + assert.Equal(t, outboundQUIC.String(), result[0].String()) +} + // addPeer is a helper to add a peer with a given connection state) func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) peer.ID { // Set up some peers with different states From ea9dcc89cff99fa6ec17cc54c6af36a0dbdf2b75 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 26 Mar 2024 19:33:33 +0100 Subject: [PATCH 12/18] Logging: Display the number of TCP/QUIC connected peers. --- beacon-chain/p2p/service.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 103bdcd617ba..8bc5b45fad2c 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -235,11 +235,19 @@ func (s *Service) Start() { async.RunEvery(s.ctx, time.Duration(params.BeaconConfig().RespTimeout)*time.Second, s.updateMetrics) async.RunEvery(s.ctx, refreshRate, s.RefreshENR) async.RunEvery(s.ctx, 1*time.Minute, func() { + inboundQUICCount := len(s.peers.InboundConnectedQUIC()) + inboundTCPCount := len(s.peers.InboundConnectedTCP()) + outboundQUICCount := len(s.peers.OutboundConnectedQUIC()) + outboundTCPCount := len(s.peers.OutboundConnectedTCP()) + total := inboundQUICCount + inboundTCPCount + outboundQUICCount + outboundTCPCount + log.WithFields(logrus.Fields{ - "inbound": len(s.peers.InboundConnected()), - "outbound": len(s.peers.OutboundConnected()), - "activePeers": len(s.peers.Active()), - }).Info("Peer summary") + "inboundQUIC": inboundQUICCount, + "inboundTCP": inboundTCPCount, + "outboundQUIC": outboundQUICCount, + "outboundTCP": outboundTCPCount, + "total": total, + }).Info("Connected peers") }) multiAddrs := s.host.Network().ListenAddresses() From f42455b6acf2323e38729bd3979cf60cd283ea0f Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 3 Apr 2024 11:00:33 +0200 Subject: [PATCH 13/18] P2P: Implement `{Inbound,Outbound}ConnectedWithProtocol`. --- beacon-chain/p2p/peers/status.go | 45 +++----- beacon-chain/p2p/peers/status_test.go | 142 +++++++++++++++----------- beacon-chain/p2p/service.go | 8 +- 3 files changed, 101 insertions(+), 94 deletions(-) diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index 757125463b31..3dda2df28815 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -77,6 +77,13 @@ const ( MaxBackOffDuration = 5000 ) +type InternetProtocol string + +const ( + TCP = "tcp" + QUIC = "quic" +) + // Status is the structure holding the peer status information. type Status struct { ctx context.Context @@ -450,26 +457,13 @@ func (p *Status) InboundConnected() []peer.ID { return peers } -// InboundConnectedTCP returns the current batch of inbound peers that are connected using TCP. -func (p *Status) InboundConnectedTCP() []peer.ID { - p.store.RLock() - defer p.store.RUnlock() - peers := make([]peer.ID, 0) - for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), "tcp") { - peers = append(peers, pid) - } - } - return peers -} - -// InboundConnectedTCP returns the current batch of inbound peers that are connected using QUIC. -func (p *Status) InboundConnectedQUIC() []peer.ID { +// InboundConnectedWithProtocol returns the current batch of inbound peers that are connected with a given protocol. +func (p *Status) InboundConnectedWithProtocol(protocol InternetProtocol) []peer.ID { p.store.RLock() defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), "quic") { + if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), string(protocol)) { peers = append(peers, pid) } } @@ -502,26 +496,13 @@ func (p *Status) OutboundConnected() []peer.ID { return peers } -// OutboundConnected returns the current batch of outbound peers that are connected using TCP. -func (p *Status) OutboundConnectedTCP() []peer.ID { - p.store.RLock() - defer p.store.RUnlock() - peers := make([]peer.ID, 0) - for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), "tcp") { - peers = append(peers, pid) - } - } - return peers -} - -// OutboundConnected returns the current batch of outbound peers that are connected using QUIC. -func (p *Status) OutboundConnectedQUIC() []peer.ID { +// OutboundConnectedWithProtocol returns the current batch of outbound peers that are connected with a given protocol. +func (p *Status) OutboundConnectedWithProtocol(protocol InternetProtocol) []peer.ID { p.store.RLock() defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), "quic") { + if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), string(protocol)) { peers = append(peers, pid) } } diff --git a/beacon-chain/p2p/peers/status_test.go b/beacon-chain/p2p/peers/status_test.go index c2b5ffa9f7fa..ae57af71f107 100644 --- a/beacon-chain/p2p/peers/status_test.go +++ b/beacon-chain/p2p/peers/status_test.go @@ -1131,7 +1131,7 @@ func TestInboundConnected(t *testing.T) { assert.Equal(t, inbound.String(), result[0].String()) } -func TestInboundConnectedTCP(t *testing.T) { +func TestInboundConnectedWithProtocol(t *testing.T) { p := peers.NewStatus(context.Background(), &peers.StatusConfig{ PeerLimit: 30, ScorerParams: &scorers.Config{ @@ -1141,42 +1141,55 @@ func TestInboundConnectedTCP(t *testing.T) { }, }) - addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") - require.NoError(t, err) + addrsTCP := []string{ + "/ip4/127.0.0.1/tcp/33333", + "/ip4/127.0.0.2/tcp/44444", + } - addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1") - require.NoError(t, err) + addrsQUIC := []string{ + "/ip4/192.168.1.3/udp/13000/quic-v1", + "/ip4/192.168.1.4/udp/14000/quic-v1", + "/ip4/192.168.1.5/udp/14000/quic-v1", + } - inboundTCP := createPeer(t, p, addrTCP, network.DirInbound, peers.PeerConnected) - createPeer(t, p, addrQUIC, network.DirInbound, peers.PeerConnected) + expectedTCP := make(map[string]bool, len(addrsTCP)) + for _, addr := range addrsTCP { + multiaddr, err := ma.NewMultiaddr(addr) + require.NoError(t, err) - result := p.InboundConnectedTCP() - require.Equal(t, 1, len(result)) - assert.Equal(t, inboundTCP.String(), result[0].String()) -} + peer := createPeer(t, p, multiaddr, network.DirInbound, peers.PeerConnected) + expectedTCP[peer.String()] = true + } -func TestInboundConnectedQUIC(t *testing.T) { - p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - ScorerParams: &scorers.Config{ - BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ - Threshold: 0, - }, - }, - }) + expectedQUIC := make(map[string]bool, len(addrsQUIC)) + for _, addr := range addrsQUIC { + multiaddr, err := ma.NewMultiaddr(addr) + require.NoError(t, err) - addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1") - require.NoError(t, err) + peer := createPeer(t, p, multiaddr, network.DirInbound, peers.PeerConnected) + expectedQUIC[peer.String()] = true + } - addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") - require.NoError(t, err) + // TCP + // --- - inboundQUIC := createPeer(t, p, addrQUIC, network.DirInbound, peers.PeerConnected) - createPeer(t, p, addrTCP, network.DirInbound, peers.PeerConnected) + actualTCP := p.InboundConnectedWithProtocol(peers.TCP) + require.Equal(t, len(expectedTCP), len(actualTCP)) - result := p.InboundConnectedQUIC() - require.Equal(t, 1, len(result)) - assert.Equal(t, inboundQUIC.String(), result[0].String()) + for _, actualPeer := range actualTCP { + _, ok := expectedTCP[actualPeer.String()] + require.Equal(t, true, ok) + } + + // QUIC + // ---- + actualQUIC := p.InboundConnectedWithProtocol(peers.QUIC) + require.Equal(t, len(expectedQUIC), len(actualQUIC)) + + for _, actualPeer := range actualQUIC { + _, ok := expectedQUIC[actualPeer.String()] + require.Equal(t, true, ok) + } } func TestOutbound(t *testing.T) { @@ -1218,7 +1231,7 @@ func TestOutboundConnected(t *testing.T) { assert.Equal(t, inbound.String(), result[0].String()) } -func TestOutbondConnectedTCP(t *testing.T) { +func TestOutboundConnectedWithProtocol(t *testing.T) { p := peers.NewStatus(context.Background(), &peers.StatusConfig{ PeerLimit: 30, ScorerParams: &scorers.Config{ @@ -1228,42 +1241,55 @@ func TestOutbondConnectedTCP(t *testing.T) { }, }) - addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") - require.NoError(t, err) + addrsTCP := []string{ + "/ip4/127.0.0.1/tcp/33333", + "/ip4/127.0.0.2/tcp/44444", + } - addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1") - require.NoError(t, err) + addrsQUIC := []string{ + "/ip4/192.168.1.3/udp/13000/quic-v1", + "/ip4/192.168.1.4/udp/14000/quic-v1", + "/ip4/192.168.1.5/udp/14000/quic-v1", + } - outboundTCP := createPeer(t, p, addrTCP, network.DirOutbound, peers.PeerConnected) - createPeer(t, p, addrQUIC, network.DirOutbound, peers.PeerConnected) + expectedTCP := make(map[string]bool, len(addrsTCP)) + for _, addr := range addrsTCP { + multiaddr, err := ma.NewMultiaddr(addr) + require.NoError(t, err) - result := p.OutboundConnectedTCP() - require.Equal(t, 1, len(result)) - assert.Equal(t, outboundTCP.String(), result[0].String()) -} + peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.PeerConnected) + expectedTCP[peer.String()] = true + } -func TestOutboundConnectedQUIC(t *testing.T) { - p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - ScorerParams: &scorers.Config{ - BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ - Threshold: 0, - }, - }, - }) + expectedQUIC := make(map[string]bool, len(addrsQUIC)) + for _, addr := range addrsQUIC { + multiaddr, err := ma.NewMultiaddr(addr) + require.NoError(t, err) - addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1") - require.NoError(t, err) + peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.PeerConnected) + expectedQUIC[peer.String()] = true + } - addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") - require.NoError(t, err) + // TCP + // --- - outboundQUIC := createPeer(t, p, addrQUIC, network.DirOutbound, peers.PeerConnected) - createPeer(t, p, addrTCP, network.DirOutbound, peers.PeerConnected) + actualTCP := p.OutboundConnectedWithProtocol(peers.TCP) + require.Equal(t, len(expectedTCP), len(actualTCP)) - result := p.OutboundConnectedQUIC() - require.Equal(t, 1, len(result)) - assert.Equal(t, outboundQUIC.String(), result[0].String()) + for _, actualPeer := range actualTCP { + _, ok := expectedTCP[actualPeer.String()] + require.Equal(t, true, ok) + } + + // QUIC + // ---- + actualQUIC := p.OutboundConnectedWithProtocol(peers.QUIC) + require.Equal(t, len(expectedQUIC), len(actualQUIC)) + + for _, actualPeer := range actualQUIC { + _, ok := expectedQUIC[actualPeer.String()] + require.Equal(t, true, ok) + } } // addPeer is a helper to add a peer with a given connection state) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 8bc5b45fad2c..5be7f9951a23 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -235,10 +235,10 @@ func (s *Service) Start() { async.RunEvery(s.ctx, time.Duration(params.BeaconConfig().RespTimeout)*time.Second, s.updateMetrics) async.RunEvery(s.ctx, refreshRate, s.RefreshENR) async.RunEvery(s.ctx, 1*time.Minute, func() { - inboundQUICCount := len(s.peers.InboundConnectedQUIC()) - inboundTCPCount := len(s.peers.InboundConnectedTCP()) - outboundQUICCount := len(s.peers.OutboundConnectedQUIC()) - outboundTCPCount := len(s.peers.OutboundConnectedTCP()) + inboundQUICCount := len(s.peers.InboundConnectedWithProtocol(peers.QUIC)) + inboundTCPCount := len(s.peers.InboundConnectedWithProtocol(peers.TCP)) + outboundQUICCount := len(s.peers.OutboundConnectedWithProtocol(peers.QUIC)) + outboundTCPCount := len(s.peers.OutboundConnectedWithProtocol(peers.TCP)) total := inboundQUICCount + inboundTCPCount + outboundQUICCount + outboundTCPCount log.WithFields(logrus.Fields{ From c69efdb8d9b2da87ce224b0f71f50929a4cdd71c Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 3 Apr 2024 13:27:03 +0200 Subject: [PATCH 14/18] Hide QUIC protocol behind the `--enable-quic` feature flag. --- beacon-chain/p2p/discovery.go | 39 +++++++++++++++++++++-------------- beacon-chain/p2p/options.go | 25 +++++++++++++++------- beacon-chain/p2p/service.go | 20 +++++++++++------- config/features/config.go | 5 +++++ config/features/flags.go | 6 ++++++ 5 files changed, 64 insertions(+), 31 deletions(-) diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 9ee0ebd03274..37a81c1ea1d0 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" + "github.com/prysmaticlabs/prysm/v5/config/features" "github.com/prysmaticlabs/prysm/v5/config/params" ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa" "github.com/prysmaticlabs/prysm/v5/runtime/version" @@ -230,14 +231,18 @@ func (s *Service) createLocalNode( localNode := enode.NewLocalNode(db, privKey) ipEntry := enr.IP(ipAddr) - udpEntry := enr.UDP(udpPort) - tcpEntry := enr.TCP(tcpPort) - quicEntry := quicProtocol(quicPort) - localNode.Set(ipEntry) + + udpEntry := enr.UDP(udpPort) localNode.Set(udpEntry) + + tcpEntry := enr.TCP(tcpPort) localNode.Set(tcpEntry) - localNode.Set(quicEntry) + + if features.Get().EnableQUIC { + quicEntry := quicProtocol(quicPort) + localNode.Set(quicEntry) + } localNode.SetFallbackIP(ipAddr) localNode.SetFallbackUDP(udpPort) @@ -502,23 +507,25 @@ func convertToMultiAddrs(node *enode.Node) ([]ma.Multiaddr, error) { return nil, errors.Wrap(err, "could not get peer id") } - // If the QUIC entry is present in the ENR, build the corresponding multiaddress. - port, ok, err := getPort(node, quic) - if err != nil { - return nil, errors.Wrap(err, "could not get QUIC port") - } - - if ok { - addr, err := multiAddressBuilderWithID(node.IP(), quic, port, id) + if features.Get().EnableQUIC { + // If the QUIC entry is present in the ENR, build the corresponding multiaddress. + port, ok, err := getPort(node, quic) if err != nil { - return nil, errors.Wrap(err, "could not build QUIC address") + return nil, errors.Wrap(err, "could not get QUIC port") } - multiaddrs = append(multiaddrs, addr) + if ok { + addr, err := multiAddressBuilderWithID(node.IP(), quic, port, id) + if err != nil { + return nil, errors.Wrap(err, "could not build QUIC address") + } + + multiaddrs = append(multiaddrs, addr) + } } // If the TCP entry is present in the ENR, build the corresponding multiaddress. - port, ok, err = getPort(node, tcp) + port, ok, err := getPort(node, tcp) if err != nil { return nil, errors.Wrap(err, "could not get TCP port") } diff --git a/beacon-chain/p2p/options.go b/beacon-chain/p2p/options.go index 391e74c486c6..9935e8e0aef4 100644 --- a/beacon-chain/p2p/options.go +++ b/beacon-chain/p2p/options.go @@ -37,12 +37,6 @@ func MultiAddressBuilder(ip net.IP, tcpPort, quicPort uint) ([]ma.Multiaddr, err return nil, errors.Wrap(err, "unable to determine IP type") } - // Example: /ip4/1.2.3.4/udp/5678/quic-v1 - multiAddrQUIC, err := ma.NewMultiaddr(fmt.Sprintf("/%s/%s/udp/%d/quic-v1", ipType, ip, quicPort)) - if err != nil { - return nil, errors.Wrapf(err, "cannot produce QUIC multiaddr format from %s:%d", ip, tcpPort) - } - // Example: /ip4/1.2.3.4./tcp/5678 multiaddrStr := fmt.Sprintf("/%s/%s/tcp/%d", ipType, ip, tcpPort) multiAddrTCP, err := ma.NewMultiaddr(multiaddrStr) @@ -50,7 +44,19 @@ func MultiAddressBuilder(ip net.IP, tcpPort, quicPort uint) ([]ma.Multiaddr, err return nil, errors.Wrapf(err, "cannot produce TCP multiaddr format from %s:%d", ip, tcpPort) } - return []ma.Multiaddr{multiAddrTCP, multiAddrQUIC}, nil + multiaddrs := []ma.Multiaddr{multiAddrTCP} + + if features.Get().EnableQUIC { + // Example: /ip4/1.2.3.4/udp/5678/quic-v1 + multiAddrQUIC, err := ma.NewMultiaddr(fmt.Sprintf("/%s/%s/udp/%d/quic-v1", ipType, ip, quicPort)) + if err != nil { + return nil, errors.Wrapf(err, "cannot produce QUIC multiaddr format from %s:%d", ip, tcpPort) + } + + multiaddrs = append(multiaddrs, multiAddrQUIC) + } + + return multiaddrs, nil } // buildOptions for the libp2p host. @@ -87,7 +93,6 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op libp2p.ListenAddrs(multiaddrs...), libp2p.UserAgent(version.BuildData()), libp2p.ConnectionGater(s), - libp2p.Transport(libp2pquic.NewTransport), libp2p.Transport(libp2ptcp.NewTCPTransport), libp2p.DefaultMuxers, libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), @@ -95,6 +100,10 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op libp2p.Ping(false), // Disable Ping Service. } + if features.Get().EnableQUIC { + options = append(options, libp2p.Transport(libp2pquic.NewTransport)) + } + if cfg.EnableUPnP { options = append(options, libp2p.NATPortMap()) // Allow to use UPnP } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 5be7f9951a23..8192d9b0b67f 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -24,6 +24,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v5/config/features" "github.com/prysmaticlabs/prysm/v5/config/params" leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket" prysmnetwork "github.com/prysmaticlabs/prysm/v5/network" @@ -241,13 +242,18 @@ func (s *Service) Start() { outboundTCPCount := len(s.peers.OutboundConnectedWithProtocol(peers.TCP)) total := inboundQUICCount + inboundTCPCount + outboundQUICCount + outboundTCPCount - log.WithFields(logrus.Fields{ - "inboundQUIC": inboundQUICCount, - "inboundTCP": inboundTCPCount, - "outboundQUIC": outboundQUICCount, - "outboundTCP": outboundTCPCount, - "total": total, - }).Info("Connected peers") + fields := logrus.Fields{ + "inboundTCP": inboundTCPCount, + "outboundTCP": outboundTCPCount, + "total": total, + } + + if features.Get().EnableQUIC { + fields["inboundQUIC"] = inboundQUICCount + fields["outboundQUIC"] = outboundQUICCount + } + + log.WithFields(fields).Info("Connected peers") }) multiAddrs := s.host.Network().ListenAddresses() diff --git a/config/features/config.go b/config/features/config.go index 43f1c3d10a67..aa606ce9a8d6 100644 --- a/config/features/config.go +++ b/config/features/config.go @@ -42,6 +42,7 @@ type Flags struct { WriteSSZStateTransitions bool // WriteSSZStateTransitions to tmp directory. EnablePeerScorer bool // EnablePeerScorer enables experimental peer scoring in p2p. EnableLightClient bool // EnableLightClient enables light client APIs. + EnableQUIC bool // EnableQUIC specifies whether to enable QUIC transport for libp2p. WriteWalletPasswordOnWebOnboarding bool // WriteWalletPasswordOnWebOnboarding writes the password to disk after Prysm web signup. EnableDoppelGanger bool // EnableDoppelGanger enables doppelganger protection on startup for the validator. EnableHistoricalSpaceRepresentation bool // EnableHistoricalSpaceRepresentation enables the saving of registry validators in separate buckets to save space @@ -265,6 +266,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error { logEnabled(BlobSaveFsync) cfg.BlobSaveFsync = true } + if ctx.IsSet(EnableQUIC.Name) { + logEnabled(EnableQUIC) + cfg.EnableQUIC = true + } cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value} Init(cfg) diff --git a/config/features/flags.go b/config/features/flags.go index fec1b987ac00..06572e7d3afe 100644 --- a/config/features/flags.go +++ b/config/features/flags.go @@ -171,6 +171,11 @@ var ( Name: "blob-save-fsync", Usage: "Forces new blob files to be fysnc'd before continuing, ensuring durable blob writes.", } + // EnableQUIC enables connection using the QUIC protocol for peers which support it. + EnableQUIC = &cli.BoolFlag{ + Name: "enable-quic", + Usage: "Enables connection using the QUIC protocol for peers which support it.", + } ) // devModeFlags holds list of flags that are set when development mode is on. @@ -229,6 +234,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c DisableRegistrationCache, EnableLightClient, BlobSaveFsync, + EnableQUIC, }...)...) // E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E. From 1f0d84d0094f5dd58743e4316fa4be4043679614 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 4 Apr 2024 10:38:41 +0200 Subject: [PATCH 15/18] `e2e`: Add `--enable-quic` flag. --- testing/endtoend/components/beacon_node.go | 1 + 1 file changed, 1 insertion(+) diff --git a/testing/endtoend/components/beacon_node.go b/testing/endtoend/components/beacon_node.go index 9c93188862b4..54638d9c5042 100644 --- a/testing/endtoend/components/beacon_node.go +++ b/testing/endtoend/components/beacon_node.go @@ -276,6 +276,7 @@ func (node *BeaconNode) Start(ctx context.Context) error { "--" + cmdshared.ForceClearDB.Name, "--" + cmdshared.AcceptTosFlag.Name, "--" + flags.EnableDebugRPCEndpoints.Name, + "--" + features.EnableQUIC.Name, } if config.UsePprof { args = append(args, "--pprof", fmt.Sprintf("--pprofport=%d", e2e.TestParams.Ports.PrysmBeaconNodePprofPort+index)) From a26fb88aea4fb2aa1c27a271c194e8c987d523b6 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 4 Apr 2024 10:41:04 +0200 Subject: [PATCH 16/18] Add `--enable-quic` in `devModeFlag`. --- config/features/flags.go | 1 + 1 file changed, 1 insertion(+) diff --git a/config/features/flags.go b/config/features/flags.go index 06572e7d3afe..1c1bbf22fe83 100644 --- a/config/features/flags.go +++ b/config/features/flags.go @@ -182,6 +182,7 @@ var ( var devModeFlags = []cli.Flag{ enableExperimentalState, backfill.EnableExperimentalBackfill, + EnableQUIC, } // ValidatorFlags contains a list of all the feature flags that apply to the validator client. From a0c03267b1aa87eaa8fd18455fa7d0e6e7c46988 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 4 Apr 2024 10:45:14 +0200 Subject: [PATCH 17/18] `convertToMultiAddrs` ==> `retrieveMultiAddrsFromNode`. --- beacon-chain/p2p/discovery.go | 10 +++++----- beacon-chain/p2p/fork_test.go | 4 ++-- beacon-chain/p2p/options_test.go | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 37a81c1ea1d0..bcde73702bb0 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -400,7 +400,7 @@ func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) { if err != nil { return nil, errors.Wrapf(err, "Could not get enode from string") } - nodeAddrs, err := convertToMultiAddrs(enodeAddr) + nodeAddrs, err := retrieveMultiAddrsFromNode(enodeAddr) if err != nil { return nil, errors.Wrapf(err, "Could not get multiaddr") } @@ -449,7 +449,7 @@ func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr { } // Get up to two multiaddrs (TCP and QUIC) for each node. - nodeMultiAddrs, err := convertToMultiAddrs(node) + nodeMultiAddrs, err := retrieveMultiAddrsFromNode(node) if err != nil { log.WithError(err).Errorf("Could not convert to multiAddr node %s", node) continue @@ -462,7 +462,7 @@ func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr { } func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, []ma.Multiaddr, error) { - multiAddrs, err := convertToMultiAddrs(node) + multiAddrs, err := retrieveMultiAddrsFromNode(node) if err != nil { return nil, nil, err } @@ -487,11 +487,11 @@ func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, []ma.Multiaddr, error) return &infos[0], multiAddrs, nil } -// convertToMultiAddrs converts an enode.Node to a list of multiaddrs. +// retrieveMultiAddrsFromNode converts an enode.Node to a list of multiaddrs. // If the node has a both a QUIC and a TCP port set in their ENR, then // the multiaddr corresponding to the QUIC port is added first, followed // by the multiaddr corresponding to the TCP port. -func convertToMultiAddrs(node *enode.Node) ([]ma.Multiaddr, error) { +func retrieveMultiAddrsFromNode(node *enode.Node) ([]ma.Multiaddr, error) { multiaddrs := make([]ma.Multiaddr, 0, 2) // Retrieve the node public key. diff --git a/beacon-chain/p2p/fork_test.go b/beacon-chain/p2p/fork_test.go index 2c2705716e58..d5780f719901 100644 --- a/beacon-chain/p2p/fork_test.go +++ b/beacon-chain/p2p/fork_test.go @@ -104,7 +104,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { for _, node := range nodes { if s.filterPeer(node) { - nodeAddrs, err := convertToMultiAddrs(node) + nodeAddrs, err := retrieveMultiAddrsFromNode(node) require.NoError(t, err) addrs = append(addrs, nodeAddrs...) } @@ -195,7 +195,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { for _, node := range nodes { if s.filterPeer(node) { - nodeAddrs, err := convertToMultiAddrs(node) + nodeAddrs, err := retrieveMultiAddrsFromNode(node) require.NoError(t, err) addrs = append(addrs, nodeAddrs...) } diff --git a/beacon-chain/p2p/options_test.go b/beacon-chain/p2p/options_test.go index b7f29788e342..89a7fd854bf6 100644 --- a/beacon-chain/p2p/options_test.go +++ b/beacon-chain/p2p/options_test.go @@ -89,7 +89,7 @@ func TestIPV6Support(t *testing.T) { lNode := enode.NewLocalNode(db, key) mockIPV6 := net.IP{0xff, 0x02, 0xAA, 0, 0x1F, 0, 0x2E, 0, 0, 0x36, 0x45, 0, 0, 0, 0, 0x02} lNode.Set(enr.IP(mockIPV6)) - mas, err := convertToMultiAddrs(lNode.Node()) + mas, err := retrieveMultiAddrsFromNode(lNode.Node()) if err != nil { t.Fatal(err) } From e9d80e2dfa2f629e8629ce0f44667f70e11b3648 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 4 Apr 2024 11:24:28 +0200 Subject: [PATCH 18/18] `convertToAddrInfo`: Ensure `len(infos) == 1`. --- beacon-chain/p2p/discovery.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index bcde73702bb0..7fc63927ca88 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -321,7 +321,7 @@ func (s *Service) filterPeer(node *enode.Node) bool { return false } - if len(multiAddrs) == 0 { + if peerData == nil || len(multiAddrs) == 0 { return false } @@ -476,12 +476,8 @@ func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, []ma.Multiaddr, error) return nil, nil, errors.Wrapf(err, "could not convert to peer info: %v", multiAddrs) } - if len(infos) > 1 { - return nil, nil, errors.Errorf("infos contains %v elements, expected not more than 1", len(infos)) - } - - if len(infos) == 0 { - return nil, multiAddrs, nil + if len(infos) != 1 { + return nil, nil, errors.Errorf("infos contains %v elements, expected exactly 1", len(infos)) } return &infos[0], multiAddrs, nil