Skip to content

Commit

Permalink
TimerQueueProcessor to scan DB for exisiting timers on init
Browse files Browse the repository at this point in the history
TimerQueueProcessor used to setup a very long timer on initialization to
notify processor for doing a DB scan.  This becomes problematic when you
have existing timers and shard restarts.  Now after initialization of
shard there is no trigger for the processor to perform a DB scan.  It
now has to wait for a trigger from outside to perform the first scan so
it can start operating as usual.
This change removes the infite timer on initialize to perform first scan
immediately to check for existing timers.
Also some code cleanup and removing some unnecessary code.

fixes cadence-workflow#454
  • Loading branch information
samarabbas committed Dec 8, 2017
1 parent 6ed2014 commit 4a215b7
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 30 deletions.
69 changes: 41 additions & 28 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ type (
}

timeGate struct {
tNext, tNow, tEnd int64 // time (in 'UnixNano' units) for next, (last) now and end
timer *time.Timer // timer used to wake us up when the next message is ready to deliver
gateC chan struct{}
closeC chan struct{}
tNext, tNow int64 // time (in 'UnixNano' units) for next, (last) now and end
timer *time.Timer // timer used to wake us up when the next message is ready to deliver
gateC chan struct{}
closeC chan struct{}
}

timerAckMgr struct {
Expand All @@ -89,13 +89,12 @@ type (
func newTimeGate() *timeGate {
tNow := time.Now()

// setup timeGate with timer set to fire at the 'end of time'
// Initialize timeGate to fire immediately to check for any outstanding timer which needs to be fired immediately
t := &timeGate{
tNow: tNow.UnixNano(),
tEnd: math.MaxInt64,
gateC: make(chan struct{}),
closeC: make(chan struct{}),
timer: time.NewTimer(time.Unix(0, math.MaxInt64).Sub(tNow)),
timer: time.NewTimer(0),
}

// "Cast" chan Time to chan struct{}.
Expand All @@ -120,35 +119,46 @@ func newTimeGate() *timeGate {
return t
}

func (t *timeGate) setEoxReached() {
t.tNext = t.tEnd
}

func (t *timeGate) beforeSleep() <-chan struct{} {
if t.engaged() && t.tNext != t.tEnd {
// reset timer to fire when the next message should be made 'visible'
tNow := time.Now()
t.tNow = tNow.UnixNano()
t.timer.Reset(time.Unix(0, t.tNext).Sub(tNow))
}
// update current time on gate
tNow := time.Now()
t.tNow = tNow.UnixNano()

return t.gateC
}

func (t *timeGate) engaged() bool {
t.tNow = time.Now().UnixNano()
return t.tNext > t.tNow
func (t *timeGate) engaged(now int64) bool {
return t.tNext > now
}

func (t *timeGate) setNext(next time.Time) {
t.tNext = next.UnixNano()
// setNext is called by processor in following 2 conditions:
// 1. Processor is woken up by new timer creation and new timer is scheduled to fire before current tNext
// 2. Processor finds a lockAheadTask from cassandra and calls it using the visibility time of the task
func (t *timeGate) setNext(next time.Time) bool {
newNext := next.UnixNano()

tNow := time.Now()
t.tNow = tNow.UnixNano()
// Check to see if passed in next should become the next time timeGate notifies processor
if !t.engaged(t.tNow) || t.tNext > newNext {
t.tNext = newNext
// reset timer to fire when the next message should be made 'visible'
t.timer.Reset(time.Unix(0, t.tNext).Sub(tNow))

// Notifies caller that next notification is reset to fire at passed in 'next' visibility time
return true
}

return false
}

func (t *timeGate) close() {
close(t.closeC)
}

func (t *timeGate) String() string {
return fmt.Sprintf("timeGate [engaged=%v eox=%v tNext=%x tNow=%x]", t.engaged(), t.tNext == t.tEnd, t.tNext, t.tNow)
return fmt.Sprintf("timeGate [engaged=%v tNext=%v tNow=%v]",
t.engaged(time.Now().UnixNano()), time.Unix(0, t.tNext), time.Unix(0, t.tNow))
}

func newTimerQueueProcessor(shard ShardContext, historyService *historyEngineImpl, executionManager persistence.ExecutionManager,
Expand Down Expand Up @@ -209,6 +219,7 @@ func (t *timerQueueProcessorImpl) NotifyNewTimer(timerTasks []persistence.Task)
for _, task := range timerTasks {
ts := persistence.GetVisibilityTSFrom(task)
if t.minPendingTimer.IsZero() || ts.Before(t.minPendingTimer) {
// TODO: We should just send the visibility time through channel instead setting minPendingTimer
t.minPendingTimer = ts
updatedMinTimer = true
}
Expand Down Expand Up @@ -281,7 +292,7 @@ continueProcessor:
for {
isWokeByNewTimer := false

if nextKeyTask == nil || gate.engaged() {
if nextKeyTask == nil || gate.engaged(time.Now().UnixNano()) {
gateC := gate.beforeSleep()

// Wait until one of four things occurs:
Expand Down Expand Up @@ -315,9 +326,9 @@ continueProcessor:

t.lock.Lock()
newMinTimestamp := t.minPendingTimer
if !gate.engaged() || newMinTimestamp.UnixNano() < gate.tNext {
if gate.setNext(newMinTimestamp) {
// reset the nextKeyTask as the new timer is expected to fire before previously read nextKeyTask
nextKeyTask = nil
gate.setNext(newMinTimestamp)
}
t.minPendingTimer = time.Time{}
t.lock.Unlock()
Expand All @@ -326,11 +337,12 @@ continueProcessor:
time.Now().UTC(), newMinTimestamp.UTC())

if !t.isProcessNow(time.Unix(0, gate.tNext)) {
continue
continue continueProcessor
}
}

// Either we have new timer (or) we are gated on timer to query for it.
ProcessPendingTimers:
for {
// Get next set of timer tasks.
timerTasks, lookAheadTask, err := t.getTasksAndNextKey()
Expand All @@ -346,7 +358,7 @@ continueProcessor:
if lookAheadTask != nil || len(timerTasks) < t.config.TimerTaskBatchSize {
// We have processed all the tasks.
nextKeyTask = lookAheadTask
break
break ProcessPendingTimers
}
}

Expand Down Expand Up @@ -976,6 +988,7 @@ func (t *timerAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persisten
"Next timer task time stamp is less than current timer task read level. timer task: (%s), ReadLevel: (%s)",
taskSeq, t.readLevel)
}

if !t.processor.isProcessNow(task.VisibilityTimestamp) {
lookAheadTask = task
break
Expand Down
9 changes: 7 additions & 2 deletions service/history/timerQueueProcessor2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
EventID: di.ScheduleID}
timerIndexResponse := &persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{timerTask}}

s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(timerIndexResponse, nil).Once()

ms := createMutableState(builder)
wfResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(wfResponse, nil).Once()
Expand All @@ -234,9 +232,16 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
}).Once()

// Start timer Processor.
emptyResponse := &persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{}}
s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(emptyResponse, nil).Run(func(arguments mock.Arguments) {
// Done.
waitCh <- struct{}{}
}).Once()
processor := newTimerQueueProcessor(s.mockShard, s.mockHistoryEngine, s.mockExecutionMgr, s.logger).(*timerQueueProcessorImpl)
processor.Start()
<-waitCh

s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(timerIndexResponse, nil)
processor.NotifyNewTimer([]persistence.Task{&persistence.WorkflowTimeoutTask{
VisibilityTimestamp: timerTask.VisibilityTimestamp,
}})
Expand Down

0 comments on commit 4a215b7

Please sign in to comment.