Skip to content

Commit

Permalink
feat: added a write option that imposes a time limit on retry sleeping
Browse files Browse the repository at this point in the history
This limit is enforced for both "come back later" sleeping informed by the
write error, and configured "backoff" sleeping configured in the writer
library.
  • Loading branch information
adrian-thurston committed Nov 16, 2020
1 parent 0d1c625 commit eeee581
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 7 deletions.
59 changes: 52 additions & 7 deletions writer/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@ type RetryWriter struct {
MetricsWriter

sleep func(time.Duration)
now func() time.Time
backoff BackoffFunc

maxAttempts int

// Number of seconds to limit retry sleeping to. If zero there is no limit
// imposed. If nonzero we will not sleep past this number of seconds since
// the write attempt started. At the end of the limit there will be one
// more write attempt.
retrySleepLimit int
}

// NewRetryWriter returns a configured *RetryWriter which decorates
Expand All @@ -31,6 +38,7 @@ func NewRetryWriter(w MetricsWriter, opts ...RetryOption) *RetryWriter {
r := &RetryWriter{
MetricsWriter: w,
sleep: time.Sleep,
now: time.Now,
backoff: func(int) time.Duration { return 0 },
maxAttempts: defaultMaxAttempts,
}
Expand All @@ -42,9 +50,33 @@ func NewRetryWriter(w MetricsWriter, opts ...RetryOption) *RetryWriter {
return r
}

// sleepWithLimit sleeps for a given duration, but imposes a limit on the
// sleeping. If it is currently past the stop time then it does nothing and
// returns false (do not continue to try). Otherwise, sleeps either for
// duration, or until stop time, whichever comes sooner, then returns true
// (continue retrying).
func (r *RetryWriter) sleepWithLimit(duration time.Duration, stopAt time.Time) bool {
if r.retrySleepLimit > 0 {
allowedToSleep := stopAt.Sub(r.now())
if allowedToSleep <= 0 {
return false
} else if duration > allowedToSleep {
duration = allowedToSleep
}
}

r.sleep(duration)
return true
}

// Write delegates to underlying MetricsWriter and then
// automatically retries when errors occur
func (r *RetryWriter) Write(m ...influxdb.Metric) (n int, err error) {
var stopAt time.Time
if r.retrySleepLimit > 0 {
stopAt = r.now().Add(time.Second * time.Duration(r.retrySleepLimit))
}

for i := 0; i < r.maxAttempts; i++ {
n, err = r.MetricsWriter.Write(m...)
if err == nil {
Expand All @@ -62,14 +94,19 @@ func (r *RetryWriter) Write(m ...influxdb.Metric) (n int, err error) {
if ierr.RetryAfter != nil {
// given retry-after is configured attempt to sleep
// for retry-after seconds
r.sleep(time.Duration(*ierr.RetryAfter) * time.Second)
continue
}

// given a backoff duration > 0
if duration := r.backoff(i + 1); duration > 0 {
// println( "-> retry after sleeping for", *ierr.RetryAfter, "seconds" )
cont := r.sleepWithLimit(time.Duration(*ierr.RetryAfter)*time.Second, stopAt)
if !cont {
return
}
} else if backoffDuration := r.backoff(i + 1); backoffDuration > 0 {
// given a backoff duration > 0
// call sleep with backoff duration
r.sleep(duration)
// println( "-> backoff sleeping for", duration )
cont := r.sleepWithLimit(backoffDuration, stopAt)
if !cont {
return
}
}
default:
return
Expand Down Expand Up @@ -106,3 +143,11 @@ func WithBackoff(fn BackoffFunc) RetryOption {
r.backoff = fn
}
}

// WithRetrySleepLimit sets the retry sleep limit. This optiona allows us to
// abort retry sleeps past some number of seconds.
func WithRetrySleepLimit(retrySleepLimit int) RetryOption {
return func(r *RetryWriter) {
r.retrySleepLimit = retrySleepLimit
}
}
59 changes: 59 additions & 0 deletions writer/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,58 @@ func Test_RetryWriter_Write(t *testing.T) {
5 * time.Second,
},
},
{
name: `three "too many requests" errors (max attempts 3) with retry-after and retry limit`,
options: []RetryOption{
WithMaxAttempts(3),
WithRetrySleepLimit(4),
},
metrics: createTestRowMetrics(t, 3),
errors: []error{
errTooMany(&three),
errTooMany(&three),
errTooMany(&three),
},
count: 0,
err: errTooMany(&three),
writes: [][]influxdb.Metric{
// three writes all error
createTestRowMetrics(t, 3),
createTestRowMetrics(t, 3),
createTestRowMetrics(t, 3),
},
sleeps: []time.Duration{
3 * time.Second,
1 * time.Second,
},
},
{
name: `three "too many requests" errors (max attempts 3) with backoff and retry limit`,
options: []RetryOption{
WithMaxAttempts(3),
WithBackoff(LinearBackoff(time.Second)),
WithRetrySleepLimit(4),
},
metrics: createTestRowMetrics(t, 3),
errors: []error{
errTooMany(nil),
errTooMany(nil),
errTooMany(nil),
},
count: 0,
err: errTooMany(nil),
writes: [][]influxdb.Metric{
// three writes all error
createTestRowMetrics(t, 3),
createTestRowMetrics(t, 3),
createTestRowMetrics(t, 3),
},
sleeps: []time.Duration{
1 * time.Second,
2 * time.Second,
1 * time.Second,
},
},
} {
t.Run(test.name, test.Run)
}
Expand Down Expand Up @@ -203,8 +255,15 @@ func (test *retryWriteCase) Run(t *testing.T) {
sleeps []time.Duration
)

now := time.Now()

retry.sleep = func(d time.Duration) {
sleeps = append(sleeps, d)
now = now.Add(d)
}

retry.now = func() time.Time {
return now
}

count, err := retry.Write(test.metrics...)
Expand Down

0 comments on commit eeee581

Please sign in to comment.