Skip to content

Commit

Permalink
announce message will be sent at beginning of epoch and caching of it…
Browse files Browse the repository at this point in the history
… will check if the destAddresses has changed (ethereum#578)
  • Loading branch information
kevjue authored Nov 14, 2019
1 parent 2200168 commit c76d62e
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 4 deletions.
13 changes: 11 additions & 2 deletions consensus/istanbul/backend/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
mrand "math/rand"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -149,6 +150,8 @@ func (sb *Backend) sendAnnounceMsgs() {

for {
select {
case <-sb.newEpochCh:
go sb.sendIstAnnounce()
case <-ticker.C:
// output the valEnodeTable for debugging purposes
log.Trace("ValidatorEnodeDB dump", "ValidatorEnodeDB", sb.valEnodeTable.String())
Expand Down Expand Up @@ -304,7 +307,9 @@ func (sb *Backend) handleIstAnnounce(payload []byte) error {
nodeKey := ecies.ImportECDSA(sb.GetNodeKey())

encryptedEndpoint := []byte("")
destAddresses := make([]string, 0, len(msg.EncryptedEndpointData))
for _, entry := range msg.EncryptedEndpointData {
destAddresses = append(destAddresses, common.BytesToAddress(entry[0]).String())
if bytes.Equal(entry[0], sb.Address().Bytes()) {
encryptedEndpoint = entry[1]
}
Expand Down Expand Up @@ -339,10 +344,14 @@ func (sb *Backend) handleIstAnnounce(payload []byte) error {
}
}

// Generate the destAddresses hash
sort.Strings(destAddresses)
destAddressesHash := istanbul.RLPHash(destAddresses)

// If we gossiped this address/enodeURL within the last 60 seconds, then don't regossip
sb.lastAnnounceGossipedMu.RLock()
if lastGossipTs, ok := sb.lastAnnounceGossiped[msg.Address]; ok {
if lastGossipTs.enodeURL == enodeURL && time.Since(lastGossipTs.timestamp) < time.Minute {
if lastGossipTs.enodeURL == enodeURL && bytes.Equal(lastGossipTs.destAddressesHash.Bytes(), destAddressesHash.Bytes()) && time.Since(lastGossipTs.timestamp) < time.Minute {
sb.logger.Trace("Already regossiped the msg within the last minute, so not regossiping.", "AnnounceMsg", msg)
sb.lastAnnounceGossipedMu.RUnlock()
return nil
Expand All @@ -355,7 +364,7 @@ func (sb *Backend) handleIstAnnounce(payload []byte) error {

sb.lastAnnounceGossipedMu.Lock()
defer sb.lastAnnounceGossipedMu.Unlock()
sb.lastAnnounceGossiped[msg.Address] = &AnnounceGossipTimestamp{enodeURL: enodeURL, timestamp: time.Now()}
sb.lastAnnounceGossiped[msg.Address] = &AnnounceGossipTimestamp{enodeURL: enodeURL, timestamp: time.Now(), destAddressesHash: destAddressesHash}

// prune non registered validator entries in the valEnodeTable, reverseValEnodeTable, and lastAnnounceGossiped tables about 5% of the times that an announce msg is handled
if (mrand.Int() % 100) <= 5 {
Expand Down
6 changes: 4 additions & 2 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ var (

// Entries for the recent announce messages
type AnnounceGossipTimestamp struct {
enodeURL string
timestamp time.Time
enodeURL string
destAddressesHash common.Hash
timestamp time.Time
}

// New creates an Ethereum backend for Istanbul core engine.
Expand Down Expand Up @@ -153,6 +154,7 @@ type Backend struct {
announceWg *sync.WaitGroup
announceQuit chan struct{}
dataDir string // A read-write data dir to persist files across restarts
newEpochCh chan struct{}
}

// Authorize implements istanbul.Backend.Authorize
Expand Down
5 changes: 5 additions & 0 deletions consensus/istanbul/backend/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,11 @@ func (sb *Backend) Start(hasBadBlock func(common.Hash) bool,
}
sb.commitCh = make(chan *types.Block, 1)

if sb.newEpochCh != nil {
close(sb.newEpochCh)
}
sb.newEpochCh = make(chan struct{})

sb.hasBadBlock = hasBadBlock
sb.stateAt = stateAt
sb.processBlock = processBlock
Expand Down
2 changes: 2 additions & 0 deletions consensus/istanbul/backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func (sb *Backend) NewChainHead() error {
}
// Establish connections to new peers and tear down connections to old ones.
go sb.RefreshValPeers(valset)

sb.newEpochCh <- struct{}{}
}

go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{})
Expand Down

0 comments on commit c76d62e

Please sign in to comment.