From c0acb7d35269fb77f9a4cf759c9ae2863326612e Mon Sep 17 00:00:00 2001 From: kasey <489222+kasey@users.noreply.github.com> Date: Thu, 11 Apr 2024 10:22:29 -0500 Subject: [PATCH] Backfill throttling (#13855) * add a sleep between retries as a simple throttle * unit test * deepsource --------- Co-authored-by: Kasey Kirkham --- beacon-chain/sync/backfill/batch.go | 39 ++++++++++++++++++++++-- beacon-chain/sync/backfill/batch_test.go | 22 +++++++++++++ beacon-chain/sync/backfill/pool.go | 5 +++ 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/beacon-chain/sync/backfill/batch.go b/beacon-chain/sync/backfill/batch.go index 0691fffafb8b..99098fd58340 100644 --- a/beacon-chain/sync/backfill/batch.go +++ b/beacon-chain/sync/backfill/batch.go @@ -1,6 +1,7 @@ package backfill import ( + "context" "fmt" "sort" "time" @@ -55,6 +56,8 @@ const ( batchEndSequence ) +var retryDelay = time.Second + type batchId string type batch struct { @@ -62,6 +65,7 @@ type batch struct { scheduled time.Time seq int // sequence identifier, ie how many times has the sequence() method served this batch retries int + retryAfter time.Time begin primitives.Slot end primitives.Slot // half-open interval, [begin, end), ie >= start, < end. results verifiedROBlocks @@ -74,7 +78,7 @@ type batch struct { } func (b batch) logFields() logrus.Fields { - return map[string]interface{}{ + f := map[string]interface{}{ "batchId": b.id(), "state": b.state.String(), "scheduled": b.scheduled.String(), @@ -86,6 +90,10 @@ func (b batch) logFields() logrus.Fields { "blockPid": b.blockPid, "blobPid": b.blobPid, } + if b.retries > 0 { + f["retryAfter"] = b.retryAfter.String() + } + return f } func (b batch) replaces(r batch) bool { @@ -153,7 +161,8 @@ func (b batch) withState(s batchState) batch { switch b.state { case batchErrRetryable: b.retries += 1 - log.WithFields(b.logFields()).Info("Sequencing batch for retry") + b.retryAfter = time.Now().Add(retryDelay) + log.WithFields(b.logFields()).Info("Sequencing batch for retry after delay") case batchInit, batchNil: b.firstScheduled = b.scheduled } @@ -190,8 +199,32 @@ func (b batch) availabilityStore() das.AvailabilityStore { return b.bs.store } +var batchBlockUntil = func(ctx context.Context, untilRetry time.Duration, b batch) error { + log.WithFields(b.logFields()).WithField("untilRetry", untilRetry.String()). + Debug("Sleeping for retry backoff delay") + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(untilRetry): + return nil + } +} + +func (b batch) waitUntilReady(ctx context.Context) error { + // Wait to retry a failed batch to avoid hammering peers + // if we've hit a state where batches will consistently fail. + // Avoids spamming requests and logs. + if b.retries > 0 { + untilRetry := time.Until(b.retryAfter) + if untilRetry > time.Millisecond { + return batchBlockUntil(ctx, untilRetry, b) + } + } + return nil +} + func sortBatchDesc(bb []batch) { sort.Slice(bb, func(i, j int) bool { - return bb[j].end < bb[i].end + return bb[i].end > bb[j].end }) } diff --git a/beacon-chain/sync/backfill/batch_test.go b/beacon-chain/sync/backfill/batch_test.go index 12c1563ff588..d46dbea83855 100644 --- a/beacon-chain/sync/backfill/batch_test.go +++ b/beacon-chain/sync/backfill/batch_test.go @@ -1,8 +1,11 @@ package backfill import ( + "context" "testing" + "time" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/testing/require" ) @@ -19,3 +22,22 @@ func TestSortBatchDesc(t *testing.T) { require.Equal(t, orderOut[i], batches[i].end) } } + +func TestWaitUntilReady(t *testing.T) { + b := batch{}.withState(batchErrRetryable) + require.Equal(t, time.Time{}, b.retryAfter) + var got time.Duration + wur := batchBlockUntil + var errDerp = errors.New("derp") + batchBlockUntil = func(_ context.Context, ur time.Duration, _ batch) error { + got = ur + return errDerp + } + // retries counter and timestamp are set when we mark the batch for sequencing, if it is in the retry state + b = b.withState(batchSequenced) + require.ErrorIs(t, b.waitUntilReady(context.Background()), errDerp) + require.Equal(t, true, retryDelay-time.Until(b.retryAfter) < time.Millisecond) + require.Equal(t, true, got < retryDelay && got > retryDelay-time.Millisecond) + require.Equal(t, 1, b.retries) + batchBlockUntil = wur +} diff --git a/beacon-chain/sync/backfill/pool.go b/beacon-chain/sync/backfill/pool.go index 70f90414cd32..a6cf8fdfa5c0 100644 --- a/beacon-chain/sync/backfill/pool.go +++ b/beacon-chain/sync/backfill/pool.go @@ -143,6 +143,11 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) { return } for _, pid := range assigned { + if err := todo[0].waitUntilReady(p.ctx); err != nil { + log.WithError(p.ctx.Err()).Info("p2pBatchWorkerPool context canceled, shutting down") + p.shutdown(p.ctx.Err()) + return + } busy[pid] = true todo[0].busy = pid p.toWorkers <- todo[0].withPeer(pid)