Skip to content

Commit

Permalink
feat: add counter metrics for consumer group join/sync and their fail…
Browse files Browse the repository at this point in the history
…ures
  • Loading branch information
gdm85 committed Aug 31, 2021
1 parent 839d956 commit 187207b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
40 changes: 40 additions & 0 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sort"
"sync"
"time"

"github.com/rcrowley/go-metrics"
)

// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
Expand Down Expand Up @@ -212,12 +214,38 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return c.retryNewSession(ctx, topics, handler, retries, true)
}

var (
metricRegistry = c.config.MetricRegistry
consumerGroupJoinTotal metrics.Counter
consumerGroupJoinFailed metrics.Counter
consumerGroupSyncTotal metrics.Counter
consumerGroupSyncFailed metrics.Counter
)

if metricRegistry != nil {
consumerGroupJoinTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-total-%s", c.groupID), metricRegistry)
consumerGroupJoinFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-failed-%s", c.groupID), metricRegistry)
consumerGroupSyncTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-total-%s", c.groupID), metricRegistry)
consumerGroupSyncFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-failed-%s", c.groupID), metricRegistry)
}

// Join consumer group
join, err := c.joinGroupRequest(coordinator, topics)
if consumerGroupJoinTotal != nil {
consumerGroupJoinTotal.Inc(1)
}
if err != nil {
_ = coordinator.Close()
if consumerGroupJoinFailed != nil {
consumerGroupJoinFailed.Inc(1)
}
return nil, err
}
if join.Err != ErrNoError {
if consumerGroupJoinFailed != nil {
consumerGroupJoinFailed.Inc(1)
}
}
switch join.Err {
case ErrNoError:
c.memberID = join.MemberId
Expand Down Expand Up @@ -256,10 +284,22 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler

// Sync consumer group
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
if consumerGroupSyncTotal != nil {
consumerGroupSyncTotal.Inc(1)
}
if err != nil {
_ = coordinator.Close()
if consumerGroupSyncFailed != nil {
consumerGroupSyncFailed.Inc(1)
}
return nil, err
}
if groupRequest.Err != ErrNoError {
if consumerGroupSyncFailed != nil {
consumerGroupSyncFailed.Inc(1)
}
}

switch groupRequest.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
Expand Down
5 changes: 5 additions & 0 deletions sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ Consumer related metrics:
| Name | Type | Description |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| consumer-batch-size | histogram | Distribution of the number of messages in a batch |
| consumer-batch-size | histogram | Distribution of the number of messages in a batch |
| consumer-group-join-total-<GroupID> | counter | Total count of consumer group join attempts |
| consumer-group-join-failed-<GroupID> | counter | Total count of consumer group join failures |
| consumer-group-sync-total-<GroupID> | counter | Total count of consumer group sync attempts |
| consumer-group-sync-failed-<GroupID> | counter | Total count of consumer group sync failures |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
*/
Expand Down

0 comments on commit 187207b

Please sign in to comment.