Skip to content

Commit

Permalink
feat:add retry for offset commit
Browse files Browse the repository at this point in the history
  • Loading branch information
HobbyBear authored and dnwe committed Jul 26, 2023
1 parent ecf43f4 commit d826d75
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 15 deletions.
10 changes: 9 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,11 @@ type Config struct {
Retry struct {
// The total number of times to retry failing commit
// requests during OffsetManager shutdown (default 3).
Max int
Max int
Factor float64
Jitter bool
MinDelay time.Duration
MaxDelay time.Duration
}
}

Expand Down Expand Up @@ -536,6 +540,10 @@ func NewConfig() *Config {
c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3
c.Consumer.Offsets.Retry.Jitter = true
c.Consumer.Offsets.Retry.Factor = 2
c.Consumer.Offsets.Retry.MaxDelay = 2 * time.Second
c.Consumer.Offsets.Retry.MinDelay = 200 * time.Millisecond

c.Consumer.Group.Session.Timeout = 10 * time.Second
c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
Expand Down
75 changes: 61 additions & 14 deletions offset_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sarama

import (
"math"
"math/rand"
"sync"
"time"
)
Expand Down Expand Up @@ -254,26 +256,71 @@ func (om *offsetManager) Commit() {
}

func (om *offsetManager) flushToBroker() {
req := om.constructRequest()
if req == nil {
return
}

broker, err := om.coordinator()
if err != nil {
om.handleError(err)
var (
attempt int
)

for {
req := om.constructRequest()
if req == nil {
return
}

broker, err := om.coordinator()
if err != nil {
om.handleError(err)
return
}

resp, err := broker.CommitOffset(req)
if err != nil {
om.handleError(err)
om.releaseCoordinator(broker)
_ = broker.Close()
return
}
if om.shouldRetry(resp, attempt) {
time.Sleep(om.backOff(attempt))
attempt++
continue
}
om.handleResponse(broker, req, resp)
return
}

resp, err := broker.CommitOffset(req)
if err != nil {
om.handleError(err)
om.releaseCoordinator(broker)
_ = broker.Close()
return
}

func (om *offsetManager) shouldRetry(resp *OffsetCommitResponse, attempt int) bool {
if attempt > om.conf.Consumer.Offsets.Retry.Max {
return false
}
if len(resp.Errors) == 0 {
return false
}
for _, topicOffsetCommitErr := range resp.Errors {
for _, err := range topicOffsetCommitErr {
switch err {
// can add some else err code in future
case ErrRequestTimedOut:
return true
default:
return false
}
}
}
return true
}

om.handleResponse(broker, req, resp)
func (om *offsetManager) backOff(attempt int) time.Duration {
dur := float64(om.conf.Consumer.Offsets.Retry.MinDelay) * math.Pow(om.conf.Consumer.Offsets.Retry.Factor, float64(attempt))
if om.conf.Consumer.Offsets.Retry.Jitter == true {
dur = rand.Float64()*(dur-float64(om.conf.Consumer.Offsets.Retry.MinDelay)) + float64(om.conf.Consumer.Offsets.Retry.MinDelay)
}
if dur > float64(om.conf.Consumer.Offsets.Retry.MaxDelay) {
return om.conf.Consumer.Offsets.Retry.MaxDelay
}
return time.Duration(dur)
}

func (om *offsetManager) constructRequest() *OffsetCommitRequest {
Expand Down

0 comments on commit d826d75

Please sign in to comment.