Skip to content

Commit

Permalink
Improve task latency metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Jun 29, 2020
1 parent 6fcee79 commit 78b9be0
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 19 deletions.
2 changes: 1 addition & 1 deletion service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second),
ReplicationTaskFetcherTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskFetcherTimerJitterCoefficient, 0.15),
ReplicationTaskFetcherErrorRetryWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherErrorRetryWait, time.Second),
ReplicationTaskProcessorErrorRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryWait, 50 * time.Millisecond),
ReplicationTaskProcessorErrorRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryWait, 50*time.Millisecond),
ReplicationTaskProcessorErrorRetryMaxAttempts: dc.GetIntPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryMaxAttempts, 5),
ReplicationTaskProcessorNoTaskRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorNoTaskInitialWait, 2*time.Second),
ReplicationTaskProcessorCleanupInterval: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorCleanupInterval, 1*time.Minute),
Expand Down
5 changes: 4 additions & 1 deletion service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,9 @@ func (p *queueProcessorBase) processBatch() {
return
}

taskStartTime := p.timeSource.Now()
for _, task := range tasks {
if submitted := p.submitTask(task); !submitted {
if submitted := p.submitTask(task, taskStartTime); !submitted {
// not submitted since processor has been shutdown
return
}
Expand All @@ -358,13 +359,15 @@ func (p *queueProcessorBase) processBatch() {

func (p *queueProcessorBase) submitTask(
taskInfo task.Info,
taskStartTime time.Time,
) bool {
if !p.isPriorityTaskProcessorEnabled() {
return p.taskProcessor.addTask(
newTaskInfo(
p.processor,
taskInfo,
task.InitializeLoggerForTask(p.shard.GetShardID(), taskInfo, p.logger),
taskStartTime,
),
)
}
Expand Down
10 changes: 5 additions & 5 deletions service/history/replicatorQueueProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *replicatorQueueProcessorSuite) TestSyncActivity_WorkflowMissing() {
nil,
), nil).AnyTimes()

_, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger))
_, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger, s.mockShard.GetTimeSource().Now()))
s.Nil(err)
}

Expand Down Expand Up @@ -201,7 +201,7 @@ func (s *replicatorQueueProcessorSuite) TestSyncActivity_WorkflowCompleted() {
nil,
), nil).AnyTimes()

_, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger))
_, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger, s.mockShard.GetTimeSource().Now()))
s.Nil(err)
}

Expand Down Expand Up @@ -250,7 +250,7 @@ func (s *replicatorQueueProcessorSuite) TestSyncActivity_ActivityCompleted() {
nil,
), nil).AnyTimes()

_, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger))
_, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger, s.mockShard.GetTimeSource().Now()))
s.Nil(err)
}

Expand Down Expand Up @@ -360,7 +360,7 @@ func (s *replicatorQueueProcessorSuite) TestSyncActivity_ActivityRetry() {
},
}).Return(nil).Once()

_, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger))
_, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger, s.mockShard.GetTimeSource().Now()))
s.Nil(err)
}

Expand Down Expand Up @@ -469,7 +469,7 @@ func (s *replicatorQueueProcessorSuite) TestSyncActivity_ActivityRunning() {
},
}).Return(nil).Once()

_, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger))
_, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger, s.mockShard.GetTimeSource().Now()))
s.Nil(err)
}

Expand Down
3 changes: 2 additions & 1 deletion service/history/taskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,13 @@ func newTaskInfo(
processor taskExecutor,
task task.Info,
logger log.Logger,
startTime time.Time,
) *taskInfo {
return &taskInfo{
processor: processor,
task: task,
attempt: 0,
startTime: time.Now(), // used for metrics
startTime: startTime, // used for metrics
logger: logger,
shouldProcessTask: true,
}
Expand Down
20 changes: 10 additions & 10 deletions service/history/taskProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_ShutDown() {
}

func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainErrRetry_ProcessNoErr() {
taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger)
taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger, s.mockShard.GetTimeSource().Now())
var taskFilterErr task.Filter = func(task task.Info) (bool, error) {
return false, errors.New("some random error")
}
Expand All @@ -143,7 +143,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainErrRetry_ProcessNoErr()
}

func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainFalse_ProcessNoErr() {
taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger)
taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger, s.mockShard.GetTimeSource().Now())
taskInfo.shouldProcessTask = false
var taskFilter task.Filter = func(task task.Info) (bool, error) {
return false, nil
Expand All @@ -159,7 +159,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainFalse_ProcessNoErr() {
}

func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainTrue_ProcessNoErr() {
taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger)
taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger, s.mockShard.GetTimeSource().Now())
var taskFilter task.Filter = func(task task.Info) (bool, error) {
return true, nil
}
Expand All @@ -175,7 +175,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainTrue_ProcessNoErr() {

func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainTrue_ProcessErrNoErr() {
err := errors.New("some random err")
taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger)
taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger, s.mockShard.GetTimeSource().Now())
var taskFilter task.Filter = func(task task.Info) (bool, error) {
return true, nil
}
Expand All @@ -193,15 +193,15 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainTrue_ProcessErrNoErr()
func (s *taskProcessorSuite) TestHandleTaskError_EntityNotExists() {
err := &workflow.EntityNotExistsError{}

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now())
s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))
}

func (s *taskProcessorSuite) TestHandleTaskError_ErrTaskRetry() {
err := task.ErrTaskRetry
delay := time.Second

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now())
go func() {
time.Sleep(delay)
s.notificationChan <- struct{}{}
Expand All @@ -216,14 +216,14 @@ func (s *taskProcessorSuite) TestHandleTaskError_ErrTaskRetry() {
func (s *taskProcessorSuite) TestHandleTaskError_ErrTaskDiscarded() {
err := task.ErrTaskDiscarded

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now())
s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))
}

func (s *taskProcessorSuite) TestHandleTaskError_DomainNotActiveError() {
err := &workflow.DomainNotActiveError{}

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now())
taskInfo.startTime = time.Now().Add(-cache.DomainCacheRefreshInterval * time.Duration(2))
s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))

Expand All @@ -234,13 +234,13 @@ func (s *taskProcessorSuite) TestHandleTaskError_DomainNotActiveError() {
func (s *taskProcessorSuite) TestHandleTaskError_CurrentWorkflowConditionFailedError() {
err := &persistence.CurrentWorkflowConditionFailedError{}

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now())
s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))
}

func (s *taskProcessorSuite) TestHandleTaskError_RandomErr() {
err := errors.New("random error")

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now())
s.Equal(err, s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))
}
5 changes: 4 additions & 1 deletion service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,9 @@ func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*persistence.TimerT
return nil, err
}

taskStartTime := t.timeSource.Now()
for _, task := range timerTasks {
if submitted := t.submitTask(task); !submitted {
if submitted := t.submitTask(task, taskStartTime); !submitted {
// not submitted due to shard shutdown
return nil, nil
}
Expand All @@ -426,13 +427,15 @@ func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*persistence.TimerT

func (t *timerQueueProcessorBase) submitTask(
taskInfo task.Info,
taskStartTime time.Time,
) bool {
if !t.isPriorityTaskProcessorEnabled() {
return t.taskProcessor.addTask(
newTaskInfo(
t.timerProcessor,
taskInfo,
task.InitializeLoggerForTask(t.shard.GetShardID(), taskInfo, t.logger),
taskStartTime,
),
)
}
Expand Down

0 comments on commit 78b9be0

Please sign in to comment.