Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sync: prioritize peers with higher success rate and low latency #5143

Closed
wants to merge 11 commits into from
163 changes: 96 additions & 67 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ package fetch
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/network"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/server"
Expand All @@ -30,6 +30,8 @@ const (
OpnProtocol = "lp/2"

cacheSize = 1000

RedundantPeers = 10
)

var (
Expand Down Expand Up @@ -79,7 +81,6 @@ func (b *batchInfo) toMap() map[types.Hash32]RequestMessage {
// Config is the configuration file of the Fetch component.
type Config struct {
BatchTimeout time.Duration // in milliseconds
MaxRetriesForPeer int
BatchSize, QueueSize int
RequestTimeout time.Duration // in seconds
MaxRetriesForRequest int
Expand All @@ -89,7 +90,6 @@ type Config struct {
func DefaultConfig() Config {
return Config{
BatchTimeout: time.Millisecond * time.Duration(50),
MaxRetriesForPeer: 2,
QueueSize: 20,
BatchSize: 20,
RequestTimeout: time.Second * time.Duration(10),
Expand Down Expand Up @@ -144,6 +144,7 @@ type Fetch struct {
logger log.Log
bs *datastore.BlobStore
host host
peers *peers.Peers

servers map[string]requester
validators *dataValidators
Expand All @@ -165,13 +166,20 @@ type Fetch struct {
}

// NewFetch creates a new Fetch struct.
func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter, host *p2p.Host, opts ...Option) *Fetch {
func NewFetch(
cdb *datastore.CachedDB,
msh meshProvider,
b system.BeaconGetter,
host *p2p.Host,
opts ...Option,
) *Fetch {
bs := datastore.NewBlobStore(cdb.Database)
f := &Fetch{
cfg: DefaultConfig(),
logger: log.NewNop(),
bs: bs,
host: host,
peers: peers.New(),
servers: map[string]requester{},
unprocessed: make(map[types.Hash32]*request),
ongoing: make(map[types.Hash32]*request),
Expand All @@ -181,6 +189,28 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter,
for _, opt := range opts {
opt(f)
}
// NOTE(dshulyak) this is to avoid tests refactoring.
// there is one test that covers this part.
if host != nil {
connectedf := func(peer p2p.Peer) {
f.logger.With().Debug("add peer", log.Stringer("id", peer))
f.peers.Add(peer)
}
host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, c network.Conn) {
connectedf(c.RemotePeer())
},
DisconnectedF: func(_ network.Network, c network.Conn) {
f.logger.With().Debug("remove peer", log.Stringer("id", c.RemotePeer()))
f.peers.Delete(c.RemotePeer())
},
})
for _, peer := range host.GetPeers() {
if host.Connected(peer) {
connectedf(peer)
}
}
}

f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout)
srvOpts := []server.Opt{
Expand All @@ -190,11 +220,23 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter,
if len(f.servers) == 0 {
h := newHandler(cdb, bs, msh, b, f.logger)
f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(host, lyrDataProtocol, h.handleLayerDataReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(
host,
lyrDataProtocol,
h.handleLayerDataReq,
srvOpts...)
f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(host, meshHashProtocol, h.handleMeshHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(
host,
meshHashProtocol,
h.handleMeshHashReq,
srvOpts...)
f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...)
f.servers[OpnProtocol] = server.New(host, OpnProtocol, h.handleLayerOpinionsReq2, srvOpts...)
f.servers[OpnProtocol] = server.New(
host,
OpnProtocol,
h.handleLayerOpinionsReq2,
srvOpts...)
}
return f
}
Expand Down Expand Up @@ -255,9 +297,6 @@ func (f *Fetch) Stop() {
f.logger.Info("stopping fetch")
f.batchTimeout.Stop()
f.cancel()
if err := f.host.Close(); err != nil {
f.logger.With().Warning("error closing host", log.Err(err))
}
f.mu.Lock()
for _, req := range f.unprocessed {
close(req.promise.completed)
Expand Down Expand Up @@ -423,7 +462,9 @@ func (f *Fetch) getUnprocessed() []RequestMessage {
var requestList []RequestMessage
// only send one request per hash
for hash, req := range f.unprocessed {
f.logger.WithContext(req.ctx).With().Debug("processing hash request", log.Stringer("hash", hash))
f.logger.WithContext(req.ctx).
With().
Debug("processing hash request", log.Stringer("hash", hash))
requestList = append(requestList, RequestMessage{Hash: hash, Hint: req.hint})
// move the processed requests to pending
f.ongoing[hash] = req
Expand All @@ -450,16 +491,17 @@ func (f *Fetch) send(requests []RequestMessage) {
peer: peer,
}
batch.setID()
_ = f.sendBatch(peer, batch)
f.sendBatch(peer, batch)
}
}
}

func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]RequestMessage {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
peer2requests := make(map[p2p.Peer][]RequestMessage)
peers := f.host.GetPeers()
if len(peers) == 0 {

best := f.peers.SelectBest(RedundantPeers)
if len(best) == 0 {
f.logger.Info("cannot send batch: no peers found")
f.mu.Lock()
defer f.mu.Unlock()
Expand All @@ -479,16 +521,10 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req
return nil
}
for _, req := range requests {
target := p2p.NoPeer
hashPeers := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng)
for _, p := range hashPeers {
if f.host.Connected(p) {
target = p
break
}
}
target := f.peers.SelectBestFrom(hashPeers)
if target == p2p.NoPeer {
target = randomPeer(peers)
target = randomPeer(best)
}
_, ok := peer2requests[target]
if !ok {
Expand Down Expand Up @@ -519,51 +555,42 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req
}

// sendBatch dispatches batched request messages to provided peer.
func (f *Fetch) sendBatch(p p2p.Peer, batch *batchInfo) error {
func (f *Fetch) sendBatch(peer p2p.Peer, batch *batchInfo) {
if f.stopped() {
return
}
f.mu.Lock()
f.batched[batch.ID] = batch
f.mu.Unlock()

f.logger.With().Debug("sending batch request",
f.logger.With().Debug("sending batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Stringer("peer", batch.peer))
// timeout function will be called if no response was received for the hashes sent
errorFunc := func(err error) {
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", peer),
)
// Request is asynchronous,
// it will return errors only if size of the bytes buffer is large
// or target peer is not connected
start := time.Now()
errf := func(err error) {
f.logger.With().Warning("failed to send batch",
log.Stringer("batch_hash", batch.ID),
log.Err(err))
log.Stringer("batch_hash", peer), log.Err(err),
)
f.peers.OnFailure(peer)
f.handleHashError(batch.ID, err)
}

bytes, err := codec.Encode(&batch.RequestBatch)
err := f.servers[hashProtocol].Request(
f.shutdownCtx,
peer,
codec.MustEncode(&batch.RequestBatch),
func(buf []byte) {
f.peers.OnLatency(peer, time.Since(start))
f.receiveResponse(buf)
},
errf,
)
if err != nil {
f.logger.With().Panic("failed to encode batch", log.Err(err))
}

// try sending batch to provided peer
retries := 0
for {
if f.stopped() {
return nil
}

f.logger.With().Debug("sending batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", p))

err = f.servers[hashProtocol].Request(f.shutdownCtx, p, bytes, f.receiveResponse, errorFunc)
if err == nil {
break
}

retries++
if retries > f.cfg.MaxRetriesForPeer {
f.handleHashError(batch.ID, fmt.Errorf("batched request failed w retries: %w", err))
break
}
errf(err)
}
return err
}

// handleHashError is called when an error occurred processing batches of the following hashes.
Expand All @@ -580,7 +607,8 @@ func (f *Fetch) handleHashError(batchHash types.Hash32, err error) {
for _, br := range batch.Requests {
req, ok := f.ongoing[br.Hash]
if !ok {
f.logger.With().Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash))
f.logger.With().
Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash))
continue
}
f.logger.WithContext(req.ctx).With().Warning("hash request failed",
Expand All @@ -596,7 +624,12 @@ func (f *Fetch) handleHashError(batchHash types.Hash32, err error) {

// getHash is the regular buffered call to get a specific hash, using provided hash, h as hint the receiving end will
// know where to look for the hash, this function returns HashDataPromiseResult channel that will hold Data received or error.
func (f *Fetch) getHash(ctx context.Context, hash types.Hash32, h datastore.Hint, receiver dataReceiver) (*promise, error) {
func (f *Fetch) getHash(
ctx context.Context,
hash types.Hash32,
h datastore.Hint,
receiver dataReceiver,
) (*promise, error) {
if f.stopped() {
return nil, f.shutdownCtx.Err()
}
Expand Down Expand Up @@ -650,10 +683,6 @@ func (f *Fetch) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) {
f.hashToPeers.RegisterPeerHashes(peer, hashes)
}

func (f *Fetch) GetPeers() []p2p.Peer {
return f.host.GetPeers()
}

func (f *Fetch) PeerProtocols(p p2p.Peer) ([]protocol.ID, error) {
return f.host.PeerProtocols(p)
func (f *Fetch) SelectBest(n int) []p2p.Peer {
return f.peers.SelectBest(n)
}
Loading