Skip to content

Commit

Permalink
Differentiate metrics for domainTagged metrics and task operation met…
Browse files Browse the repository at this point in the history
…rics
  • Loading branch information
anish531213 committed Aug 20, 2020
1 parent 4836f98 commit 3995fdd
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 52 deletions.
46 changes: 36 additions & 10 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,19 @@ const (
TaskBatchCompleteCounter
TaskProcessingLatency
TaskQueueLatency

TaskRequestsAllDomains
TaskLatencyAllDomains
TaskFailuresAllDomains
TaskDiscardedAllDomains
TaskAttemptTimerAllDomains
TaskStandbyRetryCounterAllDomains
TaskNotActiveCounterAllDomains
TaskLimitExceededCounterAllDomains
TaskProcessingLatencyAllDomains
TaskQueueLatencyAllDomains
TransferTaskMissingEventCounterAllDomains

TaskRedispatchQueuePendingTasksTimer

TransferTaskThrottledCounter
Expand Down Expand Up @@ -2162,16 +2175,29 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
CadenceShardFailureGauge: {metricName: "cadence_shard_failure", metricType: Gauge},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
TaskFailures: {metricName: "task_errors", metricType: Counter},
TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
TaskFailures: {metricName: "task_errors", metricType: Counter},
TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},

TaskRequestsAllDomains: {metricName: "task_requests_all_domains", metricType: Counter},
TaskLatencyAllDomains: {metricName: "task_latency_all_domains", metricType: Timer},
TaskAttemptTimerAllDomains: {metricName: "task_attempt_all_domains", metricType: Timer},
TaskFailuresAllDomains: {metricName: "task_errors_all_domains", metricType: Counter},
TaskDiscardedAllDomains: {metricName: "task_errors_discarded_all_domains", metricType: Counter},
TaskStandbyRetryCounterAllDomains: {metricName: "task_errors_standby_retry_counter_all_domains", metricType: Counter},
TaskNotActiveCounterAllDomains: {metricName: "task_errors_not_active_counter_all_domains", metricType: Counter},
TaskLimitExceededCounterAllDomains: {metricName: "task_errors_limit_exceeded_counter_all_domains", metricType: Counter},
TaskProcessingLatencyAllDomains: {metricName: "task_latency_processing_all_domains", metricType: Timer},
TaskQueueLatencyAllDomains: {metricName: "task_latency_queue_all_domains", metricType: Timer},
TransferTaskMissingEventCounterAllDomains: {metricName: "transfer_task_missing_event_counter_all_domains", metricType: Counter},

TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TaskRedispatchQueuePendingTasksTimer: {metricName: "task_redispatch_queue_pending_tasks", metricType: Timer},
TransferTaskThrottledCounter: {metricName: "transfer_task_throttled_counter", metricType: Counter},
Expand Down
112 changes: 70 additions & 42 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,19 @@ type (
sync.Mutex
Info

shard shard.Context
state ctask.State
priority int
attempt int
timeSource clock.TimeSource
submitTime time.Time
logger log.Logger
scopeIdx int
emitDomainTag bool
scope metrics.Scope // initialized when processing task to make the initialization parallel
taskExecutor Executor
maxRetryCount dynamicconfig.IntPropertyFn
shard shard.Context
state ctask.State
priority int
attempt int
timeSource clock.TimeSource
submitTime time.Time
logger log.Logger
scopeIdx int
emitDomainTag bool
scope metrics.Scope // initialized when processing task to make the initialization parallel
domainTaggedScope metrics.Scope
taskExecutor Executor
maxRetryCount dynamicconfig.IntPropertyFn

// TODO: following three fields should be removed after new task lifecycle is implemented
taskFilter Filter
Expand Down Expand Up @@ -181,21 +182,22 @@ func newQueueTaskBase(
emitDomainTag bool,
) *taskBase {
return &taskBase{
Info: taskInfo,
shard: shard,
state: ctask.TaskStatePending,
priority: ctask.NoPriority,
queueType: queueType,
scopeIdx: scopeIdx,
emitDomainTag: emitDomainTag,
scope: nil,
logger: logger,
attempt: 0,
submitTime: timeSource.Now(),
timeSource: timeSource,
maxRetryCount: maxRetryCount,
taskFilter: taskFilter,
taskExecutor: taskExecutor,
Info: taskInfo,
shard: shard,
state: ctask.TaskStatePending,
priority: ctask.NoPriority,
queueType: queueType,
scopeIdx: scopeIdx,
emitDomainTag: emitDomainTag,
scope: nil,
domainTaggedScope: nil,
logger: logger,
attempt: 0,
submitTime: timeSource.Now(),
timeSource: timeSource,
maxRetryCount: maxRetryCount,
taskFilter: taskFilter,
taskExecutor: taskExecutor,
}
}

Expand Down Expand Up @@ -242,10 +244,9 @@ func (t *taskBase) Execute() error {
// processed as active or standby and use the corresponding
// task executor.
if t.scope == nil {
if t.emitDomainTag {
t.scope = GetOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger)
} else {
t.scope = t.shard.GetMetricsClient().Scope(t.scopeIdx)
t.scope = t.shard.GetMetricsClient().Scope(t.scopeIdx)
if t.emitDomainTag && t.domainTaggedScope == nil {
t.domainTaggedScope = GetOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger)
}
}

Expand All @@ -260,8 +261,12 @@ func (t *taskBase) Execute() error {

defer func() {
if t.shouldProcessTask {
t.scope.IncCounter(metrics.TaskRequests)
t.scope.RecordTimer(metrics.TaskProcessingLatency, time.Since(executionStartTime))
t.scope.IncCounter(metrics.TaskRequestsAllDomains)
t.scope.RecordTimer(metrics.TaskProcessingLatencyAllDomains, time.Since(executionStartTime))
if t.emitDomainTag {
t.domainTaggedScope.IncCounter(metrics.TaskRequests)
t.domainTaggedScope.RecordTimer(metrics.TaskProcessingLatency, time.Since(executionStartTime))
}
}
}()

Expand All @@ -278,7 +283,10 @@ func (t *taskBase) HandleErr(

t.attempt++
if t.attempt > t.maxRetryCount() {
t.scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskAttemptTimerAllDomains, time.Duration(t.attempt))
if t.emitDomainTag {
t.domainTaggedScope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt))
}
t.logger.Error("Critical error processing task, retrying.",
tag.Error(err), tag.OperationCritical, tag.TaskType(t.GetTaskType()))
}
Expand All @@ -297,19 +305,28 @@ func (t *taskBase) HandleErr(
transferTask.TaskType == persistence.TransferTaskTypeCloseExecution &&
err == execution.ErrMissingWorkflowStartEvent &&
t.shard.GetConfig().EnableDropStuckTaskByDomainID(t.Info.GetDomainID()) { // use domainID here to avoid accessing domainCache
t.scope.IncCounter(metrics.TransferTaskMissingEventCounter)
t.scope.IncCounter(metrics.TransferTaskMissingEventCounterAllDomains)
if t.emitDomainTag {
t.domainTaggedScope.IncCounter(metrics.TransferTaskMissingEventCounter)
}
t.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}

// this is a transient error
if err == ErrTaskRedispatch {
t.scope.IncCounter(metrics.TaskStandbyRetryCounter)
t.scope.IncCounter(metrics.TaskStandbyRetryCounterAllDomains)
if t.emitDomainTag {
t.domainTaggedScope.IncCounter(metrics.TaskStandbyRetryCounter)
}
return err
}

if err == ErrTaskDiscarded {
t.scope.IncCounter(metrics.TaskDiscarded)
t.scope.IncCounter(metrics.TaskDiscardedAllDomains)
if t.emitDomainTag {
t.domainTaggedScope.IncCounter(metrics.TaskDiscarded)
}
err = nil
}

Expand All @@ -318,14 +335,20 @@ func (t *taskBase) HandleErr(
// since the new task life cycle will not give up until task processed / verified
if _, ok := err.(*workflow.DomainNotActiveError); ok {
if t.timeSource.Now().Sub(t.submitTime) > 2*cache.DomainCacheRefreshInterval {
t.scope.IncCounter(metrics.TaskNotActiveCounter)
t.scope.IncCounter(metrics.TaskNotActiveCounterAllDomains)
if t.emitDomainTag {
t.domainTaggedScope.IncCounter(metrics.TaskNotActiveCounter)
}
return nil
}

return err
}

t.scope.IncCounter(metrics.TaskFailures)
t.scope.IncCounter(metrics.TaskFailuresAllDomains)
if t.emitDomainTag {
t.domainTaggedScope.IncCounter(metrics.TaskFailures)
}

if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok {
t.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed)
Expand Down Expand Up @@ -359,9 +382,14 @@ func (t *taskBase) Ack() {

t.state = ctask.TaskStateAcked
if t.shouldProcessTask {
t.scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskLatency, time.Since(t.submitTime))
t.scope.RecordTimer(metrics.TaskQueueLatency, time.Since(t.GetVisibilityTimestamp()))
t.scope.RecordTimer(metrics.TaskAttemptTimerAllDomains, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskLatencyAllDomains, time.Since(t.submitTime))
t.scope.RecordTimer(metrics.TaskQueueLatencyAllDomains, time.Since(t.GetVisibilityTimestamp()))
if t.emitDomainTag {
t.domainTaggedScope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt))
t.domainTaggedScope.RecordTimer(metrics.TaskLatency, time.Since(t.submitTime))
t.domainTaggedScope.RecordTimer(metrics.TaskQueueLatency, time.Since(t.GetVisibilityTimestamp()))
}
}
}

Expand Down

0 comments on commit 3995fdd

Please sign in to comment.