Skip to content

Commit

Permalink
Introduce per domain metrics for task processing. (#3467)
Browse files Browse the repository at this point in the history
* Differentiate metrics for domainTagged metrics and task operation metrics
  • Loading branch information
anish531213 committed Aug 25, 2020
1 parent 1ac5539 commit 8de8f99
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 82 deletions.
48 changes: 38 additions & 10 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,19 @@ const (
TaskBatchCompleteCounter
TaskProcessingLatency
TaskQueueLatency

TaskRequestsPerDomain
TaskLatencyPerDomain
TaskFailuresPerDomain
TaskDiscardedPerDomain
TaskAttemptTimerPerDomain
TaskStandbyRetryCounterPerDomain
TaskNotActiveCounterPerDomain
TaskLimitExceededCounterPerDomain
TaskProcessingLatencyPerDomain
TaskQueueLatencyPerDomain
TransferTaskMissingEventCounterPerDomain

TaskRedispatchQueuePendingTasksTimer

TransferTaskThrottledCounter
Expand Down Expand Up @@ -2119,16 +2132,31 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
CadenceShardFailureGauge: {metricName: "cadence_shard_failure", metricType: Gauge},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
TaskFailures: {metricName: "task_errors", metricType: Counter},
TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
TaskFailures: {metricName: "task_errors", metricType: Counter},
TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},

// per domain task metrics

TaskRequestsPerDomain: {metricName: "task_requests_per_domain", metricRollupName: "task_requests", metricType: Counter},
TaskLatencyPerDomain: {metricName: "task_latency_per_domain", metricRollupName: "task_latency", metricType: Timer},
TaskAttemptTimerPerDomain: {metricName: "task_attempt_per_domain", metricRollupName: "task_attempt", metricType: Timer},
TaskFailuresPerDomain: {metricName: "task_errors_per_domain", metricRollupName: "task_errors", metricType: Counter},
TaskDiscardedPerDomain: {metricName: "task_errors_discarded_per_domain", metricRollupName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounterPerDomain: {metricName: "task_errors_standby_retry_counter_per_domain", metricRollupName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounterPerDomain: {metricName: "task_errors_not_active_counter_per_domain", metricRollupName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounterPerDomain: {metricName: "task_errors_limit_exceeded_counter_per_domain", metricRollupName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatencyPerDomain: {metricName: "task_latency_processing_per_domain", metricRollupName: "task_latency_processing", metricType: Timer},
TaskQueueLatencyPerDomain: {metricName: "task_latency_queue_per_domain", metricRollupName: "task_latency_queue", metricType: Timer},
TransferTaskMissingEventCounterPerDomain: {metricName: "transfer_task_missing_event_counter_per_domain", metricRollupName: "transfer_task_missing_event_counter", metricType: Counter},

TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TaskRedispatchQueuePendingTasksTimer: {metricName: "task_redispatch_queue_pending_tasks", metricType: Timer},
TransferTaskThrottledCounter: {metricName: "transfer_task_throttled_counter", metricType: Counter},
Expand Down
3 changes: 0 additions & 3 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ var keys = map[Key]string{
ActiveTaskRedispatchInterval: "history.activeTaskRedispatchInterval",
StandbyTaskRedispatchInterval: "history.standbyTaskRedispatchInterval",
TaskRedispatchIntervalJitterCoefficient: "history.taskRedispatchIntervalJitterCoefficient",
QueueProcessorEnableDomainTaggedMetrics: "history.queueProcessorEnableDomainTaggedMetrics",
QueueProcessorEnableSplit: "history.queueProcessorEnableSplit",
QueueProcessorSplitMaxLevel: "history.queueProcessorSplitMaxLevel",
QueueProcessorEnableRandomSplitByDomainID: "history.queueProcessorEnableRandomSplitByDomain",
Expand Down Expand Up @@ -573,8 +572,6 @@ const (
StandbyTaskRedispatchInterval
// TaskRedispatchIntervalJitterCoefficient is the task redispatch interval jitter coefficient
TaskRedispatchIntervalJitterCoefficient
// QueueProcessorEnableDomainTaggedMetrics indicates whether task processing metrics should include domain tag
QueueProcessorEnableDomainTaggedMetrics
// QueueProcessorEnableSplit indicates whether processing queue split policy should be enabled
QueueProcessorEnableSplit
// QueueProcessorSplitMaxLevel is the max processing queue level
Expand Down
42 changes: 22 additions & 20 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,17 @@ type Config struct {
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

// QueueProcessor settings
QueueProcessorEnableDomainTaggedMetrics dynamicconfig.BoolPropertyFn
QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn
QueueProcessorSplitMaxLevel dynamicconfig.IntPropertyFn
QueueProcessorEnableRandomSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
QueueProcessorRandomSplitProbability dynamicconfig.FloatPropertyFn
QueueProcessorEnablePendingTaskSplit dynamicconfig.BoolPropertyFn
QueueProcessorPendingTaskSplitThreshold dynamicconfig.MapPropertyFn
QueueProcessorEnableStuckTaskSplit dynamicconfig.BoolPropertyFn
QueueProcessorStuckTaskSplitThreshold dynamicconfig.MapPropertyFn
QueueProcessorSplitLookAheadDurationByDomainID dynamicconfig.DurationPropertyFnWithDomainIDFilter
QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn
QueueProcessorSplitMaxLevel dynamicconfig.IntPropertyFn
QueueProcessorEnableRandomSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
QueueProcessorRandomSplitProbability dynamicconfig.FloatPropertyFn
QueueProcessorEnablePendingTaskSplit dynamicconfig.BoolPropertyFn
QueueProcessorPendingTaskSplitThreshold dynamicconfig.MapPropertyFn
QueueProcessorEnableStuckTaskSplit dynamicconfig.BoolPropertyFn
QueueProcessorStuckTaskSplitThreshold dynamicconfig.MapPropertyFn
QueueProcessorSplitLookAheadDurationByDomainID dynamicconfig.DurationPropertyFnWithDomainIDFilter
QueueProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
QueueProcessorPollBackoffIntervalJitterCoefficient dynamicconfig.FloatPropertyFn

// TimerQueueProcessor settings
TimerTaskBatchSize dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -333,16 +334,17 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorSplitQueueIntervalJitterCoefficient, 0.15),
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID, false),

QueueProcessorEnableDomainTaggedMetrics: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableDomainTaggedMetrics, false),
QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false),
QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel, 2), // 3 levels, start from 0
QueueProcessorEnableRandomSplitByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.QueueProcessorEnableRandomSplitByDomainID, false),
QueueProcessorRandomSplitProbability: dc.GetFloat64Property(dynamicconfig.QueueProcessorRandomSplitProbability, 0.01),
QueueProcessorEnablePendingTaskSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnablePendingTaskSplit, false),
QueueProcessorPendingTaskSplitThreshold: dc.GetMapProperty(dynamicconfig.QueueProcessorPendingTaskSplitThreshold, common.ConvertIntMapToDynamicConfigMapProperty(DefaultPendingTaskSplitThreshold)),
QueueProcessorEnableStuckTaskSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableStuckTaskSplit, false),
QueueProcessorStuckTaskSplitThreshold: dc.GetMapProperty(dynamicconfig.QueueProcessorStuckTaskSplitThreshold, common.ConvertIntMapToDynamicConfigMapProperty(DefaultStuckTaskSplitThreshold)),
QueueProcessorSplitLookAheadDurationByDomainID: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.QueueProcessorSplitLookAheadDurationByDomainID, 20*time.Minute),
QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false),
QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel, 2), // 3 levels, start from 0
QueueProcessorEnableRandomSplitByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.QueueProcessorEnableRandomSplitByDomainID, false),
QueueProcessorRandomSplitProbability: dc.GetFloat64Property(dynamicconfig.QueueProcessorRandomSplitProbability, 0.01),
QueueProcessorEnablePendingTaskSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnablePendingTaskSplit, false),
QueueProcessorPendingTaskSplitThreshold: dc.GetMapProperty(dynamicconfig.QueueProcessorPendingTaskSplitThreshold, common.ConvertIntMapToDynamicConfigMapProperty(DefaultPendingTaskSplitThreshold)),
QueueProcessorEnableStuckTaskSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableStuckTaskSplit, false),
QueueProcessorStuckTaskSplitThreshold: dc.GetMapProperty(dynamicconfig.QueueProcessorStuckTaskSplitThreshold, common.ConvertIntMapToDynamicConfigMapProperty(DefaultStuckTaskSplitThreshold)),
QueueProcessorSplitLookAheadDurationByDomainID: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.QueueProcessorSplitLookAheadDurationByDomainID, 20*time.Minute),
QueueProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.QueueProcessorPollBackoffInterval, 5*time.Second),
QueueProcessorPollBackoffIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.QueueProcessorPollBackoffIntervalJitterCoefficient, 0.15),

TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100),
TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10),
Expand Down
4 changes: 0 additions & 4 deletions service/history/queue/timer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ func newTimerQueueProcessorBase(
queueType = task.QueueTypeStandbyTimer
}

// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics()

return &timerQueueProcessorBase{
processorBase: processorBase,

Expand All @@ -130,7 +127,6 @@ func newTimerQueueProcessorBase(
processorBase.redispatcher.AddTask,
shard.GetTimeSource(),
shard.GetConfig().TimerTaskMaxRetryCount,
emitDomainTag,
nil,
)
},
Expand Down
4 changes: 0 additions & 4 deletions service/history/queue/transfer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ func newTransferQueueProcessorBase(
queueType = task.QueueTypeStandbyTransfer
}

// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics()

return &transferQueueProcessorBase{
processorBase: processorBase,

Expand All @@ -111,7 +108,6 @@ func newTransferQueueProcessorBase(
processorBase.redispatcher.AddTask,
shard.GetTimeSource(),
shard.GetConfig().TransferTaskMaxRetryCount,
emitDomainTag,
nil,
)
},
Expand Down
3 changes: 0 additions & 3 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ func newQueueProcessorBase(
}

if options.QueueType != task.QueueTypeReplication {
// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics()
p.queueTaskInitializer = func(taskInfo task.Info) task.Task {
return task.NewTransferTask(
shard,
Expand All @@ -156,7 +154,6 @@ func newQueueProcessorBase(
p.redispatcher.AddTask,
p.timeSource,
options.MaxRetryCount,
emitDomainTag,
p.ackMgr,
)
}
Expand Down
35 changes: 12 additions & 23 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type (
submitTime time.Time
logger log.Logger
scopeIdx int
emitDomainTag bool
scope metrics.Scope // initialized when processing task to make the initialization parallel
taskExecutor Executor
maxRetryCount dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -115,7 +114,6 @@ func NewTimerTask(
redispatchFn func(task Task),
timeSource clock.TimeSource,
maxRetryCount dynamicconfig.IntPropertyFn,
emitDomainTag bool,
ackMgr TimerQueueAckMgr,
) Task {
return &timerTask{
Expand All @@ -129,7 +127,6 @@ func NewTimerTask(
taskExecutor,
timeSource,
maxRetryCount,
emitDomainTag,
),
ackMgr: ackMgr,
redispatchFn: redispatchFn,
Expand All @@ -147,7 +144,6 @@ func NewTransferTask(
redispatchFn func(task Task),
timeSource clock.TimeSource,
maxRetryCount dynamicconfig.IntPropertyFn,
emitDomainTag bool,
ackMgr QueueAckMgr,
) Task {
return &transferTask{
Expand All @@ -161,7 +157,6 @@ func NewTransferTask(
taskExecutor,
timeSource,
maxRetryCount,
emitDomainTag,
),
ackMgr: ackMgr,
redispatchFn: redispatchFn,
Expand All @@ -178,7 +173,6 @@ func newQueueTaskBase(
taskExecutor Executor,
timeSource clock.TimeSource,
maxRetryCount dynamicconfig.IntPropertyFn,
emitDomainTag bool,
) *taskBase {
return &taskBase{
Info: taskInfo,
Expand All @@ -187,7 +181,6 @@ func newQueueTaskBase(
priority: ctask.NoPriority,
queueType: queueType,
scopeIdx: scopeIdx,
emitDomainTag: emitDomainTag,
scope: nil,
logger: logger,
attempt: 0,
Expand Down Expand Up @@ -242,11 +235,7 @@ func (t *taskBase) Execute() error {
// processed as active or standby and use the corresponding
// task executor.
if t.scope == nil {
if t.emitDomainTag {
t.scope = GetOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger)
} else {
t.scope = t.shard.GetMetricsClient().Scope(t.scopeIdx)
}
t.scope = GetOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger)
}

var err error
Expand All @@ -260,8 +249,8 @@ func (t *taskBase) Execute() error {

defer func() {
if t.shouldProcessTask {
t.scope.IncCounter(metrics.TaskRequests)
t.scope.RecordTimer(metrics.TaskProcessingLatency, time.Since(executionStartTime))
t.scope.IncCounter(metrics.TaskRequestsPerDomain)
t.scope.RecordTimer(metrics.TaskProcessingLatencyPerDomain, time.Since(executionStartTime))
}
}()

Expand All @@ -278,7 +267,7 @@ func (t *taskBase) HandleErr(

t.attempt++
if t.attempt > t.maxRetryCount() {
t.scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt))
t.logger.Error("Critical error processing task, retrying.",
tag.Error(err), tag.OperationCritical, tag.TaskType(t.GetTaskType()))
}
Expand All @@ -297,19 +286,19 @@ func (t *taskBase) HandleErr(
transferTask.TaskType == persistence.TransferTaskTypeCloseExecution &&
err == execution.ErrMissingWorkflowStartEvent &&
t.shard.GetConfig().EnableDropStuckTaskByDomainID(t.Info.GetDomainID()) { // use domainID here to avoid accessing domainCache
t.scope.IncCounter(metrics.TransferTaskMissingEventCounter)
t.scope.IncCounter(metrics.TransferTaskMissingEventCounterPerDomain)
t.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}

// this is a transient error
if err == ErrTaskRedispatch {
t.scope.IncCounter(metrics.TaskStandbyRetryCounter)
t.scope.IncCounter(metrics.TaskStandbyRetryCounterPerDomain)
return err
}

if err == ErrTaskDiscarded {
t.scope.IncCounter(metrics.TaskDiscarded)
t.scope.IncCounter(metrics.TaskDiscardedPerDomain)
err = nil
}

Expand All @@ -318,14 +307,14 @@ func (t *taskBase) HandleErr(
// since the new task life cycle will not give up until task processed / verified
if _, ok := err.(*workflow.DomainNotActiveError); ok {
if t.timeSource.Now().Sub(t.submitTime) > 2*cache.DomainCacheRefreshInterval {
t.scope.IncCounter(metrics.TaskNotActiveCounter)
t.scope.IncCounter(metrics.TaskNotActiveCounterPerDomain)
return nil
}

return err
}

t.scope.IncCounter(metrics.TaskFailures)
t.scope.IncCounter(metrics.TaskFailuresPerDomain)

if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok {
t.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed)
Expand All @@ -352,9 +341,9 @@ func (t *taskBase) Ack() {

t.state = ctask.TaskStateAcked
if t.shouldProcessTask {
t.scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskLatency, time.Since(t.submitTime))
t.scope.RecordTimer(metrics.TaskQueueLatency, time.Since(t.GetVisibilityTimestamp()))
t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(t.submitTime))
t.scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(t.GetVisibilityTimestamp()))
}
}

Expand Down
1 change: 0 additions & 1 deletion service/history/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func (s *taskSuite) newTestQueueTaskBase(
s.mockTaskExecutor,
s.timeSource,
s.maxRetryCount,
true,
)
taskBase.scope = s.mockShard.GetMetricsClient().Scope(0)
return taskBase
Expand Down
Loading

0 comments on commit 8de8f99

Please sign in to comment.