Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log/metrics for decision attempts and force schedule new decision #3840

Merged
merged 2 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,7 @@ const (
EmptyCompletionDecisionsCounter
MultipleCompletionDecisionsCounter
FailedDecisionsCounter
DecisionAttemptTimer
StaleMutableStateCounter
AutoResetPointsLimitExceededCounter
AutoResetPointCorruptionCounter
Expand Down Expand Up @@ -2222,6 +2223,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
EmptyCompletionDecisionsCounter: {metricName: "empty_completion_decisions", metricType: Counter},
MultipleCompletionDecisionsCounter: {metricName: "multiple_completion_decisions", metricType: Counter},
FailedDecisionsCounter: {metricName: "failed_decisions", metricType: Counter},
DecisionAttemptTimer: {metricName: "decision_attempt", metricType: Timer},
StaleMutableStateCounter: {metricName: "stale_mutable_state", metricType: Counter},
AutoResetPointsLimitExceededCounter: {metricName: "auto_reset_points_exceed_limit", metricType: Counter},
AutoResetPointCorruptionCounter: {metricName: "auto_reset_point_corruption", metricType: Counter},
Expand Down
9 changes: 9 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ var keys = map[Key]string{
HistoryThrottledLogRPS: "history.throttledLogRPS",
StickyTTL: "history.stickyTTL",
DecisionHeartbeatTimeout: "history.decisionHeartbeatTimeout",
DecisionRetryCriticalAttempts: "history.decisionRetryCriticalAttempts",
DecisionRetryMaxAttempts: "history.decisionRetryMaxAttempts",
EnableForceScheduleNewDecision: "history.enableForceScheduleNewDecision",
ParentClosePolicyThreshold: "history.parentClosePolicyThreshold",
NumParentClosePolicySystemWorkflows: "history.numParentClosePolicySystemWorkflows",
ReplicationTaskFetcherParallelism: "history.ReplicationTaskFetcherParallelism",
Expand Down Expand Up @@ -758,6 +761,12 @@ const (
StickyTTL
// DecisionHeartbeatTimeout for decision heartbeat
DecisionHeartbeatTimeout
// DecisionRetryCriticalAttempts is the decision attempt threshold for logging and emiting metrics
DecisionRetryCriticalAttempts
// DecisionRetryMaxAttempts is the decision attempt threshold for scheduling a new decision if EnableForceScheduleNewDecision is enabled
DecisionRetryMaxAttempts
// EnableForceScheduleNewDecision force schedule a new decision when the decision attempt reaches DecisionRetryMaxAttempts
EnableForceScheduleNewDecision

// EnableDropStuckTaskByDomainID is whether stuck timer/transfer task should be dropped for a domain
EnableDropStuckTaskByDomainID
Expand Down
7 changes: 7 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ type Config struct {
DecisionHeartbeatTimeout dynamicconfig.DurationPropertyFnWithDomainFilter
// MaxDecisionStartToCloseSeconds is the StartToCloseSeconds for decision
MaxDecisionStartToCloseSeconds dynamicconfig.IntPropertyFnWithDomainFilter
DecisionRetryCriticalAttempts dynamicconfig.IntPropertyFn
DecisionRetryMaxAttempts dynamicconfig.IntPropertyFn
// EnableForceScheduleNewDecision force schedule a new decision when the decision attempt reaches DecisionRetryMaxAttempts
EnableForceScheduleNewDecision dynamicconfig.BoolPropertyFnWithDomainFilter

// The following is used by the new RPC replication stack
ReplicationTaskFetcherParallelism dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -438,6 +442,9 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024),
StickyTTL: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.StickyTTL, time.Hour*24*365),
DecisionHeartbeatTimeout: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.DecisionHeartbeatTimeout, time.Minute*30),
DecisionRetryCriticalAttempts: dc.GetIntProperty(dynamicconfig.DecisionRetryCriticalAttempts, 10), // about 30m
DecisionRetryMaxAttempts: dc.GetIntProperty(dynamicconfig.DecisionRetryMaxAttempts, 30), // about 2hrs
EnableForceScheduleNewDecision: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableForceScheduleNewDecision, false),

ReplicationTaskFetcherParallelism: dc.GetIntProperty(dynamicconfig.ReplicationTaskFetcherParallelism, 1),
ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second),
Expand Down
25 changes: 25 additions & 0 deletions service/history/execution/mutable_state_decision_task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
)

Expand Down Expand Up @@ -319,6 +320,19 @@ func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskScheduledEventAsHea
}
}

domainName := m.msb.GetDomainEntry().GetInfo().Name
if m.msb.executionInfo.DecisionAttempt > 0 &&
m.msb.shard.GetConfig().EnableForceScheduleNewDecision(domainName) &&
m.msb.executionInfo.DecisionAttempt >= int64(m.msb.shard.GetConfig().DecisionRetryMaxAttempts()) {
// Force schedule new non-transient decision
m.msb.executionInfo.DecisionAttempt = 0
m.msb.logger.Info("Force schedule new decision task.",
tag.WorkflowDomainName(domainName),
tag.WorkflowID(m.msb.GetExecutionInfo().WorkflowID),
tag.WorkflowRunID(m.msb.GetExecutionInfo().RunID),
)
}

var newDecisionEvent *workflow.HistoryEvent
scheduleID := m.msb.GetNextEventID() // we will generate the schedule event later for repeatedly failing decisions
// Avoid creating new history events when decisions are continuously failing
Expand Down Expand Up @@ -584,6 +598,17 @@ func (m *mutableStateDecisionTaskManagerImpl) FailDecision(
if incrementAttempt {
failDecisionInfo.Attempt = m.msb.executionInfo.DecisionAttempt + 1
failDecisionInfo.ScheduledTimestamp = m.msb.timeSource.Now().UnixNano()

if failDecisionInfo.Attempt >= int64(m.msb.shard.GetConfig().DecisionRetryCriticalAttempts()) {
domainName := m.msb.GetDomainEntry().GetInfo().Name
domainTag := metrics.DomainTag(domainName)
m.msb.metricsClient.Scope(metrics.WorkflowContextScope, domainTag).RecordTimer(metrics.DecisionAttemptTimer, time.Duration(failDecisionInfo.Attempt))
m.msb.logger.Warn("Critical error processing decision task, retrying.",
tag.WorkflowDomainName(m.msb.GetDomainEntry().GetInfo().Name),
tag.WorkflowID(m.msb.GetExecutionInfo().WorkflowID),
tag.WorkflowRunID(m.msb.GetExecutionInfo().RunID),
)
}
}
m.UpdateDecision(failDecisionInfo)
}
Expand Down