Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Add discovery version #3616

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions nodebuilder/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package p2p

import (
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

const defaultRoutingRefreshPeriod = time.Minute

// Config combines all configuration fields for P2P subsystem.
type Config struct {
// ListenAddresses - Addresses to listen to on local NIC.
Expand All @@ -29,8 +26,7 @@ type Config struct {
// This is enabled by default for Bootstrappers.
PeerExchange bool
// ConnManager is a configuration tuple for ConnectionManager.
ConnManager connManagerConfig
RoutingTableRefreshPeriod time.Duration
ConnManager connManagerConfig

// Allowlist for IPColocation PubSub parameter, a list of string CIDRs
IPColocationWhitelist []string
Expand Down Expand Up @@ -64,10 +60,9 @@ func DefaultConfig(tp node.Type) Config {
"/ip4/127.0.0.1/tcp/2121",
"/ip6/::/tcp/2121",
},
MutualPeers: []string{},
PeerExchange: tp == node.Bridge || tp == node.Full,
ConnManager: defaultConnManagerConfig(tp),
RoutingTableRefreshPeriod: defaultRoutingRefreshPeriod,
MutualPeers: []string{},
PeerExchange: tp == node.Bridge || tp == node.Full,
ConnManager: defaultConnManagerConfig(tp),
}
}

Expand All @@ -83,15 +78,6 @@ func (cfg *Config) mutualPeers() (_ []peer.AddrInfo, err error) {
return peer.AddrInfosFromP2pAddrs(maddrs...)
}

// Validate performs basic validation of the config.
func (cfg *Config) Validate() error {
if cfg.RoutingTableRefreshPeriod <= 0 {
cfg.RoutingTableRefreshPeriod = defaultRoutingRefreshPeriod
log.Warnf("routingTableRefreshPeriod is not valid. restoring to default value: %d", cfg.RoutingTableRefreshPeriod)
}
return nil
}

// Upgrade updates the `ListenAddresses` and `NoAnnounceAddresses` to
// include support for websocket connections.
func (cfg *Config) Upgrade() {
Expand Down
4 changes: 1 addition & 3 deletions nodebuilder/p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ var log = logging.Logger("module/p2p")
// ConstructModule collects all the components and services related to p2p.
func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// sanitize config values before constructing module
cfgErr := cfg.Validate()
baseComponents := fx.Options(
fx.Error(cfgErr),
fx.Supply(cfg),
fx.Provide(Key),
fx.Provide(id),
Expand All @@ -28,7 +26,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Provide(pubSub),
fx.Provide(ipld.NewBlockservice),
fx.Provide(peerRouting),
fx.Provide(contentRouting),
fx.Provide(newDHT),
fx.Provide(addrsFactory(cfg.AnnounceAddresses, cfg.NoAnnounceAddresses)),
fx.Provide(metrics.NewBandwidthCounter),
fx.Provide(newModule),
Expand Down
76 changes: 29 additions & 47 deletions nodebuilder/p2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,70 +6,52 @@ import (

"github.com/ipfs/go-datastore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
)

// contentRouting constructs nil content routing,
// as for our use-case existing ContentRouting mechanisms, e.g DHT, are unsuitable
func contentRouting(r routing.PeerRouting) routing.ContentRouting {
return r.(*dht.IpfsDHT)
}

// peerRouting provides constructor for PeerRouting over DHT.
// Basically, this provides a way to discover peer addresses by respecting public keys.
func peerRouting(cfg *Config, tp node.Type, params routingParams) (routing.PeerRouting, error) {
opts := []dht.Option{
dht.BootstrapPeers(params.Peers...),
dht.ProtocolPrefix(protocol.ID(fmt.Sprintf("/celestia/%s", params.Net))),
dht.Datastore(params.DataStore),
dht.RoutingTableRefreshPeriod(cfg.RoutingTableRefreshPeriod),
}

if isBootstrapper() {
opts = append(opts,
dht.BootstrapPeers(), // no bootstrappers for a bootstrapper ¯\_(ツ)_/¯
)
}

Wondertan marked this conversation as resolved.
Show resolved Hide resolved
func newDHT(
ctx context.Context,
lc fx.Lifecycle,
tp node.Type,
network Network,
bootsrappers Bootstrappers,
host HostBase,
dataStore datastore.Batching,
) (*dht.IpfsDHT, error) {
var mode dht.ModeOpt
switch tp {
case node.Light:
opts = append(opts,
dht.Mode(dht.ModeClient),
)
mode = dht.ModeClient
case node.Bridge, node.Full:
opts = append(opts,
dht.Mode(dht.ModeServer),
)
mode = dht.ModeServer
default:
return nil, fmt.Errorf("unsupported node type: %s", tp)
}

d, err := dht.New(params.Ctx, params.Host, opts...)
// no bootstrappers for a bootstrapper ¯\_(ツ)_/¯
// otherwise dht.Bootstrap(OnStart hook) will deadlock
if isBootstrapper() {
bootsrappers = nil
}

dht, err := discovery.NewDHT(ctx, network.String(), bootsrappers, host, dataStore, mode)
if err != nil {
return nil, err
}
params.Lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return d.Bootstrap(ctx)
},
OnStop: func(context.Context) error {
return d.Close()
},
stopFn := func(context.Context) error {
return dht.Close()
}
lc.Append(fx.Hook{
OnStart: dht.Bootstrap,
OnStop: stopFn,
})
return d, nil
return dht, nil
}

type routingParams struct {
fx.In

Ctx context.Context
Net Network
Peers Bootstrappers
Lc fx.Lifecycle
Host HostBase
DataStore datastore.Batching
func peerRouting(dht *dht.IpfsDHT) routing.PeerRouting {
return dht
}
30 changes: 21 additions & 9 deletions nodebuilder/share/p2p_constructors.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package share

import (
dht "github.com/libp2p/go-libp2p-kad-dht"
p2pdisc "github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.uber.org/fx"
Expand All @@ -24,11 +25,16 @@ const (
// archivalNodesTag is the tag used to identify archival nodes in the
// discovery service.
archivalNodesTag = "archival"

// discovery version is a prefix for all tags used in discovery. It is bumped when
// there are protocol breaking changes.
version = "v1"
)

// TODO @renaynay: rename
func peerComponents(tp node.Type, cfg *Config) fx.Option {
return fx.Options(
fx.Provide(routingDiscovery),
fullDiscoveryAndPeerManager(tp, cfg),
archivalDiscoveryAndPeerManager(tp, cfg),
)
Expand All @@ -42,8 +48,8 @@ func fullDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option {
func(
lc fx.Lifecycle,
host host.Host,
r routing.ContentRouting,
connGater *conngater.BasicConnectionGater,
disc p2pdisc.Discovery,
shrexSub *shrexsub.PubSub,
headerSub libhead.Subscriber[*header.ExtendedHeader],
// we must ensure Syncer is started before PeerManager
Expand Down Expand Up @@ -78,7 +84,8 @@ func fullDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option {
fullDisc, err := discovery.NewDiscovery(
cfg.Discovery,
host,
routingdisc.NewRoutingDiscovery(r),
disc,
version,
fullNodesTag,
discOpts...,
)
Expand All @@ -100,10 +107,10 @@ func archivalDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option {
func(
lc fx.Lifecycle,
pruneCfg *modprune.Config,
d *discovery.Discovery,
manager *peers.Manager,
fullDisc *discovery.Discovery,
fullManager *peers.Manager,
h host.Host,
r routing.ContentRouting,
disc p2pdisc.Discovery,
gater *conngater.BasicConnectionGater,
) (map[string]*peers.Manager, []*discovery.Discovery, error) {
archivalPeerManager, err := peers.NewManager(
Expand All @@ -125,7 +132,8 @@ func archivalDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option {
archivalDisc, err := discovery.NewDiscovery(
cfg.Discovery,
h,
routingdisc.NewRoutingDiscovery(r),
disc,
version,
archivalNodesTag,
discOpts...,
)
Expand All @@ -137,7 +145,11 @@ func archivalDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option {
OnStop: archivalDisc.Stop,
})

managers := map[string]*peers.Manager{fullNodesTag: manager, archivalNodesTag: archivalPeerManager}
return managers, []*discovery.Discovery{d, archivalDisc}, nil
managers := map[string]*peers.Manager{fullNodesTag: fullManager, archivalNodesTag: archivalPeerManager}
return managers, []*discovery.Discovery{fullDisc, archivalDisc}, nil
})
}

func routingDiscovery(dht *dht.IpfsDHT) p2pdisc.Discovery {
return routingdisc.NewRoutingDiscovery(dht)
}
1 change: 0 additions & 1 deletion nodebuilder/tests/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ func getAdminClient(ctx context.Context, nd *nodebuilder.Node, t *testing.T) *cl
}

func setTimeInterval(cfg *nodebuilder.Config, interval time.Duration) {
cfg.P2P.RoutingTableRefreshPeriod = interval
cfg.Share.Discovery.AdvertiseInterval = interval
}
38 changes: 38 additions & 0 deletions share/shwap/p2p/discovery/dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package discovery

import (
"context"
"fmt"
"time"

"github.com/ipfs/go-datastore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

const (
defaultRoutingRefreshPeriod = time.Minute
)

// PeerRouting provides constructor for PeerRouting over DHT.
// Basically, this provides a way to discover peer addresses by respecting public keys.
func NewDHT(
ctx context.Context,
prefix string,
bootsrappers []peer.AddrInfo,
host host.Host,
dataStore datastore.Batching,
mode dht.ModeOpt,
) (*dht.IpfsDHT, error) {
opts := []dht.Option{
dht.BootstrapPeers(bootsrappers...),
dht.ProtocolPrefix(protocol.ID(fmt.Sprintf("/celestia/%s", prefix))),
dht.Datastore(dataStore),
dht.RoutingTableRefreshPeriod(defaultRoutingRefreshPeriod),
dht.Mode(mode),
}

return dht.New(ctx, host, opts...)
}
Loading
Loading