diff --git a/common/asyncworkflow/queue/kafka/queue.go b/common/asyncworkflow/queue/kafka/queue.go index cd8ccf3f40c..2ee9567b3a7 100644 --- a/common/asyncworkflow/queue/kafka/queue.go +++ b/common/asyncworkflow/queue/kafka/queue.go @@ -32,6 +32,7 @@ import ( "github.com/uber/cadence/common/asyncworkflow/queue/consumer" "github.com/uber/cadence/common/asyncworkflow/queue/provider" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/messaging/kafka" "github.com/uber/cadence/common/metrics" @@ -85,6 +86,7 @@ func (q *queueImpl) CreateConsumer(p *provider.Params) (provider.Consumer, error if err != nil { return nil, fmt.Errorf("failed to create kafka consumer: %w", err) } + p.Logger.Info("Creating async wf consumer", tag.KafkaTopicName(q.config.Topic)) return consumer.New(q.ID(), kafkaConsumer, p.Logger, p.MetricsClient, p.FrontendClient), nil } @@ -94,6 +96,7 @@ func (q *queueImpl) CreateProducer(p *provider.Params) (messaging.Producer, erro return nil, err } config.Producer.Return.Successes = true + p.Logger.Info("Creating async wf producer", tag.KafkaTopicName(q.config.Topic)) return newProducer(q.config.Topic, q.config.Connection.Brokers, config, p.MetricsClient, p.Logger) } @@ -102,5 +105,7 @@ func newProducer(topic string, brokers []string, saramaConfig *sarama.Config, me if err != nil { return nil, err } - return messaging.NewMetricProducer(kafka.NewKafkaProducer(topic, p, logger), metricsClient), nil + + withMetricsOpt := messaging.WithMetricTags(metrics.TopicTag(topic)) + return messaging.NewMetricProducer(kafka.NewKafkaProducer(topic, p, logger), metricsClient, withMetricsOpt), nil } diff --git a/common/messaging/kafka/client_impl.go b/common/messaging/kafka/client_impl.go index 8be63febd30..dd10a182a9f 100644 --- a/common/messaging/kafka/client_impl.go +++ b/common/messaging/kafka/client_impl.go @@ -141,7 +141,8 @@ func (c *clientImpl) newProducerByTopic(topic string) (messaging.Producer, error if c.metricsClient != nil { c.logger.Info("Create producer with metricsClient") - return messaging.NewMetricProducer(NewKafkaProducer(topic, producer, c.logger), c.metricsClient), nil + withMetricsOpt := messaging.WithMetricTags(metrics.TopicTag(topic)) + return messaging.NewMetricProducer(NewKafkaProducer(topic, producer, c.logger), c.metricsClient, withMetricsOpt), nil } return NewKafkaProducer(topic, producer, c.logger), nil } diff --git a/common/messaging/metricsProducer.go b/common/messaging/metrics_producer.go similarity index 68% rename from common/messaging/metricsProducer.go rename to common/messaging/metrics_producer.go index 41351952da4..5797c68cfd5 100644 --- a/common/messaging/metricsProducer.go +++ b/common/messaging/metrics_producer.go @@ -26,38 +26,52 @@ import ( "github.com/uber/cadence/common/metrics" ) -type ( - metricsProducer struct { - producer Producer - metricsClient metrics.Client +type MetricsProducer struct { + producer Producer + scope metrics.Scope + tags []metrics.Tag +} + +type MetricProducerOptions func(*MetricsProducer) + +func WithMetricTags(tags ...metrics.Tag) MetricProducerOptions { + return func(p *MetricsProducer) { + p.tags = tags } -) +} // NewMetricProducer creates a new instance of producer that emits metrics func NewMetricProducer( producer Producer, metricsClient metrics.Client, + opts ...MetricProducerOptions, ) Producer { - return &metricsProducer{ - producer: producer, - metricsClient: metricsClient, + p := &MetricsProducer{ + producer: producer, + } + + for _, opt := range opts { + opt(p) } + + p.scope = metricsClient.Scope(metrics.MessagingClientPublishScope, p.tags...) + return p } -func (p *metricsProducer) Publish(ctx context.Context, msg interface{}) error { - p.metricsClient.IncCounter(metrics.MessagingClientPublishScope, metrics.CadenceClientRequests) +func (p *MetricsProducer) Publish(ctx context.Context, msg interface{}) error { + p.scope.IncCounter(metrics.CadenceClientRequests) - sw := p.metricsClient.StartTimer(metrics.MessagingClientPublishScope, metrics.CadenceClientLatency) + sw := p.scope.StartTimer(metrics.CadenceClientLatency) err := p.producer.Publish(ctx, msg) sw.Stop() if err != nil { - p.metricsClient.IncCounter(metrics.MessagingClientPublishScope, metrics.CadenceClientFailures) + p.scope.IncCounter(metrics.CadenceClientFailures) } return err } -func (p *metricsProducer) Close() error { +func (p *MetricsProducer) Close() error { if closeableProducer, ok := p.producer.(CloseableProducer); ok { return closeableProducer.Close() } diff --git a/common/messaging/metrics_producer_test.go b/common/messaging/metrics_producer_test.go new file mode 100644 index 00000000000..e1c4185fb2c --- /dev/null +++ b/common/messaging/metrics_producer_test.go @@ -0,0 +1,112 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package messaging + +import ( + "context" + "errors" + "testing" + + "github.com/golang/mock/gomock" + + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/mocks" +) + +func TestPublish(t *testing.T) { + tests := []struct { + desc string + tags []metrics.Tag + producerFails bool + metricsClientMockFn func() *mocks.Client + }{ + { + desc: "success", + producerFails: false, + tags: []metrics.Tag{ + metrics.TopicTag("test-topic-1"), + }, + metricsClientMockFn: func() *mocks.Client { + metricsClient := &mocks.Client{} + metricsScope := &mocks.Scope{} + metricsClient. + On("Scope", metrics.MessagingClientPublishScope, metrics.TopicTag("test-topic-1")). + Return(metricsScope). + Once() + metricsScope.On("IncCounter", metrics.CadenceClientRequests).Once() + + sw := metrics.NoopScope(metrics.MessagingClientPublishScope).StartTimer(-1) + metricsScope.On("StartTimer", metrics.CadenceClientLatency).Return(sw).Once() + return metricsClient + }, + }, + { + desc: "failure", + producerFails: true, + tags: []metrics.Tag{ + metrics.TopicTag("test-topic-2"), + }, + metricsClientMockFn: func() *mocks.Client { + metricsClient := &mocks.Client{} + metricsScope := &mocks.Scope{} + metricsClient. + On("Scope", metrics.MessagingClientPublishScope, metrics.TopicTag("test-topic-2")). + Return(metricsScope). + Once() + metricsScope.On("IncCounter", metrics.CadenceClientRequests).Once() + metricsScope.On("IncCounter", metrics.CadenceClientFailures).Once() + + sw := metrics.NoopScope(metrics.MessagingClientPublishScope).StartTimer(-1) + metricsScope.On("StartTimer", metrics.CadenceClientLatency).Return(sw).Once() + return metricsClient + }, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + // setup + ctrl := gomock.NewController(t) + mockProducer := NewMockProducer(ctrl) + msg := "custom-message" + if tc.producerFails { + mockProducer.EXPECT().Publish(gomock.Any(), msg).Return(errors.New("publish failed")).Times(1) + } else { + mockProducer.EXPECT().Publish(gomock.Any(), msg).Return(nil).Times(1) + } + metricsClient := tc.metricsClientMockFn() + + // create producer and call publish + p := NewMetricProducer(mockProducer, metricsClient, WithMetricTags(tc.tags...)) + err := p.Publish(context.Background(), msg) + + // validations + if tc.producerFails != (err != nil) { + t.Errorf("expected producer to fail: %v, got: %v", tc.producerFails, err) + } + if err != nil { + return + } + + metricsClient.AssertExpectations(t) + }) + } +} diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 91d30630b6e..63d7f51bb67 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -63,6 +63,7 @@ const ( workflowTerminationReason = "workflow_termination_reason" workflowCloseStatus = "workflow_close_status" isolationEnabled = "isolation_enabled" + topic = "topic" // limiter-side tags globalRatelimitKey = "global_ratelimit_key" @@ -316,3 +317,7 @@ func IsolationEnabledTag(enabled bool) Tag { } return simpleMetric{key: isolationEnabled, value: v} } + +func TopicTag(value string) Tag { + return metricWithUnknown(topic, value) +}