Skip to content

Commit

Permalink
eth/fetcher: polish the code
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Aug 15, 2024
1 parent 927509a commit 54336c2
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 103 deletions.
134 changes: 83 additions & 51 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
mrand "math/rand"
"sort"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -117,13 +118,17 @@ type txAnnounce struct {
}

// txMetadata provides the extra data transmitted along with the announcement
// for better fetch scheduling ('kind' & 'size'), plus an extra field
// ('arrival') to keep track of its order of arrival. 'size==0' can be used to
// test for 0 pre-eth/68 announcements. In this case, kind will also be 0.
// for better fetch scheduling.
type txMetadata struct {
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes, or 0 if the announcement didn't include metadata
arrival uint64 // Value that can be used to sort announcements by order of arrival
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes
}

// txMetadataWithSeq is a wrapper of transaction metadata with an extra field
// tracking the transaction sequence number.
type txMetadataWithSeq struct {
txMetadata
seq uint64
}

// txRequest represents an in-flight transaction retrieval request destined to
Expand Down Expand Up @@ -171,18 +176,19 @@ type TxFetcher struct {
drop chan *txDrop
quit chan struct{}

txSeq atomic.Uint64 // Unique transaction sequence number
underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch)

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection)
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
Expand Down Expand Up @@ -220,8 +226,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadata),
announces: make(map[string]map[common.Hash]*txMetadata),
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
Expand Down Expand Up @@ -433,9 +439,11 @@ func (f *TxFetcher) loop() {
ann.metas = ann.metas[:want-maxTxAnnounces]
}
// All is well, schedule the remainder of the transactions
idleWait := len(f.waittime) == 0
_, oldPeer := f.announces[ann.origin]

var (
idleWait = len(f.waittime) == 0
_, oldPeer = f.announces[ann.origin]
hasBlob bool
)
for i, hash := range ann.hashes {
// If the transaction is already downloading, add it to the list
// of possible alternates (in case the current retrieval fails) and
Expand All @@ -445,9 +453,17 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = &ann.metas[i]
announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
}
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
},
}
}
continue
}
Expand All @@ -458,26 +474,18 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = &ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
}
continue
}
// If this is a blob tx, schedule it to fetch without being
// waitlisted since blob txs should not be broadcast. If its
// hash is already on the waitlist, it was previously announced
// as a non-blob (or unknown) tx type. In this case we'll just
// eat the delay and continue handling it as a waitlisted tx to
// keep things simple.
if ann.metas[i].kind == types.BlobTxType && f.waitlist[hash] == nil {
f.announced[hash] = map[string]struct{}{ann.origin: {}}
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = &ann.metas[i]
announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
}
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
},
}
}
f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}})
continue
}
// If the transaction is already known to the fetcher, but not
Expand All @@ -493,24 +501,47 @@ func (f *TxFetcher) loop() {
f.waitlist[hash][ann.origin] = struct{}{}

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = &ann.metas[i]
waitslots[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
}
} else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
},
}
}
continue
}
// Transaction unknown to the fetcher, insert it into the waiting list
f.waitlist[hash] = map[string]struct{}{ann.origin: {}}
f.waittime[hash] = f.clock.Now()

// Assign the current timestamp as the wait time, but for blob transactions,
// skip the wait time since they are only announced.
if ann.metas[i].kind != types.BlobTxType {
f.waittime[hash] = f.clock.Now()
} else {
hasBlob = true
f.waittime[hash] = f.clock.Now() - mclock.AbsTime(txArriveTimeout)
}
if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = &ann.metas[i]
waitslots[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
}
} else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
},
}
}
}
// If a new item was added to the waitlist, schedule it into the fetcher
if idleWait && len(f.waittime) > 0 {
if hasBlob || (idleWait && len(f.waittime) > 0) {
f.rescheduleWait(waitTimer, waitTrigger)
}
// If this peer is new and announced something already queued, maybe
Expand All @@ -534,7 +565,7 @@ func (f *TxFetcher) loop() {
if announces := f.announces[peer]; announces != nil {
announces[hash] = f.waitslots[peer][hash]
} else {
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
f.announces[peer] = map[common.Hash]*txMetadataWithSeq{hash: f.waitslots[peer][hash]}
}
delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 {
Expand Down Expand Up @@ -608,7 +639,7 @@ func (f *TxFetcher) loop() {
for i, hash := range delivery.hashes {
if _, ok := f.waitlist[hash]; ok {
for peer, txset := range f.waitslots {
if meta, ok := txset[hash]; ok && meta.size != 0 {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
Expand All @@ -634,7 +665,7 @@ func (f *TxFetcher) loop() {
delete(f.waittime, hash)
} else {
for peer, txset := range f.announces {
if meta, ok := txset[hash]; ok && meta.size != 0 {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
Expand Down Expand Up @@ -958,20 +989,21 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))

// forEachAnnounce loops over the given announcements in arrival order, invoking
// the do function for each until it returns false. We enforce an arrival
// ordering to minimize the chances of mempool nonce-gaps, which result in blob
// transactions being rejected by the mempool.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta txMetadata) bool) {
// ordering to minimize the chances of transaction nonce-gaps, which result in
// transactions being rejected by the txpool.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadataWithSeq, do func(hash common.Hash, meta txMetadata) bool) {
type announcement struct {
hash common.Hash
meta txMetadata
seq uint64
}
// process announcements by their arrival order
list := make([]announcement, 0, len(announces))
for hash, metadata := range announces {
list = append(list, announcement{hash: hash, meta: *metadata})
for hash, entry := range announces {
list = append(list, announcement{hash: hash, meta: entry.txMetadata, seq: entry.seq})
}
sort.Slice(list, func(i, j int) bool {
return list[i].meta.arrival < list[j].meta.arrival
return list[i].seq < list[j].seq
})
for i := range list {
if !do(list[i].hash, list[i].meta) {
Expand Down
Loading

0 comments on commit 54336c2

Please sign in to comment.