Skip to content

Commit

Permalink
Porting Shopify#1164 onto v1.24.1
Browse files Browse the repository at this point in the history
  • Loading branch information
kjelle committed Nov 18, 2019
1 parent 675b0b1 commit 2a23f12
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 10 deletions.
16 changes: 12 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,15 @@ type Config struct {
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
// How frequently to commit updated offsets. Defaults to 1s.
CommitInterval time.Duration
AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool

// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration
}

// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Expand Down Expand Up @@ -423,7 +430,8 @@ func NewConfig() *Config {
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.AutoCommit.Enable = true
c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3

Expand Down Expand Up @@ -650,7 +658,7 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.CommitInterval <= 0:
case c.Consumer.Offsets.AutoCommit.Interval <= 0:
return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
Expand Down
7 changes: 6 additions & 1 deletion offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client
client: client,
conf: conf,
group: group,
ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
poms: make(map[string]map[int32]*partitionOffsetManager),

memberID: memberID,
Expand Down Expand Up @@ -233,7 +233,12 @@ func (om *offsetManager) mainLoop() {
}
}

// flushToBroker is ignored if auto-commit offsets is disabled
func (om *offsetManager) flushToBroker() {
if !om.conf.Consumer.Offsets.AutoCommit.Enable {
return
}

req := om.constructRequest()
if req == nil {
return
Expand Down
84 changes: 79 additions & 5 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
)

func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
backoffFunc func(retries, maxRetries int) time.Duration) (om OffsetManager,
backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {

config := NewConfig()
config.Metadata.Retry.Max = 1
if backoffFunc != nil {
config.Metadata.Retry.BackoffFunc = backoffFunc
}
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Millisecond
config.Version = V0_9_0_0
if retention > 0 {
config.Consumer.Offsets.Retention = retention
Expand Down Expand Up @@ -52,7 +51,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,

func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {
return initOffsetManagerWithBackoffFunc(t, retention, nil)
return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig())
}

func initPartitionOffsetManager(t *testing.T, om OffsetManager,
Expand Down Expand Up @@ -97,6 +96,81 @@ func TestNewOffsetManager(t *testing.T) {
}
}

var offsetsautocommitTestTable = []struct {
name string
set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
enable bool
}{
{
"AutoCommit (default)",
false, // use default
true,
},
{
"AutoCommit Enabled",
true,
true,
},
{
"AutoCommit Disabled",
true,
false,
},
}

func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
// Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable`
for _, tt := range offsetsautocommitTestTable {
t.Run(tt.name, func(t *testing.T) {

config := NewConfig()
if tt.set {
config.Consumer.Offsets.AutoCommit.Enable = tt.enable
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

// we will wait for twice the interval
timeout := 2 * config.Consumer.Offsets.AutoCommit.Interval
called := make(chan none)

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
handler := func(req *request) (res encoder) {
close(called)
return ocResponse
}
coordinator.setHandler(handler)

// Should force an offset commit, if auto-commit is enabled.
expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
_, _ = pom.NextOffset()

select {
case <-called:
// OffsetManager called on the wire.
if !config.Consumer.Offsets.AutoCommit.Enable {
t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
}
case <-time.After(timeout):
// Timeout waiting for OffsetManager to call on the wire.
if config.Consumer.Offsets.AutoCommit.Enable {
t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
}
}

broker.Close()
coordinator.Close()

// !! om must be closed before the pom so pom.release() is called before pom.Close()
safeClose(t, om)
safeClose(t, pom)
safeClose(t, testClient)
})
}
}

// Test recovery from ErrNotCoordinatorForConsumer
// on first fetchInitialOffset call
func TestOffsetManagerFetchInitialFail(t *testing.T) {
Expand Down Expand Up @@ -148,7 +222,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
atomic.AddInt32(&retryCount, 1)
return 0
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff)
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig())

// Error on first fetchInitialOffset call
responseBlock := OffsetFetchResponseBlock{
Expand Down

0 comments on commit 2a23f12

Please sign in to comment.