Skip to content

Commit

Permalink
Emit workflow task attempt stats (#2487)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Feb 11, 2022
1 parent 61dac76 commit 1552628
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 1 deletion.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ const (
StickyTTL = "history.stickyTTL"
// WorkflowTaskHeartbeatTimeout for workflow task heartbeat
WorkflowTaskHeartbeatTimeout = "history.workflowTaskHeartbeatTimeout"
// WorkflowTaskCriticalAttempts is the number of attempts for a workflow task that's regarded as critical
WorkflowTaskCriticalAttempts = "history.workflowTaskCriticalAttempt"
// DefaultWorkflowTaskTimeout for a workflow task
DefaultWorkflowTaskTimeout = "history.defaultWorkflowTaskTimeout"
// SkipReapplicationByNamespaceID is whether skipping a event re-application for a namespace
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,7 @@ const (
EmptyCompletionCommandsCounter
MultipleCompletionCommandsCounter
FailedWorkflowTasksCounter
WorkflowTaskAttempt
StaleMutableStateCounter
AutoResetPointsLimitExceededCounter
AutoResetPointCorruptionCounter
Expand Down Expand Up @@ -2384,6 +2385,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
EmptyCompletionCommandsCounter: NewCounterDef("empty_completion_commands"),
MultipleCompletionCommandsCounter: NewCounterDef("multiple_completion_commands"),
FailedWorkflowTasksCounter: NewCounterDef("failed_workflow_tasks"),
WorkflowTaskAttempt: NewDimensionlessHistogramDef("worrkflow_task_attempt"),
StaleMutableStateCounter: NewCounterDef("stale_mutable_state"),
AutoResetPointsLimitExceededCounter: NewCounterDef("auto_reset_points_exceed_limit"),
AutoResetPointCorruptionCounter: NewCounterDef("auto_reset_point_corruption"),
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ type Config struct {
// WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any workflow tasks
// So that workflow task will be scheduled to another worker(by clear stickyness)
WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn

// The following is used by the new RPC replication stack
ReplicationTaskFetcherParallelism dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -377,6 +378,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()),
StickyTTL: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.StickyTTL, time.Hour*24*365),
WorkflowTaskHeartbeatTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.WorkflowTaskHeartbeatTimeout, time.Minute*30),
WorkflowTaskCriticalAttempts: dc.GetIntProperty(dynamicconfig.WorkflowTaskCriticalAttempts, 10),

ReplicationTaskFetcherParallelism: dc.GetIntProperty(dynamicconfig.ReplicationTaskFetcherParallelism, 4),
ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second),
Expand Down
32 changes: 31 additions & 1 deletion service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
)

Expand All @@ -58,7 +59,15 @@ func newWorkflowTaskStateMachine(
}
}

func (m *workflowTaskStateMachine) ReplicateWorkflowTaskScheduledEvent(version int64, scheduleID int64, taskQueue *taskqueuepb.TaskQueue, startToCloseTimeoutSeconds int32, attempt int32, scheduleTimestamp *time.Time, originalScheduledTimestamp *time.Time) (*WorkflowTaskInfo, error) {
func (m *workflowTaskStateMachine) ReplicateWorkflowTaskScheduledEvent(
version int64,
scheduleID int64,
taskQueue *taskqueuepb.TaskQueue,
startToCloseTimeoutSeconds int32,
attempt int32,
scheduleTimestamp *time.Time,
originalScheduledTimestamp *time.Time,
) (*WorkflowTaskInfo, error) {

// set workflow state to running, since workflow task is scheduled
// NOTE: for zombie workflow, should not change the state
Expand Down Expand Up @@ -394,6 +403,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
}

workflowTask, err := m.ReplicateWorkflowTaskStartedEvent(workflowTask, m.ms.GetCurrentVersion(), scheduleID, startedID, requestID, startTime)

m.emitWorkflowTaskAttemptStats(workflowTask.Attempt)

// TODO merge active & passive task generation
if err := m.ms.taskGenerator.GenerateStartWorkflowTaskTasks(
startTime, // start time is now
Expand Down Expand Up @@ -739,3 +751,21 @@ func (m *workflowTaskStateMachine) afterAddWorkflowTaskCompletedEvent(
m.ms.executionInfo.LastWorkflowTaskStartId = event.GetWorkflowTaskCompletedEventAttributes().GetStartedEventId()
return m.ms.addBinaryCheckSumIfNotExists(event, maxResetPoints)
}

func (m *workflowTaskStateMachine) emitWorkflowTaskAttemptStats(
attempt int32,
) {
namespaceName := m.ms.GetNamespaceEntry().Name().String()
m.ms.metricsClient.Scope(
metrics.WorkflowContextScope,
metrics.NamespaceTag(namespaceName),
).RecordDistribution(metrics.WorkflowTaskAttempt, int(attempt))
if attempt >= int32(m.ms.shard.GetConfig().WorkflowTaskCriticalAttempts()) {
m.ms.shard.GetThrottledLogger().Warn("Critical attempts processing workflow task",
tag.WorkflowNamespace(namespaceName),
tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId),
tag.WorkflowRunID(m.ms.GetExecutionState().RunId),
tag.Attempt(attempt),
)
}
}

0 comments on commit 1552628

Please sign in to comment.