Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow the Consumer to disable auto-commit offsets #1164

Merged
merged 9 commits into from
Nov 22, 2019
16 changes: 12 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,15 @@ type Config struct {
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
// How frequently to commit updated offsets. Defaults to 1s.
CommitInterval time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh shit, this was a breaking change, i.e: it breaks people using sarama-cluster with the latest version of sarama.

../../../../go/pkg/mod/github.com/bsm/[email protected]+incompatible/consumer.go:452:59: c.client.config.Config.Consumer.Offsets.CommitInterval undefined (type struct { AutoCommit struct { Enable bool; Interval time.Duration }; Initial int64; Retention time.Duration; Retry struct { Max int } } has no field or method CommitInterval)

AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool

// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration
}

// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Expand Down Expand Up @@ -423,7 +430,8 @@ func NewConfig() *Config {
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.AutoCommit.Enable = true
c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3

Expand Down Expand Up @@ -650,7 +658,7 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.CommitInterval <= 0:
case c.Consumer.Offsets.AutoCommit.Interval <= 0:
return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
Expand Down
7 changes: 6 additions & 1 deletion offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client
client: client,
conf: conf,
group: group,
ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
poms: make(map[string]map[int32]*partitionOffsetManager),

memberID: memberID,
Expand Down Expand Up @@ -233,7 +233,12 @@ func (om *offsetManager) mainLoop() {
}
}

// flushToBroker is ignored if auto-commit offsets is disabled
func (om *offsetManager) flushToBroker() {
if !om.conf.Consumer.Offsets.AutoCommit.Enable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad this line is fixed, it actually blocks all calls to Commit(), however sometimes we do wish to commit manually.

return
}

req := om.constructRequest()
if req == nil {
return
Expand Down
85 changes: 80 additions & 5 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
)

func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
backoffFunc func(retries, maxRetries int) time.Duration) (om OffsetManager,
backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {

config := NewConfig()
config.Metadata.Retry.Max = 1
if backoffFunc != nil {
config.Metadata.Retry.BackoffFunc = backoffFunc
}
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Millisecond
config.Version = V0_9_0_0
if retention > 0 {
config.Consumer.Offsets.Retention = retention
Expand Down Expand Up @@ -52,7 +51,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,

func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {
return initOffsetManagerWithBackoffFunc(t, retention, nil)
return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig())
}

func initPartitionOffsetManager(t *testing.T, om OffsetManager,
Expand Down Expand Up @@ -97,6 +96,82 @@ func TestNewOffsetManager(t *testing.T) {
}
}

var offsetsautocommitTestTable = []struct {
name string
set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
enable bool
}{
{
"AutoCommit (default)",
false, // use default
true,
},
{
"AutoCommit Enabled",
true,
true,
},
{
"AutoCommit Disabled",
true,
false,
},
}

func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
// Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable`
for _, tt := range offsetsautocommitTestTable {
t.Run(tt.name, func(t *testing.T) {

config := NewConfig()
if tt.set {
config.Consumer.Offsets.AutoCommit.Enable = tt.enable
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

// Wait long enough for the test not to fail..
timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval

called := make(chan none)

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
handler := func(req *request) (res encoder) {
close(called)
return ocResponse
}
coordinator.setHandler(handler)

// Should force an offset commit, if auto-commit is enabled.
expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
_, _ = pom.NextOffset()

select {
case <-called:
// OffsetManager called on the wire.
if !config.Consumer.Offsets.AutoCommit.Enable {
t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
}
case <-time.After(timeout):
// Timeout waiting for OffsetManager to call on the wire.
if config.Consumer.Offsets.AutoCommit.Enable {
t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
}
}

broker.Close()
coordinator.Close()

// !! om must be closed before the pom so pom.release() is called before pom.Close()
safeClose(t, om)
safeClose(t, pom)
safeClose(t, testClient)
})
}
}

// Test recovery from ErrNotCoordinatorForConsumer
// on first fetchInitialOffset call
func TestOffsetManagerFetchInitialFail(t *testing.T) {
Expand Down Expand Up @@ -148,7 +223,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
atomic.AddInt32(&retryCount, 1)
return 0
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff)
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig())

// Error on first fetchInitialOffset call
responseBlock := OffsetFetchResponseBlock{
Expand Down