Skip to content

Commit

Permalink
Imported eth.handler from [email protected] (#78)
Browse files Browse the repository at this point in the history
* Imported stop process of eth.handler from [email protected]

* Imported `chainFinalizedHeightFn` of fetcher.BlockFetcher from [email protected]

* remove stop chan from eth/handler (#82)

---------

Co-authored-by: tak <[email protected]>
  • Loading branch information
ironbeer and tak1827 authored Oct 18, 2024
1 parent fb4a70c commit aeff5e9
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 38 deletions.
90 changes: 56 additions & 34 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type blockBroadcasterFn func(block *types.Block, propagate bool)
// chainHeightFn is a callback type to retrieve the current chain height.
type chainHeightFn func() uint64

// chainFinalizedHeightFn is a callback type to retrieve the current chain finalized height.
type chainFinalizedHeightFn func() uint64

// headersInsertFn is a callback type to insert a batch of headers into the local chain.
type headersInsertFn func(headers []*types.Header) (int, error)

Expand Down Expand Up @@ -180,14 +183,15 @@ type BlockFetcher struct {
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)

// Callbacks
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
chainHeight chainHeightFn // Retrieves the current chain's height
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
chainHeight chainHeightFn // Retrieves the current chain's height
chainFinalizedHeight chainFinalizedHeightFn // Retrieves the current chain's finalized height
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving

// Testing hooks
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
Expand All @@ -198,31 +202,34 @@ type BlockFetcher struct {
}

// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn,
broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, chainFinalizedHeight chainFinalizedHeightFn,
insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New[int64, *blockOrHeaderInject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New[int64, *blockOrHeaderInject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
chainFinalizedHeight: chainFinalizedHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
}
}

Expand Down Expand Up @@ -366,7 +373,8 @@ func (f *BlockFetcher) loop() {
break
}
// Otherwise if fresh and still unknown, try and import
if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
finalizedHeight := f.chainFinalizedHeight()
if (number+maxUncleDist < height) || number <= finalizedHeight || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
f.forgetBlock(hash)
continue
}
Expand Down Expand Up @@ -397,7 +405,13 @@ func (f *BlockFetcher) loop() {
}
// If we have a valid block number, check that it's potentially useful
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
log.Debug("Peer discarded announcement by distance", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
blockAnnounceDropMeter.Mark(1)
break
}
finalized := f.chainFinalizedHeight()
if notification.number <= finalized {
log.Debug("Peer discarded announcement by finality", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "finalized", finalized)
blockAnnounceDropMeter.Mark(1)
break
}
Expand Down Expand Up @@ -782,6 +796,14 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
f.forgetHash(hash)
return
}
// Discard any block that is below the current finalized height
finalizedHeight := f.chainFinalizedHeight()
if number <= finalizedHeight {
log.Debug("Discarded delivered header or block, below or equal to finalized", "peer", peer, "number", number, "hash", hash, "finalized", finalizedHeight)
blockBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
// Schedule the block for future importing
if _, ok := f.queued[hash]; !ok {
op := &blockOrHeaderInject{origin: peer}
Expand Down
78 changes: 76 additions & 2 deletions eth/fetcher/block_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func newTester(light bool) *fetcherTester {
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
drops: make(map[string]bool),
}
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer)
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader,
tester.broadcastBlock, tester.chainHeight, tester.chainFinalizedHeight, tester.insertHeaders,
tester.insertChain, tester.dropPeer)
tester.fetcher.Start()

return tester
Expand Down Expand Up @@ -143,6 +145,18 @@ func (f *fetcherTester) chainHeight() uint64 {
return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
}

func (f *fetcherTester) chainFinalizedHeight() uint64 {
f.lock.RLock()
defer f.lock.RUnlock()
if len(f.hashes) < 3 {
return 0
}
if f.fetcher.light {
return f.headers[f.hashes[len(f.hashes)-3]].Number.Uint64()
}
return f.blocks[f.hashes[len(f.hashes)-3]].NumberU64()
}

// insertChain injects a new headers into the simulated chain.
func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) {
f.lock.Lock()
Expand Down Expand Up @@ -735,6 +749,67 @@ func testDistantAnnouncementDiscarding(t *testing.T, light bool) {
}
}

// Tests that announcements with numbers much lower or equal to the current finalized block
// head get discarded to prevent wasting resources on useless blocks from faulty peers.
func TestFullFinalizedAnnouncementDiscarding(t *testing.T) {
testFinalizedAnnouncementDiscarding(t, false)
}
func TestLightFinalizedAnnouncementDiscarding(t *testing.T) {
testFinalizedAnnouncementDiscarding(t, true)
}

func testFinalizedAnnouncementDiscarding(t *testing.T, light bool) {
// Create a long chain to import and define the discard boundaries
hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)

head := hashes[len(hashes)/2]
justified := hashes[len(hashes)/2+1]
finalized := hashes[len(hashes)/2+2]
beforeFinalized := hashes[len(hashes)/2+3]

low, equal := len(hashes)/2+3, len(hashes)/2+2

// Create a tester and simulate a head block being the middle of the above chain
tester := newTester(light)

tester.lock.Lock()
tester.hashes = []common.Hash{beforeFinalized, finalized, justified, head}
tester.headers = map[common.Hash]*types.Header{
beforeFinalized: blocks[beforeFinalized].Header(),
finalized: blocks[finalized].Header(),
justified: blocks[justified].Header(),
head: blocks[head].Header(),
}
tester.blocks = map[common.Hash]*types.Block{
beforeFinalized: blocks[beforeFinalized],
finalized: blocks[finalized],
justified: blocks[justified],
head: blocks[head],
}
tester.lock.Unlock()

headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)

fetching := make(chan struct{}, 2)
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }

// Ensure that a block with a lower number than the finalized height is discarded
tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
select {
case <-time.After(50 * time.Millisecond):
case <-fetching:
t.Fatalf("fetcher requested stale header")
}
// Ensure that a block with a same number of the finalized height is discarded
tester.fetcher.Notify("equal", hashes[equal], blocks[hashes[equal]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
select {
case <-time.After(50 * time.Millisecond):
case <-fetching:
t.Fatalf("fetcher requested future header")
}
}

// Tests that peers announcing blocks with invalid numbers (i.e. not matching
// the headers provided afterwards) get dropped as malicious.
func TestFullInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, false) }
Expand Down Expand Up @@ -775,7 +850,6 @@ func testInvalidNumberAnnouncement(t *testing.T, light bool) {
continue
case <-time.After(1 * time.Second):
t.Fatal("announce timeout")
return
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ func newHandler(config *handlerConfig) (*handler, error) {
heighter := func() uint64 {
return h.chain.CurrentBlock().Number.Uint64()
}
finalizeHeighter := func() uint64 {
fblock := h.chain.CurrentFinalBlock()
if fblock == nil {
return 0
}
return fblock.Number.Uint64()
}
inserter := func(blocks types.Blocks) (int, error) {
// All the block fetcher activities should be disabled
// after the transition. Print the warning log.
Expand Down Expand Up @@ -301,7 +308,8 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return h.chain.InsertChain(blocks)
}
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock,
heighter, finalizeHeighter, nil, inserter, h.removePeer)

fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.peer(peer)
Expand Down Expand Up @@ -620,7 +628,6 @@ func (h *handler) Stop() {
h.voteMonitorSub.Unsubscribe()
}
}

// Quit chainSync and txsync64.
// After this is done, no new peers will be accepted.
close(h.quitSync)
Expand Down

0 comments on commit aeff5e9

Please sign in to comment.