diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index 8df1c8361e..c4eabf8f37 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -261,13 +261,13 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc // Acknowledge any old messages that occurred before the last reset (standard reset after trigger or conditional reset) if m.Timestamp/1e9 <= msgHolder.getLastResetTime() { - log.Debugf("About to reset and ack dependency=%s due to message time occurred before reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d", - depName, m.Timestamp, msgHolder.getLastResetTime()) - if depName != "" { msgHolder.reset(depName) } msgHolder.ackAndCache(m, event.ID()) + + log.Debugf("reset and acked dependency=%s due to message time occurred before reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d", + depName, m.Timestamp, msgHolder.getLastResetTime()) return } // make sure that everything has been cleared within a certain amount of time @@ -354,12 +354,10 @@ type eventSourceMessageHolder struct { // time that resets conditions, usually the time all conditions meet, // or the time getting an external signal to reset. lastResetTime int64 - // after lastResetTime is set, this represents the time that a reset of all dependencies actually occurred - fullResetOccurrenceTime int64 - // does the lastResetTime represent something that occurred after startup? - lastResetTimeAfterStartup bool - expr *govaluate.EvaluableExpression - depNames []string + // if we reach this time, we reset everything (occurs 60 seconds after lastResetTime) + resetTimeout int64 + expr *govaluate.EvaluableExpression + depNames []string // Mapping of [eventSourceName + eventName]dependencyName sourceDepMap map[string]string parameters map[string]interface{} @@ -395,16 +393,15 @@ func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr strin } return &eventSourceMessageHolder{ - lastResetTime: lastResetTime.Unix(), - lastResetTimeAfterStartup: false, - expr: expression, - depNames: deps, - sourceDepMap: srcDepMap, - parameters: parameters, - msgs: msgs, - smap: new(sync.Map), - lock: sync.RWMutex{}, - logger: logger, + lastResetTime: lastResetTime.Unix(), + expr: expression, + depNames: deps, + sourceDepMap: srcDepMap, + parameters: parameters, + msgs: msgs, + smap: new(sync.Map), + lock: sync.RWMutex{}, + logger: logger, }, nil } @@ -418,16 +415,12 @@ func (mh *eventSourceMessageHolder) setLastResetTime(t int64) { mh.lock.Lock() defer mh.lock.Unlock() mh.lastResetTime = t - mh.lastResetTimeAfterStartup = true + mh.resetTimeout = t + 60 // failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime } -// failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime +// failsafe condition after lastResetTime func (mh *eventSourceMessageHolder) fullResetTimeout() bool { - if mh.lastResetTimeAfterStartup { - return time.Now().Unix()-mh.getLastResetTime() > 60 && mh.fullResetOccurrenceTime < mh.lastResetTime - } else { - return false - } + return mh.resetTimeout != 0 && time.Now().Unix() > mh.resetTimeout } func (mh *eventSourceMessageHolder) getDependencyName(eventSourceName, eventName string) (string, error) { @@ -455,7 +448,7 @@ func (mh *eventSourceMessageHolder) reset(depName string) { delete(mh.msgs, depName) if mh.isCleanedUp() { - mh.fullResetOccurrenceTime = time.Now().Unix() + mh.resetTimeout = 0 } } @@ -466,7 +459,7 @@ func (mh *eventSourceMessageHolder) resetAll() { for k := range mh.parameters { mh.parameters[k] = false } - mh.fullResetOccurrenceTime = time.Now().Unix() + mh.resetTimeout = 0 } // Check if all the parameters and messages have been cleaned up