Skip to content

Commit

Permalink
Backfill throttling (#13855)
Browse files Browse the repository at this point in the history
* add a sleep between retries as a simple throttle

* unit test

* deepsource

---------

Co-authored-by: Kasey Kirkham <[email protected]>
  • Loading branch information
kasey and kasey authored Apr 11, 2024
1 parent 0d6070e commit c0acb7d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 3 deletions.
39 changes: 36 additions & 3 deletions beacon-chain/sync/backfill/batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backfill

import (
"context"
"fmt"
"sort"
"time"
Expand Down Expand Up @@ -55,13 +56,16 @@ const (
batchEndSequence
)

var retryDelay = time.Second

type batchId string

type batch struct {
firstScheduled time.Time
scheduled time.Time
seq int // sequence identifier, ie how many times has the sequence() method served this batch
retries int
retryAfter time.Time
begin primitives.Slot
end primitives.Slot // half-open interval, [begin, end), ie >= start, < end.
results verifiedROBlocks
Expand All @@ -74,7 +78,7 @@ type batch struct {
}

func (b batch) logFields() logrus.Fields {
return map[string]interface{}{
f := map[string]interface{}{
"batchId": b.id(),
"state": b.state.String(),
"scheduled": b.scheduled.String(),
Expand All @@ -86,6 +90,10 @@ func (b batch) logFields() logrus.Fields {
"blockPid": b.blockPid,
"blobPid": b.blobPid,
}
if b.retries > 0 {
f["retryAfter"] = b.retryAfter.String()
}
return f
}

func (b batch) replaces(r batch) bool {
Expand Down Expand Up @@ -153,7 +161,8 @@ func (b batch) withState(s batchState) batch {
switch b.state {
case batchErrRetryable:
b.retries += 1
log.WithFields(b.logFields()).Info("Sequencing batch for retry")
b.retryAfter = time.Now().Add(retryDelay)
log.WithFields(b.logFields()).Info("Sequencing batch for retry after delay")
case batchInit, batchNil:
b.firstScheduled = b.scheduled
}
Expand Down Expand Up @@ -190,8 +199,32 @@ func (b batch) availabilityStore() das.AvailabilityStore {
return b.bs.store
}

var batchBlockUntil = func(ctx context.Context, untilRetry time.Duration, b batch) error {
log.WithFields(b.logFields()).WithField("untilRetry", untilRetry.String()).
Debug("Sleeping for retry backoff delay")
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(untilRetry):
return nil
}
}

func (b batch) waitUntilReady(ctx context.Context) error {
// Wait to retry a failed batch to avoid hammering peers
// if we've hit a state where batches will consistently fail.
// Avoids spamming requests and logs.
if b.retries > 0 {
untilRetry := time.Until(b.retryAfter)
if untilRetry > time.Millisecond {
return batchBlockUntil(ctx, untilRetry, b)
}
}
return nil
}

func sortBatchDesc(bb []batch) {
sort.Slice(bb, func(i, j int) bool {
return bb[j].end < bb[i].end
return bb[i].end > bb[j].end
})
}
22 changes: 22 additions & 0 deletions beacon-chain/sync/backfill/batch_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package backfill

import (
"context"
"testing"
"time"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/testing/require"
)
Expand All @@ -19,3 +22,22 @@ func TestSortBatchDesc(t *testing.T) {
require.Equal(t, orderOut[i], batches[i].end)
}
}

func TestWaitUntilReady(t *testing.T) {
b := batch{}.withState(batchErrRetryable)
require.Equal(t, time.Time{}, b.retryAfter)
var got time.Duration
wur := batchBlockUntil
var errDerp = errors.New("derp")
batchBlockUntil = func(_ context.Context, ur time.Duration, _ batch) error {
got = ur
return errDerp
}
// retries counter and timestamp are set when we mark the batch for sequencing, if it is in the retry state
b = b.withState(batchSequenced)
require.ErrorIs(t, b.waitUntilReady(context.Background()), errDerp)
require.Equal(t, true, retryDelay-time.Until(b.retryAfter) < time.Millisecond)
require.Equal(t, true, got < retryDelay && got > retryDelay-time.Millisecond)
require.Equal(t, 1, b.retries)
batchBlockUntil = wur
}
5 changes: 5 additions & 0 deletions beacon-chain/sync/backfill/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) {
return
}
for _, pid := range assigned {
if err := todo[0].waitUntilReady(p.ctx); err != nil {
log.WithError(p.ctx.Err()).Info("p2pBatchWorkerPool context canceled, shutting down")
p.shutdown(p.ctx.Err())
return
}
busy[pid] = true
todo[0].busy = pid
p.toWorkers <- todo[0].withPeer(pid)
Expand Down

0 comments on commit c0acb7d

Please sign in to comment.