From 31312684701160f59511961b50ba7ff398bf204f Mon Sep 17 00:00:00 2001 From: Kjell Tore Fossbakk Date: Thu, 6 Sep 2018 22:59:44 +0200 Subject: [PATCH 1/7] Shopify/sarama#1158 Allow the Consumer to disable auto-commit offsets --- config.go | 5 +++++ offset_manager.go | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/config.go b/config.go index 08f533084..b594a9e19 100644 --- a/config.go +++ b/config.go @@ -248,6 +248,10 @@ type Config struct { // offsets. This currently requires the manual use of an OffsetManager // but will eventually be automated. Offsets struct { + // Whether or not to auto-commit marked offsets back to the broker + // (default enabled). + Enable bool + // How frequently to commit updated offsets. Defaults to 1s. CommitInterval time.Duration @@ -327,6 +331,7 @@ func NewConfig() *Config { c.Consumer.MaxWaitTime = 250 * time.Millisecond c.Consumer.MaxProcessingTime = 100 * time.Millisecond c.Consumer.Return.Errors = false + c.Consumer.Offsets.Enable = true c.Consumer.Offsets.CommitInterval = 1 * time.Second c.Consumer.Offsets.Initial = OffsetNewest c.Consumer.Offsets.Retry.Max = 3 diff --git a/offset_manager.go b/offset_manager.go index 2a1a89f3a..4dea0e979 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -215,6 +215,10 @@ func (om *offsetManager) mainLoop() { } func (om *offsetManager) flushToBroker() { + if om.conf.Consumer.Offsets.Enable == false { + return + } + req := om.constructRequest() if req == nil { return From 37e0cdd34b69c327fa4c1f88ceebe3124ddafdd0 Mon Sep 17 00:00:00 2001 From: Kjell Tore Fossbakk Date: Sat, 24 Aug 2019 14:02:46 +0200 Subject: [PATCH 2/7] Ignoring Close if commit offsets are disabled. Avoit starting mainloop if commit offsets are disabled --- offset_manager.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/offset_manager.go b/offset_manager.go index 5a0db8773..4471c0627 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -58,7 +58,6 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client client: client, conf: conf, group: group, - ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval), poms: make(map[string]map[int32]*partitionOffsetManager), memberID: memberID, @@ -67,7 +66,10 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client closing: make(chan none), closed: make(chan none), } - go withRecover(om.mainLoop) + if conf.Consumer.Offsets.Enable { + om.ticker = time.NewTicker(conf.Consumer.Offsets.CommitInterval) + go withRecover(om.mainLoop) + } return om, nil } @@ -96,6 +98,10 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti } func (om *offsetManager) Close() error { + if !om.conf.Consumer.Offsets.Enable { + return nil + } + om.closeOnce.Do(func() { // exit the mainLoop close(om.closing) From 103232a483fbfa7720b3f1a07354668c035967d9 Mon Sep 17 00:00:00 2001 From: Kjell Tore Fossbakk Date: Fri, 15 Nov 2019 14:46:25 +0100 Subject: [PATCH 3/7] Renamed Consumer.Offsets.AutoCommitEnable to Consumer.Offsets.AutoCommit.Enable. Renamed Consumer.Offsets.CommitInteval to Consumer.Offsets.AutoCommit.Inteval --- config.go | 19 +++++++++++-------- offset_manager.go | 10 +++------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/config.go b/config.go index 5aab1511b..69c716173 100644 --- a/config.go +++ b/config.go @@ -338,12 +338,15 @@ type Config struct { // offsets. This currently requires the manual use of an OffsetManager // but will eventually be automated. Offsets struct { - // Whether or not to auto-commit marked offsets back to the broker - // (default enabled). - Enable bool + AutoCommit struct { + // Whether or not to auto-commit updated offsets back to the broker. + // (default enabled). + Enable bool - // How frequently to commit updated offsets. Defaults to 1s. - CommitInterval time.Duration + // How frequently to commit updated offsets. Ineffective unless + // auto-commit is enabled (default 1s) + Interval time.Duration + } // The initial offset to use if no offset was previously committed. // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest. @@ -427,8 +430,8 @@ func NewConfig() *Config { c.Consumer.MaxWaitTime = 250 * time.Millisecond c.Consumer.MaxProcessingTime = 100 * time.Millisecond c.Consumer.Return.Errors = false - c.Consumer.Offsets.Enable = true - c.Consumer.Offsets.CommitInterval = 1 * time.Second + c.Consumer.Offsets.AutoCommit.Enable = true + c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second c.Consumer.Offsets.Initial = OffsetNewest c.Consumer.Offsets.Retry.Max = 3 @@ -655,7 +658,7 @@ func (c *Config) Validate() error { return ConfigurationError("Consumer.MaxProcessingTime must be > 0") case c.Consumer.Retry.Backoff < 0: return ConfigurationError("Consumer.Retry.Backoff must be >= 0") - case c.Consumer.Offsets.CommitInterval <= 0: + case c.Consumer.Offsets.AutoCommit.Interval <= 0: return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0") case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest: return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest") diff --git a/offset_manager.go b/offset_manager.go index 4471c0627..0d47219b2 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -66,8 +66,8 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client closing: make(chan none), closed: make(chan none), } - if conf.Consumer.Offsets.Enable { - om.ticker = time.NewTicker(conf.Consumer.Offsets.CommitInterval) + if conf.Consumer.Offsets.AutoCommit.Enable { + om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval) go withRecover(om.mainLoop) } @@ -98,7 +98,7 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti } func (om *offsetManager) Close() error { - if !om.conf.Consumer.Offsets.Enable { + if !om.conf.Consumer.Offsets.AutoCommit.Enable { return nil } @@ -240,10 +240,6 @@ func (om *offsetManager) mainLoop() { } func (om *offsetManager) flushToBroker() { - if om.conf.Consumer.Offsets.Enable == false { - return - } - req := om.constructRequest() if req == nil { return From 34b422979e0da296aa68a3c2230dad8d6318f302 Mon Sep 17 00:00:00 2001 From: Kjell Tore Fossbakk Date: Fri, 15 Nov 2019 14:47:35 +0100 Subject: [PATCH 4/7] started on unittest for Consumer.Offsets.AutoCommit.Enable --- offset_manager_test.go | 78 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 5 deletions(-) diff --git a/offset_manager_test.go b/offset_manager_test.go index 5fb6b21c6..ed885bc2f 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -7,15 +7,14 @@ import ( ) func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration, - backoffFunc func(retries, maxRetries int) time.Duration) (om OffsetManager, + backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager, testClient Client, broker, coordinator *MockBroker) { - config := NewConfig() config.Metadata.Retry.Max = 1 if backoffFunc != nil { config.Metadata.Retry.BackoffFunc = backoffFunc } - config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond + config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Millisecond config.Version = V0_9_0_0 if retention > 0 { config.Consumer.Offsets.Retention = retention @@ -52,7 +51,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration, func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager, testClient Client, broker, coordinator *MockBroker) { - return initOffsetManagerWithBackoffFunc(t, retention, nil) + return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig()) } func initPartitionOffsetManager(t *testing.T, om OffsetManager, @@ -97,6 +96,75 @@ func TestNewOffsetManager(t *testing.T) { } } +var offsetsautocommitTestTable = []struct { + name string + set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable + enable bool +}{ + { + "AutoCommit (default)", + false, // use default + true, + }, + { + "AutoCommit Enabled", + true, + true, + }, + { + "AutoCommit Disabled", + true, + false, + }, +} + +func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { + // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable` + for _, tt := range offsetsautocommitTestTable { + config := NewConfig() + if tt.set { + config.Consumer.Offsets.AutoCommit.Enable = tt.enable + } + om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) + pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") + + // we will wait for twice the interval + timeout := 2 * config.Consumer.Offsets.AutoCommit.Interval + called := make(chan none) + + ocResponse := new(OffsetCommitResponse) + ocResponse.AddError("my_topic", 0, ErrNoError) + handler := func(req *request) (res encoder) { + close(called) + return ocResponse + } + coordinator.setHandler(handler) + + // Should force an offset commit, if auto-commit is enabled. + expected := int64(1) + pom.ResetOffset(expected, "modified_meta") + _, _ = pom.NextOffset() + + select { + case <-called: + // OffsetManager called on the wire. + if !config.Consumer.Offsets.AutoCommit.Enable { + t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name) + } + case <-time.After(timeout): + // Timeout waiting for OffsetManager to call on the wire. + if config.Consumer.Offsets.AutoCommit.Enable { + t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout) + } + } + safeClose(t, pom) + safeClose(t, om) + safeClose(t, testClient) + broker.Close() + coordinator.Close() + } +} + // Test recovery from ErrNotCoordinatorForConsumer // on first fetchInitialOffset call func TestOffsetManagerFetchInitialFail(t *testing.T) { @@ -148,7 +216,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { atomic.AddInt32(&retryCount, 1) return 0 } - om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff) + om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig()) // Error on first fetchInitialOffset call responseBlock := OffsetFetchResponseBlock{ From 9a246b5b32cd1a390bda38dbc3aa3bd0a11e9b19 Mon Sep 17 00:00:00 2001 From: Kjell Tore Fossbakk Date: Mon, 18 Nov 2019 20:05:57 +0100 Subject: [PATCH 5/7] Moved check for Consumer.Offsets.AutoCommit.Enable to offsetManager.flushToBroker to keep mainLoop --- offset_manager.go | 15 ++++---- offset_manager_test.go | 84 ++++++++++++++++++++++-------------------- 2 files changed, 52 insertions(+), 47 deletions(-) diff --git a/offset_manager.go b/offset_manager.go index 0d47219b2..edec700f9 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -66,10 +66,8 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client closing: make(chan none), closed: make(chan none), } - if conf.Consumer.Offsets.AutoCommit.Enable { - om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval) - go withRecover(om.mainLoop) - } + om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval) + go withRecover(om.mainLoop) return om, nil } @@ -98,10 +96,6 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti } func (om *offsetManager) Close() error { - if !om.conf.Consumer.Offsets.AutoCommit.Enable { - return nil - } - om.closeOnce.Do(func() { // exit the mainLoop close(om.closing) @@ -239,7 +233,12 @@ func (om *offsetManager) mainLoop() { } } +// flushToBroker is ignored if auto-commit offsets is disabled func (om *offsetManager) flushToBroker() { + if !om.conf.Consumer.Offsets.AutoCommit.Enable { + return + } + req := om.constructRequest() if req == nil { return diff --git a/offset_manager_test.go b/offset_manager_test.go index ed885bc2f..bd50a25b6 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -121,47 +121,53 @@ var offsetsautocommitTestTable = []struct { func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable` for _, tt := range offsetsautocommitTestTable { - config := NewConfig() - if tt.set { - config.Consumer.Offsets.AutoCommit.Enable = tt.enable - } - om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) - pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") - - // we will wait for twice the interval - timeout := 2 * config.Consumer.Offsets.AutoCommit.Interval - called := make(chan none) - - ocResponse := new(OffsetCommitResponse) - ocResponse.AddError("my_topic", 0, ErrNoError) - handler := func(req *request) (res encoder) { - close(called) - return ocResponse - } - coordinator.setHandler(handler) - - // Should force an offset commit, if auto-commit is enabled. - expected := int64(1) - pom.ResetOffset(expected, "modified_meta") - _, _ = pom.NextOffset() - - select { - case <-called: - // OffsetManager called on the wire. - if !config.Consumer.Offsets.AutoCommit.Enable { - t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name) + t.Run(tt.name, func(t *testing.T) { + + config := NewConfig() + if tt.set { + config.Consumer.Offsets.AutoCommit.Enable = tt.enable } - case <-time.After(timeout): - // Timeout waiting for OffsetManager to call on the wire. - if config.Consumer.Offsets.AutoCommit.Enable { - t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout) + om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) + pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") + + // we will wait for twice the interval + timeout := 2 * config.Consumer.Offsets.AutoCommit.Interval + called := make(chan none) + + ocResponse := new(OffsetCommitResponse) + ocResponse.AddError("my_topic", 0, ErrNoError) + handler := func(req *request) (res encoder) { + close(called) + return ocResponse } - } - safeClose(t, pom) - safeClose(t, om) - safeClose(t, testClient) - broker.Close() - coordinator.Close() + coordinator.setHandler(handler) + + // Should force an offset commit, if auto-commit is enabled. + expected := int64(1) + pom.ResetOffset(expected, "modified_meta") + _, _ = pom.NextOffset() + + select { + case <-called: + // OffsetManager called on the wire. + if !config.Consumer.Offsets.AutoCommit.Enable { + t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name) + } + case <-time.After(timeout): + // Timeout waiting for OffsetManager to call on the wire. + if config.Consumer.Offsets.AutoCommit.Enable { + t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout) + } + } + + broker.Close() + coordinator.Close() + + // !! om must be closed before the pom so pom.release() is called before pom.Close() + safeClose(t, om) + safeClose(t, pom) + safeClose(t, testClient) + }) } } From 26bf4fda4abbf36333da89fcde1fa233623463fc Mon Sep 17 00:00:00 2001 From: Kjell Tore Fossbakk Date: Mon, 18 Nov 2019 20:17:47 +0100 Subject: [PATCH 6/7] moved ticker back to struct --- offset_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/offset_manager.go b/offset_manager.go index edec700f9..e40f42967 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -58,6 +58,7 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client client: client, conf: conf, group: group, + ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval), poms: make(map[string]map[int32]*partitionOffsetManager), memberID: memberID, @@ -66,7 +67,6 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client closing: make(chan none), closed: make(chan none), } - om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval) go withRecover(om.mainLoop) return om, nil From 61c57e3a080f6244e5d7b6f3dce2b04c08501e2c Mon Sep 17 00:00:00 2001 From: Kjell Tore Fossbakk Date: Mon, 18 Nov 2019 21:33:04 +0100 Subject: [PATCH 7/7] Fixed TestNewOffsetManagerOffsetsAutoCommit fails because of low timeout --- offset_manager_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/offset_manager_test.go b/offset_manager_test.go index bd50a25b6..765db166d 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -130,8 +130,9 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") - // we will wait for twice the interval - timeout := 2 * config.Consumer.Offsets.AutoCommit.Interval + // Wait long enough for the test not to fail.. + timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval + called := make(chan none) ocResponse := new(OffsetCommitResponse)