Skip to content

Commit

Permalink
Add support for manual commit to ConsumerGroup
Browse files Browse the repository at this point in the history
- expose a `Commit()` sync method on ConsumerGroupSession
- don't create mainLoop in OffsetManager unless AutoCommit is enabled
  • Loading branch information
wclaeys committed Jun 24, 2020
1 parent ad70aac commit cb9d1e8
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 10 deletions.
9 changes: 9 additions & 0 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,11 @@ type ConsumerGroupSession interface {
// message twice, and your processing should ideally be idempotent.
MarkOffset(topic string, partition int32, offset int64, metadata string)

// Commit the offset to the backend
//
// Note: calling Commit performs a blocking synchronous operation.
Commit()

// ResetOffset resets to the provided offset, alongside a metadata string that
// represents the state of the partition consumer at that point in time. Reset
// acts as a counterpart to MarkOffset, the difference being that it allows to
Expand Down Expand Up @@ -624,6 +629,10 @@ func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset
}
}

func (s *consumerGroupSession) Commit() {
s.offsets.Commit()
}

func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
if pom := s.offsets.findPOM(topic, partition); pom != nil {
pom.ResetOffset(offset, metadata)
Expand Down
27 changes: 17 additions & 10 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type OffsetManager interface {
// will otherwise leak memory. You must call this after all the
// PartitionOffsetManagers are closed.
Close() error

// Commit commits the offsets. This method can be used if AutoCommit.Enable is
// set to false.
Commit()
}

type offsetManager struct {
Expand Down Expand Up @@ -58,7 +62,6 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client
client: client,
conf: conf,
group: group,
ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
poms: make(map[string]map[int32]*partitionOffsetManager),

memberID: memberID,
Expand All @@ -67,7 +70,10 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client
closing: make(chan none),
closed: make(chan none),
}
go withRecover(om.mainLoop)
if conf.Consumer.Offsets.AutoCommit.Enable {
om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
go withRecover(om.mainLoop)
}

return om, nil
}
Expand Down Expand Up @@ -99,7 +105,9 @@ func (om *offsetManager) Close() error {
om.closeOnce.Do(func() {
// exit the mainLoop
close(om.closing)
<-om.closed
if om.conf.Consumer.Offsets.AutoCommit.Enable {
<-om.closed
}

// mark all POMs as closed
om.asyncClosePOMs()
Expand Down Expand Up @@ -225,20 +233,19 @@ func (om *offsetManager) mainLoop() {
for {
select {
case <-om.ticker.C:
om.flushToBroker()
om.releasePOMs(false)
om.Commit()
case <-om.closing:
return
}
}
}

// flushToBroker is ignored if auto-commit offsets is disabled
func (om *offsetManager) flushToBroker() {
if !om.conf.Consumer.Offsets.AutoCommit.Enable {
return
}
func (om *offsetManager) Commit() {
om.flushToBroker()
om.releasePOMs(false)
}

func (om *offsetManager) flushToBroker() {
req := om.constructRequest()
if req == nil {
return
Expand Down
58 changes: 58 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,64 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
}
}

func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
// Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false
config := NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = false

om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

// Wait long enough for the test not to fail..
timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
called := make(chan none)
handler := func(req *request) (res encoderWithHeader) {
close(called)
return ocResponse
}
coordinator.setHandler(handler)

// Should not trigger an auto-commit
expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
_, _ = pom.NextOffset()

select {
case <-called:
// OffsetManager called on the wire.
t.Errorf("Received request when AutoCommit is disabled")
case <-time.After(timeout):
// Timeout waiting for OffsetManager to call on the wire.
// OK
}

// Setup again to test manual commit
called = make(chan none)

om.Commit()

select {
case <-called:
// OffsetManager called on the wire.
// OK
case <-time.After(timeout):
// Timeout waiting for OffsetManager to call on the wire.
t.Errorf("No request received for after waiting for %v", timeout)
}

// Close up
broker.Close()
coordinator.Close()

// !! om must be closed before the pom so pom.release() is called before pom.Close()
safeClose(t, om)
safeClose(t, pom)
safeClose(t, testClient)
}

// Test recovery from ErrNotCoordinatorForConsumer
// on first fetchInitialOffset call
func TestOffsetManagerFetchInitialFail(t *testing.T) {
Expand Down

0 comments on commit cb9d1e8

Please sign in to comment.