From 3bc5bf4b4509da7f8fd9335a96a9ccab7302b0aa Mon Sep 17 00:00:00 2001 From: Julie Vogelman Date: Wed, 2 Feb 2022 15:09:36 -0800 Subject: [PATCH] less kludgey way of doing fullResetTimeout() --- eventbus/driver/nats.go | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index 445d7d3f95..8df1c8361e 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -356,8 +356,10 @@ type eventSourceMessageHolder struct { lastResetTime int64 // after lastResetTime is set, this represents the time that a reset of all dependencies actually occurred fullResetOccurrenceTime int64 - expr *govaluate.EvaluableExpression - depNames []string + // does the lastResetTime represent something that occurred after startup? + lastResetTimeAfterStartup bool + expr *govaluate.EvaluableExpression + depNames []string // Mapping of [eventSourceName + eventName]dependencyName sourceDepMap map[string]string parameters map[string]interface{} @@ -393,16 +395,16 @@ func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr strin } return &eventSourceMessageHolder{ - lastResetTime: lastResetTime.Unix(), - fullResetOccurrenceTime: lastResetTime.Unix(), // a little kludgey, but this is needed in order to prevent fullResetTimeout() from returning true at startup - expr: expression, - depNames: deps, - sourceDepMap: srcDepMap, - parameters: parameters, - msgs: msgs, - smap: new(sync.Map), - lock: sync.RWMutex{}, - logger: logger, + lastResetTime: lastResetTime.Unix(), + lastResetTimeAfterStartup: false, + expr: expression, + depNames: deps, + sourceDepMap: srcDepMap, + parameters: parameters, + msgs: msgs, + smap: new(sync.Map), + lock: sync.RWMutex{}, + logger: logger, }, nil } @@ -416,11 +418,16 @@ func (mh *eventSourceMessageHolder) setLastResetTime(t int64) { mh.lock.Lock() defer mh.lock.Unlock() mh.lastResetTime = t + mh.lastResetTimeAfterStartup = true } // failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime func (mh *eventSourceMessageHolder) fullResetTimeout() bool { - return mh.getLastResetTime() != 0 && time.Now().Unix()-mh.getLastResetTime() > 60 && mh.fullResetOccurrenceTime < mh.lastResetTime + if mh.lastResetTimeAfterStartup { + return time.Now().Unix()-mh.getLastResetTime() > 60 && mh.fullResetOccurrenceTime < mh.lastResetTime + } else { + return false + } } func (mh *eventSourceMessageHolder) getDependencyName(eventSourceName, eventName string) (string, error) {