Skip to content

Commit

Permalink
les: switch to new discv5 (#21940)
Browse files Browse the repository at this point in the history
This PR enables running the new discv5 protocol in both LES client
and server mode. In client mode it mixes discv5 and dnsdisc iterators
(if both are enabled) and filters incoming ENRs for "les" tag and fork ID.
The old p2p/discv5 package and all references to it are removed.

Co-authored-by: Felix Lange <[email protected]>
  • Loading branch information
zsfelfoldi and fjl authored Jan 26, 2021
1 parent 9c57293 commit a72fa88
Show file tree
Hide file tree
Showing 31 changed files with 113 additions and 6,184 deletions.
15 changes: 7 additions & 8 deletions cmd/bootnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
Expand Down Expand Up @@ -121,17 +120,17 @@ func main() {

printNotice(&nodeKey.PublicKey, *realaddr)

db, _ := enode.OpenDB("")
ln := enode.NewLocalNode(db, nodeKey)
cfg := discover.Config{
PrivateKey: nodeKey,
NetRestrict: restrictList,
}
if *runv5 {
if _, err := discv5.ListenUDP(nodeKey, conn, "", restrictList); err != nil {
if _, err := discover.ListenV5(conn, ln, cfg); err != nil {
utils.Fatalf("%v", err)
}
} else {
db, _ := enode.OpenDB("")
ln := enode.NewLocalNode(db, nodeKey)
cfg := discover.Config{
PrivateKey: nodeKey,
NetRestrict: restrictList,
}
if _, err := discover.ListenUDP(conn, ln, cfg); err != nil {
utils.Fatalf("%v", err)
}
Expand Down
7 changes: 3 additions & 4 deletions cmd/faucet/faucet.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/params"
Expand Down Expand Up @@ -154,9 +153,9 @@ func main() {
log.Crit("Failed to parse genesis block json", "err", err)
}
// Convert the bootnodes to internal enode representations
var enodes []*discv5.Node
var enodes []*enode.Node
for _, boot := range strings.Split(*bootFlag, ",") {
if url, err := discv5.ParseNode(boot); err == nil {
if url, err := enode.Parse(enode.ValidSchemes, boot); err == nil {
enodes = append(enodes, url)
} else {
log.Error("Failed to parse bootnode URL", "url", boot, "err", err)
Expand Down Expand Up @@ -228,7 +227,7 @@ type wsConn struct {
wlock sync.Mutex
}

func newFaucet(genesis *core.Genesis, port int, enodes []*discv5.Node, network uint64, stats string, ks *keystore.KeyStore, index []byte) (*faucet, error) {
func newFaucet(genesis *core.Genesis, port int, enodes []*enode.Node, network uint64, stats string, ks *keystore.KeyStore, index []byte) (*faucet, error) {
// Assemble the raw devp2p protocol stack
stack, err := node.New(&node.Config{
Name: "geth",
Expand Down
15 changes: 3 additions & 12 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
Expand Down Expand Up @@ -842,30 +841,22 @@ func setBootstrapNodes(ctx *cli.Context, cfg *p2p.Config) {
// setBootstrapNodesV5 creates a list of bootstrap nodes from the command line
// flags, reverting to pre-configured ones if none have been specified.
func setBootstrapNodesV5(ctx *cli.Context, cfg *p2p.Config) {
urls := params.MainnetBootnodes
urls := params.V5Bootnodes
switch {
case ctx.GlobalIsSet(BootnodesFlag.Name) || ctx.GlobalIsSet(LegacyBootnodesV5Flag.Name):
if ctx.GlobalIsSet(LegacyBootnodesV5Flag.Name) {
urls = SplitAndTrim(ctx.GlobalString(LegacyBootnodesV5Flag.Name))
} else {
urls = SplitAndTrim(ctx.GlobalString(BootnodesFlag.Name))
}
case ctx.GlobalBool(RopstenFlag.Name):
urls = params.RopstenBootnodes
case ctx.GlobalBool(RinkebyFlag.Name):
urls = params.RinkebyBootnodes
case ctx.GlobalBool(GoerliFlag.Name):
urls = params.GoerliBootnodes
case ctx.GlobalBool(YoloV2Flag.Name):
urls = params.YoloV2Bootnodes
case cfg.BootstrapNodesV5 != nil:
return // already set, don't apply defaults.
}

cfg.BootstrapNodesV5 = make([]*discv5.Node, 0, len(urls))
cfg.BootstrapNodesV5 = make([]*enode.Node, 0, len(urls))
for _, url := range urls {
if url != "" {
node, err := discv5.ParseNode(url)
node, err := enode.Parse(enode.ValidSchemes, url)
if err != nil {
log.Error("Bootstrap URL invalid", "enode", url, "err", err)
continue
Expand Down
13 changes: 8 additions & 5 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type LightEthereum struct {
netRPCService *ethapi.PublicNetAPI

p2pServer *p2p.Server
p2pConfig *p2p.Config
}

// New creates an instance of the light client.
Expand Down Expand Up @@ -109,14 +110,11 @@ func New(stack *node.Node, config *eth.Config) (*LightEthereum, error) {
bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
p2pServer: stack.Server(),
p2pConfig: &stack.Config().P2P,
}
peers.subscribe((*vtSubscription)(leth.valueTracker))

dnsdisc, err := leth.setupDiscovery()
if err != nil {
return nil, err
}
leth.serverPool = newServerPool(lespayDb, []byte("serverpool:"), leth.valueTracker, dnsdisc, time.Second, nil, &mclock.System{}, config.UltraLightServers)
leth.serverPool = newServerPool(lespayDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
peers.subscribe(leth.serverPool)
leth.dialCandidates = leth.serverPool.dialIterator

Expand Down Expand Up @@ -299,6 +297,11 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
func (s *LightEthereum) Start() error {
log.Warn("Light client mode is an experimental feature")

discovery, err := s.setupDiscovery(s.p2pConfig)
if err != nil {
return err
}
s.serverPool.addSource(discovery)
s.serverPool.start()
// Start bloom request workers.
s.wg.Add(bloomServiceThreads)
Expand Down
12 changes: 0 additions & 12 deletions les/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
)
Expand All @@ -42,17 +41,6 @@ func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}

func lesTopic(genesisHash common.Hash, protocolVersion uint) discv5.Topic {
var name string
switch protocolVersion {
case lpv2:
name = "LES2"
default:
panic(nil)
}
return discv5.Topic(name + "@" + common.Bytes2Hex(genesisHash.Bytes()[0:8]))
}

type chainReader interface {
CurrentHeader() *types.Header
}
Expand Down
47 changes: 38 additions & 9 deletions les/enr_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package les

import (
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
Expand All @@ -25,19 +27,46 @@ import (
// lesEntry is the "les" ENR entry. This is set for LES servers only.
type lesEntry struct {
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
_ []rlp.RawValue `rlp:"tail"`
}

// ENRKey implements enr.Entry.
func (e lesEntry) ENRKey() string {
return "les"
func (lesEntry) ENRKey() string { return "les" }

// ethEntry is the "eth" ENR entry. This is redeclared here to avoid depending on package eth.
type ethEntry struct {
ForkID forkid.ID
_ []rlp.RawValue `rlp:"tail"`
}

func (ethEntry) ENRKey() string { return "eth" }

// setupDiscovery creates the node discovery source for the eth protocol.
func (eth *LightEthereum) setupDiscovery() (enode.Iterator, error) {
if len(eth.config.EthDiscoveryURLs) == 0 {
return nil, nil
func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) {
it := enode.NewFairMix(0)

// Enable DNS discovery.
if len(eth.config.EthDiscoveryURLs) != 0 {
client := dnsdisc.NewClient(dnsdisc.Config{})
dns, err := client.NewIterator(eth.config.EthDiscoveryURLs...)
if err != nil {
return nil, err
}
it.AddSource(dns)
}

// Enable DHT.
if cfg.DiscoveryV5 && eth.p2pServer.DiscV5 != nil {
it.AddSource(eth.p2pServer.DiscV5.RandomNodes())
}
client := dnsdisc.NewClient(dnsdisc.Config{})
return client.NewIterator(eth.config.EthDiscoveryURLs...)

forkFilter := forkid.NewFilter(eth.blockchain)
iterator := enode.Filter(it, func(n *enode.Node) bool { return nodeIsServer(forkFilter, n) })
return iterator, nil
}

// nodeIsServer checks whether n is an LES server node.
func nodeIsServer(forkFilter forkid.Filter, n *enode.Node) bool {
var les lesEntry
var eth ethEntry
return n.Load(&les) == nil && n.Load(&eth) == nil && forkFilter(eth.ForkID) == nil
}
21 changes: 0 additions & 21 deletions les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nodestate"
Expand Down Expand Up @@ -58,7 +57,6 @@ type LesServer struct {
archiveMode bool // Flag whether the ethereum node runs in archive mode.
handler *serverHandler
broadcaster *broadcaster
lesTopics []discv5.Topic
privateKey *ecdsa.PrivateKey

// Flow control and capacity management
Expand All @@ -77,11 +75,6 @@ type LesServer struct {

func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
ns := nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup)
// Collect les protocol version information supported by local node.
lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
for i, pv := range AdvertiseProtocolVersions {
lesTopics[i] = lesTopic(e.BlockChain().Genesis().Hash(), pv)
}
// Calculate the number of threads used to service the light client
// requests based on the user-specified value.
threads := config.LightServ * 4 / 100
Expand All @@ -103,7 +96,6 @@ func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesSer
ns: ns,
archiveMode: e.ArchiveMode(),
broadcaster: newBroadcaster(ns),
lesTopics: lesTopics,
fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
threadsBusy: config.LightServ/100 + 1,
Expand Down Expand Up @@ -203,19 +195,6 @@ func (s *LesServer) Start() error {
s.wg.Add(1)
go s.capacityManagement()

if s.p2pSrv.DiscV5 != nil {
for _, topic := range s.lesTopics {
topic := topic
go func() {
logger := log.New("topic", topic)
logger.Info("Starting topic registration")
defer logger.Info("Terminated topic registration")

s.p2pSrv.DiscV5.RegisterTopic(topic, s.closeCh)
}()
}
}

return nil
}

Expand Down
12 changes: 8 additions & 4 deletions les/serverpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var (
)

// newServerPool creates a new server pool
func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, discovery enode.Iterator, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string) *serverPool {
func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string) *serverPool {
s := &serverPool{
db: db,
clock: clock,
Expand All @@ -147,9 +147,6 @@ func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, d
alwaysConnect := lpc.NewQueueIterator(s.ns, sfAlwaysConnect, sfDisableSelection, true, nil)
s.mixSources = append(s.mixSources, knownSelector)
s.mixSources = append(s.mixSources, alwaysConnect)
if discovery != nil {
s.mixSources = append(s.mixSources, discovery)
}

iter := enode.Iterator(s.mixer)
if query != nil {
Expand All @@ -175,6 +172,13 @@ func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, d
return s
}

// addSource adds a node discovery source to the server pool (should be called before start)
func (s *serverPool) addSource(source enode.Iterator) {
if source != nil {
s.mixSources = append(s.mixSources, source)
}
}

// addPreNegFilter installs a node filter mechanism that performs a pre-negotiation query.
// Nodes that are filtered out and does not appear on the output iterator are put back
// into redialWait state.
Expand Down
3 changes: 2 additions & 1 deletion les/serverpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ func (s *serverPoolTest) start() {
}

s.vt = lpc.NewValueTracker(s.db, s.clock, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000))
s.sp = newServerPool(s.db, []byte("serverpool:"), s.vt, s.input, 0, testQuery, s.clock, s.trusted)
s.sp = newServerPool(s.db, []byte("serverpool:"), s.vt, 0, testQuery, s.clock, s.trusted)
s.sp.addSource(s.input)
s.sp.validSchemes = enode.ValidSchemesForTesting
s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
s.disconnect = make(map[int][]int)
Expand Down
12 changes: 6 additions & 6 deletions mobile/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ package geth
import (
"errors"

"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
)

// Enode represents a host on the network.
type Enode struct {
node *discv5.Node
node *enode.Node
}

// NewEnode parses a node designator.
Expand All @@ -53,21 +53,21 @@ type Enode struct {
// and UDP discovery port 30301.
//
// enode://<hex node id>@10.3.58.6:30303?discport=30301
func NewEnode(rawurl string) (enode *Enode, _ error) {
node, err := discv5.ParseNode(rawurl)
func NewEnode(rawurl string) (*Enode, error) {
node, err := enode.Parse(enode.ValidSchemes, rawurl)
if err != nil {
return nil, err
}
return &Enode{node}, nil
}

// Enodes represents a slice of accounts.
type Enodes struct{ nodes []*discv5.Node }
type Enodes struct{ nodes []*enode.Node }

// NewEnodes creates a slice of uninitialized enodes.
func NewEnodes(size int) *Enodes {
return &Enodes{
nodes: make([]*discv5.Node, size),
nodes: make([]*enode.Node, size),
}
}

Expand Down
10 changes: 7 additions & 3 deletions mobile/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"encoding/json"

"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
)

Expand Down Expand Up @@ -62,9 +62,13 @@ func GoerliGenesis() string {
// FoundationBootnodes returns the enode URLs of the P2P bootstrap nodes operated
// by the foundation running the V5 discovery protocol.
func FoundationBootnodes() *Enodes {
nodes := &Enodes{nodes: make([]*discv5.Node, len(params.MainnetBootnodes))}
nodes := &Enodes{nodes: make([]*enode.Node, len(params.MainnetBootnodes))}
for i, url := range params.MainnetBootnodes {
nodes.nodes[i] = discv5.MustParseNode(url)
var err error
nodes.nodes[i], err = enode.Parse(enode.ValidSchemes, url)
if err != nil {
panic("invalid node URL: " + err.Error())
}
}
return nodes
}
Loading

0 comments on commit a72fa88

Please sign in to comment.