Skip to content

Commit

Permalink
address code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
samarabbas committed Dec 9, 2017
1 parent 4a215b7 commit 5e7b8e9
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ type (
}

timeGate struct {
tNext, tNow int64 // time (in 'UnixNano' units) for next, (last) now and end
timer *time.Timer // timer used to wake us up when the next message is ready to deliver
gateC chan struct{}
closeC chan struct{}
tNextNanos, tNowNanos int64
timer *time.Timer // timer used to wake us up when the next message is ready to deliver
gateC chan struct{}
closeC chan struct{}
}

timerAckMgr struct {
Expand All @@ -91,10 +91,10 @@ func newTimeGate() *timeGate {

// Initialize timeGate to fire immediately to check for any outstanding timer which needs to be fired immediately
t := &timeGate{
tNow: tNow.UnixNano(),
gateC: make(chan struct{}),
closeC: make(chan struct{}),
timer: time.NewTimer(0),
tNowNanos: tNow.UnixNano(),
gateC: make(chan struct{}),
closeC: make(chan struct{}),
timer: time.NewTimer(0),
}

// "Cast" chan Time to chan struct{}.
Expand Down Expand Up @@ -122,28 +122,28 @@ func newTimeGate() *timeGate {
func (t *timeGate) beforeSleep() <-chan struct{} {
// update current time on gate
tNow := time.Now()
t.tNow = tNow.UnixNano()
t.tNowNanos = tNow.UnixNano()

return t.gateC
}

func (t *timeGate) engaged(now int64) bool {
return t.tNext > now
return t.tNextNanos > now
}

// setNext is called by processor in following 2 conditions:
// 1. Processor is woken up by new timer creation and new timer is scheduled to fire before current tNext
// 1. Processor is woken up by new timer creation and new timer is scheduled to fire before current tNextNanos
// 2. Processor finds a lockAheadTask from cassandra and calls it using the visibility time of the task
func (t *timeGate) setNext(next time.Time) bool {
newNext := next.UnixNano()

tNow := time.Now()
t.tNow = tNow.UnixNano()
t.tNowNanos = tNow.UnixNano()
// Check to see if passed in next should become the next time timeGate notifies processor
if !t.engaged(t.tNow) || t.tNext > newNext {
t.tNext = newNext
if !t.engaged(t.tNowNanos) || t.tNextNanos > newNext {
t.tNextNanos = newNext
// reset timer to fire when the next message should be made 'visible'
t.timer.Reset(time.Unix(0, t.tNext).Sub(tNow))
t.timer.Reset(time.Unix(0, t.tNextNanos).Sub(tNow))

// Notifies caller that next notification is reset to fire at passed in 'next' visibility time
return true
Expand All @@ -157,8 +157,8 @@ func (t *timeGate) close() {
}

func (t *timeGate) String() string {
return fmt.Sprintf("timeGate [engaged=%v tNext=%v tNow=%v]",
t.engaged(time.Now().UnixNano()), time.Unix(0, t.tNext), time.Unix(0, t.tNow))
return fmt.Sprintf("timeGate [engaged=%v tNextNanos=%v tNowNanos=%v]",
t.engaged(time.Now().UnixNano()), time.Unix(0, t.tNextNanos), time.Unix(0, t.tNowNanos))
}

func newTimerQueueProcessor(shard ShardContext, historyService *historyEngineImpl, executionManager persistence.ExecutionManager,
Expand Down Expand Up @@ -336,7 +336,7 @@ continueProcessor:
t.logger.Debugf("%v: Next key after woke up by timer: %v",
time.Now().UTC(), newMinTimestamp.UTC())

if !t.isProcessNow(time.Unix(0, gate.tNext)) {
if !t.isProcessNow(time.Unix(0, gate.tNextNanos)) {
continue continueProcessor
}
}
Expand Down

0 comments on commit 5e7b8e9

Please sign in to comment.