Skip to content

Commit

Permalink
Try #5143:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] authored Oct 12, 2023
2 parents c699075 + e0e3cfa commit 1b3ddf7
Show file tree
Hide file tree
Showing 18 changed files with 1,032 additions and 572 deletions.
163 changes: 96 additions & 67 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ package fetch
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/network"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/server"
Expand All @@ -30,6 +30,8 @@ const (
OpnProtocol = "lp/2"

cacheSize = 1000

RedundantPeers = 10
)

var (
Expand Down Expand Up @@ -79,7 +81,6 @@ func (b *batchInfo) toMap() map[types.Hash32]RequestMessage {
// Config is the configuration file of the Fetch component.
type Config struct {
BatchTimeout time.Duration // in milliseconds
MaxRetriesForPeer int
BatchSize, QueueSize int
RequestTimeout time.Duration // in seconds
MaxRetriesForRequest int
Expand All @@ -89,7 +90,6 @@ type Config struct {
func DefaultConfig() Config {
return Config{
BatchTimeout: time.Millisecond * time.Duration(50),
MaxRetriesForPeer: 2,
QueueSize: 20,
BatchSize: 20,
RequestTimeout: time.Second * time.Duration(10),
Expand Down Expand Up @@ -144,6 +144,7 @@ type Fetch struct {
logger log.Log
bs *datastore.BlobStore
host host
peers *peers.Peers

servers map[string]requester
validators *dataValidators
Expand All @@ -165,13 +166,20 @@ type Fetch struct {
}

// NewFetch creates a new Fetch struct.
func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter, host *p2p.Host, opts ...Option) *Fetch {
func NewFetch(
cdb *datastore.CachedDB,
msh meshProvider,
b system.BeaconGetter,
host *p2p.Host,
opts ...Option,
) *Fetch {
bs := datastore.NewBlobStore(cdb.Database)
f := &Fetch{
cfg: DefaultConfig(),
logger: log.NewNop(),
bs: bs,
host: host,
peers: peers.New(),
servers: map[string]requester{},
unprocessed: make(map[types.Hash32]*request),
ongoing: make(map[types.Hash32]*request),
Expand All @@ -181,6 +189,28 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter,
for _, opt := range opts {
opt(f)
}
// NOTE(dshulyak) this is to avoid tests refactoring.
// there is one test that covers this part.
if host != nil {
connectedf := func(peer p2p.Peer) {
f.logger.With().Debug("add peer", log.Stringer("id", peer))
f.peers.Add(peer)
}
host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, c network.Conn) {
connectedf(c.RemotePeer())
},
DisconnectedF: func(_ network.Network, c network.Conn) {
f.logger.With().Debug("remove peer", log.Stringer("id", c.RemotePeer()))
f.peers.Delete(c.RemotePeer())
},
})
for _, peer := range host.GetPeers() {
if host.Connected(peer) {
connectedf(peer)
}
}
}

f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout)
srvOpts := []server.Opt{
Expand All @@ -190,11 +220,23 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter,
if len(f.servers) == 0 {
h := newHandler(cdb, bs, msh, b, f.logger)
f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(host, lyrDataProtocol, h.handleLayerDataReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(
host,
lyrDataProtocol,
h.handleLayerDataReq,
srvOpts...)
f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(host, meshHashProtocol, h.handleMeshHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(
host,
meshHashProtocol,
h.handleMeshHashReq,
srvOpts...)
f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...)
f.servers[OpnProtocol] = server.New(host, OpnProtocol, h.handleLayerOpinionsReq2, srvOpts...)
f.servers[OpnProtocol] = server.New(
host,
OpnProtocol,
h.handleLayerOpinionsReq2,
srvOpts...)
}
return f
}
Expand Down Expand Up @@ -255,9 +297,6 @@ func (f *Fetch) Stop() {
f.logger.Info("stopping fetch")
f.batchTimeout.Stop()
f.cancel()
if err := f.host.Close(); err != nil {
f.logger.With().Warning("error closing host", log.Err(err))
}
f.mu.Lock()
for _, req := range f.unprocessed {
close(req.promise.completed)
Expand Down Expand Up @@ -423,7 +462,9 @@ func (f *Fetch) getUnprocessed() []RequestMessage {
var requestList []RequestMessage
// only send one request per hash
for hash, req := range f.unprocessed {
f.logger.WithContext(req.ctx).With().Debug("processing hash request", log.Stringer("hash", hash))
f.logger.WithContext(req.ctx).
With().
Debug("processing hash request", log.Stringer("hash", hash))
requestList = append(requestList, RequestMessage{Hash: hash, Hint: req.hint})
// move the processed requests to pending
f.ongoing[hash] = req
Expand All @@ -450,16 +491,17 @@ func (f *Fetch) send(requests []RequestMessage) {
peer: peer,
}
batch.setID()
_ = f.sendBatch(peer, batch)
f.sendBatch(peer, batch)
}
}
}

func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]RequestMessage {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
peer2requests := make(map[p2p.Peer][]RequestMessage)
peers := f.host.GetPeers()
if len(peers) == 0 {

best := f.peers.SelectBest(RedundantPeers)
if len(best) == 0 {
f.logger.Info("cannot send batch: no peers found")
f.mu.Lock()
defer f.mu.Unlock()
Expand All @@ -479,16 +521,10 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req
return nil
}
for _, req := range requests {
target := p2p.NoPeer
hashPeers := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng)
for _, p := range hashPeers {
if f.host.Connected(p) {
target = p
break
}
}
target := f.peers.SelectBestFrom(hashPeers)
if target == p2p.NoPeer {
target = randomPeer(peers)
target = randomPeer(best)
}
_, ok := peer2requests[target]
if !ok {
Expand Down Expand Up @@ -519,51 +555,42 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req
}

// sendBatch dispatches batched request messages to provided peer.
func (f *Fetch) sendBatch(p p2p.Peer, batch *batchInfo) error {
func (f *Fetch) sendBatch(peer p2p.Peer, batch *batchInfo) {
if f.stopped() {
return
}
f.mu.Lock()
f.batched[batch.ID] = batch
f.mu.Unlock()

f.logger.With().Debug("sending batch request",
f.logger.With().Debug("sending batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Stringer("peer", batch.peer))
// timeout function will be called if no response was received for the hashes sent
errorFunc := func(err error) {
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", peer),
)
// Request is asynchronous,
// it will return errors only if size of the bytes buffer is large
// or target peer is not connected
start := time.Now()
errf := func(err error) {
f.logger.With().Warning("failed to send batch",
log.Stringer("batch_hash", batch.ID),
log.Err(err))
log.Stringer("batch_hash", peer), log.Err(err),
)
f.peers.OnFailure(peer)
f.handleHashError(batch.ID, err)
}

bytes, err := codec.Encode(&batch.RequestBatch)
err := f.servers[hashProtocol].Request(
f.shutdownCtx,
peer,
codec.MustEncode(&batch.RequestBatch),
func(buf []byte) {
f.peers.OnLatency(peer, time.Since(start))
f.receiveResponse(buf)
},
errf,
)
if err != nil {
f.logger.With().Panic("failed to encode batch", log.Err(err))
}

// try sending batch to provided peer
retries := 0
for {
if f.stopped() {
return nil
}

f.logger.With().Debug("sending batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", p))

err = f.servers[hashProtocol].Request(f.shutdownCtx, p, bytes, f.receiveResponse, errorFunc)
if err == nil {
break
}

retries++
if retries > f.cfg.MaxRetriesForPeer {
f.handleHashError(batch.ID, fmt.Errorf("batched request failed w retries: %w", err))
break
}
errf(err)
}
return err
}

// handleHashError is called when an error occurred processing batches of the following hashes.
Expand All @@ -580,7 +607,8 @@ func (f *Fetch) handleHashError(batchHash types.Hash32, err error) {
for _, br := range batch.Requests {
req, ok := f.ongoing[br.Hash]
if !ok {
f.logger.With().Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash))
f.logger.With().
Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash))
continue
}
f.logger.WithContext(req.ctx).With().Warning("hash request failed",
Expand All @@ -596,7 +624,12 @@ func (f *Fetch) handleHashError(batchHash types.Hash32, err error) {

// getHash is the regular buffered call to get a specific hash, using provided hash, h as hint the receiving end will
// know where to look for the hash, this function returns HashDataPromiseResult channel that will hold Data received or error.
func (f *Fetch) getHash(ctx context.Context, hash types.Hash32, h datastore.Hint, receiver dataReceiver) (*promise, error) {
func (f *Fetch) getHash(
ctx context.Context,
hash types.Hash32,
h datastore.Hint,
receiver dataReceiver,
) (*promise, error) {
if f.stopped() {
return nil, f.shutdownCtx.Err()
}
Expand Down Expand Up @@ -650,10 +683,6 @@ func (f *Fetch) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) {
f.hashToPeers.RegisterPeerHashes(peer, hashes)
}

func (f *Fetch) GetPeers() []p2p.Peer {
return f.host.GetPeers()
}

func (f *Fetch) PeerProtocols(p p2p.Peer) ([]protocol.ID, error) {
return f.host.PeerProtocols(p)
func (f *Fetch) SelectBest(n int) []p2p.Peer {
return f.peers.SelectBest(n)
}
Loading

0 comments on commit 1b3ddf7

Please sign in to comment.