Skip to content

Commit

Permalink
eth/fetcher: fix blob transaction propagation (ethereum#30125)
Browse files Browse the repository at this point in the history
This PR fixes an issue with blob transaction propagation due to the blob
transation txpool rejecting transactions with gapped nonces. The
specific changes are:

- fetch transactions from a peer in the order they were announced to
minimize nonce-gaps (which cause blob txs to be rejected

- don't wait on fetching blob transactions after announcement is
received, since they are not broadcast

Testing:
- unit tests updated to reflect that fetch order should always match tx
announcement order
- unit test added to confirm blob transactions are scheduled immediately
for fetching
  - running the PR on an eth mainnet full node without incident so far

---------

Signed-off-by: Roberto Bayardo <[email protected]>
Co-authored-by: Gary Rong <[email protected]>
  • Loading branch information
2 people authored and zfy0701 committed Dec 3, 2024
1 parent 4b236d6 commit 2a247aa
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 83 deletions.
11 changes: 10 additions & 1 deletion cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,16 @@ func (s *Suite) TestBlobViolations(t *utesting.T) {
if code, _, err := conn.Read(); err != nil {
t.Fatalf("expected disconnect on blob violation, got err: %v", err)
} else if code != discMsg {
t.Fatalf("expected disconnect on blob violation, got msg code: %d", code)
if code == protoOffset(ethProto)+eth.NewPooledTransactionHashesMsg {
// sometimes we'll get a blob transaction hashes announcement before the disconnect
// because blob transactions are scheduled to be fetched right away.
if code, _, err = conn.Read(); err != nil {
t.Fatalf("expected disconnect on blob violation, got err on second read: %v", err)
}
}
if code != discMsg {
t.Fatalf("expected disconnect on blob violation, got msg code: %d", code)
}
}
conn.Close()
}
Expand Down
177 changes: 104 additions & 73 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package fetcher

import (
"bytes"
"errors"
"fmt"
"math"
Expand All @@ -35,7 +34,7 @@ import (
)

const (
// maxTxAnnounces is the maximum number of unique transaction a peer
// maxTxAnnounces is the maximum number of unique transactions a peer
// can announce in a short time.
maxTxAnnounces = 4096

Expand Down Expand Up @@ -114,16 +113,23 @@ var errTerminated = errors.New("terminated")
type txAnnounce struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes being announced
metas []*txMetadata // Batch of metadata associated with the hashes
metas []txMetadata // Batch of metadata associated with the hashes
}

// txMetadata is a set of extra data transmitted along the announcement for better
// fetch scheduling.
// txMetadata provides the extra data transmitted along with the announcement
// for better fetch scheduling.
type txMetadata struct {
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes
}

// txMetadataWithSeq is a wrapper of transaction metadata with an extra field
// tracking the transaction sequence number.
type txMetadataWithSeq struct {
txMetadata
seq uint64
}

// txRequest represents an in-flight transaction retrieval request destined to
// a specific peers.
type txRequest struct {
Expand Down Expand Up @@ -159,7 +165,7 @@ type txDrop struct {
// The invariants of the fetcher are:
// - Each tracked transaction (hash) must only be present in one of the
// three stages. This ensures that the fetcher operates akin to a finite
// state automata and there's do data leak.
// state automata and there's no data leak.
// - Each peer that announced transactions may be scheduled retrievals, but
// only ever one concurrently. This ensures we can immediately know what is
// missing from a reply and reschedule it.
Expand All @@ -169,18 +175,19 @@ type TxFetcher struct {
drop chan *txDrop
quit chan struct{}

txSeq uint64 // Unique transaction sequence number
underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch)

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection)
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
Expand Down Expand Up @@ -218,8 +225,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadata),
announces: make(map[string]map[common.Hash]*txMetadata),
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
Expand Down Expand Up @@ -247,7 +254,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
// loop, so anything caught here is time saved internally.
var (
unknownHashes = make([]common.Hash, 0, len(hashes))
unknownMetas = make([]*txMetadata, 0, len(hashes))
unknownMetas = make([]txMetadata, 0, len(hashes))

duplicate int64
underpriced int64
Expand All @@ -264,7 +271,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
// Transaction metadata has been available since eth68, and all
// legacy eth protocols (prior to eth68) have been deprecated.
// Therefore, metadata is always expected in the announcement.
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]})
unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]})
}
}
txAnnounceKnownMeter.Mark(duplicate)
Expand Down Expand Up @@ -431,9 +438,19 @@ func (f *TxFetcher) loop() {
ann.metas = ann.metas[:want-maxTxAnnounces]
}
// All is well, schedule the remainder of the transactions
idleWait := len(f.waittime) == 0
_, oldPeer := f.announces[ann.origin]

var (
idleWait = len(f.waittime) == 0
_, oldPeer = f.announces[ann.origin]
hasBlob bool

// nextSeq returns the next available sequence number for tagging
// transaction announcement and also bump it internally.
nextSeq = func() uint64 {
seq := f.txSeq
f.txSeq++
return seq
}
)
for i, hash := range ann.hashes {
// If the transaction is already downloading, add it to the list
// of possible alternates (in case the current retrieval fails) and
Expand All @@ -443,9 +460,17 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i]
announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
}
continue
}
Expand All @@ -456,9 +481,17 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i]
announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
}
continue
}
Expand All @@ -475,24 +508,47 @@ func (f *TxFetcher) loop() {
f.waitlist[hash][ann.origin] = struct{}{}

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = ann.metas[i]
waitslots[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
}
continue
}
// Transaction unknown to the fetcher, insert it into the waiting list
f.waitlist[hash] = map[string]struct{}{ann.origin: {}}
f.waittime[hash] = f.clock.Now()

// Assign the current timestamp as the wait time, but for blob transactions,
// skip the wait time since they are only announced.
if ann.metas[i].kind != types.BlobTxType {
f.waittime[hash] = f.clock.Now()
} else {
hasBlob = true
f.waittime[hash] = f.clock.Now() - mclock.AbsTime(txArriveTimeout)
}
if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = ann.metas[i]
waitslots[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
}
}
// If a new item was added to the waitlist, schedule it into the fetcher
if idleWait && len(f.waittime) > 0 {
if hasBlob || (idleWait && len(f.waittime) > 0) {
f.rescheduleWait(waitTimer, waitTrigger)
}
// If this peer is new and announced something already queued, maybe
Expand All @@ -516,7 +572,7 @@ func (f *TxFetcher) loop() {
if announces := f.announces[peer]; announces != nil {
announces[hash] = f.waitslots[peer][hash]
} else {
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
f.announces[peer] = map[common.Hash]*txMetadataWithSeq{hash: f.waitslots[peer][hash]}
}
delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 {
Expand Down Expand Up @@ -873,7 +929,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
hashes = make([]common.Hash, 0, maxTxRetrievals)
bytes uint64
)
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool {
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta txMetadata) bool {
// If the transaction is already fetching, skip to the next one
if _, ok := f.fetching[hash]; ok {
return true
Expand Down Expand Up @@ -938,28 +994,26 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
}
}

// forEachAnnounce does a range loop over a map of announcements in production,
// but during testing it does a deterministic sorted random to allow reproducing
// issues.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) {
// If we're running production, use whatever Go's map gives us
if f.rand == nil {
for hash, meta := range announces {
if !do(hash, meta) {
return
}
}
return
// forEachAnnounce loops over the given announcements in arrival order, invoking
// the do function for each until it returns false. We enforce an arrival
// ordering to minimize the chances of transaction nonce-gaps, which result in
// transactions being rejected by the txpool.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadataWithSeq, do func(hash common.Hash, meta txMetadata) bool) {
type announcement struct {
hash common.Hash
meta txMetadata
seq uint64
}
// We're running the test suite, make iteration deterministic
list := make([]common.Hash, 0, len(announces))
for hash := range announces {
list = append(list, hash)
// Process announcements by their arrival order
list := make([]announcement, 0, len(announces))
for hash, entry := range announces {
list = append(list, announcement{hash: hash, meta: entry.txMetadata, seq: entry.seq})
}
sortHashes(list)
rotateHashes(list, f.rand.Intn(len(list)))
for _, hash := range list {
if !do(hash, announces[hash]) {
sort.Slice(list, func(i, j int) bool {
return list[i].seq < list[j].seq
})
for i := range list {
if !do(list[i].hash, list[i].meta) {
return
}
}
Expand All @@ -975,26 +1029,3 @@ func rotateStrings(slice []string, n int) {
slice[i] = orig[(i+n)%len(orig)]
}
}

// sortHashes sorts a slice of hashes. This method is only used in tests in order
// to simulate random map iteration but keep it deterministic.
func sortHashes(slice []common.Hash) {
for i := 0; i < len(slice); i++ {
for j := i + 1; j < len(slice); j++ {
if bytes.Compare(slice[i][:], slice[j][:]) > 0 {
slice[i], slice[j] = slice[j], slice[i]
}
}
}
}

// rotateHashes rotates the contents of a slice by n steps. This method is only
// used in tests to simulate random map iteration but keep it deterministic.
func rotateHashes(slice []common.Hash, n int) {
orig := make([]common.Hash, len(slice))
copy(orig, slice)

for i := 0; i < len(orig); i++ {
slice[i] = orig[(i+n)%len(orig)]
}
}
Loading

0 comments on commit 2a247aa

Please sign in to comment.