From e5ec6a8f8b24a56f2f2a7b94b29b5b0a8d4be010 Mon Sep 17 00:00:00 2001 From: Andrew Chiw Date: Tue, 17 Jan 2023 08:37:49 +0100 Subject: [PATCH] Connection Gater (#610) Peer blacklisting and whitelisting implemented on libp2p level with Connection Gater. Resolves #386 --- config/p2p.go | 2 ++ node/node.go | 11 ++++--- p2p/client.go | 82 +++++++++++++++++++++++++++++++--------------- p2p/client_test.go | 47 ++++++++++++++++++++------ p2p/utils_test.go | 8 ++--- 5 files changed, 105 insertions(+), 45 deletions(-) diff --git a/config/p2p.go b/config/p2p.go index 5a4e703988c..0509f7f95d1 100644 --- a/config/p2p.go +++ b/config/p2p.go @@ -4,4 +4,6 @@ package config type P2PConfig struct { ListenAddress string // Address to listen for incoming connections Seeds string // Comma separated list of seed nodes to connect to + BlockedPeers string // Comma separated list of nodes to ignore + AllowedPeers string // Comma separated list of nodes to whitelist } diff --git a/node/node.go b/node/node.go index 22b54950713..0cd8592bf1b 100644 --- a/node/node.go +++ b/node/node.go @@ -96,13 +96,9 @@ func NewNode( return nil, err } - client, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, logger.With("module", "p2p")) - if err != nil { - return nil, err - } - var baseKV ds.TxnDatastore + var err error if conf.RootDir == "" && conf.DBPath == "" { // this is used for testing logger.Info("WARNING: working in in-memory mode") baseKV, err = store.NewDefaultInMemoryKVStore() @@ -117,6 +113,11 @@ func NewNode( dalcKV := newPrefixKV(baseKV, dalcPrefix) indexerKV := newPrefixKV(baseKV, indexerPrefix) + client, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, baseKV, logger.With("module", "p2p")) + if err != nil { + return nil, err + } + s := store.New(ctx, mainKV) dalc := registry.GetClient(conf.DALayer) diff --git a/p2p/client.go b/p2p/client.go index ed64827762f..46e8b375446 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -17,6 +18,7 @@ import ( discovery "github.com/libp2p/go-libp2p/p2p/discovery/routing" discutil "github.com/libp2p/go-libp2p/p2p/discovery/util" routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" + "github.com/libp2p/go-libp2p/p2p/net/conngater" "github.com/multiformats/go-multiaddr" "github.com/tendermint/tendermint/p2p" "go.uber.org/multierr" @@ -55,9 +57,10 @@ type Client struct { chainID string privKey crypto.PrivKey - host host.Host - dht *dht.IpfsDHT - disc *discovery.RoutingDiscovery + host host.Host + dht *dht.IpfsDHT + disc *discovery.RoutingDiscovery + gater *conngater.BasicConnectionGater txGossiper *Gossiper txValidator GossipValidator @@ -82,15 +85,22 @@ type Client struct { // // Basic checks on parameters are done, and default parameters are provided for unset-configuration // TODO(tzdybal): consider passing entire config, not just P2P config, to reduce number of arguments -func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, logger log.Logger) (*Client, error) { +func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, ds datastore.Datastore, logger log.Logger) (*Client, error) { if privKey == nil { return nil, errNoPrivKey } if conf.ListenAddress == "" { conf.ListenAddress = config.DefaultListenAddress } + + gater, err := conngater.NewBasicConnectionGater(ds) + if err != nil { + return nil, fmt.Errorf("failed to create connection gater: %w", err) + } + return &Client{ conf: conf, + gater: gater, privKey: privKey, chainID: chainID, logger: logger, @@ -121,21 +131,28 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { c.logger.Info("listening on", "address", fmt.Sprintf("%s/p2p/%s", a, c.host.ID())) } + c.logger.Debug("blocking blacklisted peers", "blacklist", c.conf.BlockedPeers) + if err := c.setupBlockedPeers(c.parseAddrInfoList(c.conf.BlockedPeers)); err != nil { + return err + } + + c.logger.Debug("allowing whitelisted peers", "whitelist", c.conf.AllowedPeers) + if err := c.setupAllowedPeers(c.parseAddrInfoList(c.conf.AllowedPeers)); err != nil { + return err + } + c.logger.Debug("setting up gossiping") - err := c.setupGossiping(ctx) - if err != nil { + if err := c.setupGossiping(ctx); err != nil { return err } c.logger.Debug("setting up DHT") - err = c.setupDHT(ctx) - if err != nil { + if err := c.setupDHT(ctx); err != nil { return err } c.logger.Debug("setting up active peer discovery") - err = c.peerDiscovery(ctx) - if err != nil { + if err := c.peerDiscovery(ctx); err != nil { return err } @@ -243,22 +260,16 @@ func (c *Client) Peers() []PeerConnection { } func (c *Client) listen(ctx context.Context) (host.Host, error) { - var err error maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress) if err != nil { return nil, err } - host, err := libp2p.New(libp2p.ListenAddrs(maddr), libp2p.Identity(c.privKey)) - if err != nil { - return nil, err - } - - return host, nil + return libp2p.New(libp2p.ListenAddrs(maddr), libp2p.Identity(c.privKey), libp2p.ConnectionGater(c.gater)) } func (c *Client) setupDHT(ctx context.Context) error { - seedNodes := c.getSeedAddrInfo(c.conf.Seeds) + seedNodes := c.parseAddrInfoList(c.conf.Seeds) if len(seedNodes) == 0 { c.logger.Info("no seed nodes - only listening for connections") } @@ -313,6 +324,24 @@ func (c *Client) setupPeerDiscovery(ctx context.Context) error { return nil } +func (c *Client) setupBlockedPeers(peers []peer.AddrInfo) error { + for _, p := range peers { + if err := c.gater.BlockPeer(p.ID); err != nil { + return err + } + } + return nil +} + +func (c *Client) setupAllowedPeers(peers []peer.AddrInfo) error { + for _, p := range peers { + if err := c.gater.UnblockPeer(p.ID); err != nil { + return err + } + } + return nil +} + func (c *Client) advertise(ctx context.Context) error { discutil.Advertise(ctx, c.disc, c.getNamespace(), cdiscovery.TTL(reAdvertisePeriod)) return nil @@ -377,21 +406,22 @@ func (c *Client) setupGossiping(ctx context.Context) error { return nil } -func (c *Client) getSeedAddrInfo(seedStr string) []peer.AddrInfo { - if len(seedStr) == 0 { +// parseAddrInfoList parses a comma separated string of multiaddrs into a list of peer.AddrInfo structs +func (c *Client) parseAddrInfoList(addrInfoStr string) []peer.AddrInfo { + if len(addrInfoStr) == 0 { return []peer.AddrInfo{} } - seeds := strings.Split(seedStr, ",") - addrs := make([]peer.AddrInfo, 0, len(seeds)) - for _, s := range seeds { - maddr, err := multiaddr.NewMultiaddr(s) + peers := strings.Split(addrInfoStr, ",") + addrs := make([]peer.AddrInfo, 0, len(peers)) + for _, p := range peers { + maddr, err := multiaddr.NewMultiaddr(p) if err != nil { - c.logger.Error("failed to parse seed node", "address", s, "error", err) + c.logger.Error("failed to parse peer", "address", p, "error", err) continue } addrInfo, err := peer.AddrInfoFromP2pAddr(maddr) if err != nil { - c.logger.Error("failed to create addr info for seed", "address", maddr, "error", err) + c.logger.Error("failed to create addr info for peer", "address", maddr, "error", err) continue } addrs = append(addrs, *addrInfo) diff --git a/p2p/client_test.go b/p2p/client_test.go index bca94f9aca9..7b8e97b8688 100644 --- a/p2p/client_test.go +++ b/p2p/client_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" @@ -20,16 +22,40 @@ import ( func TestClientStartup(t *testing.T) { privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - client, err := NewClient(config.P2PConfig{}, privKey, "TestChain", test.NewLogger(t)) assert := assert.New(t) - assert.NoError(err) - assert.NotNil(client) + testCases := []struct { + desc string + p2pconf config.P2PConfig + }{ + {"blank config", config.P2PConfig{}}, + {"peer whitelisting", config.P2PConfig{ + ListenAddress: "", + Seeds: "", + BlockedPeers: "", + AllowedPeers: "/ip4/127.0.0.1/tcp/7676/p2p/12D3KooWM1NFkZozoatQi3JvFE57eBaX56mNgBA68Lk5MTPxBE4U", + }}, + {"peer blacklisting", config.P2PConfig{ + ListenAddress: "", + Seeds: "", + BlockedPeers: "/ip4/127.0.0.1/tcp/7676/p2p/12D3KooWM1NFkZozoatQi3JvFE57eBaX56mNgBA68Lk5MTPxBE4U", + AllowedPeers: "", + }}, + } - err = client.Start(context.Background()) - defer func() { - _ = client.Close() - }() - assert.NoError(err) + for _, testCase := range testCases { + t.Run(testCase.desc, func(t *testing.T) { + client, err := NewClient(testCase.p2pconf, privKey, "TestChain", + dssync.MutexWrap(datastore.NewMapDatastore()), test.NewLogger(t)) + assert.NoError(err) + assert.NotNil(client) + + err = client.Start(context.Background()) + defer func() { + _ = client.Close() + }() + assert.NoError(err) + }) + } } func TestBootstrapping(t *testing.T) { @@ -167,10 +193,11 @@ func TestSeedStringParsing(t *testing.T) { assert := assert.New(t) require := require.New(t) logger := &test.MockLogger{} - client, err := NewClient(config.P2PConfig{}, privKey, "TestNetwork", logger) + client, err := NewClient(config.P2PConfig{}, privKey, "TestNetwork", + dssync.MutexWrap(datastore.NewMapDatastore()), logger) require.NoError(err) require.NotNil(client) - actual := client.getSeedAddrInfo(c.input) + actual := client.parseAddrInfoList(c.input) assert.NotNil(actual) assert.Equal(c.expected, actual) // ensure that errors are logged diff --git a/p2p/utils_test.go b/p2p/utils_test.go index 6ae97b073d9..5617a6879a3 100644 --- a/p2p/utils_test.go +++ b/p2p/utils_test.go @@ -8,6 +8,8 @@ import ( "strings" "testing" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -101,11 +103,9 @@ func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hos clients := make([]*Client, n) for i := 0; i < n; i++ { - client, err := NewClient(config.P2PConfig{ - Seeds: seeds[i]}, + client, err := NewClient(config.P2PConfig{Seeds: seeds[i]}, mnet.Hosts()[i].Peerstore().PrivKey(mnet.Hosts()[i].ID()), - conf[i].chainID, - logger) + conf[i].chainID, sync.MutexWrap(datastore.NewMapDatastore()), logger) require.NoError(err) require.NotNil(client)