Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TimerQueueProcessor to scan DB for exisiting timers on init #455

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 45 additions & 32 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ type (
}

timeGate struct {
tNext, tNow, tEnd int64 // time (in 'UnixNano' units) for next, (last) now and end
timer *time.Timer // timer used to wake us up when the next message is ready to deliver
gateC chan struct{}
closeC chan struct{}
tNextNanos, tNowNanos int64
timer *time.Timer // timer used to wake us up when the next message is ready to deliver
gateC chan struct{}
closeC chan struct{}
}

timerAckMgr struct {
Expand All @@ -89,13 +89,12 @@ type (
func newTimeGate() *timeGate {
tNow := time.Now()

// setup timeGate with timer set to fire at the 'end of time'
// Initialize timeGate to fire immediately to check for any outstanding timer which needs to be fired immediately
t := &timeGate{
tNow: tNow.UnixNano(),
tEnd: math.MaxInt64,
gateC: make(chan struct{}),
closeC: make(chan struct{}),
timer: time.NewTimer(time.Unix(0, math.MaxInt64).Sub(tNow)),
tNowNanos: tNow.UnixNano(),
gateC: make(chan struct{}),
closeC: make(chan struct{}),
timer: time.NewTimer(0),
}

// "Cast" chan Time to chan struct{}.
Expand All @@ -120,35 +119,46 @@ func newTimeGate() *timeGate {
return t
}

func (t *timeGate) setEoxReached() {
t.tNext = t.tEnd
}

func (t *timeGate) beforeSleep() <-chan struct{} {
if t.engaged() && t.tNext != t.tEnd {
// reset timer to fire when the next message should be made 'visible'
tNow := time.Now()
t.tNow = tNow.UnixNano()
t.timer.Reset(time.Unix(0, t.tNext).Sub(tNow))
}
// update current time on gate
tNow := time.Now()
t.tNowNanos = tNow.UnixNano()

return t.gateC
}

func (t *timeGate) engaged() bool {
t.tNow = time.Now().UnixNano()
return t.tNext > t.tNow
func (t *timeGate) engaged(now int64) bool {
return t.tNextNanos > now
}

func (t *timeGate) setNext(next time.Time) {
t.tNext = next.UnixNano()
// setNext is called by processor in following 2 conditions:
// 1. Processor is woken up by new timer creation and new timer is scheduled to fire before current tNextNanos
// 2. Processor finds a lockAheadTask from cassandra and calls it using the visibility time of the task
func (t *timeGate) setNext(next time.Time) bool {
newNext := next.UnixNano()

tNow := time.Now()
t.tNowNanos = tNow.UnixNano()
// Check to see if passed in next should become the next time timeGate notifies processor
if !t.engaged(t.tNowNanos) || t.tNextNanos > newNext {
t.tNextNanos = newNext
// reset timer to fire when the next message should be made 'visible'
t.timer.Reset(time.Unix(0, t.tNextNanos).Sub(tNow))

// Notifies caller that next notification is reset to fire at passed in 'next' visibility time
return true
}

return false
}

func (t *timeGate) close() {
close(t.closeC)
}

func (t *timeGate) String() string {
return fmt.Sprintf("timeGate [engaged=%v eox=%v tNext=%x tNow=%x]", t.engaged(), t.tNext == t.tEnd, t.tNext, t.tNow)
return fmt.Sprintf("timeGate [engaged=%v tNextNanos=%v tNowNanos=%v]",
t.engaged(time.Now().UnixNano()), time.Unix(0, t.tNextNanos), time.Unix(0, t.tNowNanos))
}

func newTimerQueueProcessor(shard ShardContext, historyService *historyEngineImpl, executionManager persistence.ExecutionManager,
Expand Down Expand Up @@ -209,6 +219,7 @@ func (t *timerQueueProcessorImpl) NotifyNewTimer(timerTasks []persistence.Task)
for _, task := range timerTasks {
ts := persistence.GetVisibilityTSFrom(task)
if t.minPendingTimer.IsZero() || ts.Before(t.minPendingTimer) {
// TODO: We should just send the visibility time through channel instead setting minPendingTimer
t.minPendingTimer = ts
updatedMinTimer = true
}
Expand Down Expand Up @@ -281,7 +292,7 @@ continueProcessor:
for {
isWokeByNewTimer := false

if nextKeyTask == nil || gate.engaged() {
if nextKeyTask == nil || gate.engaged(time.Now().UnixNano()) {
gateC := gate.beforeSleep()

// Wait until one of four things occurs:
Expand Down Expand Up @@ -315,22 +326,23 @@ continueProcessor:

t.lock.Lock()
newMinTimestamp := t.minPendingTimer
if !gate.engaged() || newMinTimestamp.UnixNano() < gate.tNext {
if gate.setNext(newMinTimestamp) {
// reset the nextKeyTask as the new timer is expected to fire before previously read nextKeyTask
nextKeyTask = nil
gate.setNext(newMinTimestamp)
}
t.minPendingTimer = time.Time{}
t.lock.Unlock()

t.logger.Debugf("%v: Next key after woke up by timer: %v",
time.Now().UTC(), newMinTimestamp.UTC())

if !t.isProcessNow(time.Unix(0, gate.tNext)) {
continue
if !t.isProcessNow(time.Unix(0, gate.tNextNanos)) {
continue continueProcessor
}
}

// Either we have new timer (or) we are gated on timer to query for it.
ProcessPendingTimers:
for {
// Get next set of timer tasks.
timerTasks, lookAheadTask, err := t.getTasksAndNextKey()
Expand All @@ -346,7 +358,7 @@ continueProcessor:
if lookAheadTask != nil || len(timerTasks) < t.config.TimerTaskBatchSize {
// We have processed all the tasks.
nextKeyTask = lookAheadTask
break
break ProcessPendingTimers
}
}

Expand Down Expand Up @@ -976,6 +988,7 @@ func (t *timerAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persisten
"Next timer task time stamp is less than current timer task read level. timer task: (%s), ReadLevel: (%s)",
taskSeq, t.readLevel)
}

if !t.processor.isProcessNow(task.VisibilityTimestamp) {
lookAheadTask = task
break
Expand Down
9 changes: 7 additions & 2 deletions service/history/timerQueueProcessor2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
EventID: di.ScheduleID}
timerIndexResponse := &persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{timerTask}}

s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(timerIndexResponse, nil).Once()

ms := createMutableState(builder)
wfResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(wfResponse, nil).Once()
Expand All @@ -234,9 +232,16 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
}).Once()

// Start timer Processor.
emptyResponse := &persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{}}
s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(emptyResponse, nil).Run(func(arguments mock.Arguments) {
// Done.
waitCh <- struct{}{}
}).Once()
processor := newTimerQueueProcessor(s.mockShard, s.mockHistoryEngine, s.mockExecutionMgr, s.logger).(*timerQueueProcessorImpl)
processor.Start()
<-waitCh

s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(timerIndexResponse, nil)
processor.NotifyNewTimer([]persistence.Task{&persistence.WorkflowTimeoutTask{
VisibilityTimestamp: timerTask.VisibilityTimestamp,
}})
Expand Down