Skip to content

Commit

Permalink
Revert "Allow the Consumer to disable auto-commit offsets (IBM#1164)"
Browse files Browse the repository at this point in the history
This reverts commit 72a629d.
  • Loading branch information
souravdotsaha committed Nov 22, 2019
1 parent b3ef550 commit 46c52d0
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 98 deletions.
16 changes: 4 additions & 12 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,15 +338,8 @@ type Config struct {
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool

// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration
}
// How frequently to commit updated offsets. Defaults to 1s.
CommitInterval time.Duration

// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Expand Down Expand Up @@ -430,8 +423,7 @@ func NewConfig() *Config {
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
c.Consumer.Offsets.AutoCommit.Enable = true
c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3

Expand Down Expand Up @@ -658,7 +650,7 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.AutoCommit.Interval <= 0:
case c.Consumer.Offsets.CommitInterval <= 0:
return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
Expand Down
7 changes: 1 addition & 6 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client
client: client,
conf: conf,
group: group,
ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
poms: make(map[string]map[int32]*partitionOffsetManager),

memberID: memberID,
Expand Down Expand Up @@ -233,12 +233,7 @@ func (om *offsetManager) mainLoop() {
}
}

// flushToBroker is ignored if auto-commit offsets is disabled
func (om *offsetManager) flushToBroker() {
if !om.conf.Consumer.Offsets.AutoCommit.Enable {
return
}

req := om.constructRequest()
if req == nil {
return
Expand Down
85 changes: 5 additions & 80 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
)

func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager,
backoffFunc func(retries, maxRetries int) time.Duration) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {

config := NewConfig()
config.Metadata.Retry.Max = 1
if backoffFunc != nil {
config.Metadata.Retry.BackoffFunc = backoffFunc
}
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Millisecond
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
config.Version = V0_9_0_0
if retention > 0 {
config.Consumer.Offsets.Retention = retention
Expand Down Expand Up @@ -51,7 +52,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,

func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {
return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig())
return initOffsetManagerWithBackoffFunc(t, retention, nil)
}

func initPartitionOffsetManager(t *testing.T, om OffsetManager,
Expand Down Expand Up @@ -96,82 +97,6 @@ func TestNewOffsetManager(t *testing.T) {
}
}

var offsetsautocommitTestTable = []struct {
name string
set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
enable bool
}{
{
"AutoCommit (default)",
false, // use default
true,
},
{
"AutoCommit Enabled",
true,
true,
},
{
"AutoCommit Disabled",
true,
false,
},
}

func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
// Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable`
for _, tt := range offsetsautocommitTestTable {
t.Run(tt.name, func(t *testing.T) {

config := NewConfig()
if tt.set {
config.Consumer.Offsets.AutoCommit.Enable = tt.enable
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

// Wait long enough for the test not to fail..
timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval

called := make(chan none)

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
handler := func(req *request) (res encoder) {
close(called)
return ocResponse
}
coordinator.setHandler(handler)

// Should force an offset commit, if auto-commit is enabled.
expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
_, _ = pom.NextOffset()

select {
case <-called:
// OffsetManager called on the wire.
if !config.Consumer.Offsets.AutoCommit.Enable {
t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
}
case <-time.After(timeout):
// Timeout waiting for OffsetManager to call on the wire.
if config.Consumer.Offsets.AutoCommit.Enable {
t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
}
}

broker.Close()
coordinator.Close()

// !! om must be closed before the pom so pom.release() is called before pom.Close()
safeClose(t, om)
safeClose(t, pom)
safeClose(t, testClient)
})
}
}

// Test recovery from ErrNotCoordinatorForConsumer
// on first fetchInitialOffset call
func TestOffsetManagerFetchInitialFail(t *testing.T) {
Expand Down Expand Up @@ -223,7 +148,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
atomic.AddInt32(&retryCount, 1)
return 0
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig())
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff)

// Error on first fetchInitialOffset call
responseBlock := OffsetFetchResponseBlock{
Expand Down

0 comments on commit 46c52d0

Please sign in to comment.