Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Differentiate metrics for domainTagged metrics and task operation metrics #3467

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,19 @@ const (
TaskBatchCompleteCounter
TaskProcessingLatency
TaskQueueLatency

TaskRequestsPerDomain
TaskLatencyPerDomain
TaskFailuresPerDomain
TaskDiscardedPerDomain
TaskAttemptTimerPerDomain
TaskStandbyRetryCounterPerDomain
TaskNotActiveCounterPerDomain
TaskLimitExceededCounterPerDomain
TaskProcessingLatencyPerDomain
TaskQueueLatencyPerDomain
TransferTaskMissingEventCounterPerDomain

TaskRedispatchQueuePendingTasksTimer

TransferTaskThrottledCounter
Expand Down Expand Up @@ -2162,16 +2175,31 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
CadenceShardFailureGauge: {metricName: "cadence_shard_failure", metricType: Gauge},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
TaskFailures: {metricName: "task_errors", metricType: Counter},
TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
TaskAttemptTimer: {metricName: "task_attempt", metricType: Timer},
TaskFailures: {metricName: "task_errors", metricType: Counter},
TaskDiscarded: {metricName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounter: {metricName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounter: {metricName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},

// per domain task metrics

TaskRequestsPerDomain: {metricName: "task_requests_per_domain", metricRollupName: "task_requests", metricType: Counter},
TaskLatencyPerDomain: {metricName: "task_latency_per_domain", metricRollupName: "task_latency", metricType: Timer},
TaskAttemptTimerPerDomain: {metricName: "task_attempt_per_domain", metricRollupName: "task_attempt", metricType: Timer},
TaskFailuresPerDomain: {metricName: "task_errors_per_domain", metricRollupName: "task_errors", metricType: Counter},
TaskDiscardedPerDomain: {metricName: "task_errors_discarded_per_domain", metricRollupName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounterPerDomain: {metricName: "task_errors_standby_retry_counter_per_domain", metricRollupName: "task_errors_standby_retry_counter", metricType: Counter},
TaskNotActiveCounterPerDomain: {metricName: "task_errors_not_active_counter_per_domain", metricRollupName: "task_errors_not_active_counter", metricType: Counter},
TaskLimitExceededCounterPerDomain: {metricName: "task_errors_limit_exceeded_counter_per_domain", metricRollupName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatencyPerDomain: {metricName: "task_latency_processing_per_domain", metricRollupName: "task_latency_processing", metricType: Timer},
TaskQueueLatencyPerDomain: {metricName: "task_latency_queue_per_domain", metricRollupName: "task_latency_queue", metricType: Timer},
TransferTaskMissingEventCounterPerDomain: {metricName: "transfer_task_missing_event_counter_per_domain", metricRollupName: "transfer_task_missing_event_counter", metricType: Counter},

TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TaskRedispatchQueuePendingTasksTimer: {metricName: "task_redispatch_queue_pending_tasks", metricType: Timer},
TransferTaskThrottledCounter: {metricName: "transfer_task_throttled_counter", metricType: Counter},
Expand Down
3 changes: 0 additions & 3 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ var keys = map[Key]string{
ActiveTaskRedispatchInterval: "history.activeTaskRedispatchInterval",
StandbyTaskRedispatchInterval: "history.standbyTaskRedispatchInterval",
TaskRedispatchIntervalJitterCoefficient: "history.taskRedispatchIntervalJitterCoefficient",
QueueProcessorEnableDomainTaggedMetrics: "history.queueProcessorEnableDomainTaggedMetrics",
QueueProcessorEnableSplit: "history.queueProcessorEnableSplit",
QueueProcessorSplitMaxLevel: "history.queueProcessorSplitMaxLevel",
QueueProcessorEnableRandomSplitByDomainID: "history.queueProcessorEnableRandomSplitByDomainID",
Expand Down Expand Up @@ -572,8 +571,6 @@ const (
StandbyTaskRedispatchInterval
// TaskRedispatchIntervalJitterCoefficient is the task redispatch interval jitter coefficient
TaskRedispatchIntervalJitterCoefficient
// QueueProcessorEnableDomainTaggedMetrics indicates whether task processing metrics should include domain tag
QueueProcessorEnableDomainTaggedMetrics
// QueueProcessorEnableSplit indicates whether processing queue split policy should be enabled
QueueProcessorEnableSplit
// QueueProcessorSplitMaxLevel is the max processing queue level
Expand Down
2 changes: 0 additions & 2 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ type Config struct {
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

// QueueProcessor settings
QueueProcessorEnableDomainTaggedMetrics dynamicconfig.BoolPropertyFn
QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn
QueueProcessorSplitMaxLevel dynamicconfig.IntPropertyFn
QueueProcessorEnableRandomSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
Expand Down Expand Up @@ -330,7 +329,6 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorSplitQueueIntervalJitterCoefficient, 0.15),
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID, false),

QueueProcessorEnableDomainTaggedMetrics: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableDomainTaggedMetrics, false),
QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false),
QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel, 2), // 3 levels, start from 0
QueueProcessorEnableRandomSplitByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.QueueProcessorEnableRandomSplitByDomainID, false),
Expand Down
4 changes: 0 additions & 4 deletions service/history/queue/timer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ func newTimerQueueProcessorBase(
queueType = task.QueueTypeStandbyTimer
}

// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics()

return &timerQueueProcessorBase{
processorBase: processorBase,

Expand All @@ -130,7 +127,6 @@ func newTimerQueueProcessorBase(
processorBase.redispatcher.AddTask,
shard.GetTimeSource(),
shard.GetConfig().TimerTaskMaxRetryCount,
emitDomainTag,
nil,
)
},
Expand Down
4 changes: 0 additions & 4 deletions service/history/queue/transfer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ func newTransferQueueProcessorBase(
queueType = task.QueueTypeStandbyTransfer
}

// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics()

return &transferQueueProcessorBase{
processorBase: processorBase,

Expand All @@ -114,7 +111,6 @@ func newTransferQueueProcessorBase(
processorBase.redispatcher.AddTask,
shard.GetTimeSource(),
shard.GetConfig().TransferTaskMaxRetryCount,
emitDomainTag,
nil,
)
},
Expand Down
3 changes: 0 additions & 3 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ func newQueueProcessorBase(
}

if options.QueueType != task.QueueTypeReplication {
// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := shard.GetConfig().QueueProcessorEnableDomainTaggedMetrics()
p.queueTaskInitializer = func(taskInfo task.Info) task.Task {
return task.NewTransferTask(
shard,
Expand All @@ -156,7 +154,6 @@ func newQueueProcessorBase(
p.redispatcher.AddTask,
p.timeSource,
options.MaxRetryCount,
emitDomainTag,
p.ackMgr,
)
}
Expand Down
35 changes: 12 additions & 23 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type (
submitTime time.Time
logger log.Logger
scopeIdx int
emitDomainTag bool
scope metrics.Scope // initialized when processing task to make the initialization parallel
taskExecutor Executor
maxRetryCount dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -115,7 +114,6 @@ func NewTimerTask(
redispatchFn func(task Task),
timeSource clock.TimeSource,
maxRetryCount dynamicconfig.IntPropertyFn,
emitDomainTag bool,
ackMgr TimerQueueAckMgr,
) Task {
return &timerTask{
Expand All @@ -129,7 +127,6 @@ func NewTimerTask(
taskExecutor,
timeSource,
maxRetryCount,
emitDomainTag,
),
ackMgr: ackMgr,
redispatchFn: redispatchFn,
Expand All @@ -147,7 +144,6 @@ func NewTransferTask(
redispatchFn func(task Task),
timeSource clock.TimeSource,
maxRetryCount dynamicconfig.IntPropertyFn,
emitDomainTag bool,
ackMgr QueueAckMgr,
) Task {
return &transferTask{
Expand All @@ -161,7 +157,6 @@ func NewTransferTask(
taskExecutor,
timeSource,
maxRetryCount,
emitDomainTag,
),
ackMgr: ackMgr,
redispatchFn: redispatchFn,
Expand All @@ -178,7 +173,6 @@ func newQueueTaskBase(
taskExecutor Executor,
timeSource clock.TimeSource,
maxRetryCount dynamicconfig.IntPropertyFn,
emitDomainTag bool,
) *taskBase {
return &taskBase{
Info: taskInfo,
Expand All @@ -187,7 +181,6 @@ func newQueueTaskBase(
priority: ctask.NoPriority,
queueType: queueType,
scopeIdx: scopeIdx,
emitDomainTag: emitDomainTag,
yycptt marked this conversation as resolved.
Show resolved Hide resolved
scope: nil,
logger: logger,
attempt: 0,
Expand Down Expand Up @@ -242,11 +235,7 @@ func (t *taskBase) Execute() error {
// processed as active or standby and use the corresponding
// task executor.
if t.scope == nil {
if t.emitDomainTag {
t.scope = GetOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger)
} else {
t.scope = t.shard.GetMetricsClient().Scope(t.scopeIdx)
}
t.scope = GetOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger)
}

var err error
Expand All @@ -260,8 +249,8 @@ func (t *taskBase) Execute() error {

defer func() {
if t.shouldProcessTask {
t.scope.IncCounter(metrics.TaskRequests)
t.scope.RecordTimer(metrics.TaskProcessingLatency, time.Since(executionStartTime))
t.scope.IncCounter(metrics.TaskRequestsPerDomain)
t.scope.RecordTimer(metrics.TaskProcessingLatencyPerDomain, time.Since(executionStartTime))
}
}()

Expand All @@ -278,7 +267,7 @@ func (t *taskBase) HandleErr(

t.attempt++
if t.attempt > t.maxRetryCount() {
t.scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt))
t.logger.Error("Critical error processing task, retrying.",
tag.Error(err), tag.OperationCritical, tag.TaskType(t.GetTaskType()))
}
Expand All @@ -297,19 +286,19 @@ func (t *taskBase) HandleErr(
transferTask.TaskType == persistence.TransferTaskTypeCloseExecution &&
err == execution.ErrMissingWorkflowStartEvent &&
t.shard.GetConfig().EnableDropStuckTaskByDomainID(t.Info.GetDomainID()) { // use domainID here to avoid accessing domainCache
t.scope.IncCounter(metrics.TransferTaskMissingEventCounter)
t.scope.IncCounter(metrics.TransferTaskMissingEventCounterPerDomain)
t.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}

// this is a transient error
if err == ErrTaskRedispatch {
t.scope.IncCounter(metrics.TaskStandbyRetryCounter)
t.scope.IncCounter(metrics.TaskStandbyRetryCounterPerDomain)
return err
}

if err == ErrTaskDiscarded {
t.scope.IncCounter(metrics.TaskDiscarded)
t.scope.IncCounter(metrics.TaskDiscardedPerDomain)
err = nil
}

Expand All @@ -318,14 +307,14 @@ func (t *taskBase) HandleErr(
// since the new task life cycle will not give up until task processed / verified
if _, ok := err.(*workflow.DomainNotActiveError); ok {
if t.timeSource.Now().Sub(t.submitTime) > 2*cache.DomainCacheRefreshInterval {
t.scope.IncCounter(metrics.TaskNotActiveCounter)
t.scope.IncCounter(metrics.TaskNotActiveCounterPerDomain)
return nil
}

return err
}

t.scope.IncCounter(metrics.TaskFailures)
t.scope.IncCounter(metrics.TaskFailuresPerDomain)

if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok {
t.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed)
Expand Down Expand Up @@ -359,9 +348,9 @@ func (t *taskBase) Ack() {

t.state = ctask.TaskStateAcked
if t.shouldProcessTask {
t.scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskLatency, time.Since(t.submitTime))
t.scope.RecordTimer(metrics.TaskQueueLatency, time.Since(t.GetVisibilityTimestamp()))
t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(t.submitTime))
t.scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(t.GetVisibilityTimestamp()))
}
}

Expand Down
22 changes: 11 additions & 11 deletions service/history/taskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ FilterLoop:
if err != nil {
task.attempt++
if task.attempt >= t.config.TimerTaskMaxRetryCount() {
scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(task.attempt))
scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(task.attempt))
task.logger.Error("Critical error processing task, retrying.",
tag.Error(err), tag.OperationCritical, tag.TaskType(task.task.GetTaskType()))
}
Expand Down Expand Up @@ -257,8 +257,8 @@ func (t *taskProcessor) processTaskOnce(

scope := task.GetOrCreateDomainTaggedScope(t.shard, scopeIdx, taskInfo.task.GetDomainID(), t.logger)
if taskInfo.shouldProcessTask {
scope.IncCounter(metrics.TaskRequests)
scope.RecordTimer(metrics.TaskProcessingLatency, time.Since(startTime))
scope.IncCounter(metrics.TaskRequestsPerDomain)
scope.RecordTimer(metrics.TaskProcessingLatencyPerDomain, time.Since(startTime))
}

return scope, err
Expand All @@ -283,14 +283,14 @@ func (t *taskProcessor) handleTaskError(
transferTask.TaskType == persistence.TransferTaskTypeCloseExecution &&
err == execution.ErrMissingWorkflowStartEvent &&
t.config.EnableDropStuckTaskByDomainID(taskInfo.task.GetDomainID()) { // use domainID here to avoid accessing domainCache
scope.IncCounter(metrics.TransferTaskMissingEventCounter)
scope.IncCounter(metrics.TransferTaskMissingEventCounterPerDomain)
taskInfo.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}

// this is a transient error
if err == task.ErrTaskRedispatch {
scope.IncCounter(metrics.TaskStandbyRetryCounter)
scope.IncCounter(metrics.TaskStandbyRetryCounterPerDomain)
select {
case <-notificationChan:
case <-t.shutdownCh:
Expand All @@ -299,7 +299,7 @@ func (t *taskProcessor) handleTaskError(
}

if err == task.ErrTaskDiscarded {
scope.IncCounter(metrics.TaskDiscarded)
scope.IncCounter(metrics.TaskDiscardedPerDomain)
err = nil
}

Expand All @@ -308,14 +308,14 @@ func (t *taskProcessor) handleTaskError(
// since the new task life cycle will not give up until task processed / verified
if _, ok := err.(*workflow.DomainNotActiveError); ok {
if t.timeSource.Now().Sub(taskInfo.startTime) > 2*cache.DomainCacheRefreshInterval {
scope.IncCounter(metrics.TaskNotActiveCounter)
scope.IncCounter(metrics.TaskNotActiveCounterPerDomain)
return nil
}

return err
}

scope.IncCounter(metrics.TaskFailures)
scope.IncCounter(metrics.TaskFailuresPerDomain)

if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok {
taskInfo.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed)
Expand All @@ -340,8 +340,8 @@ func (t *taskProcessor) ackTaskOnce(

task.processor.complete(task)
if task.shouldProcessTask {
scope.RecordTimer(metrics.TaskAttemptTimer, time.Duration(task.attempt))
scope.RecordTimer(metrics.TaskLatency, time.Since(task.startTime))
scope.RecordTimer(metrics.TaskQueueLatency, time.Since(task.task.GetVisibilityTimestamp()))
scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(task.attempt))
scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(task.startTime))
scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(task.task.GetVisibilityTimestamp()))
}
}
3 changes: 0 additions & 3 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ func newTimerQueueProcessorBase(
),
}

// read dynamic config only once on startup to avoid gc pressure caused by keeping reading dynamic config
emitDomainTag := config.QueueProcessorEnableDomainTaggedMetrics()
base.queueTaskInitializer = func(taskInfo task.Info) task.Task {
return task.NewTimerTask(
shard,
Expand All @@ -157,7 +155,6 @@ func newTimerQueueProcessorBase(
base.redispatcher.AddTask,
shard.GetTimeSource(),
config.TimerTaskMaxRetryCount,
emitDomainTag,
timerQueueAckMgr,
)
}
Expand Down