From 78b9be018ae81d3bfc38e7c6327b569da3a47b86 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 29 Jun 2020 12:22:40 -0700 Subject: [PATCH] Improve task latency metrics --- service/history/config/config.go | 2 +- service/history/queueProcessor.go | 5 ++++- .../history/replicatorQueueProcessor_test.go | 10 +++++----- service/history/taskProcessor.go | 3 ++- service/history/taskProcessor_test.go | 20 +++++++++---------- service/history/timerQueueProcessorBase.go | 5 ++++- 6 files changed, 26 insertions(+), 19 deletions(-) diff --git a/service/history/config/config.go b/service/history/config/config.go index 59abb502763..00a4225a98f 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -430,7 +430,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second), ReplicationTaskFetcherTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskFetcherTimerJitterCoefficient, 0.15), ReplicationTaskFetcherErrorRetryWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherErrorRetryWait, time.Second), - ReplicationTaskProcessorErrorRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryWait, 50 * time.Millisecond), + ReplicationTaskProcessorErrorRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryWait, 50*time.Millisecond), ReplicationTaskProcessorErrorRetryMaxAttempts: dc.GetIntPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryMaxAttempts, 5), ReplicationTaskProcessorNoTaskRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorNoTaskInitialWait, 2*time.Second), ReplicationTaskProcessorCleanupInterval: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorCleanupInterval, 1*time.Minute), diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index 1fbae3e9807..36d2d8d1f66 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -340,8 +340,9 @@ func (p *queueProcessorBase) processBatch() { return } + taskStartTime := p.timeSource.Now() for _, task := range tasks { - if submitted := p.submitTask(task); !submitted { + if submitted := p.submitTask(task, taskStartTime); !submitted { // not submitted since processor has been shutdown return } @@ -358,6 +359,7 @@ func (p *queueProcessorBase) processBatch() { func (p *queueProcessorBase) submitTask( taskInfo task.Info, + taskStartTime time.Time, ) bool { if !p.isPriorityTaskProcessorEnabled() { return p.taskProcessor.addTask( @@ -365,6 +367,7 @@ func (p *queueProcessorBase) submitTask( p.processor, taskInfo, task.InitializeLoggerForTask(p.shard.GetShardID(), taskInfo, p.logger), + taskStartTime, ), ) } diff --git a/service/history/replicatorQueueProcessor_test.go b/service/history/replicatorQueueProcessor_test.go index f7f6e0b39dd..9266a19750e 100644 --- a/service/history/replicatorQueueProcessor_test.go +++ b/service/history/replicatorQueueProcessor_test.go @@ -153,7 +153,7 @@ func (s *replicatorQueueProcessorSuite) TestSyncActivity_WorkflowMissing() { nil, ), nil).AnyTimes() - _, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger)) + _, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger, s.mockShard.GetTimeSource().Now())) s.Nil(err) } @@ -201,7 +201,7 @@ func (s *replicatorQueueProcessorSuite) TestSyncActivity_WorkflowCompleted() { nil, ), nil).AnyTimes() - _, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger)) + _, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger, s.mockShard.GetTimeSource().Now())) s.Nil(err) } @@ -250,7 +250,7 @@ func (s *replicatorQueueProcessorSuite) TestSyncActivity_ActivityCompleted() { nil, ), nil).AnyTimes() - _, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger)) + _, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger, s.mockShard.GetTimeSource().Now())) s.Nil(err) } @@ -360,7 +360,7 @@ func (s *replicatorQueueProcessorSuite) TestSyncActivity_ActivityRetry() { }, }).Return(nil).Once() - _, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger)) + _, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger, s.mockShard.GetTimeSource().Now())) s.Nil(err) } @@ -469,7 +469,7 @@ func (s *replicatorQueueProcessorSuite) TestSyncActivity_ActivityRunning() { }, }).Return(nil).Once() - _, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger)) + _, err := s.replicatorQueueProcessor.process(newTaskInfo(nil, task, s.logger, s.mockShard.GetTimeSource().Now())) s.Nil(err) } diff --git a/service/history/taskProcessor.go b/service/history/taskProcessor.go index 5f4a3dda58b..0e9907388e8 100644 --- a/service/history/taskProcessor.go +++ b/service/history/taskProcessor.go @@ -82,12 +82,13 @@ func newTaskInfo( processor taskExecutor, task task.Info, logger log.Logger, + startTime time.Time, ) *taskInfo { return &taskInfo{ processor: processor, task: task, attempt: 0, - startTime: time.Now(), // used for metrics + startTime: startTime, // used for metrics logger: logger, shouldProcessTask: true, } diff --git a/service/history/taskProcessor_test.go b/service/history/taskProcessor_test.go index 57512b04639..b74e62cff13 100644 --- a/service/history/taskProcessor_test.go +++ b/service/history/taskProcessor_test.go @@ -124,7 +124,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_ShutDown() { } func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainErrRetry_ProcessNoErr() { - taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger) + taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger, s.mockShard.GetTimeSource().Now()) var taskFilterErr task.Filter = func(task task.Info) (bool, error) { return false, errors.New("some random error") } @@ -143,7 +143,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainErrRetry_ProcessNoErr() } func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainFalse_ProcessNoErr() { - taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger) + taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger, s.mockShard.GetTimeSource().Now()) taskInfo.shouldProcessTask = false var taskFilter task.Filter = func(task task.Info) (bool, error) { return false, nil @@ -159,7 +159,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainFalse_ProcessNoErr() { } func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainTrue_ProcessNoErr() { - taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger) + taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger, s.mockShard.GetTimeSource().Now()) var taskFilter task.Filter = func(task task.Info) (bool, error) { return true, nil } @@ -175,7 +175,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainTrue_ProcessNoErr() { func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainTrue_ProcessErrNoErr() { err := errors.New("some random err") - taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger) + taskInfo := newTaskInfo(s.mockProcessor, &persistence.TimerTaskInfo{TaskID: 12345, VisibilityTimestamp: time.Now()}, s.logger, s.mockShard.GetTimeSource().Now()) var taskFilter task.Filter = func(task task.Info) (bool, error) { return true, nil } @@ -193,7 +193,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_DomainTrue_ProcessErrNoErr() func (s *taskProcessorSuite) TestHandleTaskError_EntityNotExists() { err := &workflow.EntityNotExistsError{} - taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger) + taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now()) s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err)) } @@ -201,7 +201,7 @@ func (s *taskProcessorSuite) TestHandleTaskError_ErrTaskRetry() { err := task.ErrTaskRetry delay := time.Second - taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger) + taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now()) go func() { time.Sleep(delay) s.notificationChan <- struct{}{} @@ -216,14 +216,14 @@ func (s *taskProcessorSuite) TestHandleTaskError_ErrTaskRetry() { func (s *taskProcessorSuite) TestHandleTaskError_ErrTaskDiscarded() { err := task.ErrTaskDiscarded - taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger) + taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now()) s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err)) } func (s *taskProcessorSuite) TestHandleTaskError_DomainNotActiveError() { err := &workflow.DomainNotActiveError{} - taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger) + taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now()) taskInfo.startTime = time.Now().Add(-cache.DomainCacheRefreshInterval * time.Duration(2)) s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err)) @@ -234,13 +234,13 @@ func (s *taskProcessorSuite) TestHandleTaskError_DomainNotActiveError() { func (s *taskProcessorSuite) TestHandleTaskError_CurrentWorkflowConditionFailedError() { err := &persistence.CurrentWorkflowConditionFailedError{} - taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger) + taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now()) s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err)) } func (s *taskProcessorSuite) TestHandleTaskError_RandomErr() { err := errors.New("random error") - taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger) + taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger, s.mockShard.GetTimeSource().Now()) s.Equal(err, s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err)) } diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index 39f689c5189..8d393e2e3b0 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -409,8 +409,9 @@ func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*persistence.TimerT return nil, err } + taskStartTime := t.timeSource.Now() for _, task := range timerTasks { - if submitted := t.submitTask(task); !submitted { + if submitted := t.submitTask(task, taskStartTime); !submitted { // not submitted due to shard shutdown return nil, nil } @@ -426,6 +427,7 @@ func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*persistence.TimerT func (t *timerQueueProcessorBase) submitTask( taskInfo task.Info, + taskStartTime time.Time, ) bool { if !t.isPriorityTaskProcessorEnabled() { return t.taskProcessor.addTask( @@ -433,6 +435,7 @@ func (t *timerQueueProcessorBase) submitTask( t.timerProcessor, taskInfo, task.InitializeLoggerForTask(t.shard.GetShardID(), taskInfo, t.logger), + taskStartTime, ), ) }