Skip to content

Commit

Permalink
Merge pull request #563 from gzliudan/fix-issue-377
Browse files Browse the repository at this point in the history
fix issue #377 and a bug in queue
  • Loading branch information
gzliudan authored Jun 27, 2024
2 parents 7330889 + 76ab0b5 commit bf289e8
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
2 changes: 1 addition & 1 deletion eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
defer tester.terminate()

for i, tt := range tests {
// Register a new peer and ensure it's presence
// Register a new peer and ensure its presence
id := fmt.Sprintf("test %d", i)
if err := tester.newPeer(id, protocol, []common.Hash{tester.genesis.Hash()}, nil, nil, nil); err != nil {
t.Fatalf("test %d: failed to register new peer: %v", i, err)
Expand Down
15 changes: 8 additions & 7 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ func (q *queue) ShouldThrottleReceipts() bool {
}

// resultSlots calculates the number of results slots available for requests
// whilst adhering to both the item and the memory limit too of the results
// cache.
// whilst adhering to both the item and the memory limits of the result cache.
func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int {
// Calculate the maximum length capped by the memory limit
limit := len(q.resultCache)
Expand Down Expand Up @@ -349,7 +348,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
}

// Results retrieves and permanently removes a batch of fetch results from
// the cache. the result slice will be empty if the queue has been closed.
// the cache. The result slice will be empty if the queue has been closed.
func (q *queue) Results(block bool) []*fetchResult {
q.lock.Lock()
defer q.lock.Unlock()
Expand Down Expand Up @@ -511,7 +510,6 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
log.Error("index allocation went beyond available resultCache space", "index", index, "len.resultCache", len(q.resultCache), "blockNum", header.Number.Int64(), "resultOffset", q.resultOffset)
common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
Expand Down Expand Up @@ -566,26 +564,29 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common

// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
func (q *queue) CancelHeaders(request *fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
q.cancel(request, q.headerTaskQueue, q.headerPendPool)
}

// CancelBodies aborts a body fetch request, returning all pending headers to the
// task queue.
func (q *queue) CancelBodies(request *fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
q.cancel(request, q.blockTaskQueue, q.blockPendPool)
}

// CancelReceipts aborts a body fetch request, returning all pending headers to
// the task queue.
func (q *queue) CancelReceipts(request *fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}

// Cancel aborts a fetch request, returning all pending hashes to the task queue.
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()

if request.From > 0 {
taskQueue.Push(request.From, -int64(request.From))
}
Expand Down

0 comments on commit bf289e8

Please sign in to comment.