Skip to content

Commit

Permalink
replaced faulty mapset.Set with lrucache
Browse files Browse the repository at this point in the history
  • Loading branch information
sadoci committed Sep 23, 2019
1 parent b8d83b1 commit f76d5a5
Showing 1 changed file with 15 additions and 21 deletions.
36 changes: 15 additions & 21 deletions eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"sync"
"time"

mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/batch"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core/types"
metaapi "github.com/ethereum/go-ethereum/metadium/api"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -87,8 +87,8 @@ type peer struct {
td *big.Int
lock sync.RWMutex

knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
knownTxs *lru.LruCache // Set of transaction hashes known to be known by this peer
knownBlocks *lru.LruCache // Set of block hashes known to be known by this peer
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
Expand All @@ -101,8 +101,8 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
rw: rw,
version: version,
id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
knownTxs: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
knownTxs: lru.NewLruCache(maxKnownTxs, true),
knownBlocks: lru.NewLruCache(maxKnownBlocks, true),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
Expand Down Expand Up @@ -194,27 +194,21 @@ func (p *peer) SetHead(hash common.Hash, td *big.Int) {
// never be propagated to this particular peer.
func (p *peer) MarkBlock(hash common.Hash) {
// If we reached the memory allowance, drop a previously known block hash
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(hash)
p.knownBlocks.Put(hash, true)
}

// MarkTransaction marks a transaction as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *peer) MarkTransaction(hash common.Hash) {
// If we reached the memory allowance, drop a previously known transaction hash
for p.knownTxs.Cardinality() >= maxKnownTxs {
p.knownTxs.Pop()
}
p.knownTxs.Add(hash)
p.knownTxs.Put(hash, true)
}

// SendTransactions sends transactions to the peer and includes the hashes
// in its transaction hash set for future reference.
func (p *peer) SendTransactions(txs types.Transactions) error {
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
p.knownTxs.Put(tx.Hash(), true)
}
return p2p.Send(p.rw, TxMsg, txs)
}
Expand All @@ -225,7 +219,7 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
select {
case p.queuedTxs <- txs:
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
p.knownTxs.Put(tx.Hash(), true)
}
default:
p.Log().Debug("Dropping transaction propagation", "count", len(txs))
Expand All @@ -243,7 +237,7 @@ func (p *peer) resendPendingTxs(txs map[common.Address]types.Transactions) {
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
for _, hash := range hashes {
p.knownBlocks.Add(hash)
p.knownBlocks.Put(hash, true)
}
request := make(newBlockHashesData, len(hashes))
for i := 0; i < len(hashes); i++ {
Expand All @@ -259,15 +253,15 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
select {
case p.queuedAnns <- block:
p.knownBlocks.Add(block.Hash())
p.knownBlocks.Put(block.Hash(), true)
default:
p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
}
}

// SendNewBlock propagates an entire block to a remote peer.
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
p.knownBlocks.Add(block.Hash())
p.knownBlocks.Put(block.Hash(), true)
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
}

Expand All @@ -276,7 +270,7 @@ func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
select {
case p.queuedProps <- &propEvent{block: block, td: td}:
p.knownBlocks.Add(block.Hash())
p.knownBlocks.Put(block.Hash(), true)
default:
p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
}
Expand Down Expand Up @@ -522,7 +516,7 @@ func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {

list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.knownBlocks.Contains(hash) {
if !p.knownBlocks.Exists(hash) {
list = append(list, p)
}
}
Expand All @@ -537,7 +531,7 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {

list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.knownTxs.Contains(hash) {
if !p.knownTxs.Exists(hash) {
list = append(list, p)
}
}
Expand Down

0 comments on commit f76d5a5

Please sign in to comment.