Skip to content

Commit

Permalink
Introduce per domain metrics for task processing. (cadence-workflow#3467
Browse files Browse the repository at this point in the history
)

* Differentiate metrics for domainTagged metrics and task operation metrics
  • Loading branch information
anish531213 authored and mkolodezny committed Aug 24, 2020
1 parent 37d34ca commit d5328ec
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 64 deletions.
48 changes: 38 additions & 10 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,19 @@ const (
TaskBatchCompleteCounter
TaskProcessingLatency
TaskQueueLatency

TaskRequestsPerDomain
TaskLatencyPerDomain
TaskFailuresPerDomain
TaskDiscardedPerDomain
TaskAttemptTimerPerDomain
TaskStandbyRetryCounterPerDomain
TaskNotActiveCounterPerDomain
TaskLimitExceededCounterPerDomain
TaskProcessingLatencyPerDomain
TaskQueueLatencyPerDomain
TransferTaskMissingEventCounterPerDomain

TaskRedispatchQueuePendingTasksTimer

TransferTaskThrottledCounter
Expand Down Expand Up @@ -2162,16 +2175,31 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
CadenceShardFailureGauge: {metricName: "cadence_shard_failure", metricType: Gauge},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
TaskFailures: {metricName: "task_errors", metricType: Counter},
TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
TaskFailures: {metricName: "task_errors", metricType: Counter},
TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},

// per domain task metrics

TaskRequestsPerDomain: {metricName: "task_requests_per_domain", metricRollupName: "task_requests", metricType: Counter},
TaskLatencyPerDomain: {metricName: "task_latency_per_domain", metricRollupName: "task_latency", metricType: Timer},
TaskAttemptTimerPerDomain: {metricName: "task_attempt_per_domain", metricRollupName: "task_attempt", metricType: Timer},
TaskFailuresPerDomain: {metricName: "task_errors_per_domain", metricRollupName: "task_errors", metricType: Counter},
TaskDiscardedPerDomain: {metricName: "task_errors_discarded_per_domain", metricRollupName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounterPerDomain: {metricName: "task_errors_standby_retry_counter_per_domain", metricRollupName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounterPerDomain: {metricName: "task_errors_not_active_counter_per_domain", metricRollupName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounterPerDomain: {metricName: "task_errors_limit_exceeded_counter_per_domain", metricRollupName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatencyPerDomain: {metricName: "task_latency_processing_per_domain", metricRollupName: "task_latency_processing", metricType: Timer},
TaskQueueLatencyPerDomain: {metricName: "task_latency_queue_per_domain", metricRollupName: "task_latency_queue", metricType: Timer},
TransferTaskMissingEventCounterPerDomain: {metricName: "transfer_task_missing_event_counter_per_domain", metricRollupName: "transfer_task_missing_event_counter", metricType: Counter},

TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TaskRedispatchQueuePendingTasksTimer: {metricName: "task_redispatch_queue_pending_tasks", metricType: Timer},
TransferTaskThrottledCounter: {metricName: "transfer_task_throttled_counter", metricType: Counter},
Expand Down
3 changes: 0 additions & 3 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ var keys = map[Key]string{
ActiveTaskRedispatchInterval: "history.activeTaskRedispatchInterval",
StandbyTaskRedispatchInterval: "history.standbyTaskRedispatchInterval",
TaskRedispatchIntervalJitterCoefficient: "history.taskRedispatchIntervalJitterCoefficient",
QueueProcessorEnableDomainTaggedMetrics: "history.queueProcessorEnableDomainTaggedMetrics",
QueueProcessorEnableSplit: "history.queueProcessorEnableSplit",
QueueProcessorSplitMaxLevel: "history.queueProcessorSplitMaxLevel",
QueueProcessorEnableRandomSplitByDomainID: "history.queueProcessorEnableRandomSplitByDomainID",
Expand Down Expand Up @@ -572,8 +571,6 @@ const (
StandbyTaskRedispatchInterval
// TaskRedispatchIntervalJitterCoefficient is the task redispatch interval jitter coefficient
TaskRedispatchIntervalJitterCoefficient
// QueueProcessorEnableDomainTaggedMetrics indicates whether task processing metrics should include domain tag
QueueProcessorEnableDomainTaggedMetrics
// QueueProcessorEnableSplit indicates whether processing queue split policy should be enabled
QueueProcessorEnableSplit
// QueueProcessorSplitMaxLevel is the max processing queue level
Expand Down
2 changes: 0 additions & 2 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ type Config struct {
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

// QueueProcessor settings
QueueProcessorEnableDomainTaggedMetrics dynamicconfig.BoolPropertyFn
QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn
QueueProcessorSplitMaxLevel dynamicconfig.IntPropertyFn
QueueProcessorEnableRandomSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
Expand Down Expand Up @@ -330,7 +329,6 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorSplitQueueIntervalJitterCoefficient, 0.15),
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID, false),

QueueProcessorEnableDomainTaggedMetrics: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableDomainTaggedMetrics, false),
QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false),
QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel, 2), // 3 levels, start from 0
QueueProcessorEnableRandomSplitByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.QueueProcessorEnableRandomSplitByDomainID, false),
Expand Down
4 changes: 0 additions & 4 deletions service/history/queue/timer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ func newTimerQueueProcessorBase(
queueType = task.QueueTypeStandbyTimer
}

// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics()

return &timerQueueProcessorBase{
processorBase: processorBase,

Expand All @@ -130,7 +127,6 @@ func newTimerQueueProcessorBase(
processorBase.redispatcher.AddTask,
shard.GetTimeSource(),
shard.GetConfig().TimerTaskMaxRetryCount,
emitDomainTag,
nil,
)
},
Expand Down
4 changes: 0 additions & 4 deletions service/history/queue/transfer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ func newTransferQueueProcessorBase(
queueType = task.QueueTypeStandbyTransfer
}

// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics()

return &transferQueueProcessorBase{
processorBase: processorBase,

Expand All @@ -114,7 +111,6 @@ func newTransferQueueProcessorBase(
processorBase.redispatcher.AddTask,
shard.GetTimeSource(),
shard.GetConfig().TransferTaskMaxRetryCount,
emitDomainTag,
nil,
)
},
Expand Down
3 changes: 0 additions & 3 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ func newQueueProcessorBase(
}

if options.QueueType != task.QueueTypeReplication {
// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics()
p.queueTaskInitializer = func(taskInfo task.Info) task.Task {
return task.NewTransferTask(
shard,
Expand All @@ -156,7 +154,6 @@ func newQueueProcessorBase(
p.redispatcher.AddTask,
p.timeSource,
options.MaxRetryCount,
emitDomainTag,
p.ackMgr,
)
}
Expand Down
35 changes: 12 additions & 23 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type (
submitTime time.Time
logger log.Logger
scopeIdx int
emitDomainTag bool
scope metrics.Scope // initialized when processing task to make the initialization parallel
taskExecutor Executor
maxRetryCount dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -115,7 +114,6 @@ func NewTimerTask(
redispatchFn func(task Task),
timeSource clock.TimeSource,
maxRetryCount dynamicconfig.IntPropertyFn,
emitDomainTag bool,
ackMgr TimerQueueAckMgr,
) Task {
return &timerTask{
Expand All @@ -129,7 +127,6 @@ func NewTimerTask(
taskExecutor,
timeSource,
maxRetryCount,
emitDomainTag,
),
ackMgr: ackMgr,
redispatchFn: redispatchFn,
Expand All @@ -147,7 +144,6 @@ func NewTransferTask(
redispatchFn func(task Task),
timeSource clock.TimeSource,
maxRetryCount dynamicconfig.IntPropertyFn,
emitDomainTag bool,
ackMgr QueueAckMgr,
) Task {
return &transferTask{
Expand All @@ -161,7 +157,6 @@ func NewTransferTask(
taskExecutor,
timeSource,
maxRetryCount,
emitDomainTag,
),
ackMgr: ackMgr,
redispatchFn: redispatchFn,
Expand All @@ -178,7 +173,6 @@ func newQueueTaskBase(
taskExecutor Executor,
timeSource clock.TimeSource,
maxRetryCount dynamicconfig.IntPropertyFn,
emitDomainTag bool,
) *taskBase {
return &taskBase{
Info: taskInfo,
Expand All @@ -187,7 +181,6 @@ func newQueueTaskBase(
priority: ctask.NoPriority,
queueType: queueType,
scopeIdx: scopeIdx,
emitDomainTag: emitDomainTag,
scope: nil,
logger: logger,
attempt: 0,
Expand Down Expand Up @@ -242,11 +235,7 @@ func (t *taskBase) Execute() error {
// processed as active or standby and use the corresponding
// task executor.
if t.scope == nil {
if t.emitDomainTag {
t.scope = GetOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger)
} else {
t.scope = t.shard.GetMetricsClient().Scope(t.scopeIdx)
}
t.scope = GetOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger)
}

var err error
Expand All @@ -260,8 +249,8 @@ func (t *taskBase) Execute() error {

defer func() {
if t.shouldProcessTask {
t.scope.IncCounter(metrics.TaskRequests)
t.scope.RecordTimer(metrics.TaskProcessingLatency, time.Since(executionStartTime))
t.scope.IncCounter(metrics.TaskRequestsPerDomain)
t.scope.RecordTimer(metrics.TaskProcessingLatencyPerDomain, time.Since(executionStartTime))
}
}()

Expand All @@ -278,7 +267,7 @@ func (t *taskBase) HandleErr(

t.attempt++
if t.attempt > t.maxRetryCount() {
t.scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt))
t.logger.Error("Critical error processing task, retrying.",
tag.Error(err), tag.OperationCritical, tag.TaskType(t.GetTaskType()))
}
Expand All @@ -297,19 +286,19 @@ func (t *taskBase) HandleErr(
transferTask.TaskType == persistence.TransferTaskTypeCloseExecution &&
err == execution.ErrMissingWorkflowStartEvent &&
t.shard.GetConfig().EnableDropStuckTaskByDomainID(t.Info.GetDomainID()) { // use domainID here to avoid accessing domainCache
t.scope.IncCounter(metrics.TransferTaskMissingEventCounter)
t.scope.IncCounter(metrics.TransferTaskMissingEventCounterPerDomain)
t.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}

// this is a transient error
if err == ErrTaskRedispatch {
t.scope.IncCounter(metrics.TaskStandbyRetryCounter)
t.scope.IncCounter(metrics.TaskStandbyRetryCounterPerDomain)
return err
}

if err == ErrTaskDiscarded {
t.scope.IncCounter(metrics.TaskDiscarded)
t.scope.IncCounter(metrics.TaskDiscardedPerDomain)
err = nil
}

Expand All @@ -318,14 +307,14 @@ func (t *taskBase) HandleErr(
// since the new task life cycle will not give up until task processed / verified
if _, ok := err.(*workflow.DomainNotActiveError); ok {
if t.timeSource.Now().Sub(t.submitTime) > 2*cache.DomainCacheRefreshInterval {
t.scope.IncCounter(metrics.TaskNotActiveCounter)
t.scope.IncCounter(metrics.TaskNotActiveCounterPerDomain)
return nil
}

return err
}

t.scope.IncCounter(metrics.TaskFailures)
t.scope.IncCounter(metrics.TaskFailuresPerDomain)

if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok {
t.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed)
Expand Down Expand Up @@ -359,9 +348,9 @@ func (t *taskBase) Ack() {

t.state = ctask.TaskStateAcked
if t.shouldProcessTask {
t.scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskLatency, time.Since(t.submitTime))
t.scope.RecordTimer(metrics.TaskQueueLatency, time.Since(t.GetVisibilityTimestamp()))
t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(t.submitTime))
t.scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(t.GetVisibilityTimestamp()))
}
}

Expand Down
1 change: 0 additions & 1 deletion service/history/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func (s *taskSuite) newTestQueueTaskBase(
s.mockTaskExecutor,
s.timeSource,
s.maxRetryCount,
true,
)
taskBase.scope = s.mockShard.GetMetricsClient().Scope(0)
return taskBase
Expand Down
22 changes: 11 additions & 11 deletions service/history/taskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ FilterLoop:
if err != nil {
task.attempt++
if task.attempt >= t.config.TimerTaskMaxRetryCount() {
scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(task.attempt))
scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(task.attempt))
task.logger.Error("Critical error processing task, retrying.",
tag.Error(err), tag.OperationCritical, tag.TaskType(task.task.GetTaskType()))
}
Expand Down Expand Up @@ -257,8 +257,8 @@ func (t *taskProcessor) processTaskOnce(

scope := task.GetOrCreateDomainTaggedScope(t.shard, scopeIdx, taskInfo.task.GetDomainID(), t.logger)
if taskInfo.shouldProcessTask {
scope.IncCounter(metrics.TaskRequests)
scope.RecordTimer(metrics.TaskProcessingLatency, time.Since(startTime))
scope.IncCounter(metrics.TaskRequestsPerDomain)
scope.RecordTimer(metrics.TaskProcessingLatencyPerDomain, time.Since(startTime))
}

return scope, err
Expand All @@ -283,14 +283,14 @@ func (t *taskProcessor) handleTaskError(
transferTask.TaskType == persistence.TransferTaskTypeCloseExecution &&
err == execution.ErrMissingWorkflowStartEvent &&
t.config.EnableDropStuckTaskByDomainID(taskInfo.task.GetDomainID()) { // use domainID here to avoid accessing domainCache
scope.IncCounter(metrics.TransferTaskMissingEventCounter)
scope.IncCounter(metrics.TransferTaskMissingEventCounterPerDomain)
taskInfo.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}

// this is a transient error
if err == task.ErrTaskRedispatch {
scope.IncCounter(metrics.TaskStandbyRetryCounter)
scope.IncCounter(metrics.TaskStandbyRetryCounterPerDomain)
select {
case <-notificationChan:
case <-t.shutdownCh:
Expand All @@ -299,7 +299,7 @@ func (t *taskProcessor) handleTaskError(
}

if err == task.ErrTaskDiscarded {
scope.IncCounter(metrics.TaskDiscarded)
scope.IncCounter(metrics.TaskDiscardedPerDomain)
err = nil
}

Expand All @@ -308,14 +308,14 @@ func (t *taskProcessor) handleTaskError(
// since the new task life cycle will not give up until task processed / verified
if _, ok := err.(*workflow.DomainNotActiveError); ok {
if t.timeSource.Now().Sub(taskInfo.startTime) > 2*cache.DomainCacheRefreshInterval {
scope.IncCounter(metrics.TaskNotActiveCounter)
scope.IncCounter(metrics.TaskNotActiveCounterPerDomain)
return nil
}

return err
}

scope.IncCounter(metrics.TaskFailures)
scope.IncCounter(metrics.TaskFailuresPerDomain)

if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok {
taskInfo.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed)
Expand All @@ -340,8 +340,8 @@ func (t *taskProcessor) ackTaskOnce(

task.processor.complete(task)
if task.shouldProcessTask {
scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(task.attempt))
scope.RecordTimer(metrics.TaskLatency, time.Since(task.startTime))
scope.RecordTimer(metrics.TaskQueueLatency, time.Since(task.task.GetVisibilityTimestamp()))
scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(task.attempt))
scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(task.startTime))
scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(task.task.GetVisibilityTimestamp()))
}
}
3 changes: 0 additions & 3 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ func newTimerQueueProcessorBase(
),
}

// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := config.QueueProcessorEnableDomainTaggedMetrics()
base.queueTaskInitializer = func(taskInfo task.Info) task.Task {
return task.NewTimerTask(
shard,
Expand All @@ -157,7 +155,6 @@ func newTimerQueueProcessorBase(
base.redispatcher.AddTask,
shard.GetTimeSource(),
config.TimerTaskMaxRetryCount,
emitDomainTag,
timerQueueAckMgr,
)
}
Expand Down

0 comments on commit d5328ec

Please sign in to comment.