From 947c4f74766161deb3cebdbdf793af77f8c88df9 Mon Sep 17 00:00:00 2001 From: David Porter Date: Wed, 21 Aug 2024 06:59:24 -0700 Subject: [PATCH] adding some description instrumentation (#6242) This adds a metric about workflow describe status, because I need to get this information for a future project. This is functionally identical metric to the existing frontend instrumentation, but adds the workflow status as a dimension. --- common/metrics/defs.go | 9 +++ common/metrics/tags.go | 7 ++ service/frontend/api/handler.go | 19 +++++ service/frontend/api/handler_test.go | 116 +++++++++++++++++++++++++++ 4 files changed, 151 insertions(+) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index b61431b8881..8d9489d41f9 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -990,6 +990,9 @@ const ( FrontendQueryWorkflowScope // FrontendDescribeWorkflowExecutionScope is the metric scope for frontend.DescribeWorkflowExecution FrontendDescribeWorkflowExecutionScope + // FrontendDescribeWorkflowExecutionStatusScope is a custom metric for more + // rich details about workflow description calls, including workflow open/closed status + FrontendDescribeWorkflowExecutionStatusScope // FrontendDescribeTaskListScope is the metric scope for frontend.DescribeTaskList FrontendDescribeTaskListScope // FrontendResetStickyTaskListScope is the metric scope for frontend.ResetStickyTaskList @@ -1801,6 +1804,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ FrontendDeprecateDomainScope: {operation: "DeprecateDomain"}, FrontendQueryWorkflowScope: {operation: "QueryWorkflow"}, FrontendDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"}, + FrontendDescribeWorkflowExecutionStatusScope: {operation: "DescribeWorkflowExecutionStatus"}, FrontendListTaskListPartitionsScope: {operation: "FrontendListTaskListPartitions"}, FrontendGetTaskListsByDomainScope: {operation: "FrontendGetTaskListsByDomain"}, FrontendRefreshWorkflowTasksScope: {operation: "FrontendRefreshWorkflowTasks"}, @@ -2129,6 +2133,9 @@ const ( KafkaConsumerMessageNackDlqErr KafkaConsumerSessionStart + DescribeWorkflowStatusCount + DescribeWorkflowStatusError + GracefulFailoverLatency GracefulFailoverFailure @@ -2880,6 +2887,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ IsolationGroupStateHealthy: {metricName: "isolation_group_healthy", metricType: Counter}, ValidatedWorkflowCount: {metricName: "task_validator_count", metricType: Counter}, HashringViewIdentifier: {metricName: "hashring_view_identifier", metricType: Counter}, + DescribeWorkflowStatusError: {metricName: "describe_wf_error", metricType: Counter}, + DescribeWorkflowStatusCount: {metricName: "describe_wf_status", metricType: Counter}, AsyncRequestPayloadSize: {metricName: "async_request_payload_size_per_domain", metricRollupName: "async_request_payload_size", metricType: Timer}, diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 240ae3a180b..6da90c20350 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -61,6 +61,7 @@ const ( pollerIsolationGroup = "poller_isolation_group" asyncWFRequestType = "async_wf_request_type" workflowTerminationReason = "workflow_termination_reason" + workflowCloseStatus = "workflow_close_status" // limiter-side tags globalRatelimitKey = "global_ratelimit_key" @@ -285,6 +286,12 @@ func WorkflowTerminationReasonTag(value string) Tag { return simpleMetric{key: workflowTerminationReason, value: value} } +// WorkflowCloseStatusTag is a stringified workflow status +func WorkflowCloseStatusTag(value string) Tag { + value = safeAlphaNumericStringRE.ReplaceAllString(value, "_") + return simpleMetric{key: workflowCloseStatus, value: value} +} + // PartitionConfigTags returns a list of partition config tags func PartitionConfigTags(partitionConfig map[string]string) []Tag { tags := make([]Tag, 0, len(partitionConfig)) diff --git a/service/frontend/api/handler.go b/service/frontend/api/handler.go index ad4a1106a54..277074d27da 100644 --- a/service/frontend/api/handler.go +++ b/service/frontend/api/handler.go @@ -3415,6 +3415,8 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution( Request: request, }) + wh.emitDescribeWorkflowExecutionMetrics(domainName, response, err) + if err != nil { return nil, err } @@ -4071,6 +4073,23 @@ func (hs HealthStatus) String() string { } } +func (wh *WorkflowHandler) emitDescribeWorkflowExecutionMetrics(domain string, response *types.DescribeWorkflowExecutionResponse, err error) { + scope := wh.GetMetricsClient().Scope(metrics.FrontendDescribeWorkflowExecutionStatusScope, metrics.DomainTag(domain)) + + if err != nil || response == nil { + scope.IncCounter(metrics.DescribeWorkflowStatusError) + return + } + + status := "unknown" + if response.WorkflowExecutionInfo != nil && response.WorkflowExecutionInfo.CloseStatus != nil { + status = response.WorkflowExecutionInfo.CloseStatus.String() + } + + scope = scope.Tagged(metrics.WorkflowCloseStatusTag(status)) + scope.IncCounter(metrics.DescribeWorkflowStatusCount) +} + func getDomainWfIDRunIDTags( domainName string, wf *types.WorkflowExecution, diff --git a/service/frontend/api/handler_test.go b/service/frontend/api/handler_test.go index ba77d385b36..6c54827d793 100644 --- a/service/frontend/api/handler_test.go +++ b/service/frontend/api/handler_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber-go/tally" "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" @@ -4363,3 +4364,118 @@ func (s *workflowHandlerSuite) TestTerminateWorkflowExecution() { }) } + +func TestWorkflowDescribeEmitStatusMetrics(t *testing.T) { + + tests := map[string]struct { + res *types.DescribeWorkflowExecutionResponse + err error + expectedCounters map[string]tally.CounterSnapshot + }{ + "valid closed workflow": { + res: &types.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &types.WorkflowExecutionInfo{ + CloseStatus: common.Ptr(types.WorkflowExecutionCloseStatusCompleted), + }, + }, + expectedCounters: map[string]tally.CounterSnapshot{ + "describe_wf_status+domain=some-domain,operation=DescribeWorkflowExecutionStatus,workflow_close_status=COMPLETED": &counterSnapshotMock{ + name: "describe_wf_status", + tags: map[string]string{ + "domain": "some-domain", + "workflow_close_status": "COMPLETED", + "operation": "DescribeWorkflowExecutionStatus", + }, + value: 1, + }, + }, + }, + "A workflow not found": { + res: nil, + err: &types.EntityNotExistsError{}, + expectedCounters: map[string]tally.CounterSnapshot{ + "describe_wf_error+domain=some-domain,operation=DescribeWorkflowExecutionStatus": &counterSnapshotMock{ + name: "describe_wf_error", + tags: map[string]string{ + "domain": "some-domain", + "operation": "DescribeWorkflowExecutionStatus", + }, + value: 1, + }, + }, + }, + "A invalid input 1": { + res: nil, + err: nil, + expectedCounters: map[string]tally.CounterSnapshot{ + "describe_wf_error+domain=some-domain,operation=DescribeWorkflowExecutionStatus": &counterSnapshotMock{ + name: "describe_wf_error", + tags: map[string]string{ + "domain": "some-domain", + "operation": "DescribeWorkflowExecutionStatus", + }, + value: 1, + }, + }, + }, + "invalid input 2": { + res: &types.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &types.WorkflowExecutionInfo{ + // intentionally nil + // CloseStatus: common.Ptr(types.WorkflowExecutionCloseStatusCompleted), + }, + }, + expectedCounters: map[string]tally.CounterSnapshot{ + "describe_wf_status+domain=some-domain,operation=DescribeWorkflowExecutionStatus,workflow_close_status=unknown": &counterSnapshotMock{ + name: "describe_wf_status", + tags: map[string]string{ + "domain": "some-domain", + "workflow_close_status": "unknown", + "operation": "DescribeWorkflowExecutionStatus", + }, + value: 1, + }, + }, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + scope := tally.NewTestScope("", nil) + mockR := resource.Test{ + MetricsScope: scope, + MetricsClient: metrics.NewClient(scope, 1), + } + + wh := WorkflowHandler{ + Resource: &mockR, + } + + wh.emitDescribeWorkflowExecutionMetrics("some-domain", td.res, td.err) + snap := scope.Snapshot() + + for k, v := range td.expectedCounters { + _, ok := snap.Counters()[k] + if !ok { + t.Errorf("the metric string expected was not found. Expected a map with this key: %q\ngot %v", k, snap.Counters()) + return + } + + assert.Equal(t, snap.Counters()[k].Name(), v.Name()) + assert.Equal(t, snap.Counters()[k].Value(), v.Value()) + assert.Equal(t, snap.Counters()[k].Tags(), v.Tags()) + } + }) + } +} + +type counterSnapshotMock struct { + name string + tags map[string]string + value int64 +} + +func (cs *counterSnapshotMock) Name() string { return cs.name } +func (cs *counterSnapshotMock) Tags() map[string]string { return cs.tags } +func (cs *counterSnapshotMock) Value() int64 { return cs.value }