From c0b74d714d11348d94dbb7a6467171ee2b737778 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 13 Oct 2023 05:06:24 +0000 Subject: [PATCH] sync: prioritize peers with higher success rate and low latency (#5143) closes: https://github.com/spacemeshos/go-spacemesh/issues/5127 https://github.com/spacemeshos/go-spacemesh/issues/5036 peers that are overwhelmed or generally will not be used for requests. there are two criteria used to select good peer: - request success rate . success rates within 0.1 (10%) of each other are treated as equal, and in such case we will use latency - latency. hs/1 protocol used to track latency, as it is the most used protocol and objects served in this protocol are of the same size with several exceptions (active sets, list of malfeasence proofs). related: https://github.com/spacemeshos/go-spacemesh/issues/4977 limits number of peers to request data for atxs. previously we were requesting data from all peers atleast once. synced data 2 times in 90m, previous attempt on my computer was 1 week ago and took 12h --- CHANGELOG.md | 4 + fetch/fetch.go | 163 ++++++++++++++---------- fetch/fetch_test.go | 180 +++++++++++++-------------- fetch/interface.go | 13 +- fetch/mesh_data_test.go | 241 ++++++++++++++++++++++-------------- fetch/mocks/mocks.go | 154 ----------------------- fetch/peers/peers.go | 156 +++++++++++++++++++++++ fetch/peers/peers_test.go | 237 +++++++++++++++++++++++++++++++++++ node/node_test.go | 95 +++++++++++--- syncer/data_fetch.go | 61 +++++++-- syncer/data_fetch_test.go | 10 +- syncer/find_fork.go | 23 +++- syncer/find_fork_test.go | 4 +- syncer/interface.go | 32 ++++- syncer/mocks/mocks.go | 152 +++++++++++------------ syncer/state_syncer.go | 80 +++++++----- syncer/state_syncer_test.go | 14 +-- syncer/syncer.go | 33 ++++- syncer/syncer_test.go | 4 +- 19 files changed, 1084 insertions(+), 572 deletions(-) create mode 100644 fetch/peers/peers.go create mode 100644 fetch/peers/peers_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ea06b36d50..6b3406e47e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,10 @@ this is hotfix for a bug introduced in v1.2.0. in rare conditions node may loop The submit proof of work should now be up to 40% faster thanks to [code optimization](https://github.com/spacemeshos/poet/pull/419). +* [#5143](https://github.com/spacemeshos/go-spacemesh/pull/5143) Select good peers for sync requests. + + The change improves initial sync speed and any sync protocol requests required during consensus. + ## v1.2.0 ### Upgrade information diff --git a/fetch/fetch.go b/fetch/fetch.go index b5faf770e4..a6be410497 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -4,17 +4,17 @@ package fetch import ( "context" "errors" - "fmt" "math/rand" "sync" "time" - "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/core/network" "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/datastore" + "github.com/spacemeshos/go-spacemesh/fetch/peers" "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/server" @@ -30,6 +30,8 @@ const ( OpnProtocol = "lp/2" cacheSize = 1000 + + RedundantPeers = 10 ) var ( @@ -79,7 +81,6 @@ func (b *batchInfo) toMap() map[types.Hash32]RequestMessage { // Config is the configuration file of the Fetch component. type Config struct { BatchTimeout time.Duration // in milliseconds - MaxRetriesForPeer int BatchSize, QueueSize int RequestTimeout time.Duration // in seconds MaxRetriesForRequest int @@ -89,7 +90,6 @@ type Config struct { func DefaultConfig() Config { return Config{ BatchTimeout: time.Millisecond * time.Duration(50), - MaxRetriesForPeer: 2, QueueSize: 20, BatchSize: 20, RequestTimeout: time.Second * time.Duration(10), @@ -144,6 +144,7 @@ type Fetch struct { logger log.Log bs *datastore.BlobStore host host + peers *peers.Peers servers map[string]requester validators *dataValidators @@ -165,13 +166,20 @@ type Fetch struct { } // NewFetch creates a new Fetch struct. -func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter, host *p2p.Host, opts ...Option) *Fetch { +func NewFetch( + cdb *datastore.CachedDB, + msh meshProvider, + b system.BeaconGetter, + host *p2p.Host, + opts ...Option, +) *Fetch { bs := datastore.NewBlobStore(cdb.Database) f := &Fetch{ cfg: DefaultConfig(), logger: log.NewNop(), bs: bs, host: host, + peers: peers.New(), servers: map[string]requester{}, unprocessed: make(map[types.Hash32]*request), ongoing: make(map[types.Hash32]*request), @@ -181,6 +189,28 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter, for _, opt := range opts { opt(f) } + // NOTE(dshulyak) this is to avoid tests refactoring. + // there is one test that covers this part. + if host != nil { + connectedf := func(peer p2p.Peer) { + f.logger.With().Debug("add peer", log.Stringer("id", peer)) + f.peers.Add(peer) + } + host.Network().Notify(&network.NotifyBundle{ + ConnectedF: func(_ network.Network, c network.Conn) { + connectedf(c.RemotePeer()) + }, + DisconnectedF: func(_ network.Network, c network.Conn) { + f.logger.With().Debug("remove peer", log.Stringer("id", c.RemotePeer())) + f.peers.Delete(c.RemotePeer()) + }, + }) + for _, peer := range host.GetPeers() { + if host.Connected(peer) { + connectedf(peer) + } + } + } f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout) srvOpts := []server.Opt{ @@ -190,11 +220,23 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter, if len(f.servers) == 0 { h := newHandler(cdb, bs, msh, b, f.logger) f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...) - f.servers[lyrDataProtocol] = server.New(host, lyrDataProtocol, h.handleLayerDataReq, srvOpts...) + f.servers[lyrDataProtocol] = server.New( + host, + lyrDataProtocol, + h.handleLayerDataReq, + srvOpts...) f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...) - f.servers[meshHashProtocol] = server.New(host, meshHashProtocol, h.handleMeshHashReq, srvOpts...) + f.servers[meshHashProtocol] = server.New( + host, + meshHashProtocol, + h.handleMeshHashReq, + srvOpts...) f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...) - f.servers[OpnProtocol] = server.New(host, OpnProtocol, h.handleLayerOpinionsReq2, srvOpts...) + f.servers[OpnProtocol] = server.New( + host, + OpnProtocol, + h.handleLayerOpinionsReq2, + srvOpts...) } return f } @@ -255,9 +297,6 @@ func (f *Fetch) Stop() { f.logger.Info("stopping fetch") f.batchTimeout.Stop() f.cancel() - if err := f.host.Close(); err != nil { - f.logger.With().Warning("error closing host", log.Err(err)) - } f.mu.Lock() for _, req := range f.unprocessed { close(req.promise.completed) @@ -423,7 +462,9 @@ func (f *Fetch) getUnprocessed() []RequestMessage { var requestList []RequestMessage // only send one request per hash for hash, req := range f.unprocessed { - f.logger.WithContext(req.ctx).With().Debug("processing hash request", log.Stringer("hash", hash)) + f.logger.WithContext(req.ctx). + With(). + Debug("processing hash request", log.Stringer("hash", hash)) requestList = append(requestList, RequestMessage{Hash: hash, Hint: req.hint}) // move the processed requests to pending f.ongoing[hash] = req @@ -450,7 +491,7 @@ func (f *Fetch) send(requests []RequestMessage) { peer: peer, } batch.setID() - _ = f.sendBatch(peer, batch) + f.sendBatch(peer, batch) } } } @@ -458,8 +499,9 @@ func (f *Fetch) send(requests []RequestMessage) { func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]RequestMessage { rng := rand.New(rand.NewSource(time.Now().UnixNano())) peer2requests := make(map[p2p.Peer][]RequestMessage) - peers := f.host.GetPeers() - if len(peers) == 0 { + + best := f.peers.SelectBest(RedundantPeers) + if len(best) == 0 { f.logger.Info("cannot send batch: no peers found") f.mu.Lock() defer f.mu.Unlock() @@ -479,16 +521,10 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req return nil } for _, req := range requests { - target := p2p.NoPeer hashPeers := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng) - for _, p := range hashPeers { - if f.host.Connected(p) { - target = p - break - } - } + target := f.peers.SelectBestFrom(hashPeers) if target == p2p.NoPeer { - target = randomPeer(peers) + target = randomPeer(best) } _, ok := peer2requests[target] if !ok { @@ -519,51 +555,42 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req } // sendBatch dispatches batched request messages to provided peer. -func (f *Fetch) sendBatch(p p2p.Peer, batch *batchInfo) error { +func (f *Fetch) sendBatch(peer p2p.Peer, batch *batchInfo) { + if f.stopped() { + return + } f.mu.Lock() f.batched[batch.ID] = batch f.mu.Unlock() - - f.logger.With().Debug("sending batch request", + f.logger.With().Debug("sending batched request to peer", log.Stringer("batch_hash", batch.ID), - log.Stringer("peer", batch.peer)) - // timeout function will be called if no response was received for the hashes sent - errorFunc := func(err error) { + log.Int("num_requests", len(batch.Requests)), + log.Stringer("peer", peer), + ) + // Request is asynchronous, + // it will return errors only if size of the bytes buffer is large + // or target peer is not connected + start := time.Now() + errf := func(err error) { f.logger.With().Warning("failed to send batch", - log.Stringer("batch_hash", batch.ID), - log.Err(err)) + log.Stringer("batch_hash", peer), log.Err(err), + ) + f.peers.OnFailure(peer) f.handleHashError(batch.ID, err) } - - bytes, err := codec.Encode(&batch.RequestBatch) + err := f.servers[hashProtocol].Request( + f.shutdownCtx, + peer, + codec.MustEncode(&batch.RequestBatch), + func(buf []byte) { + f.peers.OnLatency(peer, time.Since(start)) + f.receiveResponse(buf) + }, + errf, + ) if err != nil { - f.logger.With().Panic("failed to encode batch", log.Err(err)) - } - - // try sending batch to provided peer - retries := 0 - for { - if f.stopped() { - return nil - } - - f.logger.With().Debug("sending batched request to peer", - log.Stringer("batch_hash", batch.ID), - log.Int("num_requests", len(batch.Requests)), - log.Stringer("peer", p)) - - err = f.servers[hashProtocol].Request(f.shutdownCtx, p, bytes, f.receiveResponse, errorFunc) - if err == nil { - break - } - - retries++ - if retries > f.cfg.MaxRetriesForPeer { - f.handleHashError(batch.ID, fmt.Errorf("batched request failed w retries: %w", err)) - break - } + errf(err) } - return err } // handleHashError is called when an error occurred processing batches of the following hashes. @@ -580,7 +607,8 @@ func (f *Fetch) handleHashError(batchHash types.Hash32, err error) { for _, br := range batch.Requests { req, ok := f.ongoing[br.Hash] if !ok { - f.logger.With().Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash)) + f.logger.With(). + Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash)) continue } f.logger.WithContext(req.ctx).With().Warning("hash request failed", @@ -596,7 +624,12 @@ func (f *Fetch) handleHashError(batchHash types.Hash32, err error) { // getHash is the regular buffered call to get a specific hash, using provided hash, h as hint the receiving end will // know where to look for the hash, this function returns HashDataPromiseResult channel that will hold Data received or error. -func (f *Fetch) getHash(ctx context.Context, hash types.Hash32, h datastore.Hint, receiver dataReceiver) (*promise, error) { +func (f *Fetch) getHash( + ctx context.Context, + hash types.Hash32, + h datastore.Hint, + receiver dataReceiver, +) (*promise, error) { if f.stopped() { return nil, f.shutdownCtx.Err() } @@ -650,10 +683,6 @@ func (f *Fetch) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) { f.hashToPeers.RegisterPeerHashes(peer, hashes) } -func (f *Fetch) GetPeers() []p2p.Peer { - return f.host.GetPeers() -} - -func (f *Fetch) PeerProtocols(p p2p.Peer) ([]protocol.ID, error) { - return f.host.PeerProtocols(p) +func (f *Fetch) SelectBest(n int) []p2p.Peer { + return f.peers.SelectBest(n) } diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 7287432b95..6a74cd4078 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -66,14 +66,14 @@ func createFetch(tb testing.TB) *testFetch { mPoetH: mocks.NewMockSyncValidator(ctrl), } cfg := Config{ - BatchTimeout: time.Millisecond * time.Duration(2000), // make sure we never hit the batch timeout - MaxRetriesForPeer: 3, + BatchTimeout: 2 * time.Second, // make sure we never hit the batch timeout BatchSize: 3, QueueSize: 1000, - RequestTimeout: time.Second * time.Duration(3), + RequestTimeout: 3 * time.Second, MaxRetriesForRequest: 3, } lg := logtest.New(tb) + tf.Fetch = NewFetch(datastore.NewCachedDB(sql.InMemory(), lg), tf.mMesh, nil, nil, WithContext(context.TODO()), WithConfig(cfg), @@ -87,7 +87,17 @@ func createFetch(tb testing.TB) *testFetch { OpnProtocol: tf.mOpn2S, }), withHost(tf.mh)) - tf.Fetch.SetValidators(tf.mAtxH, tf.mPoetH, tf.mBallotH, tf.mActiveSetH, tf.mBlocksH, tf.mProposalH, tf.mTxBlocksH, tf.mTxProposalH, tf.mMalH) + tf.Fetch.SetValidators( + tf.mAtxH, + tf.mPoetH, + tf.mBallotH, + tf.mActiveSetH, + tf.mBlocksH, + tf.mProposalH, + tf.mTxBlocksH, + tf.mTxProposalH, + tf.mMalH, + ) return tf } @@ -114,7 +124,6 @@ func TestFetch_Start(t *testing.T) { func TestFetch_GetHash(t *testing.T) { f := createFetch(t) - f.mh.EXPECT().Close() require.NoError(t, f.Start()) defer f.Stop() h1 := types.RandomHash() @@ -134,43 +143,6 @@ func TestFetch_GetHash(t *testing.T) { require.NotEqual(t, p1.completed, p2.completed) } -func TestFetch_GetHashPeerNotConnected(t *testing.T) { - f := createFetch(t) - f.cfg.MaxRetriesForRequest = 0 - f.cfg.MaxRetriesForPeer = 0 - peer := p2p.Peer("buddy") - awol := p2p.Peer("notConnected") - f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer}) - f.mh.EXPECT().ID().Return(p2p.Peer("self")) - f.mh.EXPECT().Connected(awol).Return(false) - hsh := types.RandomHash() - f.RegisterPeerHashes(awol, []types.Hash32{hsh}) - - res := ResponseMessage{ - Hash: hsh, - Data: []byte("a"), - } - f.mHashS.EXPECT().Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error { - var rb RequestBatch - err := codec.Decode(req, &rb) - require.NoError(t, err) - resBatch := ResponseBatch{ - ID: rb.ID, - Responses: []ResponseMessage{res}, - } - bts, err := codec.Encode(&resBatch) - require.NoError(t, err) - okFunc(bts) - return nil - }) - - p, err := f.getHash(context.TODO(), hsh, datastore.BlockDB, goodReceiver) - require.NoError(t, err) - f.requestHashBatchFromPeers() - <-p.completed -} - func TestFetch_RequestHashBatchFromPeers(t *testing.T) { tt := []struct { name string @@ -196,10 +168,8 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) { f := createFetch(t) f.cfg.MaxRetriesForRequest = 0 - f.cfg.MaxRetriesForPeer = 0 peer := p2p.Peer("buddy") - f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer}) - f.mh.EXPECT().Connected(peer).Return(true).AnyTimes() + f.peers.Add(peer) hsh0 := types.RandomHash() res0 := ResponseMessage{ @@ -211,23 +181,25 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) { Hash: hsh1, Data: []byte("b"), } - f.mHashS.EXPECT().Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error { - if tc.nErr != nil { - return tc.nErr - } - var rb RequestBatch - err := codec.Decode(req, &rb) - require.NoError(t, err) - resBatch := ResponseBatch{ - ID: rb.ID, - Responses: []ResponseMessage{res0, res1}, - } - bts, err := codec.Encode(&resBatch) - require.NoError(t, err) - okFunc(bts) - return nil - }) + f.mHashS.EXPECT(). + Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error { + if tc.nErr != nil { + return tc.nErr + } + var rb RequestBatch + err := codec.Decode(req, &rb) + require.NoError(t, err) + resBatch := ResponseBatch{ + ID: rb.ID, + Responses: []ResponseMessage{res0, res1}, + } + bts, err := codec.Encode(&resBatch) + require.NoError(t, err) + okFunc(bts) + return nil + }) var p0, p1 []*promise // query each hash twice @@ -262,7 +234,6 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) { func TestFetch_GetHash_StartStopSanity(t *testing.T) { f := createFetch(t) - f.mh.EXPECT().Close() require.NoError(t, f.Start()) f.Stop() } @@ -270,39 +241,41 @@ func TestFetch_GetHash_StartStopSanity(t *testing.T) { func TestFetch_Loop_BatchRequestMax(t *testing.T) { f := createFetch(t) f.cfg.BatchTimeout = 1 - f.cfg.MaxRetriesForPeer = 2 f.cfg.BatchSize = 2 peer := p2p.Peer("buddy") - f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer}) + f.peers.Add(peer) h1 := types.RandomHash() h2 := types.RandomHash() h3 := types.RandomHash() - f.mHashS.EXPECT().Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error { - var rb RequestBatch - err := codec.Decode(req, &rb) - require.NoError(t, err) - resps := make([]ResponseMessage, 0, len(rb.Requests)) - for _, r := range rb.Requests { - resps = append(resps, ResponseMessage{ - Hash: r.Hash, - Data: []byte("a"), - }) - } - resBatch := ResponseBatch{ - ID: rb.ID, - Responses: resps, - } - bts, err := codec.Encode(&resBatch) - require.NoError(t, err) - okFunc(bts) - return nil - }).Times(2) // 3 requests with batch size 2 -> 2 sends + f.mHashS.EXPECT(). + Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error { + var rb RequestBatch + err := codec.Decode(req, &rb) + require.NoError(t, err) + resps := make([]ResponseMessage, 0, len(rb.Requests)) + for _, r := range rb.Requests { + resps = append(resps, ResponseMessage{ + Hash: r.Hash, + Data: []byte("a"), + }) + } + resBatch := ResponseBatch{ + ID: rb.ID, + Responses: resps, + } + bts, err := codec.Encode(&resBatch) + require.NoError(t, err) + okFunc(bts) + return nil + }). + Times(2) + // 3 requests with batch size 2 -> 2 sends hint := datastore.POETDB - f.mh.EXPECT().Close() defer f.Stop() require.NoError(t, f.Start()) p1, err := f.getHash(context.TODO(), h1, hint, goodReceiver) @@ -355,8 +328,7 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() cfg := Config{ - BatchTimeout: time.Minute * time.Duration(2000), // make sure we never hit the batch timeout - MaxRetriesForPeer: 3, + BatchTimeout: 2000 * time.Minute, // make sure we never hit the batch timeout BatchSize: 3, QueueSize: 1000, RequestTimeout: time.Second * time.Duration(3), @@ -411,11 +383,18 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) { t.Cleanup(fetcher.Stop) // We set a validatior just for atxs, this validator does not drop connections - vf := ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return pubsub.ErrValidationReject }) + vf := ValidatorFunc( + func(context.Context, types.Hash32, peer.ID, []byte) error { return pubsub.ErrValidationReject }, + ) fetcher.SetValidators(vf, nil, nil, nil, nil, nil, nil, nil, nil) // Request an atx by hash - _, err = fetcher.getHash(ctx, types.Hash32{}, datastore.ATXDB, fetcher.validators.atx.HandleMessage) + _, err = fetcher.getHash( + ctx, + types.Hash32{}, + datastore.ATXDB, + fetcher.validators.atx.HandleMessage, + ) require.NoError(t, err) fetcher.requestHashBatchFromPeers() @@ -427,10 +406,25 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) { } // Now wrap the atx validator with DropPeerOnValidationReject and set it again - fetcher.SetValidators(ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(vf, h, lg)), nil, nil, nil, nil, nil, nil, nil, nil) + fetcher.SetValidators( + ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(vf, h, lg)), + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + ) // Request an atx by hash - _, err = fetcher.getHash(ctx, types.Hash32{}, datastore.ATXDB, fetcher.validators.atx.HandleMessage) + _, err = fetcher.getHash( + ctx, + types.Hash32{}, + datastore.ATXDB, + fetcher.validators.atx.HandleMessage, + ) require.NoError(t, err) fetcher.requestHashBatchFromPeers() diff --git a/fetch/interface.go b/fetch/interface.go index 0aaa289081..759397b4b6 100644 --- a/fetch/interface.go +++ b/fetch/interface.go @@ -3,8 +3,6 @@ package fetch import ( "context" - "github.com/libp2p/go-libp2p/core/protocol" - "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/pubsub" @@ -22,7 +20,12 @@ type requester interface { // directly and do away with both ValidatorFunc and SyncValidator. type ValidatorFunc pubsub.SyncHandler -func (f ValidatorFunc) HandleMessage(ctx context.Context, hash types.Hash32, peer p2p.Peer, msg []byte) error { +func (f ValidatorFunc) HandleMessage( + ctx context.Context, + hash types.Hash32, + peer p2p.Peer, + msg []byte, +) error { return f(ctx, hash, peer, msg) } @@ -42,8 +45,4 @@ type meshProvider interface { type host interface { ID() p2p.Peer - GetPeers() []p2p.Peer - Connected(p2p.Peer) bool - PeerProtocols(p2p.Peer) ([]protocol.ID, error) - Close() error } diff --git a/fetch/mesh_data_test.go b/fetch/mesh_data_test.go index f945eae9be..44df40ff76 100644 --- a/fetch/mesh_data_test.go +++ b/fetch/mesh_data_test.go @@ -41,7 +41,9 @@ func (f *testFetch) withMethod(method int) *testFetch { func (f *testFetch) expectTransactionCall(times int) *gomock.Call { if f.method == txsForBlock { - return f.mTxBlocksH.EXPECT().HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(times) + return f.mTxBlocksH.EXPECT(). + HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Times(times) } else if f.method == txsForProposal { return f.mTxProposalH.EXPECT().HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(times) } @@ -145,10 +147,10 @@ func TestFetch_getHashes(t *testing.T) { f.cfg.QueueSize = 3 f.cfg.BatchSize = 2 f.cfg.MaxRetriesForRequest = 0 - f.cfg.MaxRetriesForPeer = 0 - f.mh.EXPECT().Connected(gomock.Any()).Return(true).AnyTimes() peers := []p2p.Peer{p2p.Peer("buddy 0"), p2p.Peer("buddy 1")} - f.mh.EXPECT().GetPeers().Return(peers) + for _, peer := range peers { + f.peers.Add(peer) + } f.mh.EXPECT().ID().Return(p2p.Peer("self")).AnyTimes() f.RegisterPeerHashes(peers[0], hashes[:2]) f.RegisterPeerHashes(peers[1], hashes[2:]) @@ -161,30 +163,40 @@ func TestFetch_getHashes(t *testing.T) { } responses[h] = res } - f.mHashS.EXPECT().Request(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, p p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error { - var rb RequestBatch - err := codec.Decode(req, &rb) - require.NoError(t, err) - - resBatch := ResponseBatch{ - ID: rb.ID, - } - for _, r := range rb.Requests { - if _, ok := tc.fetchErrs[r.Hash]; ok { - continue + f.mHashS.EXPECT(). + Request(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ context.Context, p p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error { + var rb RequestBatch + err := codec.Decode(req, &rb) + require.NoError(t, err) + + resBatch := ResponseBatch{ + ID: rb.ID, } - res := responses[r.Hash] - resBatch.Responses = append(resBatch.Responses, res) - f.mBlocksH.EXPECT().HandleMessage(gomock.Any(), res.Hash, p, res.Data).Return(tc.hdlrErr) - } - bts, err := codec.Encode(&resBatch) - require.NoError(t, err) - okFunc(bts) - return nil - }).Times(len(peers)) - - got := f.getHashes(context.Background(), hashes, datastore.BlockDB, f.validators.block.HandleMessage) + for _, r := range rb.Requests { + if _, ok := tc.fetchErrs[r.Hash]; ok { + continue + } + res := responses[r.Hash] + resBatch.Responses = append(resBatch.Responses, res) + f.mBlocksH.EXPECT(). + HandleMessage(gomock.Any(), res.Hash, p, res.Data). + Return(tc.hdlrErr) + } + bts, err := codec.Encode(&resBatch) + require.NoError(t, err) + okFunc(bts) + return nil + }). + Times(len(peers)) + + got := f.getHashes( + context.Background(), + hashes, + datastore.BlockDB, + f.validators.block.HandleMessage, + ) if len(tc.fetchErrs) > 0 || tc.hdlrErr != nil { require.NotEmpty(t, got) } else { @@ -197,7 +209,10 @@ func TestFetch_getHashes(t *testing.T) { func TestFetch_GetMalfeasanceProofs(t *testing.T) { nodeIDs := []types.NodeID{{1}, {2}, {3}} f := createFetch(t) - f.mMalH.EXPECT().HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(len(nodeIDs)) + f.mMalH.EXPECT(). + HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil). + Times(len(nodeIDs)) stop := make(chan struct{}, 1) var eg errgroup.Group @@ -215,7 +230,10 @@ func TestFetch_GetBlocks(t *testing.T) { } blockIDs := types.ToBlockIDs(blks) f := createFetch(t) - f.mBlocksH.EXPECT().HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(len(blockIDs)) + f.mBlocksH.EXPECT(). + HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil). + Times(len(blockIDs)) stop := make(chan struct{}, 1) var eg errgroup.Group @@ -233,7 +251,10 @@ func TestFetch_GetBallots(t *testing.T) { } ballotIDs := types.ToBallotIDs(blts) f := createFetch(t) - f.mBallotH.EXPECT().HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(len(ballotIDs)) + f.mBallotH.EXPECT(). + HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil). + Times(len(ballotIDs)) stop := make(chan struct{}, 1) var eg errgroup.Group @@ -244,7 +265,11 @@ func TestFetch_GetBallots(t *testing.T) { require.NoError(t, eg.Wait()) } -func genLayerProposal(tb testing.TB, layerID types.LayerID, txs []types.TransactionID) *types.Proposal { +func genLayerProposal( + tb testing.TB, + layerID types.LayerID, + txs []types.TransactionID, +) *types.Proposal { tb.Helper() p := &types.Proposal{ InnerProposal: types.InnerProposal{ @@ -298,7 +323,10 @@ func TestFetch_GetProposals(t *testing.T) { } proposalIDs := types.ToProposalIDs(proposals) f := createFetch(t) - f.mProposalH.EXPECT().HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(len(proposalIDs)) + f.mProposalH.EXPECT(). + HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil). + Times(len(proposalIDs)) stop := make(chan struct{}, 1) var eg errgroup.Group @@ -309,7 +337,12 @@ func TestFetch_GetProposals(t *testing.T) { require.NoError(t, eg.Wait()) } -func genTx(tb testing.TB, signer *signing.EdSigner, dest types.Address, amount, nonce, price uint64) types.Transaction { +func genTx( + tb testing.TB, + signer *signing.EdSigner, + dest types.Address, + amount, nonce, price uint64, +) types.Transaction { tb.Helper() raw := wallet.Spend(signer.PrivateKey(), dest, amount, nonce, @@ -375,7 +408,13 @@ func genATXs(tb testing.TB, num uint32) []*types.ActivationTx { require.NoError(tb, err) atxs := make([]*types.ActivationTx, 0, num) for i := uint32(0); i < num; i++ { - atx := types.NewActivationTx(types.NIPostChallenge{}, types.Address{1, 2, 3}, &types.NIPost{}, i, nil) + atx := types.NewActivationTx( + types.NIPostChallenge{}, + types.Address{1, 2, 3}, + &types.NIPost{}, + i, + nil, + ) require.NoError(tb, activation.SignAndFinalizeAtx(sig, atx)) atxs = append(atxs, atx) } @@ -385,7 +424,10 @@ func genATXs(tb testing.TB, num uint32) []*types.ActivationTx { func TestGetATXs(t *testing.T) { atxs := genATXs(t, 2) f := createFetch(t) - f.mAtxH.EXPECT().HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(len(atxs)) + f.mAtxH.EXPECT(). + HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil). + Times(len(atxs)) stop := make(chan struct{}, 1) var eg errgroup.Group @@ -399,7 +441,9 @@ func TestGetATXs(t *testing.T) { func TestGetActiveSet(t *testing.T) { f := createFetch(t) - f.mActiveSetH.EXPECT().HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + f.mActiveSetH.EXPECT(). + HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil) stop := make(chan struct{}, 1) var eg errgroup.Group @@ -413,7 +457,9 @@ func TestGetActiveSet(t *testing.T) { func TestGetPoetProof(t *testing.T) { f := createFetch(t) h := types.RandomHash() - f.mPoetH.EXPECT().HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + f.mPoetH.EXPECT(). + HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil) stop := make(chan struct{}, 1) var eg errgroup.Group @@ -468,15 +514,17 @@ func TestFetch_GetMaliciousIDs(t *testing.T) { expErr++ } idx := i - f.mMalS.EXPECT().Request(gomock.Any(), p, []byte{}, gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ p2p.Peer, _ []byte, okCB func([]byte), errCB func(error)) error { - if tc.errs[idx] == nil { - go okCB(generateMaliciousIDs(t)) - } else { - go errCB(tc.errs[idx]) - } - return nil - }) + f.mMalS.EXPECT(). + Request(gomock.Any(), p, []byte{}, gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ context.Context, _ p2p.Peer, _ []byte, okCB func([]byte), errCB func(error)) error { + if tc.errs[idx] == nil { + go okCB(generateMaliciousIDs(t)) + } else { + go errCB(tc.errs[idx]) + } + return nil + }) } require.NoError(t, f.GetMaliciousIDs(context.Background(), peers, okFunc, errFunc)) wg.Wait() @@ -530,17 +578,22 @@ func TestFetch_GetLayerData(t *testing.T) { expErr++ } idx := i - f.mLyrS.EXPECT().Request(gomock.Any(), p, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ p2p.Peer, _ []byte, okCB func([]byte), errCB func(error)) error { - if tc.errs[idx] == nil { - go okCB(generateLayerContent(t)) - } else { - go errCB(tc.errs[idx]) - } - return nil - }) + f.mLyrS.EXPECT(). + Request(gomock.Any(), p, gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ context.Context, _ p2p.Peer, _ []byte, okCB func([]byte), errCB func(error)) error { + if tc.errs[idx] == nil { + go okCB(generateLayerContent(t)) + } else { + go errCB(tc.errs[idx]) + } + return nil + }) } - require.NoError(t, f.GetLayerData(context.Background(), peers, types.LayerID(111), okFunc, errFunc)) + require.NoError( + t, + f.GetLayerData(context.Background(), peers, types.LayerID(111), okFunc, errFunc), + ) wg.Wait() require.Len(t, oks, expOk) require.Len(t, errs, expErr) @@ -582,17 +635,19 @@ func Test_PeerEpochInfo(t *testing.T) { f := createFetch(t) f.mh.EXPECT().ID().Return(p2p.Peer("self")).AnyTimes() var expected *EpochData - f.mAtxS.EXPECT().Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ p2p.Peer, req []byte, okCB func([]byte), errCB func(error)) error { - if tc.err == nil { - var data []byte - expected, data = generateEpochData(t) - okCB(data) - } else { - errCB(tc.err) - } - return nil - }) + f.mAtxS.EXPECT(). + Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ context.Context, _ p2p.Peer, req []byte, okCB func([]byte), errCB func(error)) error { + if tc.err == nil { + var data []byte + expected, data = generateEpochData(t) + okCB(data) + } else { + errCB(tc.err) + } + return nil + }) got, err := f.PeerEpochInfo(context.Background(), peer, types.EpochID(111)) require.ErrorIs(t, err, tc.err) if tc.err == nil { @@ -644,18 +699,20 @@ func TestFetch_GetMeshHashes(t *testing.T) { } reqData, err := codec.Encode(req) require.NoError(t, err) - f.mMHashS.EXPECT().Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ p2p.Peer, gotReq []byte, okCB func([]byte), errCB func(error)) error { - require.Equal(t, reqData, gotReq) - if tc.err == nil { - data, err := codec.EncodeSlice(expected.Hashes) - require.NoError(t, err) - okCB(data) - } else { - errCB(tc.err) - } - return nil - }) + f.mMHashS.EXPECT(). + Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ context.Context, _ p2p.Peer, gotReq []byte, okCB func([]byte), errCB func(error)) error { + require.Equal(t, reqData, gotReq) + if tc.err == nil { + data, err := codec.EncodeSlice(expected.Hashes) + require.NoError(t, err) + okCB(data) + } else { + errCB(tc.err) + } + return nil + }) got, err := f.PeerMeshHashes(context.Background(), peer, req) if tc.err == nil { require.NoError(t, err) @@ -706,18 +763,20 @@ func TestFetch_GetCert(t *testing.T) { require.NoError(t, err) for i, peer := range peers { ith := i - f.mOpn2S.EXPECT().Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ p2p.Peer, gotReq []byte, okCB func([]byte), errCB func(error)) error { - require.Equal(t, reqData, gotReq) - if tc.results[ith] == nil { - data, err := codec.Encode(&expected) - require.NoError(t, err) - okCB(data) - } else { - errCB(tc.results[ith]) - } - return nil - }) + f.mOpn2S.EXPECT(). + Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ context.Context, _ p2p.Peer, gotReq []byte, okCB func([]byte), errCB func(error)) error { + require.Equal(t, reqData, gotReq) + if tc.results[ith] == nil { + data, err := codec.Encode(&expected) + require.NoError(t, err) + okCB(data) + } else { + errCB(tc.results[ith]) + } + return nil + }) if tc.stop > 0 && tc.stop == i { break } diff --git a/fetch/mocks/mocks.go b/fetch/mocks/mocks.go index f65a0649e2..4b2245ca59 100644 --- a/fetch/mocks/mocks.go +++ b/fetch/mocks/mocks.go @@ -12,7 +12,6 @@ import ( context "context" reflect "reflect" - protocol "github.com/libp2p/go-libp2p/core/protocol" types "github.com/spacemeshos/go-spacemesh/common/types" p2p "github.com/spacemeshos/go-spacemesh/p2p" gomock "go.uber.org/mock/gomock" @@ -285,120 +284,6 @@ func (m *Mockhost) EXPECT() *MockhostMockRecorder { return m.recorder } -// Close mocks base method. -func (m *Mockhost) Close() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 -} - -// Close indicates an expected call of Close. -func (mr *MockhostMockRecorder) Close() *hostCloseCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*Mockhost)(nil).Close)) - return &hostCloseCall{Call: call} -} - -// hostCloseCall wrap *gomock.Call -type hostCloseCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *hostCloseCall) Return(arg0 error) *hostCloseCall { - c.Call = c.Call.Return(arg0) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *hostCloseCall) Do(f func() error) *hostCloseCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *hostCloseCall) DoAndReturn(f func() error) *hostCloseCall { - c.Call = c.Call.DoAndReturn(f) - return c -} - -// Connected mocks base method. -func (m *Mockhost) Connected(arg0 p2p.Peer) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Connected", arg0) - ret0, _ := ret[0].(bool) - return ret0 -} - -// Connected indicates an expected call of Connected. -func (mr *MockhostMockRecorder) Connected(arg0 any) *hostConnectedCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connected", reflect.TypeOf((*Mockhost)(nil).Connected), arg0) - return &hostConnectedCall{Call: call} -} - -// hostConnectedCall wrap *gomock.Call -type hostConnectedCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *hostConnectedCall) Return(arg0 bool) *hostConnectedCall { - c.Call = c.Call.Return(arg0) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *hostConnectedCall) Do(f func(p2p.Peer) bool) *hostConnectedCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *hostConnectedCall) DoAndReturn(f func(p2p.Peer) bool) *hostConnectedCall { - c.Call = c.Call.DoAndReturn(f) - return c -} - -// GetPeers mocks base method. -func (m *Mockhost) GetPeers() []p2p.Peer { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPeers") - ret0, _ := ret[0].([]p2p.Peer) - return ret0 -} - -// GetPeers indicates an expected call of GetPeers. -func (mr *MockhostMockRecorder) GetPeers() *hostGetPeersCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeers", reflect.TypeOf((*Mockhost)(nil).GetPeers)) - return &hostGetPeersCall{Call: call} -} - -// hostGetPeersCall wrap *gomock.Call -type hostGetPeersCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *hostGetPeersCall) Return(arg0 []p2p.Peer) *hostGetPeersCall { - c.Call = c.Call.Return(arg0) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *hostGetPeersCall) Do(f func() []p2p.Peer) *hostGetPeersCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *hostGetPeersCall) DoAndReturn(f func() []p2p.Peer) *hostGetPeersCall { - c.Call = c.Call.DoAndReturn(f) - return c -} - // ID mocks base method. func (m *Mockhost) ID() p2p.Peer { m.ctrl.T.Helper() @@ -436,42 +321,3 @@ func (c *hostIDCall) DoAndReturn(f func() p2p.Peer) *hostIDCall { c.Call = c.Call.DoAndReturn(f) return c } - -// PeerProtocols mocks base method. -func (m *Mockhost) PeerProtocols(arg0 p2p.Peer) ([]protocol.ID, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PeerProtocols", arg0) - ret0, _ := ret[0].([]protocol.ID) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// PeerProtocols indicates an expected call of PeerProtocols. -func (mr *MockhostMockRecorder) PeerProtocols(arg0 any) *hostPeerProtocolsCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerProtocols", reflect.TypeOf((*Mockhost)(nil).PeerProtocols), arg0) - return &hostPeerProtocolsCall{Call: call} -} - -// hostPeerProtocolsCall wrap *gomock.Call -type hostPeerProtocolsCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *hostPeerProtocolsCall) Return(arg0 []protocol.ID, arg1 error) *hostPeerProtocolsCall { - c.Call = c.Call.Return(arg0, arg1) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *hostPeerProtocolsCall) Do(f func(p2p.Peer) ([]protocol.ID, error)) *hostPeerProtocolsCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *hostPeerProtocolsCall) DoAndReturn(f func(p2p.Peer) ([]protocol.ID, error)) *hostPeerProtocolsCall { - c.Call = c.Call.DoAndReturn(f) - return c -} diff --git a/fetch/peers/peers.go b/fetch/peers/peers.go new file mode 100644 index 0000000000..e57d8ac712 --- /dev/null +++ b/fetch/peers/peers.go @@ -0,0 +1,156 @@ +package peers + +import ( + "strings" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + + "github.com/spacemeshos/go-spacemesh/p2p" +) + +type data struct { + id peer.ID + success, failures int + rate float64 + averageLatency float64 +} + +func (p *data) successRate() float64 { + return float64(p.success) / float64(p.success+p.failures) +} + +func (p *data) cmp(other *data) int { + if p == nil && other != nil { + return -1 + } + const rateThreshold = 0.1 + switch { + case p.rate-other.rate > rateThreshold: + return 1 + case other.rate-p.rate > rateThreshold: + return -1 + } + switch { + case p.averageLatency < other.averageLatency: + return 1 + case p.averageLatency > other.averageLatency: + return -1 + } + return strings.Compare(string(p.id), string(other.id)) +} + +func New() *Peers { + return &Peers{peers: map[peer.ID]*data{}} +} + +type Peers struct { + mu sync.Mutex + peers map[peer.ID]*data +} + +func (p *Peers) Add(id peer.ID) { + p.mu.Lock() + defer p.mu.Unlock() + _, exist := p.peers[id] + if exist { + return + } + peer := &data{id: id} + p.peers[id] = peer +} + +func (p *Peers) Delete(id peer.ID) { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.peers, id) +} + +func (p *Peers) OnFailure(id peer.ID) { + p.mu.Lock() + defer p.mu.Unlock() + peer, exist := p.peers[id] + if !exist { + return + } + peer.failures++ + peer.rate = peer.successRate() +} + +// OnLatency records success and latency. Latency is not reported with every success +// as some requests has different amount of work and data, and we want to measure something +// comparable. +func (p *Peers) OnLatency(id peer.ID, latency time.Duration) { + p.mu.Lock() + defer p.mu.Unlock() + peer, exist := p.peers[id] + if !exist { + return + } + peer.success++ + peer.rate = peer.successRate() + if peer.averageLatency != 0 { + peer.averageLatency = 0.8*peer.averageLatency + 0.2*float64(latency) + } else { + peer.averageLatency = float64(latency) + } +} + +// SelectBest peer with preferences. +func (p *Peers) SelectBestFrom(peers []peer.ID) peer.ID { + p.mu.Lock() + defer p.mu.Unlock() + var best *data + for _, peer := range peers { + pdata, exist := p.peers[peer] + if !exist { + continue + } + if best.cmp(pdata) == -1 { + best = pdata + } + } + if best != nil { + return best.id + } + return p2p.NoPeer +} + +// SelectBest selects at most n peers sorted by responsiveness and latency. +// +// SelectBest parametrized by N because sync protocol relies on receiving data from redundant +// connections to guarantee that it will get complete data set. +// If it doesn't get complete data set it will have to fallback into hash resolution, which is +// generally more expensive. +func (p *Peers) SelectBest(n int) []peer.ID { + p.mu.Lock() + defer p.mu.Unlock() + lth := min(len(p.peers), n) + if lth == 0 { + return nil + } + cache := make([]*data, 0, lth) + for _, peer := range p.peers { + worst := peer + for i := range cache { + if cache[i].cmp(worst) == -1 { + cache[i], worst = worst, cache[i] + } + } + if len(cache) < cap(cache) { + cache = append(cache, worst) + } + } + rst := make([]peer.ID, len(cache)) + for i := range rst { + rst[i] = cache[i].id + } + return rst +} + +func (p *Peers) Total() int { + p.mu.Lock() + defer p.mu.Unlock() + return len(p.peers) +} diff --git a/fetch/peers/peers_test.go b/fetch/peers/peers_test.go new file mode 100644 index 0000000000..2217e9e55d --- /dev/null +++ b/fetch/peers/peers_test.go @@ -0,0 +1,237 @@ +package peers + +import ( + "math/rand" + "strconv" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +type event struct { + id peer.ID + add, delete bool + success int + failure int + latency time.Duration +} + +func withEvents(events []event) *Peers { + tracker := New() + for _, ev := range events { + if ev.delete { + tracker.Delete(ev.id) + } else if ev.add { + tracker.Add(ev.id) + } + for i := 0; i < ev.failure; i++ { + tracker.OnFailure(ev.id) + } + for i := 0; i < ev.success; i++ { + tracker.OnLatency(ev.id, ev.latency) + } + } + return tracker +} + +func TestSelect(t *testing.T) { + for _, tc := range []struct { + desc string + events []event + + n int + expect []peer.ID + + from []peer.ID + best peer.ID + }{ + { + desc: "ordered by rate", + events: []event{ + {id: "b", success: 100, failure: 30, add: true}, + {id: "c", success: 100, failure: 0, add: true}, + {id: "a", success: 80, failure: 80, add: true}, + }, + n: 5, + expect: []peer.ID{"c", "b", "a"}, + from: []peer.ID{"a", "b"}, + best: peer.ID("b"), + }, + { + desc: "ordered by rate no best", + events: []event{ + {id: "b", success: 100, failure: 30, add: true}, + {id: "c", success: 100, failure: 0, add: true}, + {id: "a", success: 80, failure: 80, add: true}, + }, + n: 5, + expect: []peer.ID{"c", "b", "a"}, + from: []peer.ID{"d", "e"}, + best: "", + }, + { + desc: "ordered by latency within threshold", + events: []event{ + {id: "b", success: 100, latency: 10, add: true}, + {id: "c", success: 95, latency: 5, add: true}, + {id: "a", success: 90, latency: 4, add: true}, + }, + n: 5, + expect: []peer.ID{"a", "c", "b"}, + from: []peer.ID{"c", "a"}, + best: peer.ID("a"), + }, + { + desc: "latency computed with moving average", + events: []event{ + {id: "a", success: 100, latency: 8, add: true}, + {id: "b", success: 100, latency: 9, add: true}, + {id: "a", success: 1, latency: 10, add: true}, + }, + n: 5, + expect: []peer.ID{"a", "b"}, + from: []peer.ID{"a", "b"}, + best: peer.ID("a"), + }, + { + desc: "latency computed with moving average", + events: []event{ + {id: "a", success: 100, latency: 8, add: true}, + {id: "b", success: 100, latency: 9, add: true}, + {id: "a", success: 1, latency: 14}, + }, + n: 5, + expect: []peer.ID{"b", "a"}, + from: []peer.ID{"a", "b"}, + best: peer.ID("b"), + }, + { + desc: "total number is larger then capacity", + events: []event{ + {id: "a", success: 100, add: true}, + {id: "b", success: 80, failure: 20, add: true}, + {id: "c", success: 60, failure: 40, add: true}, + {id: "d", success: 40, failure: 60, add: true}, + }, + n: 2, + expect: []peer.ID{"a", "b"}, + }, + { + desc: "total number is larger then capacity", + events: []event{ + {id: "a", success: 100, add: true}, + {id: "b", success: 80, failure: 20, add: true}, + {id: "c", success: 60, failure: 40, add: true}, + {id: "d", success: 40, failure: 60, add: true}, + }, + n: 2, + expect: []peer.ID{"a", "b"}, + }, + { + desc: "deleted are not in the list", + events: []event{ + {id: "a", success: 100, add: true}, + {id: "b", success: 80, failure: 20, add: true}, + {id: "c", success: 60, failure: 40, add: true}, + {id: "d", success: 40, failure: 60, add: true}, + {id: "b", delete: true}, + {id: "a", delete: true}, + }, + n: 4, + expect: []peer.ID{"c", "d"}, + from: []peer.ID{"a", "b", "c", "d"}, + best: peer.ID("c"), + }, + { + desc: "empty", + n: 4, + }, + { + desc: "request empty", + events: []event{ + {id: "a", success: 100, add: true}, + }, + n: 0, + }, + { + desc: "no success rate", + events: []event{ + {id: "a", add: true}, + {id: "b", add: true}, + }, + n: 2, + expect: []peer.ID{"b", "a"}, + }, + { + desc: "events for nonexisting", + events: []event{ + {id: "a", success: 100, failure: 100}, + }, + n: 2, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal( + t, + tc.expect, + withEvents(tc.events).SelectBest(tc.n), + "select best %d", + tc.n, + ) + if tc.from != nil { + require.Equal( + t, + tc.best, + withEvents(tc.events).SelectBestFrom(tc.from), + "select best (%v) from %v", + tc.best, + tc.from, + ) + } + }) + } +} + +func TestTotal(t *testing.T) { + const total = 100 + events := []event{} + for i := 0; i < total; i++ { + events = append( + events, event{id: peer.ID(strconv.Itoa(i)), add: true}, + ) + } + require.Equal(t, total, withEvents(events).Total()) +} + +func BenchmarkSelectBest(b *testing.B) { + const ( + total = 10000 + target = 10 + ) + events := []event{} + rng := rand.New(rand.NewSource(10001)) + + for i := 0; i < total; i++ { + events = append( + events, + event{ + id: peer.ID(strconv.Itoa(i)), + success: rng.Intn(100), + failure: rng.Intn(100), + add: true, + }, + ) + } + tracker := withEvents(events) + require.Equal(b, total, tracker.Total()) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + best := tracker.SelectBest(target) + if len(best) != target { + b.Fail() + } + } +} diff --git a/node/node_test.go b/node/node_test.go index e1d2a42591..8331d49a0d 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -157,7 +157,10 @@ func TestSpacemeshApp_SetLoggers(t *testing.T) { app.log = app.addLogger(mylogger, myLog) msg1 := "hi there" app.log.Info(msg1) - r.Equal(fmt.Sprintf("INFO\t%s\t%s\t{\"module\": \"%s\"}\n", mylogger, msg1, mylogger), buf1.String()) + r.Equal( + fmt.Sprintf("INFO\t%s\t%s\t{\"module\": \"%s\"}\n", mylogger, msg1, mylogger), + buf1.String(), + ) r.NoError(app.SetLogLevel(mylogger, "warn")) r.Equal("warn", app.loggers[mylogger].String()) buf1.Reset() @@ -170,7 +173,10 @@ func TestSpacemeshApp_SetLoggers(t *testing.T) { app.log.Info(msg2) // This one should be printed app.log.Warning(msg3) - r.Equal(fmt.Sprintf("WARN\t%s\t%s\t{\"module\": \"%s\"}\n", mylogger, msg3, mylogger), buf1.String()) + r.Equal( + fmt.Sprintf("WARN\t%s\t%s\t{\"module\": \"%s\"}\n", mylogger, msg3, mylogger), + buf1.String(), + ) r.Equal(fmt.Sprintf("INFO\t%s\n", msg1), buf2.String()) buf1.Reset() @@ -179,7 +185,10 @@ func TestSpacemeshApp_SetLoggers(t *testing.T) { msg4 := "nihao" app.log.Info(msg4) r.Equal("info", app.loggers[mylogger].String()) - r.Equal(fmt.Sprintf("INFO\t%s\t%s\t{\"module\": \"%s\"}\n", mylogger, msg4, mylogger), buf1.String()) + r.Equal( + fmt.Sprintf("INFO\t%s\t%s\t{\"module\": \"%s\"}\n", mylogger, msg4, mylogger), + buf1.String(), + ) // test bad logger name r.Error(app.SetLogLevel("anton3", "warn")) @@ -201,7 +210,10 @@ func TestSpacemeshApp_AddLogger(t *testing.T) { subLogger.Debug("should not get printed") teststr := "should get printed" subLogger.Info(teststr) - r.Equal(fmt.Sprintf("INFO\t%s\t%s\t{\"module\": \"%s\"}\n", mylogger, teststr, mylogger), buf.String()) + r.Equal( + fmt.Sprintf("INFO\t%s\t%s\t{\"module\": \"%s\"}\n", mylogger, teststr, mylogger), + buf.String(), + ) } func testArgs(ctx context.Context, root *cobra.Command, args ...string) (string, error) { @@ -309,7 +321,14 @@ func TestSpacemeshApp_GrpcService(t *testing.T) { events.CloseEventReporter() // Test starting the server from the command line - str, err = testArgs(context.Background(), cmdWithRun(run), "--grpc-public-listener", listener, "--grpc-public-services", "node") + str, err = testArgs( + context.Background(), + cmdWithRun(run), + "--grpc-public-listener", + listener, + "--grpc-public-services", + "node", + ) r.Empty(str) r.NoError(err) r.Equal(listener, app.Config.API.PublicListener) @@ -384,7 +403,14 @@ func TestSpacemeshApp_JsonService(t *testing.T) { // Test starting the JSON server from the commandline // uses Cmd.Run from above listener := "127.0.0.1:1234" - str, err := testArgs(context.Background(), cmdWithRun(run), "--grpc-public-services", "node", "--grpc-json-listener", listener) + str, err := testArgs( + context.Background(), + cmdWithRun(run), + "--grpc-public-services", + "node", + "--grpc-json-listener", + listener, + ) r.Empty(str) r.NoError(err) defer app.stopServices(context.Background()) @@ -396,7 +422,11 @@ func TestSpacemeshApp_JsonService(t *testing.T) { respStatus int ) require.Eventually(t, func() bool { - respBody, respStatus = callEndpoint(t, fmt.Sprintf("http://%s/v1/node/echo", app.Config.API.JSONListener), payload) + respBody, respStatus = callEndpoint( + t, + fmt.Sprintf("http://%s/v1/node/echo", app.Config.API.JSONListener), + payload, + ) return respStatus == http.StatusOK }, 2*time.Second, 100*time.Millisecond) var msg pb.EchoResponse @@ -410,7 +440,10 @@ func TestSpacemeshApp_JsonService(t *testing.T) { // E2E app test of the stream endpoints in the NodeService. func TestSpacemeshApp_NodeService(t *testing.T) { logger := logtest.New(t) - errlog := log.RegisterHooks(logtest.New(t, zap.ErrorLevel), events.EventHook()) // errlog is used to simulate errors in the app + errlog := log.RegisterHooks( + logtest.New(t, zap.ErrorLevel), + events.EventHook(), + ) // errlog is used to simulate errors in the app // Use a unique port port := 1240 @@ -455,7 +488,16 @@ func TestSpacemeshApp_NodeService(t *testing.T) { // If there's an error in the args, it will return immediately. var eg errgroup.Group eg.Go(func() error { - str, err := testArgs(ctx, cmdWithRun(run), "--grpc-private-listener", fmt.Sprintf("localhost:%d", port), "--grpc-private-services", "node", "--grpc-public-services", "debug") + str, err := testArgs( + ctx, + cmdWithRun(run), + "--grpc-private-listener", + fmt.Sprintf("localhost:%d", port), + "--grpc-private-services", + "node", + "--grpc-public-services", + "debug", + ) assert.Empty(t, str) assert.NoError(t, err) return nil @@ -593,7 +635,12 @@ func TestSpacemeshApp_TransactionService(t *testing.T) { }() <-app.Started() - require.Eventually(t, func() bool { return app.syncer.IsSynced(ctx) }, 4*time.Second, 10*time.Millisecond) + require.Eventually( + t, + func() bool { return app.syncer.IsSynced(ctx) }, + 4*time.Second, + 10*time.Millisecond, + ) ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -607,7 +654,9 @@ func TestSpacemeshApp_TransactionService(t *testing.T) { t.Cleanup(func() { r.NoError(conn.Close()) }) c := pb.NewTransactionServiceClient(conn) - tx1 := types.NewRawTx(wallet.SelfSpawn(signer.PrivateKey(), 0, sdk.WithGenesisID(cfg.Genesis.GenesisID()))) + tx1 := types.NewRawTx( + wallet.SelfSpawn(signer.PrivateKey(), 0, sdk.WithGenesisID(cfg.Genesis.GenesisID())), + ) stream, err := c.TransactionsStateStream(ctx, &pb.TransactionsStateStreamRequest{ TransactionId: []*pb.TransactionId{{Id: tx1.ID.Bytes()}}, @@ -794,7 +843,9 @@ func TestConfig_CustomTypes(t *testing.T) { cli: "--post-pow-difficulty=00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff", config: `{"post": {"post-pow-difficulty": "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"}}`, updatePreset: func(t *testing.T, c *config.Config) { - diff, err := hex.DecodeString("00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff") + diff, err := hex.DecodeString( + "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff", + ) require.NoError(t, err) copy(c.POST.PowDifficulty[:], diff) }, @@ -921,7 +972,19 @@ func TestConfig_PostProviderID_InvalidValues(t *testing.T) { cmd.AddCommands(c) path := filepath.Join(t.TempDir(), "config.json") - require.NoError(t, os.WriteFile(path, []byte(fmt.Sprintf(`{"smeshing": {"smeshing-opts": {"smeshing-opts-provider": %s}}}`, tc.configValue)), 0o600)) + require.NoError( + t, + os.WriteFile( + path, + []byte( + fmt.Sprintf( + `{"smeshing": {"smeshing-opts": {"smeshing-opts-provider": %s}}}`, + tc.configValue, + ), + ), + 0o600, + ), + ) require.NoError(t, c.ParseFlags([]string{"--config=" + path})) t.Cleanup(cmd.ResetConfig) @@ -1008,7 +1071,10 @@ func TestGenesisConfig(t *testing.T) { t.Cleanup(func() { app.Cleanup(context.Background()) }) var existing config.GenesisConfig - require.NoError(t, existing.LoadFromFile(filepath.Join(app.Config.DataDir(), genesisFileName))) + require.NoError( + t, + existing.LoadFromFile(filepath.Join(app.Config.DataDir(), genesisFileName)), + ) require.Empty(t, existing.Diff(app.Config.Genesis)) }) @@ -1195,7 +1261,6 @@ func getTestDefaultConfig(tb testing.TB) *config.Config { cfg.FileLock = filepath.Join(tmp, "LOCK") cfg.FETCH.RequestTimeout = 10 - cfg.FETCH.MaxRetriesForPeer = 5 cfg.FETCH.BatchSize = 5 cfg.FETCH.BatchTimeout = 5 diff --git a/syncer/data_fetch.go b/syncer/data_fetch.go index 6cd3649dca..1fb034d05e 100644 --- a/syncer/data_fetch.go +++ b/syncer/data_fetch.go @@ -64,7 +64,13 @@ type DataFetch struct { } // NewDataFetch creates a new DataFetch instance. -func NewDataFetch(msh meshProvider, fetch fetcher, ids idProvider, cache activeSetCache, lg log.Log) *DataFetch { +func NewDataFetch( + msh meshProvider, + fetch fetcher, + ids idProvider, + cache activeSetCache, + lg log.Log, +) *DataFetch { return &DataFetch{ fetcher: fetch, logger: lg, @@ -77,7 +83,7 @@ func NewDataFetch(msh meshProvider, fetch fetcher, ids idProvider, cache activeS // PollMaliciousProofs polls all peers for malicious NodeIDs. func (d *DataFetch) PollMaliciousProofs(ctx context.Context) error { - peers := d.fetcher.GetPeers() + peers := d.fetcher.SelectBest(fetch.RedundantPeers) logger := d.logger.WithContext(ctx) req := &maliciousIDRequest{ peers: peers, @@ -131,7 +137,7 @@ func (d *DataFetch) PollMaliciousProofs(ctx context.Context) error { // PollLayerData polls all peers for data in the specified layer. func (d *DataFetch) PollLayerData(ctx context.Context, lid types.LayerID, peers ...p2p.Peer) error { if len(peers) == 0 { - peers = d.fetcher.GetPeers() + peers = d.fetcher.SelectBest(fetch.RedundantPeers) } if len(peers) == 0 { return errNoPeers @@ -190,7 +196,13 @@ func (d *DataFetch) PollLayerData(ctx context.Context, lid types.LayerID, peers } } -func (d *DataFetch) receiveMaliciousIDs(ctx context.Context, req *maliciousIDRequest, peer p2p.Peer, data []byte, peerErr error) { +func (d *DataFetch) receiveMaliciousIDs( + ctx context.Context, + req *maliciousIDRequest, + peer p2p.Peer, + data []byte, + peerErr error, +) { logger := d.logger.WithContext(ctx).WithFields(log.Stringer("peer", peer)) logger.Debug("received malicious id from peer") var ( @@ -211,7 +223,13 @@ func (d *DataFetch) receiveMaliciousIDs(ctx context.Context, req *maliciousIDReq } } -func (d *DataFetch) receiveData(ctx context.Context, req *dataRequest, peer p2p.Peer, data []byte, peerErr error) { +func (d *DataFetch) receiveData( + ctx context.Context, + req *dataRequest, + peer p2p.Peer, + data []byte, + peerErr error, +) { logger := d.logger.WithContext(ctx).WithFields(req.lid, log.Stringer("peer", peer)) logger.Debug("received layer data from peer") var ( @@ -248,7 +266,14 @@ func registerLayerHashes(fetcher fetcher, peer p2p.Peer, data *fetch.LayerData) fetcher.RegisterPeerHashes(peer, layerHashes) } -func fetchMalfeasanceProof(ctx context.Context, logger log.Log, ids idProvider, fetcher fetcher, req *maliciousIDRequest, data *fetch.MaliciousIDs) { +func fetchMalfeasanceProof( + ctx context.Context, + logger log.Log, + ids idProvider, + fetcher fetcher, + req *maliciousIDRequest, + data *fetch.MaliciousIDs, +) { var idsToFetch []types.NodeID for _, nodeID := range data.NodeIDs { if _, ok := req.response.ids[nodeID]; !ok { @@ -281,7 +306,13 @@ func fetchMalfeasanceProof(ctx context.Context, logger log.Log, ids idProvider, } } -func fetchLayerData(ctx context.Context, logger log.Log, fetcher fetcher, req *dataRequest, data *fetch.LayerData) { +func fetchLayerData( + ctx context.Context, + logger log.Log, + fetcher fetcher, + req *dataRequest, + data *fetch.LayerData, +) { var ballotsToFetch []types.BallotID for _, ballotID := range data.Ballots { if _, ok := req.response.ballots[ballotID]; !ok { @@ -302,6 +333,7 @@ func fetchLayerData(ctx context.Context, logger log.Log, fetcher fetcher, req *d return nil })), log.Err(err)) + // syntactically invalid ballots are expected from malicious peers } } @@ -364,7 +396,10 @@ func (d *DataFetch) PollLayerOpinions( // note that we want to fetch block certificate for types.EmptyBlockID as well // but we don't need to register hash for the actual block fetching if *opns.Certified != types.EmptyBlockID { - d.fetcher.RegisterPeerHashes(opns.Peer(), []types.Hash32{opns.Certified.AsHash32()}) + d.fetcher.RegisterPeerHashes( + opns.Peer(), + []types.Hash32{opns.Certified.AsHash32()}, + ) } } for bid, bidPeers := range peerCerts { @@ -384,7 +419,13 @@ func (d *DataFetch) PollLayerOpinions( } } -func (d *DataFetch) receiveOpinions(ctx context.Context, req *opinionRequest, peer p2p.Peer, data []byte, peerErr error) { +func (d *DataFetch) receiveOpinions( + ctx context.Context, + req *opinionRequest, + peer p2p.Peer, + data []byte, + peerErr error, +) { logger := d.logger.WithContext(ctx).WithFields(req.lid, log.Stringer("peer", peer)) logger.Debug("received layer opinions from peer") @@ -430,7 +471,7 @@ func (d *DataFetch) updateAtxPeer(epoch types.EpochID, peer p2p.Peer) { // GetEpochATXs fetches all ATXs published in the specified epoch from a peer. func (d *DataFetch) GetEpochATXs(ctx context.Context, epoch types.EpochID) error { - peers := d.fetcher.GetPeers() + peers := d.fetcher.SelectBest(fetch.RedundantPeers) if len(peers) == 0 { return errNoPeers } diff --git a/syncer/data_fetch_test.go b/syncer/data_fetch_test.go index 1b420e4719..25c9ebd909 100644 --- a/syncer/data_fetch_test.go +++ b/syncer/data_fetch_test.go @@ -103,7 +103,7 @@ func TestDataFetch_PollMaliciousIDs(t *testing.T) { errUnknown := errors.New("unknown") newTestDataFetchWithMocks := func(_ *testing.T, exits bool) *testDataFetch { td := newTestDataFetch(t) - td.mFetcher.EXPECT().GetPeers().Return(peers) + td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) td.mFetcher.EXPECT().GetMaliciousIDs(gomock.Any(), peers, gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, _ []p2p.Peer, okCB func([]byte, p2p.Peer), errCB func(error, p2p.Peer)) error { for _, peer := range peers { @@ -144,7 +144,7 @@ func TestDataFetch_PollLayerData(t *testing.T) { errUnknown := errors.New("unknown") newTestDataFetchWithMocks := func(*testing.T) *testDataFetch { td := newTestDataFetch(t) - td.mFetcher.EXPECT().GetPeers().Return(peers) + td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) td.mFetcher.EXPECT().GetLayerData(gomock.Any(), peers, layerID, gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, _ []p2p.Peer, _ types.LayerID, okCB func([]byte, p2p.Peer), errCB func(error, p2p.Peer)) error { for _, peer := range peers { @@ -183,7 +183,7 @@ func TestDataFetch_PollLayerData_PeerErrors(t *testing.T) { t.Run("only one peer has data", func(t *testing.T) { t.Parallel() td := newTestDataFetch(t) - td.mFetcher.EXPECT().GetPeers().Return(peers) + td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) td.mFetcher.EXPECT().GetLayerData(gomock.Any(), peers, layerID, gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, _ []p2p.Peer, _ types.LayerID, okCB func([]byte, p2p.Peer), errCB func(error, p2p.Peer)) error { td.mFetcher.EXPECT().RegisterPeerHashes(peers[0], gomock.Any()) @@ -200,7 +200,7 @@ func TestDataFetch_PollLayerData_PeerErrors(t *testing.T) { t.Run("only one peer has empty layer", func(t *testing.T) { t.Parallel() td := newTestDataFetch(t) - td.mFetcher.EXPECT().GetPeers().Return(peers) + td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) td.mFetcher.EXPECT().GetLayerData(gomock.Any(), peers, layerID, gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, _ []p2p.Peer, _ types.LayerID, okCB func([]byte, p2p.Peer), errCB func(error, p2p.Peer)) error { okCB(generateEmptyLayer(t), peers[0]) @@ -353,7 +353,7 @@ func TestDataFetch_GetEpochATXs(t *testing.T) { ed := &fetch.EpochData{ AtxIDs: types.RandomActiveSet(11), } - td.mFetcher.EXPECT().GetPeers().Return(peers) + td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) if tc.getErr == nil { td.mAtxCache.EXPECT().GetMissingActiveSet(epoch+1, ed.AtxIDs).Return(ed.AtxIDs[1:]) } diff --git a/syncer/find_fork.go b/syncer/find_fork.go index 9dde773ba6..48c15f1fba 100644 --- a/syncer/find_fork.go +++ b/syncer/find_fork.go @@ -82,7 +82,7 @@ func (ff *ForkFinder) Purge(all bool, toPurge ...p2p.Peer) { return } - peers := ff.fetcher.GetPeers() + peers := ff.fetcher.SelectBest(fetch.RedundantPeers) uniquePeers := make(map[p2p.Peer]struct{}) for _, p := range peers { uniquePeers[p] = struct{}{} @@ -134,7 +134,12 @@ func (ff *ForkFinder) NeedResync(lid types.LayerID, hash types.Hash32) bool { // FindFork finds the point of divergence in layer opinions between the node and the specified peer // from a given disagreed layer. -func (ff *ForkFinder) FindFork(ctx context.Context, peer p2p.Peer, diffLid types.LayerID, diffHash types.Hash32) (types.LayerID, error) { +func (ff *ForkFinder) FindFork( + ctx context.Context, + peer p2p.Peer, + diffLid types.LayerID, + diffHash types.Hash32, +) (types.LayerID, error) { logger := ff.logger.WithContext(ctx).WithFields( log.Stringer("diff_layer", diffLid), log.Stringer("diff_hash", diffHash), @@ -213,7 +218,12 @@ func (ff *ForkFinder) FindFork(ctx context.Context, peer p2p.Peer, diffLid types } // UpdateAgreement updates the layer at which the peer agreed with the node. -func (ff *ForkFinder) UpdateAgreement(peer p2p.Peer, lid types.LayerID, hash types.Hash32, created time.Time) { +func (ff *ForkFinder) UpdateAgreement( + peer p2p.Peer, + lid types.LayerID, + hash types.Hash32, + created time.Time, +) { ff.updateAgreement(peer, &layerHash{layer: lid, hash: hash}, created) } @@ -273,7 +283,12 @@ func (ff *ForkFinder) setupBoundary(peer p2p.Peer, oldestDiff *layerHash) (*boun // if the number of hashes is less than maxHashesInReq, then request every hash. // otherwise, set appropriate params such that the number of hashes requested is maxHashesInReq // while ensuring hashes for the boundary layers are requested. -func (ff *ForkFinder) sendRequest(ctx context.Context, logger log.Log, peer p2p.Peer, bnd *boundary) (*fetch.MeshHashes, error) { +func (ff *ForkFinder) sendRequest( + ctx context.Context, + logger log.Log, + peer p2p.Peer, + bnd *boundary, +) (*fetch.MeshHashes, error) { if bnd == nil { logger.Fatal("invalid args") } else if bnd.from == nil || bnd.to == nil || !bnd.to.layer.After(bnd.from.layer) { diff --git a/syncer/find_fork_test.go b/syncer/find_fork_test.go index e997f9f22c..ed4534548c 100644 --- a/syncer/find_fork_test.go +++ b/syncer/find_fork_test.go @@ -48,7 +48,7 @@ func TestResynced(t *testing.T) { tf.AddResynced(lid, hash) require.False(t, tf.NeedResync(lid, hash)) - tf.mFetcher.EXPECT().GetPeers().Return([]p2p.Peer{}) + tf.mFetcher.EXPECT().SelectBest(gomock.Any()).Return([]p2p.Peer{}) tf.Purge(false) require.True(t, tf.NeedResync(lid, hash)) } @@ -60,7 +60,7 @@ func TestForkFinder_Purge(t *testing.T) { for i := 1; i < numCached; i++ { tf.UpdateAgreement(p2p.Peer(strconv.Itoa(i)), types.LayerID(uint32(i+1)), types.RandomHash(), time.Now()) } - tf.mFetcher.EXPECT().GetPeers().Return([]p2p.Peer{}) + tf.mFetcher.EXPECT().SelectBest(gomock.Any()).Return([]p2p.Peer{}) require.Equal(t, numCached, tf.NumPeersCached()) tf.Purge(false) require.Equal(t, 9, tf.NumPeersCached()) diff --git a/syncer/interface.go b/syncer/interface.go index d171dcd529..5f34ad8cb6 100644 --- a/syncer/interface.go +++ b/syncer/interface.go @@ -30,15 +30,37 @@ type fetchLogic interface { PollMaliciousProofs(ctx context.Context) error PollLayerData(context.Context, types.LayerID, ...p2p.Peer) error - PollLayerOpinions(context.Context, types.LayerID, bool, []p2p.Peer) ([]*fetch.LayerOpinion, []*types.Certificate, error) + PollLayerOpinions( + context.Context, + types.LayerID, + bool, + []p2p.Peer, + ) ([]*fetch.LayerOpinion, []*types.Certificate, error) GetEpochATXs(context.Context, types.EpochID) error } // fetcher is the interface to the low-level fetching. type fetcher interface { - GetMaliciousIDs(context.Context, []p2p.Peer, func([]byte, p2p.Peer), func(error, p2p.Peer)) error - GetLayerData(context.Context, []p2p.Peer, types.LayerID, func([]byte, p2p.Peer), func(error, p2p.Peer)) error - GetLayerOpinions(context.Context, []p2p.Peer, types.LayerID, func([]byte, p2p.Peer), func(error, p2p.Peer)) error + GetMaliciousIDs( + context.Context, + []p2p.Peer, + func([]byte, p2p.Peer), + func(error, p2p.Peer), + ) error + GetLayerData( + context.Context, + []p2p.Peer, + types.LayerID, + func([]byte, p2p.Peer), + func(error, p2p.Peer), + ) error + GetLayerOpinions( + context.Context, + []p2p.Peer, + types.LayerID, + func([]byte, p2p.Peer), + func(error, p2p.Peer), + ) error GetCert(context.Context, types.LayerID, types.BlockID, []p2p.Peer) (*types.Certificate, error) GetMalfeasanceProofs(context.Context, []types.NodeID) error @@ -47,7 +69,7 @@ type fetcher interface { GetBlocks(context.Context, []types.BlockID) error RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) - GetPeers() []p2p.Peer + SelectBest(int) []p2p.Peer PeerEpochInfo(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error) PeerMeshHashes(context.Context, p2p.Peer, *fetch.MeshHashRequest) (*fetch.MeshHashes, error) } diff --git a/syncer/mocks/mocks.go b/syncer/mocks/mocks.go index 53eb33263c..38b28faa62 100644 --- a/syncer/mocks/mocks.go +++ b/syncer/mocks/mocks.go @@ -566,44 +566,6 @@ func (c *fetchLogicGetMaliciousIDsCall) DoAndReturn(f func(context.Context, []p2 return c } -// GetPeers mocks base method. -func (m *MockfetchLogic) GetPeers() []p2p.Peer { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPeers") - ret0, _ := ret[0].([]p2p.Peer) - return ret0 -} - -// GetPeers indicates an expected call of GetPeers. -func (mr *MockfetchLogicMockRecorder) GetPeers() *fetchLogicGetPeersCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeers", reflect.TypeOf((*MockfetchLogic)(nil).GetPeers)) - return &fetchLogicGetPeersCall{Call: call} -} - -// fetchLogicGetPeersCall wrap *gomock.Call -type fetchLogicGetPeersCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *fetchLogicGetPeersCall) Return(arg0 []p2p.Peer) *fetchLogicGetPeersCall { - c.Call = c.Call.Return(arg0) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *fetchLogicGetPeersCall) Do(f func() []p2p.Peer) *fetchLogicGetPeersCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *fetchLogicGetPeersCall) DoAndReturn(f func() []p2p.Peer) *fetchLogicGetPeersCall { - c.Call = c.Call.DoAndReturn(f) - return c -} - // PeerEpochInfo mocks base method. func (m *MockfetchLogic) PeerEpochInfo(arg0 context.Context, arg1 p2p.Peer, arg2 types.EpochID) (*fetch.EpochData, error) { m.ctrl.T.Helper() @@ -839,6 +801,44 @@ func (c *fetchLogicRegisterPeerHashesCall) DoAndReturn(f func(p2p.Peer, []types. return c } +// SelectBest mocks base method. +func (m *MockfetchLogic) SelectBest(arg0 int) []p2p.Peer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SelectBest", arg0) + ret0, _ := ret[0].([]p2p.Peer) + return ret0 +} + +// SelectBest indicates an expected call of SelectBest. +func (mr *MockfetchLogicMockRecorder) SelectBest(arg0 any) *fetchLogicSelectBestCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectBest", reflect.TypeOf((*MockfetchLogic)(nil).SelectBest), arg0) + return &fetchLogicSelectBestCall{Call: call} +} + +// fetchLogicSelectBestCall wrap *gomock.Call +type fetchLogicSelectBestCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *fetchLogicSelectBestCall) Return(arg0 []p2p.Peer) *fetchLogicSelectBestCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *fetchLogicSelectBestCall) Do(f func(int) []p2p.Peer) *fetchLogicSelectBestCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *fetchLogicSelectBestCall) DoAndReturn(f func(int) []p2p.Peer) *fetchLogicSelectBestCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // Mockfetcher is a mock of fetcher interface. type Mockfetcher struct { ctrl *gomock.Controller @@ -1167,44 +1167,6 @@ func (c *fetcherGetMaliciousIDsCall) DoAndReturn(f func(context.Context, []p2p.P return c } -// GetPeers mocks base method. -func (m *Mockfetcher) GetPeers() []p2p.Peer { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPeers") - ret0, _ := ret[0].([]p2p.Peer) - return ret0 -} - -// GetPeers indicates an expected call of GetPeers. -func (mr *MockfetcherMockRecorder) GetPeers() *fetcherGetPeersCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeers", reflect.TypeOf((*Mockfetcher)(nil).GetPeers)) - return &fetcherGetPeersCall{Call: call} -} - -// fetcherGetPeersCall wrap *gomock.Call -type fetcherGetPeersCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *fetcherGetPeersCall) Return(arg0 []p2p.Peer) *fetcherGetPeersCall { - c.Call = c.Call.Return(arg0) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *fetcherGetPeersCall) Do(f func() []p2p.Peer) *fetcherGetPeersCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *fetcherGetPeersCall) DoAndReturn(f func() []p2p.Peer) *fetcherGetPeersCall { - c.Call = c.Call.DoAndReturn(f) - return c -} - // PeerEpochInfo mocks base method. func (m *Mockfetcher) PeerEpochInfo(arg0 context.Context, arg1 p2p.Peer, arg2 types.EpochID) (*fetch.EpochData, error) { m.ctrl.T.Helper() @@ -1319,6 +1281,44 @@ func (c *fetcherRegisterPeerHashesCall) DoAndReturn(f func(p2p.Peer, []types.Has return c } +// SelectBest mocks base method. +func (m *Mockfetcher) SelectBest(arg0 int) []p2p.Peer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SelectBest", arg0) + ret0, _ := ret[0].([]p2p.Peer) + return ret0 +} + +// SelectBest indicates an expected call of SelectBest. +func (mr *MockfetcherMockRecorder) SelectBest(arg0 any) *fetcherSelectBestCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectBest", reflect.TypeOf((*Mockfetcher)(nil).SelectBest), arg0) + return &fetcherSelectBestCall{Call: call} +} + +// fetcherSelectBestCall wrap *gomock.Call +type fetcherSelectBestCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *fetcherSelectBestCall) Return(arg0 []p2p.Peer) *fetcherSelectBestCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *fetcherSelectBestCall) Do(f func(int) []p2p.Peer) *fetcherSelectBestCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *fetcherSelectBestCall) DoAndReturn(f func(int) []p2p.Peer) *fetcherSelectBestCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // MocklayerPatrol is a mock of layerPatrol interface. type MocklayerPatrol struct { ctrl *gomock.Controller diff --git a/syncer/state_syncer.go b/syncer/state_syncer.go index 2d11c1d996..7a8029d779 100644 --- a/syncer/state_syncer.go +++ b/syncer/state_syncer.go @@ -80,20 +80,27 @@ func (s *Syncer) processLayers(ctx context.Context) error { if opinions, certs, err := s.layerOpinions(ctx, lid); err == nil { if len(certs) > 0 { if err = s.adopt(ctx, lid, certs); err != nil { - s.logger.WithContext(ctx).With().Warning("failed to adopt peer opinions", lid, log.Err(err)) + s.logger.WithContext(ctx). + With(). + Warning("failed to adopt peer opinions", lid, log.Err(err)) } } if s.IsSynced(ctx) { - if err = s.checkMeshAgreement(ctx, lid, opinions); err != nil && errors.Is(err, errMeshHashDiverged) { - s.logger.WithContext(ctx).With().Debug("mesh hash diverged, trying to reach agreement", - lid, - log.Stringer("diverged", lid.Sub(1)), - ) - if err = s.ensureMeshAgreement(ctx, lid, opinions, resyncPeers); err != nil { - s.logger.WithContext(ctx).With().Debug("failed to reach mesh agreement with peers", + if err = s.checkMeshAgreement(ctx, lid, opinions); err != nil && + errors.Is(err, errMeshHashDiverged) { + s.logger.WithContext(ctx). + With(). + Debug("mesh hash diverged, trying to reach agreement", lid, - log.Err(err), + log.Stringer("diverged", lid.Sub(1)), ) + if err = s.ensureMeshAgreement(ctx, lid, opinions, resyncPeers); err != nil { + s.logger.WithContext(ctx). + With(). + Debug("failed to reach mesh agreement with peers", + lid, + log.Err(err), + ) hashResolve.Inc() } else { hashResolveFail.Inc() @@ -105,7 +112,9 @@ func (s *Syncer) processLayers(ctx context.Context) error { // has a chance to count ballots and form its own opinions if err := s.mesh.ProcessLayer(ctx, lid); err != nil { if !errors.Is(err, mesh.ErrMissingBlock) { - s.logger.WithContext(ctx).With().Warning("mesh failed to process layer from sync", lid, log.Err(err)) + s.logger.WithContext(ctx). + With(). + Warning("mesh failed to process layer from sync", lid, log.Err(err)) } s.stateErr.Store(true) } else { @@ -135,8 +144,11 @@ func (s *Syncer) needCert(ctx context.Context, lid types.LayerID) (bool, error) return errors.Is(err, sql.ErrNotFound), nil } -func (s *Syncer) layerOpinions(ctx context.Context, lid types.LayerID) ([]*peerOpinion, []*types.Certificate, error) { - peers := s.dataFetcher.GetPeers() +func (s *Syncer) layerOpinions( + ctx context.Context, + lid types.LayerID, +) ([]*peerOpinion, []*types.Certificate, error) { + peers := s.dataFetcher.SelectBest(fetch.RedundantPeers) if len(peers) == 0 { return nil, nil, errNoPeers } @@ -168,7 +180,11 @@ func (s *Syncer) layerOpinions(ctx context.Context, lid types.LayerID) ([]*peerO return result, certs, nil } -func (s *Syncer) checkMeshAgreement(ctx context.Context, lid types.LayerID, opinions []*peerOpinion) error { +func (s *Syncer) checkMeshAgreement( + ctx context.Context, + lid types.LayerID, + opinions []*peerOpinion, +) error { prevHash, err := layers.GetAggregatedHash(s.cdb, lid.Sub(1)) if err != nil { s.logger.WithContext(ctx).With().Error("failed to get prev agg hash", lid, log.Err(err)) @@ -193,7 +209,9 @@ func (s *Syncer) adopt(ctx context.Context, lid types.LayerID, certs []*types.Ce } for _, cert := range certs { if err = s.adoptCert(ctx, lid, cert); err != nil { - s.logger.WithContext(ctx).With().Warning("failed to adopt cert", lid, cert.BlockID, log.Err(err)) + s.logger.WithContext(ctx). + With(). + Warning("failed to adopt cert", lid, cert.BlockID, log.Err(err)) } else { s.logger.WithContext(ctx).With().Info("adopted cert from peer", lid, cert.BlockID) break @@ -265,12 +283,14 @@ func (s *Syncer) ensureMeshAgreement( ) if !s.forkFinder.NeedResync(prevLid, opn.prevAggHash) { - s.logger.WithContext(ctx).With().Debug("already resynced based on the same diverged hash", - log.Stringer("node_prev_hash", prevHash), - log.Stringer("peer", peer), - log.Stringer("disagreed", prevLid), - log.Stringer("peer_hash", opn.prevAggHash), - ) + s.logger.WithContext(ctx). + With(). + Debug("already resynced based on the same diverged hash", + log.Stringer("node_prev_hash", prevHash), + log.Stringer("peer", peer), + log.Stringer("disagreed", prevLid), + log.Stringer("peer_hash", opn.prevAggHash), + ) continue } @@ -297,15 +317,17 @@ func (s *Syncer) ensureMeshAgreement( // node and peer has different state. check if peer has valid ATXs to back up its opinions if err = s.dataFetcher.GetAtxs(ctx, missing); err != nil { // if the node cannot download the ATXs claimed by this peer, it does not trust this peer's mesh - s.logger.WithContext(ctx).With().Warning("failed to download missing ATX claimed by peer", - log.Stringer("peer", peer), - log.Array("missing_atxs", log.ArrayMarshalerFunc(func(encoder zapcore.ArrayEncoder) error { - for _, id := range missing { - encoder.AppendString(id.ShortString()) - } - return nil - })), - log.Err(err)) + s.logger.WithContext(ctx). + With(). + Warning("failed to download missing ATX claimed by peer", + log.Stringer("peer", peer), + log.Array("missing_atxs", log.ArrayMarshalerFunc(func(encoder zapcore.ArrayEncoder) error { + for _, id := range missing { + encoder.AppendString(id.ShortString()) + } + return nil + })), + log.Err(err)) continue } } else { diff --git a/syncer/state_syncer_test.go b/syncer/state_syncer_test.go index 6fb52c1c3a..04edcbbd74 100644 --- a/syncer/state_syncer_test.go +++ b/syncer/state_syncer_test.go @@ -44,7 +44,7 @@ func TestProcessLayers_MultiLayers(t *testing.T) { ts.mTicker.advanceToLayer(current) peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().GetPeers().Return(peers).AnyTimes() + ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers).AnyTimes() ts.mForkFinder.EXPECT().UpdateAgreement(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() adopted := make(map[types.LayerID]types.BlockID) for lid := gLid.Add(1); lid.Before(current); lid = lid.Add(1) { @@ -145,7 +145,7 @@ func TestProcessLayers_OpinionsNotAdopted(t *testing.T) { ts.syncer.setLastSyncedLayer(current.Sub(1)) ts.mTicker.advanceToLayer(current) peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().GetPeers().Return(peers).AnyTimes() + ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers).AnyTimes() hasCert := false for _, opn := range tc.opns { @@ -231,7 +231,7 @@ func TestProcessLayers_HareIsStillWorking(t *testing.T) { ts.mLyrPatrol.EXPECT().IsHareInCharge(lastSynced).Return(false) peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().GetPeers().Return(peers) + ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) ts.mDataFetcher.EXPECT().PollLayerOpinions(gomock.Any(), lastSynced, true, peers).Return(nil, nil, nil) ts.mTortoise.EXPECT().TallyVotes(gomock.Any(), lastSynced) ts.mTortoise.EXPECT().Updates().Return(fixture.RLayers(fixture.RLayer(lastSynced))) @@ -257,7 +257,7 @@ func TestProcessLayers_HareTakesTooLong(t *testing.T) { ts.mLyrPatrol.EXPECT().IsHareInCharge(lid).Return(false) } peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().GetPeers().Return(peers) + ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) ts.mDataFetcher.EXPECT().PollLayerOpinions(gomock.Any(), lid, gomock.Any(), peers).Return(nil, nil, nil) ts.mTortoise.EXPECT().TallyVotes(gomock.Any(), lid) ts.mTortoise.EXPECT().Updates().Return(fixture.RLayers(fixture.RLayer(lid))) @@ -277,7 +277,7 @@ func TestProcessLayers_OpinionsOptional(t *testing.T) { ts.mTicker.advanceToLayer(lastSynced.Add(1)) ts.mLyrPatrol.EXPECT().IsHareInCharge(lastSynced).Return(false) peers := test.GeneratePeerIDs(5) - ts.mDataFetcher.EXPECT().GetPeers().Return(peers) + ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) ts.mDataFetcher.EXPECT().PollLayerOpinions(gomock.Any(), lastSynced, true, peers).Return(nil, nil, errors.New("meh")) ts.mTortoise.EXPECT().TallyVotes(gomock.Any(), lastSynced) ts.mTortoise.EXPECT().Updates().Return(fixture.RLayers(fixture.RLayer(lastSynced))) @@ -334,7 +334,7 @@ func TestProcessLayers_MeshHashDiverged(t *testing.T) { ts.mLyrPatrol.EXPECT().IsHareInCharge(instate).Return(false) peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().GetPeers().Return(peers) + ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) ts.mDataFetcher.EXPECT().PollLayerOpinions(gomock.Any(), instate, false, peers).Return(opns, nil, nil) ts.mForkFinder.EXPECT().UpdateAgreement(opns[1].Peer(), instate.Sub(1), prevHash, gomock.Any()) for i := 0; i < numPeers; i++ { @@ -434,7 +434,7 @@ func TestProcessLayers_NoHashResolutionForNewlySyncedNode(t *testing.T) { for lid := instate; lid <= current; lid++ { ts.mLyrPatrol.EXPECT().IsHareInCharge(lid) peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().GetPeers().Return(peers) + ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) ts.mDataFetcher.EXPECT().PollLayerOpinions(gomock.Any(), lid, gomock.Any(), peers).Return(opns, nil, nil) ts.mTortoise.EXPECT().TallyVotes(gomock.Any(), lid) ts.mTortoise.EXPECT().Updates().Return(fixture.RLayers(fixture.ROpinion(lid.Sub(1), opns[2].PrevAggHash))) diff --git a/syncer/syncer.go b/syncer/syncer.go index 69fbc7d54e..6c5ff694ee 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -371,7 +371,8 @@ func (s *Syncer) synchronize(ctx context.Context) bool { s.setATXSynced() return true } - if len(s.dataFetcher.GetPeers()) == 0 { + // check that we have any peers + if len(s.dataFetcher.SelectBest(1)) == 0 { return false } @@ -443,7 +444,11 @@ func (s *Syncer) syncAtx(ctx context.Context) error { // steady state atx syncing curr := s.ticker.CurrentLayer() - if float64((curr - curr.GetEpoch().FirstLayer()).Uint32()) >= float64(types.GetLayersPerEpoch())*s.cfg.EpochEndFraction { + if float64( + (curr - curr.GetEpoch().FirstLayer()).Uint32(), + ) >= float64( + types.GetLayersPerEpoch(), + )*s.cfg.EpochEndFraction { s.logger.WithContext(ctx).With().Debug("at end of epoch, syncing atx", curr.GetEpoch()) if err := s.fetchATXsForEpoch(ctx, curr.GetEpoch()); err != nil { return err @@ -452,7 +457,12 @@ func (s *Syncer) syncAtx(ctx context.Context) error { return nil } -func isTooFarBehind(ctx context.Context, logger log.Log, current, lastSynced types.LayerID, outOfSyncThreshold uint32) bool { +func isTooFarBehind( + ctx context.Context, + logger log.Log, + current, lastSynced types.LayerID, + outOfSyncThreshold uint32, +) bool { if current.After(lastSynced) && current.Difference(lastSynced) >= outOfSyncThreshold { logger.WithContext(ctx).With().Info("node is too far behind", log.Stringer("current", current), @@ -472,7 +482,13 @@ func (s *Syncer) setStateBeforeSync(ctx context.Context) { } return } - if isTooFarBehind(ctx, s.logger, current, s.getLastSyncedLayer(), s.cfg.OutOfSyncThresholdLayers) { + if isTooFarBehind( + ctx, + s.logger, + current, + s.getLastSyncedLayer(), + s.cfg.OutOfSyncThresholdLayers, + ) { s.setSyncState(ctx, notSynced) } } @@ -492,7 +508,14 @@ func (s *Syncer) setStateAfterSync(ctx context.Context, success bool) { // network outage. switch currSyncState { case synced: - if !success && isTooFarBehind(ctx, s.logger, current, s.getLastSyncedLayer(), s.cfg.OutOfSyncThresholdLayers) { + if !success && + isTooFarBehind( + ctx, + s.logger, + current, + s.getLastSyncedLayer(), + s.cfg.OutOfSyncThresholdLayers, + ) { s.setSyncState(ctx, notSynced) } case gossipSync: diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 771d52f69d..a80f8b40de 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -114,7 +114,7 @@ func newTestSyncer(t *testing.T, interval time.Duration) *testSyncer { func newSyncerWithoutPeriodicRuns(t *testing.T) *testSyncer { ts := newTestSyncer(t, never) - ts.mDataFetcher.EXPECT().GetPeers().Return([]p2p.Peer{"non-empty"}).AnyTimes() + ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return([]p2p.Peer{"non-empty"}).AnyTimes() return ts } @@ -136,7 +136,7 @@ func TestStartAndShutdown(t *testing.T) { ts.syncer.Start() ts.mForkFinder.EXPECT().Purge(false).AnyTimes() - ts.mDataFetcher.EXPECT().GetPeers().Return(nil).AnyTimes() + ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(nil).AnyTimes() require.Eventually(t, func() bool { return ts.syncer.ListenToATXGossip() && ts.syncer.ListenToGossip() && ts.syncer.IsSynced(ctx) }, time.Second, 10*time.Millisecond)