Skip to content

Commit

Permalink
sync: prioritize peers with higher success rate and low latency (#5143)
Browse files Browse the repository at this point in the history
closes: #5127 #5036

peers that are overwhelmed or generally will not be used for requests. there are two criteria used to select good peer:
- request success rate . success rates within 0.1 (10%) of each other are treated as equal, and in such case we will use latency
- latency. hs/1 protocol used to track latency, as it is the most used protocol and objects served in this protocol are of the same size with several exceptions (active sets, list of malfeasence proofs).

related: #4977

limits number of peers to request data for atxs. previously we were requesting data from all peers atleast once.

synced data 2 times in 90m, previous attempt on my computer was 1 week ago and took 12h
  • Loading branch information
dshulyak committed Oct 13, 2023
1 parent 6338651 commit 5403cb9
Show file tree
Hide file tree
Showing 19 changed files with 1,084 additions and 572 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ See [RELEASE](./RELEASE.md) for workflow instructions.

The submit proof of work should now be up to 40% faster thanks to [code optimization](https://github.com/spacemeshos/poet/pull/419).

* [#5143](https://github.com/spacemeshos/go-spacemesh/pull/5143) Select good peers for sync requests.

The change improves initial sync speed and any sync protocol requests required during consensus.

## v1.2.0

### Upgrade information
Expand Down
163 changes: 96 additions & 67 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ package fetch
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/network"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/server"
Expand All @@ -30,6 +30,8 @@ const (
OpnProtocol = "lp/2"

cacheSize = 1000

RedundantPeers = 10
)

var (
Expand Down Expand Up @@ -79,7 +81,6 @@ func (b *batchInfo) toMap() map[types.Hash32]RequestMessage {
// Config is the configuration file of the Fetch component.
type Config struct {
BatchTimeout time.Duration // in milliseconds
MaxRetriesForPeer int
BatchSize, QueueSize int
RequestTimeout time.Duration // in seconds
MaxRetriesForRequest int
Expand All @@ -89,7 +90,6 @@ type Config struct {
func DefaultConfig() Config {
return Config{
BatchTimeout: time.Millisecond * time.Duration(50),
MaxRetriesForPeer: 2,
QueueSize: 20,
BatchSize: 20,
RequestTimeout: time.Second * time.Duration(10),
Expand Down Expand Up @@ -144,6 +144,7 @@ type Fetch struct {
logger log.Log
bs *datastore.BlobStore
host host
peers *peers.Peers

servers map[string]requester
validators *dataValidators
Expand All @@ -165,13 +166,20 @@ type Fetch struct {
}

// NewFetch creates a new Fetch struct.
func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter, host *p2p.Host, opts ...Option) *Fetch {
func NewFetch(
cdb *datastore.CachedDB,
msh meshProvider,
b system.BeaconGetter,
host *p2p.Host,
opts ...Option,
) *Fetch {
bs := datastore.NewBlobStore(cdb.Database)
f := &Fetch{
cfg: DefaultConfig(),
logger: log.NewNop(),
bs: bs,
host: host,
peers: peers.New(),
servers: map[string]requester{},
unprocessed: make(map[types.Hash32]*request),
ongoing: make(map[types.Hash32]*request),
Expand All @@ -181,6 +189,28 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter,
for _, opt := range opts {
opt(f)
}
// NOTE(dshulyak) this is to avoid tests refactoring.
// there is one test that covers this part.
if host != nil {
connectedf := func(peer p2p.Peer) {
f.logger.With().Debug("add peer", log.Stringer("id", peer))
f.peers.Add(peer)
}
host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, c network.Conn) {
connectedf(c.RemotePeer())
},
DisconnectedF: func(_ network.Network, c network.Conn) {
f.logger.With().Debug("remove peer", log.Stringer("id", c.RemotePeer()))
f.peers.Delete(c.RemotePeer())
},
})
for _, peer := range host.GetPeers() {
if host.Connected(peer) {
connectedf(peer)
}
}
}

f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout)
srvOpts := []server.Opt{
Expand All @@ -190,11 +220,23 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter,
if len(f.servers) == 0 {
h := newHandler(cdb, bs, msh, b, f.logger)
f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(host, lyrDataProtocol, h.handleLayerDataReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(
host,
lyrDataProtocol,
h.handleLayerDataReq,
srvOpts...)
f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(host, meshHashProtocol, h.handleMeshHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(
host,
meshHashProtocol,
h.handleMeshHashReq,
srvOpts...)
f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...)
f.servers[OpnProtocol] = server.New(host, OpnProtocol, h.handleLayerOpinionsReq2, srvOpts...)
f.servers[OpnProtocol] = server.New(
host,
OpnProtocol,
h.handleLayerOpinionsReq2,
srvOpts...)
}
return f
}
Expand Down Expand Up @@ -255,9 +297,6 @@ func (f *Fetch) Stop() {
f.logger.Info("stopping fetch")
f.batchTimeout.Stop()
f.cancel()
if err := f.host.Close(); err != nil {
f.logger.With().Warning("error closing host", log.Err(err))
}
f.mu.Lock()
for _, req := range f.unprocessed {
close(req.promise.completed)
Expand Down Expand Up @@ -423,7 +462,9 @@ func (f *Fetch) getUnprocessed() []RequestMessage {
var requestList []RequestMessage
// only send one request per hash
for hash, req := range f.unprocessed {
f.logger.WithContext(req.ctx).With().Debug("processing hash request", log.Stringer("hash", hash))
f.logger.WithContext(req.ctx).
With().
Debug("processing hash request", log.Stringer("hash", hash))
requestList = append(requestList, RequestMessage{Hash: hash, Hint: req.hint})
// move the processed requests to pending
f.ongoing[hash] = req
Expand All @@ -450,16 +491,17 @@ func (f *Fetch) send(requests []RequestMessage) {
peer: peer,
}
batch.setID()
_ = f.sendBatch(peer, batch)
f.sendBatch(peer, batch)
}
}
}

func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]RequestMessage {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
peer2requests := make(map[p2p.Peer][]RequestMessage)
peers := f.host.GetPeers()
if len(peers) == 0 {

best := f.peers.SelectBest(RedundantPeers)
if len(best) == 0 {
f.logger.Info("cannot send batch: no peers found")
f.mu.Lock()
defer f.mu.Unlock()
Expand All @@ -479,16 +521,10 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req
return nil
}
for _, req := range requests {
target := p2p.NoPeer
hashPeers := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng)
for _, p := range hashPeers {
if f.host.Connected(p) {
target = p
break
}
}
target := f.peers.SelectBestFrom(hashPeers)
if target == p2p.NoPeer {
target = randomPeer(peers)
target = randomPeer(best)
}
_, ok := peer2requests[target]
if !ok {
Expand Down Expand Up @@ -519,51 +555,42 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req
}

// sendBatch dispatches batched request messages to provided peer.
func (f *Fetch) sendBatch(p p2p.Peer, batch *batchInfo) error {
func (f *Fetch) sendBatch(peer p2p.Peer, batch *batchInfo) {
if f.stopped() {
return
}
f.mu.Lock()
f.batched[batch.ID] = batch
f.mu.Unlock()

f.logger.With().Debug("sending batch request",
f.logger.With().Debug("sending batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Stringer("peer", batch.peer))
// timeout function will be called if no response was received for the hashes sent
errorFunc := func(err error) {
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", peer),
)
// Request is asynchronous,
// it will return errors only if size of the bytes buffer is large
// or target peer is not connected
start := time.Now()
errf := func(err error) {
f.logger.With().Warning("failed to send batch",
log.Stringer("batch_hash", batch.ID),
log.Err(err))
log.Stringer("batch_hash", peer), log.Err(err),
)
f.peers.OnFailure(peer)
f.handleHashError(batch.ID, err)
}

bytes, err := codec.Encode(&batch.RequestBatch)
err := f.servers[hashProtocol].Request(
f.shutdownCtx,
peer,
codec.MustEncode(&batch.RequestBatch),
func(buf []byte) {
f.peers.OnLatency(peer, time.Since(start))
f.receiveResponse(buf)
},
errf,
)
if err != nil {
f.logger.With().Panic("failed to encode batch", log.Err(err))
}

// try sending batch to provided peer
retries := 0
for {
if f.stopped() {
return nil
}

f.logger.With().Debug("sending batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", p))

err = f.servers[hashProtocol].Request(f.shutdownCtx, p, bytes, f.receiveResponse, errorFunc)
if err == nil {
break
}

retries++
if retries > f.cfg.MaxRetriesForPeer {
f.handleHashError(batch.ID, fmt.Errorf("batched request failed w retries: %w", err))
break
}
errf(err)
}
return err
}

// handleHashError is called when an error occurred processing batches of the following hashes.
Expand All @@ -580,7 +607,8 @@ func (f *Fetch) handleHashError(batchHash types.Hash32, err error) {
for _, br := range batch.Requests {
req, ok := f.ongoing[br.Hash]
if !ok {
f.logger.With().Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash))
f.logger.With().
Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash))
continue
}
f.logger.WithContext(req.ctx).With().Warning("hash request failed",
Expand All @@ -596,7 +624,12 @@ func (f *Fetch) handleHashError(batchHash types.Hash32, err error) {

// getHash is the regular buffered call to get a specific hash, using provided hash, h as hint the receiving end will
// know where to look for the hash, this function returns HashDataPromiseResult channel that will hold Data received or error.
func (f *Fetch) getHash(ctx context.Context, hash types.Hash32, h datastore.Hint, receiver dataReceiver) (*promise, error) {
func (f *Fetch) getHash(
ctx context.Context,
hash types.Hash32,
h datastore.Hint,
receiver dataReceiver,
) (*promise, error) {
if f.stopped() {
return nil, f.shutdownCtx.Err()
}
Expand Down Expand Up @@ -650,10 +683,6 @@ func (f *Fetch) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) {
f.hashToPeers.RegisterPeerHashes(peer, hashes)
}

func (f *Fetch) GetPeers() []p2p.Peer {
return f.host.GetPeers()
}

func (f *Fetch) PeerProtocols(p p2p.Peer) ([]protocol.ID, error) {
return f.host.PeerProtocols(p)
func (f *Fetch) SelectBest(n int) []p2p.Peer {
return f.peers.SelectBest(n)
}
Loading

0 comments on commit 5403cb9

Please sign in to comment.