Skip to content

Commit

Permalink
cleaner way to handle the reset time
Browse files Browse the repository at this point in the history
Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 committed Feb 3, 2022
1 parent 6eff289 commit 8791325
Showing 1 changed file with 21 additions and 28 deletions.
49 changes: 21 additions & 28 deletions eventbus/driver/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,13 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc

// Acknowledge any old messages that occurred before the last reset (standard reset after trigger or conditional reset)
if m.Timestamp/1e9 <= msgHolder.getLastResetTime() {
log.Debugf("About to reset and ack dependency=%s due to message time occurred before reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d",
depName, m.Timestamp, msgHolder.getLastResetTime())

if depName != "" {
msgHolder.reset(depName)
}
msgHolder.ackAndCache(m, event.ID())

log.Debugf("reset and acked dependency=%s due to message time occurred before reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d",
depName, m.Timestamp, msgHolder.getLastResetTime())
return
}
// make sure that everything has been cleared within a certain amount of time
Expand Down Expand Up @@ -354,12 +354,10 @@ type eventSourceMessageHolder struct {
// time that resets conditions, usually the time all conditions meet,
// or the time getting an external signal to reset.
lastResetTime int64
// after lastResetTime is set, this represents the time that a reset of all dependencies actually occurred
fullResetOccurrenceTime int64
// does the lastResetTime represent something that occurred after startup?
lastResetTimeAfterStartup bool
expr *govaluate.EvaluableExpression
depNames []string
// if we reach this time, we reset everything (occurs 60 seconds after lastResetTime)
resetTimeout int64
expr *govaluate.EvaluableExpression
depNames []string
// Mapping of [eventSourceName + eventName]dependencyName
sourceDepMap map[string]string
parameters map[string]interface{}
Expand Down Expand Up @@ -395,16 +393,15 @@ func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr strin
}

return &eventSourceMessageHolder{
lastResetTime: lastResetTime.Unix(),
lastResetTimeAfterStartup: false,
expr: expression,
depNames: deps,
sourceDepMap: srcDepMap,
parameters: parameters,
msgs: msgs,
smap: new(sync.Map),
lock: sync.RWMutex{},
logger: logger,
lastResetTime: lastResetTime.Unix(),
expr: expression,
depNames: deps,
sourceDepMap: srcDepMap,
parameters: parameters,
msgs: msgs,
smap: new(sync.Map),
lock: sync.RWMutex{},
logger: logger,
}, nil
}

Expand All @@ -418,16 +415,12 @@ func (mh *eventSourceMessageHolder) setLastResetTime(t int64) {
mh.lock.Lock()
defer mh.lock.Unlock()
mh.lastResetTime = t
mh.lastResetTimeAfterStartup = true
mh.resetTimeout = t + 60 // failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime
}

// failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime
// failsafe condition after lastResetTime
func (mh *eventSourceMessageHolder) fullResetTimeout() bool {
if mh.lastResetTimeAfterStartup {
return time.Now().Unix()-mh.getLastResetTime() > 60 && mh.fullResetOccurrenceTime < mh.lastResetTime
} else {
return false
}
return mh.resetTimeout != 0 && time.Now().Unix() > mh.resetTimeout
}

func (mh *eventSourceMessageHolder) getDependencyName(eventSourceName, eventName string) (string, error) {
Expand Down Expand Up @@ -455,7 +448,7 @@ func (mh *eventSourceMessageHolder) reset(depName string) {
delete(mh.msgs, depName)

if mh.isCleanedUp() {
mh.fullResetOccurrenceTime = time.Now().Unix()
mh.resetTimeout = 0
}
}

Expand All @@ -466,7 +459,7 @@ func (mh *eventSourceMessageHolder) resetAll() {
for k := range mh.parameters {
mh.parameters[k] = false
}
mh.fullResetOccurrenceTime = time.Now().Unix()
mh.resetTimeout = 0
}

// Check if all the parameters and messages have been cleaned up
Expand Down

0 comments on commit 8791325

Please sign in to comment.