Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding some description instrumentation #6242

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,9 @@ const (
FrontendQueryWorkflowScope
// FrontendDescribeWorkflowExecutionScope is the metric scope for frontend.DescribeWorkflowExecution
FrontendDescribeWorkflowExecutionScope
// FrontendDescribeWorkflowExecutionStatusScope is a custom metric for more
// rich details about workflow description calls, including workflow open/closed status
FrontendDescribeWorkflowExecutionStatusScope
// FrontendDescribeTaskListScope is the metric scope for frontend.DescribeTaskList
FrontendDescribeTaskListScope
// FrontendResetStickyTaskListScope is the metric scope for frontend.ResetStickyTaskList
Expand Down Expand Up @@ -1801,6 +1804,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FrontendDeprecateDomainScope: {operation: "DeprecateDomain"},
FrontendQueryWorkflowScope: {operation: "QueryWorkflow"},
FrontendDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"},
FrontendDescribeWorkflowExecutionStatusScope: {operation: "DescribeWorkflowExecutionStatus"},
FrontendListTaskListPartitionsScope: {operation: "FrontendListTaskListPartitions"},
FrontendGetTaskListsByDomainScope: {operation: "FrontendGetTaskListsByDomain"},
FrontendRefreshWorkflowTasksScope: {operation: "FrontendRefreshWorkflowTasks"},
Expand Down Expand Up @@ -2129,6 +2133,9 @@ const (
KafkaConsumerMessageNackDlqErr
KafkaConsumerSessionStart

DescribeWorkflowStatusCount
DescribeWorkflowStatusError

GracefulFailoverLatency
GracefulFailoverFailure

Expand Down Expand Up @@ -2878,6 +2885,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
IsolationGroupStateHealthy: {metricName: "isolation_group_healthy", metricType: Counter},
ValidatedWorkflowCount: {metricName: "task_validator_count", metricType: Counter},
HashringViewIdentifier: {metricName: "hashring_view_identifier", metricType: Counter},
DescribeWorkflowStatusError: {metricName: "describe_wf_error", metricType: Counter},
DescribeWorkflowStatusCount: {metricName: "describe_wf_status", metricType: Counter},

AsyncRequestPayloadSize: {metricName: "async_request_payload_size_per_domain", metricRollupName: "async_request_payload_size", metricType: Timer},

Expand Down
7 changes: 7 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
pollerIsolationGroup = "poller_isolation_group"
asyncWFRequestType = "async_wf_request_type"
workflowTerminationReason = "workflow_termination_reason"
workflowCloseStatus = "workflow_close_status"

// limiter-side tags
globalRatelimitKey = "global_ratelimit_key"
Expand Down Expand Up @@ -285,6 +286,12 @@ func WorkflowTerminationReasonTag(value string) Tag {
return simpleMetric{key: workflowTerminationReason, value: value}
}

// WorkflowCloseStatusTag is a stringified workflow status
func WorkflowCloseStatusTag(value string) Tag {
value = safeAlphaNumericStringRE.ReplaceAllString(value, "_")
return simpleMetric{key: workflowCloseStatus, value: value}
}

// PartitionConfigTags returns a list of partition config tags
func PartitionConfigTags(partitionConfig map[string]string) []Tag {
tags := make([]Tag, 0, len(partitionConfig))
Expand Down
19 changes: 19 additions & 0 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3415,6 +3415,8 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(
Request: request,
})

wh.emitDescribeWorkflowExecutionMetrics(domainName, response, err)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4071,6 +4073,23 @@ func (hs HealthStatus) String() string {
}
}

func (wh *WorkflowHandler) emitDescribeWorkflowExecutionMetrics(domain string, response *types.DescribeWorkflowExecutionResponse, err error) {
scope := wh.GetMetricsClient().Scope(metrics.FrontendDescribeWorkflowExecutionStatusScope, metrics.DomainTag(domain))

if err != nil || response == nil {
scope.IncCounter(metrics.DescribeWorkflowStatusError)
return
}

status := "unknown"
if response.WorkflowExecutionInfo != nil && response.WorkflowExecutionInfo.CloseStatus != nil {
status = response.WorkflowExecutionInfo.CloseStatus.String()
}

scope = scope.Tagged(metrics.WorkflowCloseStatusTag(status))
scope.IncCounter(metrics.DescribeWorkflowStatusCount)
}

func getDomainWfIDRunIDTags(
domainName string,
wf *types.WorkflowExecution,
Expand Down
116 changes: 116 additions & 0 deletions service/frontend/api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
Expand Down Expand Up @@ -4363,3 +4364,118 @@ func (s *workflowHandlerSuite) TestTerminateWorkflowExecution() {
})

}

func TestWorkflowDescribeEmitStatusMetrics(t *testing.T) {

tests := map[string]struct {
res *types.DescribeWorkflowExecutionResponse
err error
expectedCounters map[string]tally.CounterSnapshot
}{
"valid closed workflow": {
res: &types.DescribeWorkflowExecutionResponse{
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{
CloseStatus: common.Ptr(types.WorkflowExecutionCloseStatusCompleted),
},
},
expectedCounters: map[string]tally.CounterSnapshot{
"describe_wf_status+domain=some-domain,operation=DescribeWorkflowExecutionStatus,workflow_close_status=COMPLETED": &counterSnapshotMock{
name: "describe_wf_status",
tags: map[string]string{
"domain": "some-domain",
"workflow_close_status": "COMPLETED",
"operation": "DescribeWorkflowExecutionStatus",
},
value: 1,
},
},
},
"A workflow not found": {
res: nil,
err: &types.EntityNotExistsError{},
expectedCounters: map[string]tally.CounterSnapshot{
"describe_wf_error+domain=some-domain,operation=DescribeWorkflowExecutionStatus": &counterSnapshotMock{
name: "describe_wf_error",
tags: map[string]string{
"domain": "some-domain",
"operation": "DescribeWorkflowExecutionStatus",
},
value: 1,
},
},
},
"A invalid input 1": {
res: nil,
err: nil,
expectedCounters: map[string]tally.CounterSnapshot{
"describe_wf_error+domain=some-domain,operation=DescribeWorkflowExecutionStatus": &counterSnapshotMock{
name: "describe_wf_error",
tags: map[string]string{
"domain": "some-domain",
"operation": "DescribeWorkflowExecutionStatus",
},
value: 1,
},
},
},
"invalid input 2": {
res: &types.DescribeWorkflowExecutionResponse{
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{
// intentionally nil
// CloseStatus: common.Ptr(types.WorkflowExecutionCloseStatusCompleted),
},
},
expectedCounters: map[string]tally.CounterSnapshot{
"describe_wf_status+domain=some-domain,operation=DescribeWorkflowExecutionStatus,workflow_close_status=unknown": &counterSnapshotMock{
name: "describe_wf_status",
tags: map[string]string{
"domain": "some-domain",
"workflow_close_status": "unknown",
"operation": "DescribeWorkflowExecutionStatus",
},
value: 1,
},
},
},
}

for name, td := range tests {
t.Run(name, func(t *testing.T) {

scope := tally.NewTestScope("", nil)
mockR := resource.Test{
MetricsScope: scope,
MetricsClient: metrics.NewClient(scope, 1),
}

wh := WorkflowHandler{
Resource: &mockR,
}

wh.emitDescribeWorkflowExecutionMetrics("some-domain", td.res, td.err)
snap := scope.Snapshot()

for k, v := range td.expectedCounters {
_, ok := snap.Counters()[k]
if !ok {
t.Errorf("the metric string expected was not found. Expected a map with this key: %q\ngot %v", k, snap.Counters())
return
}

assert.Equal(t, snap.Counters()[k].Name(), v.Name())
assert.Equal(t, snap.Counters()[k].Value(), v.Value())
assert.Equal(t, snap.Counters()[k].Tags(), v.Tags())
}
})
}
}

type counterSnapshotMock struct {
name string
tags map[string]string
value int64
}

func (cs *counterSnapshotMock) Name() string { return cs.name }
func (cs *counterSnapshotMock) Tags() map[string]string { return cs.tags }
func (cs *counterSnapshotMock) Value() int64 { return cs.value }
Loading