diff --git a/consumer_group.go b/consumer_group.go index 2bf236ae53..b64576aba0 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -7,6 +7,8 @@ import ( "sort" "sync" "time" + + "github.com/rcrowley/go-metrics" ) // ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed. @@ -212,12 +214,38 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler return c.retryNewSession(ctx, topics, handler, retries, true) } + var ( + metricRegistry = c.config.MetricRegistry + consumerGroupJoinTotal metrics.Counter + consumerGroupJoinFailed metrics.Counter + consumerGroupSyncTotal metrics.Counter + consumerGroupSyncFailed metrics.Counter + ) + + if metricRegistry != nil { + consumerGroupJoinTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-total-%s", c.groupID), metricRegistry) + consumerGroupJoinFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-failed-%s", c.groupID), metricRegistry) + consumerGroupSyncTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-total-%s", c.groupID), metricRegistry) + consumerGroupSyncFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-failed-%s", c.groupID), metricRegistry) + } + // Join consumer group join, err := c.joinGroupRequest(coordinator, topics) + if consumerGroupJoinTotal != nil { + consumerGroupJoinTotal.Inc(1) + } if err != nil { _ = coordinator.Close() + if consumerGroupJoinFailed != nil { + consumerGroupJoinFailed.Inc(1) + } return nil, err } + if join.Err != ErrNoError { + if consumerGroupJoinFailed != nil { + consumerGroupJoinFailed.Inc(1) + } + } switch join.Err { case ErrNoError: c.memberID = join.MemberId @@ -256,10 +284,22 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler // Sync consumer group groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId) + if consumerGroupSyncTotal != nil { + consumerGroupSyncTotal.Inc(1) + } if err != nil { _ = coordinator.Close() + if consumerGroupSyncFailed != nil { + consumerGroupSyncFailed.Inc(1) + } return nil, err } + if groupRequest.Err != ErrNoError { + if consumerGroupSyncFailed != nil { + consumerGroupSyncFailed.Inc(1) + } + } + switch groupRequest.Err { case ErrNoError: case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately diff --git a/sarama.go b/sarama.go index 48f362d287..7f5b6b60b6 100644 --- a/sarama.go +++ b/sarama.go @@ -68,6 +68,11 @@ Consumer related metrics: | Name | Type | Description | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | consumer-batch-size | histogram | Distribution of the number of messages in a batch | + | consumer-batch-size | histogram | Distribution of the number of messages in a batch | + | consumer-group-join-total- | counter | Total count of consumer group join attempts | + | consumer-group-join-failed- | counter | Total count of consumer group join failures | + | consumer-group-sync-total- | counter | Total count of consumer group sync attempts | + | consumer-group-sync-failed- | counter | Total count of consumer group sync failures | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ */