Skip to content

Commit

Permalink
Add topic tag to producer metrics (#6254)
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Aug 26, 2024
1 parent fbae51a commit ac349fc
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 15 deletions.
7 changes: 6 additions & 1 deletion common/asyncworkflow/queue/kafka/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/cadence/common/asyncworkflow/queue/consumer"
"github.com/uber/cadence/common/asyncworkflow/queue/provider"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/messaging/kafka"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -85,6 +86,7 @@ func (q *queueImpl) CreateConsumer(p *provider.Params) (provider.Consumer, error
if err != nil {
return nil, fmt.Errorf("failed to create kafka consumer: %w", err)
}
p.Logger.Info("Creating async wf consumer", tag.KafkaTopicName(q.config.Topic))
return consumer.New(q.ID(), kafkaConsumer, p.Logger, p.MetricsClient, p.FrontendClient), nil
}

Expand All @@ -94,6 +96,7 @@ func (q *queueImpl) CreateProducer(p *provider.Params) (messaging.Producer, erro
return nil, err
}
config.Producer.Return.Successes = true
p.Logger.Info("Creating async wf producer", tag.KafkaTopicName(q.config.Topic))
return newProducer(q.config.Topic, q.config.Connection.Brokers, config, p.MetricsClient, p.Logger)
}

Expand All @@ -102,5 +105,7 @@ func newProducer(topic string, brokers []string, saramaConfig *sarama.Config, me
if err != nil {
return nil, err
}
return messaging.NewMetricProducer(kafka.NewKafkaProducer(topic, p, logger), metricsClient), nil

withMetricsOpt := messaging.WithMetricTags(metrics.TopicTag(topic))
return messaging.NewMetricProducer(kafka.NewKafkaProducer(topic, p, logger), metricsClient, withMetricsOpt), nil
}
3 changes: 2 additions & 1 deletion common/messaging/kafka/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func (c *clientImpl) newProducerByTopic(topic string) (messaging.Producer, error

if c.metricsClient != nil {
c.logger.Info("Create producer with metricsClient")
return messaging.NewMetricProducer(NewKafkaProducer(topic, producer, c.logger), c.metricsClient), nil
withMetricsOpt := messaging.WithMetricTags(metrics.TopicTag(topic))
return messaging.NewMetricProducer(NewKafkaProducer(topic, producer, c.logger), c.metricsClient, withMetricsOpt), nil
}
return NewKafkaProducer(topic, producer, c.logger), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,52 @@ import (
"github.com/uber/cadence/common/metrics"
)

type (
metricsProducer struct {
producer Producer
metricsClient metrics.Client
type MetricsProducer struct {
producer Producer
scope metrics.Scope
tags []metrics.Tag
}

type MetricProducerOptions func(*MetricsProducer)

func WithMetricTags(tags ...metrics.Tag) MetricProducerOptions {
return func(p *MetricsProducer) {
p.tags = tags
}
)
}

// NewMetricProducer creates a new instance of producer that emits metrics
func NewMetricProducer(
producer Producer,
metricsClient metrics.Client,
opts ...MetricProducerOptions,
) Producer {
return &metricsProducer{
producer: producer,
metricsClient: metricsClient,
p := &MetricsProducer{
producer: producer,
}

for _, opt := range opts {
opt(p)
}

p.scope = metricsClient.Scope(metrics.MessagingClientPublishScope, p.tags...)
return p
}

func (p *metricsProducer) Publish(ctx context.Context, msg interface{}) error {
p.metricsClient.IncCounter(metrics.MessagingClientPublishScope, metrics.CadenceClientRequests)
func (p *MetricsProducer) Publish(ctx context.Context, msg interface{}) error {
p.scope.IncCounter(metrics.CadenceClientRequests)

sw := p.metricsClient.StartTimer(metrics.MessagingClientPublishScope, metrics.CadenceClientLatency)
sw := p.scope.StartTimer(metrics.CadenceClientLatency)
err := p.producer.Publish(ctx, msg)
sw.Stop()

if err != nil {
p.metricsClient.IncCounter(metrics.MessagingClientPublishScope, metrics.CadenceClientFailures)
p.scope.IncCounter(metrics.CadenceClientFailures)
}
return err
}

func (p *metricsProducer) Close() error {
func (p *MetricsProducer) Close() error {
if closeableProducer, ok := p.producer.(CloseableProducer); ok {
return closeableProducer.Close()
}
Expand Down
112 changes: 112 additions & 0 deletions common/messaging/metrics_producer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package messaging

import (
"context"
"errors"
"testing"

"github.com/golang/mock/gomock"

"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/metrics/mocks"
)

func TestPublish(t *testing.T) {
tests := []struct {
desc string
tags []metrics.Tag
producerFails bool
metricsClientMockFn func() *mocks.Client
}{
{
desc: "success",
producerFails: false,
tags: []metrics.Tag{
metrics.TopicTag("test-topic-1"),
},
metricsClientMockFn: func() *mocks.Client {
metricsClient := &mocks.Client{}
metricsScope := &mocks.Scope{}
metricsClient.
On("Scope", metrics.MessagingClientPublishScope, metrics.TopicTag("test-topic-1")).
Return(metricsScope).
Once()
metricsScope.On("IncCounter", metrics.CadenceClientRequests).Once()

sw := metrics.NoopScope(metrics.MessagingClientPublishScope).StartTimer(-1)
metricsScope.On("StartTimer", metrics.CadenceClientLatency).Return(sw).Once()
return metricsClient
},
},
{
desc: "failure",
producerFails: true,
tags: []metrics.Tag{
metrics.TopicTag("test-topic-2"),
},
metricsClientMockFn: func() *mocks.Client {
metricsClient := &mocks.Client{}
metricsScope := &mocks.Scope{}
metricsClient.
On("Scope", metrics.MessagingClientPublishScope, metrics.TopicTag("test-topic-2")).
Return(metricsScope).
Once()
metricsScope.On("IncCounter", metrics.CadenceClientRequests).Once()
metricsScope.On("IncCounter", metrics.CadenceClientFailures).Once()

sw := metrics.NoopScope(metrics.MessagingClientPublishScope).StartTimer(-1)
metricsScope.On("StartTimer", metrics.CadenceClientLatency).Return(sw).Once()
return metricsClient
},
},
}

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
// setup
ctrl := gomock.NewController(t)
mockProducer := NewMockProducer(ctrl)
msg := "custom-message"
if tc.producerFails {
mockProducer.EXPECT().Publish(gomock.Any(), msg).Return(errors.New("publish failed")).Times(1)
} else {
mockProducer.EXPECT().Publish(gomock.Any(), msg).Return(nil).Times(1)
}
metricsClient := tc.metricsClientMockFn()

// create producer and call publish
p := NewMetricProducer(mockProducer, metricsClient, WithMetricTags(tc.tags...))
err := p.Publish(context.Background(), msg)

// validations
if tc.producerFails != (err != nil) {
t.Errorf("expected producer to fail: %v, got: %v", tc.producerFails, err)
}
if err != nil {
return
}

metricsClient.AssertExpectations(t)
})
}
}
5 changes: 5 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
workflowTerminationReason = "workflow_termination_reason"
workflowCloseStatus = "workflow_close_status"
isolationEnabled = "isolation_enabled"
topic = "topic"

// limiter-side tags
globalRatelimitKey = "global_ratelimit_key"
Expand Down Expand Up @@ -316,3 +317,7 @@ func IsolationEnabledTag(enabled bool) Tag {
}
return simpleMetric{key: isolationEnabled, value: v}
}

func TopicTag(value string) Tag {
return metricWithUnknown(topic, value)
}

0 comments on commit ac349fc

Please sign in to comment.