diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 7bb9fa0d576..02765c829a0 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -65,10 +65,10 @@ type ( } timeGate struct { - tNext, tNow, tEnd int64 // time (in 'UnixNano' units) for next, (last) now and end - timer *time.Timer // timer used to wake us up when the next message is ready to deliver - gateC chan struct{} - closeC chan struct{} + tNextNanos, tNowNanos int64 + timer *time.Timer // timer used to wake us up when the next message is ready to deliver + gateC chan struct{} + closeC chan struct{} } timerAckMgr struct { @@ -89,13 +89,12 @@ type ( func newTimeGate() *timeGate { tNow := time.Now() - // setup timeGate with timer set to fire at the 'end of time' + // Initialize timeGate to fire immediately to check for any outstanding timer which needs to be fired immediately t := &timeGate{ - tNow: tNow.UnixNano(), - tEnd: math.MaxInt64, - gateC: make(chan struct{}), - closeC: make(chan struct{}), - timer: time.NewTimer(time.Unix(0, math.MaxInt64).Sub(tNow)), + tNowNanos: tNow.UnixNano(), + gateC: make(chan struct{}), + closeC: make(chan struct{}), + timer: time.NewTimer(0), } // "Cast" chan Time to chan struct{}. @@ -120,27 +119,37 @@ func newTimeGate() *timeGate { return t } -func (t *timeGate) setEoxReached() { - t.tNext = t.tEnd -} - func (t *timeGate) beforeSleep() <-chan struct{} { - if t.engaged() && t.tNext != t.tEnd { - // reset timer to fire when the next message should be made 'visible' - tNow := time.Now() - t.tNow = tNow.UnixNano() - t.timer.Reset(time.Unix(0, t.tNext).Sub(tNow)) - } + // update current time on gate + tNow := time.Now() + t.tNowNanos = tNow.UnixNano() + return t.gateC } -func (t *timeGate) engaged() bool { - t.tNow = time.Now().UnixNano() - return t.tNext > t.tNow +func (t *timeGate) engaged(now int64) bool { + return t.tNextNanos > now } -func (t *timeGate) setNext(next time.Time) { - t.tNext = next.UnixNano() +// setNext is called by processor in following 2 conditions: +// 1. Processor is woken up by new timer creation and new timer is scheduled to fire before current tNextNanos +// 2. Processor finds a lockAheadTask from cassandra and calls it using the visibility time of the task +func (t *timeGate) setNext(next time.Time) bool { + newNext := next.UnixNano() + + tNow := time.Now() + t.tNowNanos = tNow.UnixNano() + // Check to see if passed in next should become the next time timeGate notifies processor + if !t.engaged(t.tNowNanos) || t.tNextNanos > newNext { + t.tNextNanos = newNext + // reset timer to fire when the next message should be made 'visible' + t.timer.Reset(time.Unix(0, t.tNextNanos).Sub(tNow)) + + // Notifies caller that next notification is reset to fire at passed in 'next' visibility time + return true + } + + return false } func (t *timeGate) close() { @@ -148,7 +157,8 @@ func (t *timeGate) close() { } func (t *timeGate) String() string { - return fmt.Sprintf("timeGate [engaged=%v eox=%v tNext=%x tNow=%x]", t.engaged(), t.tNext == t.tEnd, t.tNext, t.tNow) + return fmt.Sprintf("timeGate [engaged=%v tNextNanos=%v tNowNanos=%v]", + t.engaged(time.Now().UnixNano()), time.Unix(0, t.tNextNanos), time.Unix(0, t.tNowNanos)) } func newTimerQueueProcessor(shard ShardContext, historyService *historyEngineImpl, executionManager persistence.ExecutionManager, @@ -209,6 +219,7 @@ func (t *timerQueueProcessorImpl) NotifyNewTimer(timerTasks []persistence.Task) for _, task := range timerTasks { ts := persistence.GetVisibilityTSFrom(task) if t.minPendingTimer.IsZero() || ts.Before(t.minPendingTimer) { + // TODO: We should just send the visibility time through channel instead setting minPendingTimer t.minPendingTimer = ts updatedMinTimer = true } @@ -281,7 +292,7 @@ continueProcessor: for { isWokeByNewTimer := false - if nextKeyTask == nil || gate.engaged() { + if nextKeyTask == nil || gate.engaged(time.Now().UnixNano()) { gateC := gate.beforeSleep() // Wait until one of four things occurs: @@ -315,9 +326,9 @@ continueProcessor: t.lock.Lock() newMinTimestamp := t.minPendingTimer - if !gate.engaged() || newMinTimestamp.UnixNano() < gate.tNext { + if gate.setNext(newMinTimestamp) { + // reset the nextKeyTask as the new timer is expected to fire before previously read nextKeyTask nextKeyTask = nil - gate.setNext(newMinTimestamp) } t.minPendingTimer = time.Time{} t.lock.Unlock() @@ -325,12 +336,13 @@ continueProcessor: t.logger.Debugf("%v: Next key after woke up by timer: %v", time.Now().UTC(), newMinTimestamp.UTC()) - if !t.isProcessNow(time.Unix(0, gate.tNext)) { - continue + if !t.isProcessNow(time.Unix(0, gate.tNextNanos)) { + continue continueProcessor } } // Either we have new timer (or) we are gated on timer to query for it. + ProcessPendingTimers: for { // Get next set of timer tasks. timerTasks, lookAheadTask, err := t.getTasksAndNextKey() @@ -346,7 +358,7 @@ continueProcessor: if lookAheadTask != nil || len(timerTasks) < t.config.TimerTaskBatchSize { // We have processed all the tasks. nextKeyTask = lookAheadTask - break + break ProcessPendingTimers } } @@ -976,6 +988,7 @@ func (t *timerAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persisten "Next timer task time stamp is less than current timer task read level. timer task: (%s), ReadLevel: (%s)", taskSeq, t.readLevel) } + if !t.processor.isProcessNow(task.VisibilityTimestamp) { lookAheadTask = task break diff --git a/service/history/timerQueueProcessor2_test.go b/service/history/timerQueueProcessor2_test.go index d1e0a11de7b..30918d85f53 100644 --- a/service/history/timerQueueProcessor2_test.go +++ b/service/history/timerQueueProcessor2_test.go @@ -218,8 +218,6 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() { EventID: di.ScheduleID} timerIndexResponse := &persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{timerTask}} - s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(timerIndexResponse, nil).Once() - ms := createMutableState(builder) wfResponse := &persistence.GetWorkflowExecutionResponse{State: ms} s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(wfResponse, nil).Once() @@ -234,9 +232,16 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() { }).Once() // Start timer Processor. + emptyResponse := &persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{}} + s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(emptyResponse, nil).Run(func(arguments mock.Arguments) { + // Done. + waitCh <- struct{}{} + }).Once() processor := newTimerQueueProcessor(s.mockShard, s.mockHistoryEngine, s.mockExecutionMgr, s.logger).(*timerQueueProcessorImpl) processor.Start() + <-waitCh + s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(timerIndexResponse, nil) processor.NotifyNewTimer([]persistence.Task{&persistence.WorkflowTimeoutTask{ VisibilityTimestamp: timerTask.VisibilityTimestamp, }})