Skip to content

Commit

Permalink
rework constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Sep 9, 2024
1 parent 844ad57 commit ea8cef0
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 104 deletions.
22 changes: 4 additions & 18 deletions nodebuilder/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package p2p

import (
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

const defaultRoutingRefreshPeriod = time.Minute

// Config combines all configuration fields for P2P subsystem.
type Config struct {
// ListenAddresses - Addresses to listen to on local NIC.
Expand All @@ -29,8 +26,7 @@ type Config struct {
// This is enabled by default for Bootstrappers.
PeerExchange bool
// ConnManager is a configuration tuple for ConnectionManager.
ConnManager connManagerConfig
RoutingTableRefreshPeriod time.Duration
ConnManager connManagerConfig

// Allowlist for IPColocation PubSub parameter, a list of string CIDRs
IPColocationWhitelist []string
Expand Down Expand Up @@ -64,10 +60,9 @@ func DefaultConfig(tp node.Type) Config {
"/ip4/127.0.0.1/tcp/2121",
"/ip6/::/tcp/2121",
},
MutualPeers: []string{},
PeerExchange: tp == node.Bridge || tp == node.Full,
ConnManager: defaultConnManagerConfig(tp),
RoutingTableRefreshPeriod: defaultRoutingRefreshPeriod,
MutualPeers: []string{},
PeerExchange: tp == node.Bridge || tp == node.Full,
ConnManager: defaultConnManagerConfig(tp),
}
}

Expand All @@ -83,15 +78,6 @@ func (cfg *Config) mutualPeers() (_ []peer.AddrInfo, err error) {
return peer.AddrInfosFromP2pAddrs(maddrs...)
}

// Validate performs basic validation of the config.
func (cfg *Config) Validate() error {
if cfg.RoutingTableRefreshPeriod <= 0 {
cfg.RoutingTableRefreshPeriod = defaultRoutingRefreshPeriod
log.Warnf("routingTableRefreshPeriod is not valid. restoring to default value: %d", cfg.RoutingTableRefreshPeriod)
}
return nil
}

// Upgrade updates the `ListenAddresses` and `NoAnnounceAddresses` to
// include support for websocket connections.
func (cfg *Config) Upgrade() {
Expand Down
4 changes: 1 addition & 3 deletions nodebuilder/p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ var log = logging.Logger("module/p2p")
// ConstructModule collects all the components and services related to p2p.
func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// sanitize config values before constructing module
cfgErr := cfg.Validate()
baseComponents := fx.Options(
fx.Error(cfgErr),
fx.Supply(cfg),
fx.Provide(Key),
fx.Provide(id),
Expand All @@ -28,7 +26,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Provide(pubSub),
fx.Provide(ipld.NewBlockservice),
fx.Provide(peerRouting),
fx.Provide(contentRouting),
fx.Provide(newDHT),
fx.Provide(addrsFactory(cfg.AnnounceAddresses, cfg.NoAnnounceAddresses)),
fx.Provide(metrics.NewBandwidthCounter),
fx.Provide(newModule),
Expand Down
70 changes: 23 additions & 47 deletions nodebuilder/p2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,70 +6,46 @@ import (

"github.com/ipfs/go-datastore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
)

// contentRouting constructs nil content routing,
// as for our use-case existing ContentRouting mechanisms, e.g DHT, are unsuitable
func contentRouting(r routing.PeerRouting) routing.ContentRouting {
return r.(*dht.IpfsDHT)
}

// peerRouting provides constructor for PeerRouting over DHT.
// Basically, this provides a way to discover peer addresses by respecting public keys.
func peerRouting(cfg *Config, tp node.Type, params routingParams) (routing.PeerRouting, error) {
opts := []dht.Option{
dht.BootstrapPeers(params.Peers...),
dht.ProtocolPrefix(protocol.ID(fmt.Sprintf("/celestia/%s", params.Net))),
dht.Datastore(params.DataStore),
dht.RoutingTableRefreshPeriod(cfg.RoutingTableRefreshPeriod),
}

if isBootstrapper() {
opts = append(opts,
dht.BootstrapPeers(), // no bootstrappers for a bootstrapper ¯\_(ツ)_/¯
)
}

func newDHT(
ctx context.Context,
lc fx.Lifecycle,
tp node.Type,
network Network,
bootsrappers Bootstrappers,
host HostBase,
dataStore datastore.Batching,
) (*dht.IpfsDHT, error) {
var mode dht.ModeOpt
switch tp {
case node.Light:
opts = append(opts,
dht.Mode(dht.ModeClient),
)
mode = dht.ModeClient
case node.Bridge, node.Full:
opts = append(opts,
dht.Mode(dht.ModeServer),
)
mode = dht.ModeServer
default:
return nil, fmt.Errorf("unsupported node type: %s", tp)
}

d, err := dht.New(params.Ctx, params.Host, opts...)
dht, err := discovery.NewDHT(ctx, network.String(), bootsrappers, host, dataStore, mode)
if err != nil {
return nil, err
}
params.Lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return d.Bootstrap(ctx)
},
OnStop: func(context.Context) error {
return d.Close()
},
stopFn := func(context.Context) error {
return dht.Close()
}
lc.Append(fx.Hook{
OnStart: dht.Bootstrap,
OnStop: stopFn,
})
return d, nil
return dht, nil
}

type routingParams struct {
fx.In

Ctx context.Context
Net Network
Peers Bootstrappers
Lc fx.Lifecycle
Host HostBase
DataStore datastore.Batching
func peerRouting(dht *dht.IpfsDHT) routing.PeerRouting {
return dht
}
24 changes: 15 additions & 9 deletions nodebuilder/share/p2p_constructors.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package share

import (
dht "github.com/libp2p/go-libp2p-kad-dht"
p2pdisc "github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.uber.org/fx"
Expand All @@ -29,6 +30,7 @@ const (
// TODO @renaynay: rename
func peerComponents(tp node.Type, cfg *Config) fx.Option {
return fx.Options(
fx.Provide(routingDiscovery),
fullDiscoveryAndPeerManager(tp, cfg),
archivalDiscoveryAndPeerManager(tp, cfg),
)
Expand All @@ -42,8 +44,8 @@ func fullDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option {
func(
lc fx.Lifecycle,
host host.Host,
r routing.ContentRouting,
connGater *conngater.BasicConnectionGater,
disc p2pdisc.Discovery,
shrexSub *shrexsub.PubSub,
headerSub libhead.Subscriber[*header.ExtendedHeader],
// we must ensure Syncer is started before PeerManager
Expand Down Expand Up @@ -78,7 +80,7 @@ func fullDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option {
fullDisc, err := discovery.NewDiscovery(
cfg.Discovery,
host,
routingdisc.NewRoutingDiscovery(r),
disc,
fullNodesTag,
discOpts...,
)
Expand All @@ -100,10 +102,10 @@ func archivalDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option {
func(
lc fx.Lifecycle,
pruneCfg *modprune.Config,
d *discovery.Discovery,
manager *peers.Manager,
fullDisc *discovery.Discovery,
fullManager *peers.Manager,
h host.Host,
r routing.ContentRouting,
disc p2pdisc.Discovery,
gater *conngater.BasicConnectionGater,
) (map[string]*peers.Manager, []*discovery.Discovery, error) {
archivalPeerManager, err := peers.NewManager(
Expand All @@ -125,7 +127,7 @@ func archivalDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option {
archivalDisc, err := discovery.NewDiscovery(
cfg.Discovery,
h,
routingdisc.NewRoutingDiscovery(r),
disc,
archivalNodesTag,
discOpts...,
)
Expand All @@ -137,7 +139,11 @@ func archivalDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option {
OnStop: archivalDisc.Stop,
})

managers := map[string]*peers.Manager{fullNodesTag: manager, archivalNodesTag: archivalPeerManager}
return managers, []*discovery.Discovery{d, archivalDisc}, nil
managers := map[string]*peers.Manager{fullNodesTag: fullManager, archivalNodesTag: archivalPeerManager}
return managers, []*discovery.Discovery{fullDisc, archivalDisc}, nil
})
}

func routingDiscovery(dht *dht.IpfsDHT) p2pdisc.Discovery {
return routingdisc.NewRoutingDiscovery(dht)
}
1 change: 0 additions & 1 deletion nodebuilder/tests/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ func getAdminClient(ctx context.Context, nd *nodebuilder.Node, t *testing.T) *cl
}

func setTimeInterval(cfg *nodebuilder.Config, interval time.Duration) {
cfg.P2P.RoutingTableRefreshPeriod = interval
cfg.Share.Discovery.AdvertiseInterval = interval
}
Loading

0 comments on commit ea8cef0

Please sign in to comment.