Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TimerQueueProcessor to scan DB for exisiting timers on init #455

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 41 additions & 28 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ type (
}

timeGate struct {
tNext, tNow, tEnd int64 // time (in 'UnixNano' units) for next, (last) now and end
timer *time.Timer // timer used to wake us up when the next message is ready to deliver
gateC chan struct{}
closeC chan struct{}
tNext, tNow int64 // time (in 'UnixNano' units) for next, (last) now and end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix comment to remove "end" (or) better yet, remove the comment and rename the vars to suffix with Nanos

timer *time.Timer // timer used to wake us up when the next message is ready to deliver
gateC chan struct{}
closeC chan struct{}
}

timerAckMgr struct {
Expand All @@ -89,13 +89,12 @@ type (
func newTimeGate() *timeGate {
tNow := time.Now()

// setup timeGate with timer set to fire at the 'end of time'
// Initialize timeGate to fire immediately to check for any outstanding timer which needs to be fired immediately
t := &timeGate{
tNow: tNow.UnixNano(),
tEnd: math.MaxInt64,
gateC: make(chan struct{}),
closeC: make(chan struct{}),
timer: time.NewTimer(time.Unix(0, math.MaxInt64).Sub(tNow)),
timer: time.NewTimer(0),
}

// "Cast" chan Time to chan struct{}.
Expand All @@ -120,35 +119,46 @@ func newTimeGate() *timeGate {
return t
}

func (t *timeGate) setEoxReached() {
t.tNext = t.tEnd
}

func (t *timeGate) beforeSleep() <-chan struct{} {
if t.engaged() && t.tNext != t.tEnd {
// reset timer to fire when the next message should be made 'visible'
tNow := time.Now()
t.tNow = tNow.UnixNano()
t.timer.Reset(time.Unix(0, t.tNext).Sub(tNow))
}
// update current time on gate
tNow := time.Now()
t.tNow = tNow.UnixNano()

return t.gateC
}

func (t *timeGate) engaged() bool {
t.tNow = time.Now().UnixNano()
return t.tNext > t.tNow
func (t *timeGate) engaged(now int64) bool {
return t.tNext > now
}

func (t *timeGate) setNext(next time.Time) {
t.tNext = next.UnixNano()
// setNext is called by processor in following 2 conditions:
// 1. Processor is woken up by new timer creation and new timer is scheduled to fire before current tNext
// 2. Processor finds a lockAheadTask from cassandra and calls it using the visibility time of the task
func (t *timeGate) setNext(next time.Time) bool {
newNext := next.UnixNano()

tNow := time.Now()
t.tNow = tNow.UnixNano()
// Check to see if passed in next should become the next time timeGate notifies processor
if !t.engaged(t.tNow) || t.tNext > newNext {
t.tNext = newNext
// reset timer to fire when the next message should be made 'visible'
t.timer.Reset(time.Unix(0, t.tNext).Sub(tNow))

// Notifies caller that next notification is reset to fire at passed in 'next' visibility time
return true
}

return false
}

func (t *timeGate) close() {
close(t.closeC)
}

func (t *timeGate) String() string {
return fmt.Sprintf("timeGate [engaged=%v eox=%v tNext=%x tNow=%x]", t.engaged(), t.tNext == t.tEnd, t.tNext, t.tNow)
return fmt.Sprintf("timeGate [engaged=%v tNext=%v tNow=%v]",
t.engaged(time.Now().UnixNano()), time.Unix(0, t.tNext), time.Unix(0, t.tNow))
}

func newTimerQueueProcessor(shard ShardContext, historyService *historyEngineImpl, executionManager persistence.ExecutionManager,
Expand Down Expand Up @@ -209,6 +219,7 @@ func (t *timerQueueProcessorImpl) NotifyNewTimer(timerTasks []persistence.Task)
for _, task := range timerTasks {
ts := persistence.GetVisibilityTSFrom(task)
if t.minPendingTimer.IsZero() || ts.Before(t.minPendingTimer) {
// TODO: We should just send the visibility time through channel instead setting minPendingTimer
t.minPendingTimer = ts
updatedMinTimer = true
}
Expand Down Expand Up @@ -281,7 +292,7 @@ continueProcessor:
for {
isWokeByNewTimer := false

if nextKeyTask == nil || gate.engaged() {
if nextKeyTask == nil || gate.engaged(time.Now().UnixNano()) {
gateC := gate.beforeSleep()

// Wait until one of four things occurs:
Expand Down Expand Up @@ -315,9 +326,9 @@ continueProcessor:

t.lock.Lock()
newMinTimestamp := t.minPendingTimer
if !gate.engaged() || newMinTimestamp.UnixNano() < gate.tNext {
if gate.setNext(newMinTimestamp) {
// reset the nextKeyTask as the new timer is expected to fire before previously read nextKeyTask
nextKeyTask = nil
gate.setNext(newMinTimestamp)
}
t.minPendingTimer = time.Time{}
t.lock.Unlock()
Expand All @@ -326,11 +337,12 @@ continueProcessor:
time.Now().UTC(), newMinTimestamp.UTC())

if !t.isProcessNow(time.Unix(0, gate.tNext)) {
continue
continue continueProcessor
}
}

// Either we have new timer (or) we are gated on timer to query for it.
ProcessPendingTimers:
for {
// Get next set of timer tasks.
timerTasks, lookAheadTask, err := t.getTasksAndNextKey()
Expand All @@ -346,7 +358,7 @@ continueProcessor:
if lookAheadTask != nil || len(timerTasks) < t.config.TimerTaskBatchSize {
// We have processed all the tasks.
nextKeyTask = lookAheadTask
break
break ProcessPendingTimers
}
}

Expand Down Expand Up @@ -976,6 +988,7 @@ func (t *timerAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persisten
"Next timer task time stamp is less than current timer task read level. timer task: (%s), ReadLevel: (%s)",
taskSeq, t.readLevel)
}

if !t.processor.isProcessNow(task.VisibilityTimestamp) {
lookAheadTask = task
break
Expand Down
9 changes: 7 additions & 2 deletions service/history/timerQueueProcessor2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
EventID: di.ScheduleID}
timerIndexResponse := &persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{timerTask}}

s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(timerIndexResponse, nil).Once()

ms := createMutableState(builder)
wfResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(wfResponse, nil).Once()
Expand All @@ -234,9 +232,16 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
}).Once()

// Start timer Processor.
emptyResponse := &persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{}}
s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(emptyResponse, nil).Run(func(arguments mock.Arguments) {
// Done.
waitCh <- struct{}{}
}).Once()
processor := newTimerQueueProcessor(s.mockShard, s.mockHistoryEngine, s.mockExecutionMgr, s.logger).(*timerQueueProcessorImpl)
processor.Start()
<-waitCh

s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(timerIndexResponse, nil)
processor.NotifyNewTimer([]persistence.Task{&persistence.WorkflowTimeoutTask{
VisibilityTimestamp: timerTask.VisibilityTimestamp,
}})
Expand Down