diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 6cfae4b4543..308680be96a 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1681,6 +1681,19 @@ const ( TaskBatchCompleteCounter TaskProcessingLatency TaskQueueLatency + + TaskRequestsPerDomain + TaskLatencyPerDomain + TaskFailuresPerDomain + TaskDiscardedPerDomain + TaskAttemptTimerPerDomain + TaskStandbyRetryCounterPerDomain + TaskNotActiveCounterPerDomain + TaskLimitExceededCounterPerDomain + TaskProcessingLatencyPerDomain + TaskQueueLatencyPerDomain + TransferTaskMissingEventCounterPerDomain + TaskRedispatchQueuePendingTasksTimer TransferTaskThrottledCounter @@ -2119,16 +2132,31 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ CadenceShardFailureGauge: {metricName: "cadence_shard_failure", metricType: Gauge}, }, History: { - TaskRequests: {metricName: "task_requests", metricType: Counter}, - TaskLatency: {metricName: "task_latency", metricType: Timer}, - TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer}, - TaskFailures: {metricName: "task_errors", metricType: Counter}, - TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter}, - TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter}, - TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter}, - TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter}, - TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer}, - TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer}, + TaskRequests: {metricName: "task_requests", metricType: Counter}, + TaskLatency: {metricName: "task_latency", metricType: Timer}, + TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer}, + TaskFailures: {metricName: "task_errors", metricType: Counter}, + TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter}, + TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter}, + TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter}, + TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter}, + TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer}, + TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer}, + + // per domain task metrics + + TaskRequestsPerDomain: {metricName: "task_requests_per_domain", metricRollupName: "task_requests", metricType: Counter}, + TaskLatencyPerDomain: {metricName: "task_latency_per_domain", metricRollupName: "task_latency", metricType: Timer}, + TaskAttemptTimerPerDomain: {metricName: "task_attempt_per_domain", metricRollupName: "task_attempt", metricType: Timer}, + TaskFailuresPerDomain: {metricName: "task_errors_per_domain", metricRollupName: "task_errors", metricType: Counter}, + TaskDiscardedPerDomain: {metricName: "task_errors_discarded_per_domain", metricRollupName: "task_errors_discarded", metricType: Counter}, + TaskStandbyRetryCounterPerDomain: {metricName: "task_errors_standby_retry_counter_per_domain", metricRollupName: "task_errors_standby_retry_counter", metricType: Counter}, + TaskNotActiveCounterPerDomain: {metricName: "task_errors_not_active_counter_per_domain", metricRollupName: "task_errors_not_active_counter", metricType: Counter}, + TaskLimitExceededCounterPerDomain: {metricName: "task_errors_limit_exceeded_counter_per_domain", metricRollupName: "task_errors_limit_exceeded_counter", metricType: Counter}, + TaskProcessingLatencyPerDomain: {metricName: "task_latency_processing_per_domain", metricRollupName: "task_latency_processing", metricType: Timer}, + TaskQueueLatencyPerDomain: {metricName: "task_latency_queue_per_domain", metricRollupName: "task_latency_queue", metricType: Timer}, + TransferTaskMissingEventCounterPerDomain: {metricName: "transfer_task_missing_event_counter_per_domain", metricRollupName: "transfer_task_missing_event_counter", metricType: Counter}, + TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter}, TaskRedispatchQueuePendingTasksTimer: {metricName: "task_redispatch_queue_pending_tasks", metricType: Timer}, TransferTaskThrottledCounter: {metricName: "transfer_task_throttled_counter", metricType: Counter}, diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 92a9f9ffc91..847c936a80f 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -171,7 +171,6 @@ var keys = map[Key]string{ ActiveTaskRedispatchInterval: "history.activeTaskRedispatchInterval", StandbyTaskRedispatchInterval: "history.standbyTaskRedispatchInterval", TaskRedispatchIntervalJitterCoefficient: "history.taskRedispatchIntervalJitterCoefficient", - QueueProcessorEnableDomainTaggedMetrics: "history.queueProcessorEnableDomainTaggedMetrics", QueueProcessorEnableSplit: "history.queueProcessorEnableSplit", QueueProcessorSplitMaxLevel: "history.queueProcessorSplitMaxLevel", QueueProcessorEnableRandomSplitByDomainID: "history.queueProcessorEnableRandomSplitByDomain", @@ -573,8 +572,6 @@ const ( StandbyTaskRedispatchInterval // TaskRedispatchIntervalJitterCoefficient is the task redispatch interval jitter coefficient TaskRedispatchIntervalJitterCoefficient - // QueueProcessorEnableDomainTaggedMetrics indicates whether task processing metrics should include domain tag - QueueProcessorEnableDomainTaggedMetrics // QueueProcessorEnableSplit indicates whether processing queue split policy should be enabled QueueProcessorEnableSplit // QueueProcessorSplitMaxLevel is the max processing queue level diff --git a/service/history/config/config.go b/service/history/config/config.go index 3c2246618bf..3f7f46dc3ba 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -92,16 +92,17 @@ type Config struct { EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter // QueueProcessor settings - QueueProcessorEnableDomainTaggedMetrics dynamicconfig.BoolPropertyFn - QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn - QueueProcessorSplitMaxLevel dynamicconfig.IntPropertyFn - QueueProcessorEnableRandomSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter - QueueProcessorRandomSplitProbability dynamicconfig.FloatPropertyFn - QueueProcessorEnablePendingTaskSplit dynamicconfig.BoolPropertyFn - QueueProcessorPendingTaskSplitThreshold dynamicconfig.MapPropertyFn - QueueProcessorEnableStuckTaskSplit dynamicconfig.BoolPropertyFn - QueueProcessorStuckTaskSplitThreshold dynamicconfig.MapPropertyFn - QueueProcessorSplitLookAheadDurationByDomainID dynamicconfig.DurationPropertyFnWithDomainIDFilter + QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn + QueueProcessorSplitMaxLevel dynamicconfig.IntPropertyFn + QueueProcessorEnableRandomSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter + QueueProcessorRandomSplitProbability dynamicconfig.FloatPropertyFn + QueueProcessorEnablePendingTaskSplit dynamicconfig.BoolPropertyFn + QueueProcessorPendingTaskSplitThreshold dynamicconfig.MapPropertyFn + QueueProcessorEnableStuckTaskSplit dynamicconfig.BoolPropertyFn + QueueProcessorStuckTaskSplitThreshold dynamicconfig.MapPropertyFn + QueueProcessorSplitLookAheadDurationByDomainID dynamicconfig.DurationPropertyFnWithDomainIDFilter + QueueProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn + QueueProcessorPollBackoffIntervalJitterCoefficient dynamicconfig.FloatPropertyFn // TimerQueueProcessor settings TimerTaskBatchSize dynamicconfig.IntPropertyFn @@ -333,16 +334,17 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorSplitQueueIntervalJitterCoefficient, 0.15), EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID, false), - QueueProcessorEnableDomainTaggedMetrics: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableDomainTaggedMetrics, false), - QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false), - QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel, 2), // 3 levels, start from 0 - QueueProcessorEnableRandomSplitByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.QueueProcessorEnableRandomSplitByDomainID, false), - QueueProcessorRandomSplitProbability: dc.GetFloat64Property(dynamicconfig.QueueProcessorRandomSplitProbability, 0.01), - QueueProcessorEnablePendingTaskSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnablePendingTaskSplit, false), - QueueProcessorPendingTaskSplitThreshold: dc.GetMapProperty(dynamicconfig.QueueProcessorPendingTaskSplitThreshold, common.ConvertIntMapToDynamicConfigMapProperty(DefaultPendingTaskSplitThreshold)), - QueueProcessorEnableStuckTaskSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableStuckTaskSplit, false), - QueueProcessorStuckTaskSplitThreshold: dc.GetMapProperty(dynamicconfig.QueueProcessorStuckTaskSplitThreshold, common.ConvertIntMapToDynamicConfigMapProperty(DefaultStuckTaskSplitThreshold)), - QueueProcessorSplitLookAheadDurationByDomainID: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.QueueProcessorSplitLookAheadDurationByDomainID, 20*time.Minute), + QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false), + QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel, 2), // 3 levels, start from 0 + QueueProcessorEnableRandomSplitByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.QueueProcessorEnableRandomSplitByDomainID, false), + QueueProcessorRandomSplitProbability: dc.GetFloat64Property(dynamicconfig.QueueProcessorRandomSplitProbability, 0.01), + QueueProcessorEnablePendingTaskSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnablePendingTaskSplit, false), + QueueProcessorPendingTaskSplitThreshold: dc.GetMapProperty(dynamicconfig.QueueProcessorPendingTaskSplitThreshold, common.ConvertIntMapToDynamicConfigMapProperty(DefaultPendingTaskSplitThreshold)), + QueueProcessorEnableStuckTaskSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableStuckTaskSplit, false), + QueueProcessorStuckTaskSplitThreshold: dc.GetMapProperty(dynamicconfig.QueueProcessorStuckTaskSplitThreshold, common.ConvertIntMapToDynamicConfigMapProperty(DefaultStuckTaskSplitThreshold)), + QueueProcessorSplitLookAheadDurationByDomainID: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.QueueProcessorSplitLookAheadDurationByDomainID, 20*time.Minute), + QueueProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.QueueProcessorPollBackoffInterval, 5*time.Second), + QueueProcessorPollBackoffIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.QueueProcessorPollBackoffIntervalJitterCoefficient, 0.15), TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100), TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10), diff --git a/service/history/queue/timer_queue_processor_base.go b/service/history/queue/timer_queue_processor_base.go index c79fee70408..cfa80647896 100644 --- a/service/history/queue/timer_queue_processor_base.go +++ b/service/history/queue/timer_queue_processor_base.go @@ -113,9 +113,6 @@ func newTimerQueueProcessorBase( queueType = task.QueueTypeStandbyTimer } - // read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config - emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics() - return &timerQueueProcessorBase{ processorBase: processorBase, @@ -130,7 +127,6 @@ func newTimerQueueProcessorBase( processorBase.redispatcher.AddTask, shard.GetTimeSource(), shard.GetConfig().TimerTaskMaxRetryCount, - emitDomainTag, nil, ) }, diff --git a/service/history/queue/transfer_queue_processor_base.go b/service/history/queue/transfer_queue_processor_base.go index 9d0d7e7823e..78b58b105d4 100644 --- a/service/history/queue/transfer_queue_processor_base.go +++ b/service/history/queue/transfer_queue_processor_base.go @@ -94,9 +94,6 @@ func newTransferQueueProcessorBase( queueType = task.QueueTypeStandbyTransfer } - // read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config - emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics() - return &transferQueueProcessorBase{ processorBase: processorBase, @@ -111,7 +108,6 @@ func newTransferQueueProcessorBase( processorBase.redispatcher.AddTask, shard.GetTimeSource(), shard.GetConfig().TransferTaskMaxRetryCount, - emitDomainTag, nil, ) }, diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index aa51fe870d4..cfbb53c5443 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -143,8 +143,6 @@ func newQueueProcessorBase( } if options.QueueType != task.QueueTypeReplication { - // read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config - emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics() p.queueTaskInitializer = func(taskInfo task.Info) task.Task { return task.NewTransferTask( shard, @@ -156,7 +154,6 @@ func newQueueProcessorBase( p.redispatcher.AddTask, p.timeSource, options.MaxRetryCount, - emitDomainTag, p.ackMgr, ) } diff --git a/service/history/task/task.go b/service/history/task/task.go index 14b237f21a8..e43534b8b05 100644 --- a/service/history/task/task.go +++ b/service/history/task/task.go @@ -75,7 +75,6 @@ type ( submitTime time.Time logger log.Logger scopeIdx int - emitDomainTag bool scope metrics.Scope // initialized when processing task to make the initialization parallel taskExecutor Executor maxRetryCount dynamicconfig.IntPropertyFn @@ -115,7 +114,6 @@ func NewTimerTask( redispatchFn func(task Task), timeSource clock.TimeSource, maxRetryCount dynamicconfig.IntPropertyFn, - emitDomainTag bool, ackMgr TimerQueueAckMgr, ) Task { return &timerTask{ @@ -129,7 +127,6 @@ func NewTimerTask( taskExecutor, timeSource, maxRetryCount, - emitDomainTag, ), ackMgr: ackMgr, redispatchFn: redispatchFn, @@ -147,7 +144,6 @@ func NewTransferTask( redispatchFn func(task Task), timeSource clock.TimeSource, maxRetryCount dynamicconfig.IntPropertyFn, - emitDomainTag bool, ackMgr QueueAckMgr, ) Task { return &transferTask{ @@ -161,7 +157,6 @@ func NewTransferTask( taskExecutor, timeSource, maxRetryCount, - emitDomainTag, ), ackMgr: ackMgr, redispatchFn: redispatchFn, @@ -178,7 +173,6 @@ func newQueueTaskBase( taskExecutor Executor, timeSource clock.TimeSource, maxRetryCount dynamicconfig.IntPropertyFn, - emitDomainTag bool, ) *taskBase { return &taskBase{ Info: taskInfo, @@ -187,7 +181,6 @@ func newQueueTaskBase( priority: ctask.NoPriority, queueType: queueType, scopeIdx: scopeIdx, - emitDomainTag: emitDomainTag, scope: nil, logger: logger, attempt: 0, @@ -242,11 +235,7 @@ func (t *taskBase) Execute() error { // processed as active or standby and use the corresponding // task executor. if t.scope == nil { - if t.emitDomainTag { - t.scope = GetOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger) - } else { - t.scope = t.shard.GetMetricsClient().Scope(t.scopeIdx) - } + t.scope = GetOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger) } var err error @@ -260,8 +249,8 @@ func (t *taskBase) Execute() error { defer func() { if t.shouldProcessTask { - t.scope.IncCounter(metrics.TaskRequests) - t.scope.RecordTimer(metrics.TaskProcessingLatency, time.Since(executionStartTime)) + t.scope.IncCounter(metrics.TaskRequestsPerDomain) + t.scope.RecordTimer(metrics.TaskProcessingLatencyPerDomain, time.Since(executionStartTime)) } }() @@ -278,7 +267,7 @@ func (t *taskBase) HandleErr( t.attempt++ if t.attempt > t.maxRetryCount() { - t.scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt)) + t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt)) t.logger.Error("Critical error processing task, retrying.", tag.Error(err), tag.OperationCritical, tag.TaskType(t.GetTaskType())) } @@ -297,19 +286,19 @@ func (t *taskBase) HandleErr( transferTask.TaskType == persistence.TransferTaskTypeCloseExecution && err == execution.ErrMissingWorkflowStartEvent && t.shard.GetConfig().EnableDropStuckTaskByDomainID(t.Info.GetDomainID()) { // use domainID here to avoid accessing domainCache - t.scope.IncCounter(metrics.TransferTaskMissingEventCounter) + t.scope.IncCounter(metrics.TransferTaskMissingEventCounterPerDomain) t.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed) return nil } // this is a transient error if err == ErrTaskRedispatch { - t.scope.IncCounter(metrics.TaskStandbyRetryCounter) + t.scope.IncCounter(metrics.TaskStandbyRetryCounterPerDomain) return err } if err == ErrTaskDiscarded { - t.scope.IncCounter(metrics.TaskDiscarded) + t.scope.IncCounter(metrics.TaskDiscardedPerDomain) err = nil } @@ -318,14 +307,14 @@ func (t *taskBase) HandleErr( // since the new task life cycle will not give up until task processed / verified if _, ok := err.(*workflow.DomainNotActiveError); ok { if t.timeSource.Now().Sub(t.submitTime) > 2*cache.DomainCacheRefreshInterval { - t.scope.IncCounter(metrics.TaskNotActiveCounter) + t.scope.IncCounter(metrics.TaskNotActiveCounterPerDomain) return nil } return err } - t.scope.IncCounter(metrics.TaskFailures) + t.scope.IncCounter(metrics.TaskFailuresPerDomain) if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok { t.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed) @@ -352,9 +341,9 @@ func (t *taskBase) Ack() { t.state = ctask.TaskStateAcked if t.shouldProcessTask { - t.scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt)) - t.scope.RecordTimer(metrics.TaskLatency, time.Since(t.submitTime)) - t.scope.RecordTimer(metrics.TaskQueueLatency, time.Since(t.GetVisibilityTimestamp())) + t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt)) + t.scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(t.submitTime)) + t.scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(t.GetVisibilityTimestamp())) } } diff --git a/service/history/task/task_test.go b/service/history/task/task_test.go index c2abd01a84e..a4b22ec5180 100644 --- a/service/history/task/task_test.go +++ b/service/history/task/task_test.go @@ -218,7 +218,6 @@ func (s *taskSuite) newTestQueueTaskBase( s.mockTaskExecutor, s.timeSource, s.maxRetryCount, - true, ) taskBase.scope = s.mockShard.GetMetricsClient().Scope(0) return taskBase diff --git a/service/history/taskProcessor.go b/service/history/taskProcessor.go index cde1e56972b..19d5f9a6278 100644 --- a/service/history/taskProcessor.go +++ b/service/history/taskProcessor.go @@ -208,7 +208,7 @@ FilterLoop: if err != nil { task.attempt++ if task.attempt >= t.config.TimerTaskMaxRetryCount() { - scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(task.attempt)) + scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(task.attempt)) task.logger.Error("Critical error processing task, retrying.", tag.Error(err), tag.OperationCritical, tag.TaskType(task.task.GetTaskType())) } @@ -257,8 +257,8 @@ func (t *taskProcessor) processTaskOnce( scope := task.GetOrCreateDomainTaggedScope(t.shard, scopeIdx, taskInfo.task.GetDomainID(), t.logger) if taskInfo.shouldProcessTask { - scope.IncCounter(metrics.TaskRequests) - scope.RecordTimer(metrics.TaskProcessingLatency, time.Since(startTime)) + scope.IncCounter(metrics.TaskRequestsPerDomain) + scope.RecordTimer(metrics.TaskProcessingLatencyPerDomain, time.Since(startTime)) } return scope, err @@ -283,14 +283,14 @@ func (t *taskProcessor) handleTaskError( transferTask.TaskType == persistence.TransferTaskTypeCloseExecution && err == execution.ErrMissingWorkflowStartEvent && t.config.EnableDropStuckTaskByDomainID(taskInfo.task.GetDomainID()) { // use domainID here to avoid accessing domainCache - scope.IncCounter(metrics.TransferTaskMissingEventCounter) + scope.IncCounter(metrics.TransferTaskMissingEventCounterPerDomain) taskInfo.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed) return nil } // this is a transient error if err == task.ErrTaskRedispatch { - scope.IncCounter(metrics.TaskStandbyRetryCounter) + scope.IncCounter(metrics.TaskStandbyRetryCounterPerDomain) select { case <-notificationChan: case <-t.shutdownCh: @@ -299,7 +299,7 @@ func (t *taskProcessor) handleTaskError( } if err == task.ErrTaskDiscarded { - scope.IncCounter(metrics.TaskDiscarded) + scope.IncCounter(metrics.TaskDiscardedPerDomain) err = nil } @@ -308,14 +308,14 @@ func (t *taskProcessor) handleTaskError( // since the new task life cycle will not give up until task processed / verified if _, ok := err.(*workflow.DomainNotActiveError); ok { if t.timeSource.Now().Sub(taskInfo.startTime) > 2*cache.DomainCacheRefreshInterval { - scope.IncCounter(metrics.TaskNotActiveCounter) + scope.IncCounter(metrics.TaskNotActiveCounterPerDomain) return nil } return err } - scope.IncCounter(metrics.TaskFailures) + scope.IncCounter(metrics.TaskFailuresPerDomain) if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok { taskInfo.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed) @@ -333,8 +333,8 @@ func (t *taskProcessor) ackTaskOnce( task.processor.complete(task) if task.shouldProcessTask { - scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(task.attempt)) - scope.RecordTimer(metrics.TaskLatency, time.Since(task.startTime)) - scope.RecordTimer(metrics.TaskQueueLatency, time.Since(task.task.GetVisibilityTimestamp())) + scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(task.attempt)) + scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(task.startTime)) + scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(task.task.GetVisibilityTimestamp())) } } diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index f76d85b89d7..99b0f45e32f 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -144,8 +144,6 @@ func newTimerQueueProcessorBase( ), } - // read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config - emitDomainTag := config.QueueProcessorEnableDomainTaggedMetrics() base.queueTaskInitializer = func(taskInfo task.Info) task.Task { return task.NewTimerTask( shard, @@ -157,7 +155,6 @@ func newTimerQueueProcessorBase( base.redispatcher.AddTask, shard.GetTimeSource(), config.TimerTaskMaxRetryCount, - emitDomainTag, timerQueueAckMgr, ) }