diff --git a/config.go b/config.go index e515e0432..69c716173 100644 --- a/config.go +++ b/config.go @@ -338,8 +338,15 @@ type Config struct { // offsets. This currently requires the manual use of an OffsetManager // but will eventually be automated. Offsets struct { - // How frequently to commit updated offsets. Defaults to 1s. - CommitInterval time.Duration + AutoCommit struct { + // Whether or not to auto-commit updated offsets back to the broker. + // (default enabled). + Enable bool + + // How frequently to commit updated offsets. Ineffective unless + // auto-commit is enabled (default 1s) + Interval time.Duration + } // The initial offset to use if no offset was previously committed. // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest. @@ -423,7 +430,8 @@ func NewConfig() *Config { c.Consumer.MaxWaitTime = 250 * time.Millisecond c.Consumer.MaxProcessingTime = 100 * time.Millisecond c.Consumer.Return.Errors = false - c.Consumer.Offsets.CommitInterval = 1 * time.Second + c.Consumer.Offsets.AutoCommit.Enable = true + c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second c.Consumer.Offsets.Initial = OffsetNewest c.Consumer.Offsets.Retry.Max = 3 @@ -650,7 +658,7 @@ func (c *Config) Validate() error { return ConfigurationError("Consumer.MaxProcessingTime must be > 0") case c.Consumer.Retry.Backoff < 0: return ConfigurationError("Consumer.Retry.Backoff must be >= 0") - case c.Consumer.Offsets.CommitInterval <= 0: + case c.Consumer.Offsets.AutoCommit.Interval <= 0: return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0") case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest: return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest") diff --git a/offset_manager.go b/offset_manager.go index 923972f26..e40f42967 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -58,7 +58,7 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client client: client, conf: conf, group: group, - ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval), + ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval), poms: make(map[string]map[int32]*partitionOffsetManager), memberID: memberID, @@ -233,7 +233,12 @@ func (om *offsetManager) mainLoop() { } } +// flushToBroker is ignored if auto-commit offsets is disabled func (om *offsetManager) flushToBroker() { + if !om.conf.Consumer.Offsets.AutoCommit.Enable { + return + } + req := om.constructRequest() if req == nil { return diff --git a/offset_manager_test.go b/offset_manager_test.go index 5fb6b21c6..765db166d 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -7,15 +7,14 @@ import ( ) func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration, - backoffFunc func(retries, maxRetries int) time.Duration) (om OffsetManager, + backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager, testClient Client, broker, coordinator *MockBroker) { - config := NewConfig() config.Metadata.Retry.Max = 1 if backoffFunc != nil { config.Metadata.Retry.BackoffFunc = backoffFunc } - config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond + config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Millisecond config.Version = V0_9_0_0 if retention > 0 { config.Consumer.Offsets.Retention = retention @@ -52,7 +51,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration, func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager, testClient Client, broker, coordinator *MockBroker) { - return initOffsetManagerWithBackoffFunc(t, retention, nil) + return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig()) } func initPartitionOffsetManager(t *testing.T, om OffsetManager, @@ -97,6 +96,82 @@ func TestNewOffsetManager(t *testing.T) { } } +var offsetsautocommitTestTable = []struct { + name string + set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable + enable bool +}{ + { + "AutoCommit (default)", + false, // use default + true, + }, + { + "AutoCommit Enabled", + true, + true, + }, + { + "AutoCommit Disabled", + true, + false, + }, +} + +func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { + // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable` + for _, tt := range offsetsautocommitTestTable { + t.Run(tt.name, func(t *testing.T) { + + config := NewConfig() + if tt.set { + config.Consumer.Offsets.AutoCommit.Enable = tt.enable + } + om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) + pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") + + // Wait long enough for the test not to fail.. + timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval + + called := make(chan none) + + ocResponse := new(OffsetCommitResponse) + ocResponse.AddError("my_topic", 0, ErrNoError) + handler := func(req *request) (res encoder) { + close(called) + return ocResponse + } + coordinator.setHandler(handler) + + // Should force an offset commit, if auto-commit is enabled. + expected := int64(1) + pom.ResetOffset(expected, "modified_meta") + _, _ = pom.NextOffset() + + select { + case <-called: + // OffsetManager called on the wire. + if !config.Consumer.Offsets.AutoCommit.Enable { + t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name) + } + case <-time.After(timeout): + // Timeout waiting for OffsetManager to call on the wire. + if config.Consumer.Offsets.AutoCommit.Enable { + t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout) + } + } + + broker.Close() + coordinator.Close() + + // !! om must be closed before the pom so pom.release() is called before pom.Close() + safeClose(t, om) + safeClose(t, pom) + safeClose(t, testClient) + }) + } +} + // Test recovery from ErrNotCoordinatorForConsumer // on first fetchInitialOffset call func TestOffsetManagerFetchInitialFail(t *testing.T) { @@ -148,7 +223,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { atomic.AddInt32(&retryCount, 1) return 0 } - om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff) + om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig()) // Error on first fetchInitialOffset call responseBlock := OffsetFetchResponseBlock{