Skip to content

Commit

Permalink
Merge branch 'devel' into eip-4788-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
yperbasis committed Aug 23, 2023
2 parents 2b89ac5 + 83d767c commit 8015ff5
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 59 deletions.
3 changes: 3 additions & 0 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,8 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
blockReader, blockWriter := blocksIO(db, logger)
engine, heimdallClient := initConsensusEngine(chainConfig, cfg.Dirs.DataDir, db, blockReader, logger)

maxBlockBroadcastPeers := func(header *types.Header) uint { return 0 }

sentryControlServer, err := sentry.NewMultiClient(
db,
"",
Expand All @@ -1520,6 +1522,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
blockBufferSize,
false,
nil,
maxBlockBroadcastPeers,
ethconfig.Defaults.DropUselessPeers,
logger,
)
Expand Down
69 changes: 26 additions & 43 deletions cmd/sentry/sentry/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sentry
import (
"context"
"errors"
"math"
"math/big"
"strings"
"syscall"
Expand All @@ -19,55 +18,43 @@ import (
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)

// Methods of sentry called by Core

const (
// This is the target size for the packs of transactions or announcements. A
// pack can get larger than this if a single transactions exceeds this size.
maxTxPacketSize = 100 * 1024
)

func (cs *MultiClient) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce) {
cs.lock.RLock()
defer cs.lock.RUnlock()

typedRequest := make(eth.NewBlockHashesPacket, len(announces))
for i := range announces {
typedRequest[i].Hash = announces[i].Hash
typedRequest[i].Number = announces[i].Number
}

data, err := rlp.EncodeToBytes(&typedRequest)
if err != nil {
log.Error("propagateNewBlockHashes", "err", err)
return
}
var req66 *proto_sentry.OutboundMessageData
// Send the block to a subset of our peers
sendToAmount := int(math.Sqrt(float64(len(cs.sentries))))
for i, sentry := range cs.sentries {

req66 := proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_BLOCK_HASHES_66,
Data: data,
}

for _, sentry := range cs.sentries {
if !sentry.Ready() {
continue
}
if i > sendToAmount { //TODO: send to random sentries, not just to fi
break
}

if req66 == nil {
req66 = &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_BLOCK_HASHES_66,
Data: data,
}

_, err = sentry.SendMessageToAll(ctx, req66, &grpc.EmptyCallOption{})
if err != nil {
log.Error("propagateNewBlockHashes", "err", err)
}
_, err = sentry.SendMessageToAll(ctx, &req66, &grpc.EmptyCallOption{})
if err != nil {
log.Error("propagateNewBlockHashes", "err", err)
}
}
}

func (cs *MultiClient) BroadcastNewBlock(ctx context.Context, header *types.Header, body *types.RawBody, td *big.Int) {
cs.lock.RLock()
defer cs.lock.RUnlock()

txs := make([]types.Transaction, len(body.Transactions))
for i, tx := range body.Transactions {
var err error
Expand All @@ -76,34 +63,30 @@ func (cs *MultiClient) BroadcastNewBlock(ctx context.Context, header *types.Head
return
}
}

data, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
Block: types.NewBlock(header, txs, body.Uncles, nil, body.Withdrawals),
TD: td,
})
if err != nil {
log.Error("broadcastNewBlock", "err", err)
}
var req66 *proto_sentry.SendMessageToRandomPeersRequest
// Send the block to a subset of our peers
sendToAmount := int(math.Sqrt(float64(len(cs.sentries))))
for i, sentry := range cs.sentries {

req66 := proto_sentry.SendMessageToRandomPeersRequest{
MaxPeers: uint64(cs.maxBlockBroadcastPeers(header)),
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_BLOCK_66,
Data: data,
},
}

for _, sentry := range cs.sentries {
if !sentry.Ready() {
continue
}
if i > sendToAmount { //TODO: send to random sentries, not just to fi
break
}

if req66 == nil {
req66 = &proto_sentry.SendMessageToRandomPeersRequest{
MaxPeers: 1024,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_BLOCK_66,
Data: data,
},
}
}
if _, err = sentry.SendMessageToRandomPeers(ctx, req66, &grpc.EmptyCallOption{}); err != nil {
_, err = sentry.SendMessageToRandomPeers(ctx, &req66, &grpc.EmptyCallOption{})
if err != nil {
if isPeerNotFoundErr(err) || networkTemporaryErr(err) {
log.Debug("broadcastNewBlock", "err", err)
continue
Expand Down
17 changes: 9 additions & 8 deletions cmd/sentry/sentry/sentry_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,28 +911,29 @@ func (ss *GrpcServer) SendMessageToRandomPeers(ctx context.Context, req *proto_s
return reply, fmt.Errorf("sendMessageToRandomPeers not implemented for message Id: %s", req.Data.Id)
}

peerInfos := make([]*PeerInfo, 0, 32) // 32 gives capacity for 1024 peers, well beyond default
peerInfos := make([]*PeerInfo, 0, 100)
ss.rangePeers(func(peerInfo *PeerInfo) bool {
peerInfos = append(peerInfos, peerInfo)
return true
})
rand.Shuffle(len(peerInfos), func(i int, j int) {
peerInfos[i], peerInfos[j] = peerInfos[j], peerInfos[i]
})
peersToSendCount := len(peerInfos)
if peersToSendCount > 0 {
peerCountConstrained := math.Min(float64(len(peerInfos)), float64(req.MaxPeers))
// Ensure we have at least 1 peer during our sqrt operation
peersToSendCount = int(math.Max(math.Sqrt(peerCountConstrained), 1.0))

var peersToSendCount int
if req.MaxPeers > 0 {
peersToSendCount = int(math.Min(float64(req.MaxPeers), float64(len(peerInfos))))
} else {
// MaxPeers == 0 means send to all
peersToSendCount = len(peerInfos)
}

var lastErr error
// Send the block to a subset of our peers at random
for _, peerInfo := range peerInfos[:peersToSendCount] {
ss.writePeer("[sentry] sendMessageToRandomPeers", peerInfo, msgcode, req.Data.Data, 0)
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH512(peerInfo.ID()))
}
return reply, lastErr
return reply, nil
}

func (ss *GrpcServer) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) {
Expand Down
3 changes: 3 additions & 0 deletions cmd/sentry/sentry/sentry_multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ type MultiClient struct {
blockReader services.FullBlockReader
logPeerInfo bool
sendHeaderRequestsToMultiplePeers bool
maxBlockBroadcastPeers func(*types.Header) uint

historyV3 bool
dropUselessPeers bool
Expand All @@ -287,6 +288,7 @@ func NewMultiClient(
blockBufferSize int,
logPeerInfo bool,
forkValidator *engine_helpers.ForkValidator,
maxBlockBroadcastPeers func(*types.Header) uint,
dropUselessPeers bool,
logger log.Logger,
) (*MultiClient, error) {
Expand Down Expand Up @@ -319,6 +321,7 @@ func NewMultiClient(
logPeerInfo: logPeerInfo,
historyV3: historyV3,
sendHeaderRequestsToMultiplePeers: chainConfig.TerminalTotalDifficultyPassed,
maxBlockBroadcastPeers: maxBlockBroadcastPeers,
dropUselessPeers: dropUselessPeers,
logger: logger,
}
Expand Down
32 changes: 26 additions & 6 deletions consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,10 @@ func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash li
parents = parents[:len(parents)-1]
} else {
// No explicit parents (or no more left), reach out to the database
header = chain.GetHeader(hash, number)
if chain != nil {
header = chain.GetHeader(hash, number)
}

if header == nil {
return nil, consensus.ErrUnknownAncestor
}
Expand All @@ -686,7 +689,7 @@ func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash li
headers = append(headers, header)
number, hash = number-1, header.ParentHash

if number < chain.FrozenBlocks() {
if chain != nil && number < chain.FrozenBlocks() {
break
}

Expand All @@ -696,7 +699,7 @@ func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash li
default:
}
}
if snap == nil && number <= chain.FrozenBlocks() {
if snap == nil && chain != nil && number <= chain.FrozenBlocks() {
// Special handling of the headers in the snapshot
zeroHeader := chain.GetHeaderByNumber(0)
if zeroHeader != nil {
Expand Down Expand Up @@ -1141,16 +1144,33 @@ func (c *Bor) Seal(chain consensus.ChainHeaderReader, block *types.Block, result
return nil
}

// IsValidator returns true if this instance is the validator for this block
func (c *Bor) IsValidator(header *types.Header) (bool, error) {
number := header.Number.Uint64()

if number == 0 {
return false, nil
}

snap, err := c.snapshot(nil, number-1, header.ParentHash, nil)
if err != nil {
return false, err
}

currentSigner := c.authorizedSigner.Load()

return snap.ValidatorSet.HasAddress(currentSigner.signer), nil
}

// IsProposer returns true if this instance is the proposer for this block
func (c *Bor) IsProposer(chain consensus.ChainHeaderReader, block *types.Block) (bool, error) {
header := block.Header()
func (c *Bor) IsProposer(header *types.Header) (bool, error) {
number := header.Number.Uint64()

if number == 0 {
return false, nil
}

snap, err := c.snapshot(chain, number-1, header.ParentHash, nil)
snap, err := c.snapshot(nil, number-1, header.ParentHash, nil)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/bor/bor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (v validator) generateChain(length int) (*core.ChainPack, error) {
}

func (v validator) IsProposer(block *types.Block) (bool, error) {
return v.Engine.(*bor.Bor).IsProposer(headerReader{v}, block)
return v.Engine.(*bor.Bor).IsProposer(block.Header())
}

func (v validator) sealBlocks(blocks []*types.Block) ([]*types.Block, error) {
Expand Down
1 change: 0 additions & 1 deletion consensus/misc/eip4788.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"github.com/ledgerwatch/log/v3"

libcommon "github.com/ledgerwatch/erigon-lib/common"

"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/params"
)
Expand Down
21 changes: 21 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,26 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
}
backend.forkValidator = engine_helpers.NewForkValidator(ctx, currentBlockNumber, inMemoryExecution, tmpdir, backend.blockReader)

// limit "new block" broadcasts to at most 10 random peers at time
maxBlockBroadcastPeers := func(header *types.Header) uint { return 10 }

// unlimited "new block" broadcasts to all peers for blocks announced by Bor validators
if borEngine, ok := backend.engine.(*bor.Bor); ok {
defaultValue := maxBlockBroadcastPeers(nil)
maxBlockBroadcastPeers = func(header *types.Header) uint {
isValidator, err := borEngine.IsValidator(header)
if err != nil {
logger.Error("maxBlockBroadcastPeers: borEngine.IsValidator has failed", "err", err)
return defaultValue
}
if isValidator {
// 0 means send to all
return 0
}
return defaultValue
}
}

backend.sentriesClient, err = sentry.NewMultiClient(
chainKv,
stack.Config().NodeName(),
Expand All @@ -502,6 +522,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
blockBufferSize,
stack.Config().SentryLogPeerInfo,
backend.forkValidator,
maxBlockBroadcastPeers,
config.DropUselessPeers,
logger,
)
Expand Down
3 changes: 3 additions & 0 deletions turbo/stages/mock/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
}
forkValidator := engine_helpers.NewForkValidator(ctx, 1, inMemoryExecution, dirs.Tmp, mock.BlockReader)
networkID := uint64(1)
maxBlockBroadcastPeers := func(header *types.Header) uint { return 0 }

mock.sentriesClient, err = sentry.NewMultiClient(
mock.DB,
"mock",
Expand All @@ -378,6 +380,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
blockBufferSize,
false,
forkValidator,
maxBlockBroadcastPeers,
cfg.DropUselessPeers,
logger,
)
Expand Down

0 comments on commit 8015ff5

Please sign in to comment.