Skip to content

Commit

Permalink
Add metrics to measture the time a task waiting in history queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Jul 31, 2024
1 parent 38c295d commit d2ead6c
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 26 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2234,6 +2234,7 @@ const (
TaskBatchCompleteFailure
TaskProcessingLatency
TaskQueueLatency
ScheduleToStartHistoryQueueLatencyPerTaskList

TaskRequestsPerDomain
TaskLatencyPerDomain
Expand Down Expand Up @@ -2901,6 +2902,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},
ScheduleToStartHistoryQueueLatencyPerTaskList: {metricName: "schedule_to_start_history_queue_latency_per_tl", metricType: Timer},

// per domain task metrics

Expand Down
23 changes: 23 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var (
ErrContextTimeoutNotSet = &types.BadRequestError{Message: "Context timeout is not set."}
// ErrDecisionResultCountTooLarge error for decision result count exceeds limit
ErrDecisionResultCountTooLarge = &types.BadRequestError{Message: "Decision result count exceeds limit."}
stickyTaskListMetricTag = metrics.TaskListTag("__sticky__")
)

// AwaitWaitGroup calls Wait on the given wait
Expand Down Expand Up @@ -1038,3 +1039,25 @@ func IntersectionStringSlice(a, b []string) []string {
}
return result
}

// NewPerTaskListScope creates a tasklist metrics scope
func NewPerTaskListScope(
domainName string,
taskListName string,
taskListKind types.TaskListKind,
client metrics.Client,
scopeIdx int,
) metrics.Scope {
domainTag := metrics.DomainUnknownTag()
taskListTag := metrics.TaskListUnknownTag()
if domainName != "" {
domainTag = metrics.DomainTag(domainName)
}
if taskListName != "" && taskListKind != types.TaskListKindSticky {
taskListTag = metrics.TaskListTag(taskListName)
}
if taskListKind == types.TaskListKindSticky {
taskListTag = stickyTaskListMetricTag

Check warning on line 1060 in common/util.go

View check run for this annotation

Codecov / codecov/patch

common/util.go#L1060

Added line #L1060 was not covered by tests
}
return client.Scope(scopeIdx, domainTag, taskListTag)
}
4 changes: 4 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1633,3 +1633,7 @@ func TestSecondsToDuration(t *testing.T) {
})
}
}

func TestNewPerTaskListScope(t *testing.T) {
assert.NotNil(t, NewPerTaskListScope("test-domain", "test-tasklist", types.TaskListKindNormal, metrics.NewNoopMetricsClient(), 0))
}
1 change: 1 addition & 0 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ func (t *taskImpl) Ack() {
t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(t.submitTime))
t.scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(t.GetVisibilityTimestamp()))

}

if t.eventLogger != nil && t.shouldProcessTask && t.attempt != 0 {
Expand Down
17 changes: 16 additions & 1 deletion service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (t *transferActiveTaskExecutor) processActivityTask(
return nil
}

domainName := mutableState.GetDomainEntry().GetInfo().Name
ai, ok := mutableState.GetActivityInfo(task.ScheduleID)
if !ok {
t.logger.Debug("Potentially duplicate ", tag.TaskID(task.TaskID), tag.WorkflowScheduleID(task.ScheduleID), tag.TaskType(persistence.TransferTaskTypeActivityTask))
Expand All @@ -209,7 +210,12 @@ func (t *transferActiveTaskExecutor) processActivityTask(
return errWorkflowRateLimited
}

return t.pushActivity(ctx, task, timeout, mutableState.GetExecutionInfo().PartitionConfig)
err = t.pushActivity(ctx, task, timeout, mutableState.GetExecutionInfo().PartitionConfig)
if err == nil {
scope := common.NewPerTaskListScope(domainName, task.TaskList, types.TaskListKindNormal, t.metricsClient, metrics.TransferActiveTaskActivityScope)
scope.RecordTimer(metrics.ScheduleToStartHistoryQueueLatencyPerTaskList, time.Since(task.GetVisibilityTimestamp()))
}
return err
}

func (t *transferActiveTaskExecutor) processDecisionTask(
Expand Down Expand Up @@ -248,6 +254,7 @@ func (t *transferActiveTaskExecutor) processDecisionTask(
return err
}

domainName := mutableState.GetDomainEntry().GetInfo().Name
executionInfo := mutableState.GetExecutionInfo()
workflowTimeout := executionInfo.WorkflowTimeout
decisionTimeout := common.MinInt32(workflowTimeout, common.MaxTaskTimeout)
Expand Down Expand Up @@ -298,6 +305,14 @@ func (t *transferActiveTaskExecutor) processDecisionTask(
// will re-create a new non-sticky task and reset sticky.
err = t.pushDecision(ctx, task, taskList, decisionTimeout, mutableState.GetExecutionInfo().PartitionConfig)
}
if err == nil {
tlKind := types.TaskListKindNormal
if taskList.Kind != nil {
tlKind = *taskList.Kind
}
scope := common.NewPerTaskListScope(domainName, taskList.Name, tlKind, t.metricsClient, metrics.TransferActiveTaskDecisionScope)
scope.RecordTimer(metrics.ScheduleToStartHistoryQueueLatencyPerTaskList, time.Since(task.GetVisibilityTimestamp()))
}
return err
}

Expand Down
4 changes: 2 additions & 2 deletions service/matching/handler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"context"
"sync"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching/tasklist"
)

type handlerContext struct {
Expand All @@ -47,7 +47,7 @@ func newHandlerContext(
) *handlerContext {
return &handlerContext{
Context: ctx,
scope: tasklist.NewPerTaskListScope(domainName, taskList.GetName(), taskList.GetKind(), metricsClient, metricsScope).Tagged(metrics.GetContextTags(ctx)...),
scope: common.NewPerTaskListScope(domainName, taskList.GetName(), taskList.GetKind(), metricsClient, metricsScope).Tagged(metrics.GetContextTags(ctx)...),
logger: logger.WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(taskList.GetName())),
}
}
Expand Down
24 changes: 1 addition & 23 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ var (
persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
taskListActivityTypeTag = metrics.TaskListTypeTag("activity")
taskListDecisionTypeTag = metrics.TaskListTypeTag("decision")
stickyTaskListMetricTag = metrics.TaskListTag("__sticky__")
)

type (
Expand Down Expand Up @@ -154,7 +153,7 @@ func NewManager(
taskListKind = &normalTaskListKind
}

scope := NewPerTaskListScope(domainName, taskList.GetName(), *taskListKind, metricsClient, metrics.MatchingTaskListMgrScope).
scope := common.NewPerTaskListScope(domainName, taskList.GetName(), *taskListKind, metricsClient, metrics.MatchingTaskListMgrScope).
Tagged(getTaskListTypeTag(taskList.GetType()))
db := newTaskListDB(taskManager, taskList.GetDomainID(), domainName, taskList.GetName(), taskList.GetType(), int(*taskListKind), logger)

Expand Down Expand Up @@ -730,27 +729,6 @@ func newTaskListConfig(id *Identifier, cfg *config.Config, domainName string) *c
}
}

func NewPerTaskListScope(
domainName string,
taskListName string,
taskListKind types.TaskListKind,
client metrics.Client,
scopeIdx int,
) metrics.Scope {
domainTag := metrics.DomainUnknownTag()
taskListTag := metrics.TaskListUnknownTag()
if domainName != "" {
domainTag = metrics.DomainTag(domainName)
}
if taskListName != "" && taskListKind != types.TaskListKindSticky {
taskListTag = metrics.TaskListTag(taskListName)
}
if taskListKind == types.TaskListKindSticky {
taskListTag = stickyTaskListMetricTag
}
return client.Scope(scopeIdx, domainTag, taskListTag)
}

func IdentityFromContext(ctx context.Context) string {
val, ok := ctx.Value(identityCtxKey{}).(string)
if !ok {
Expand Down

0 comments on commit d2ead6c

Please sign in to comment.