Skip to content

Commit

Permalink
less kludgey way of doing fullResetTimeout()
Browse files Browse the repository at this point in the history
  • Loading branch information
juliev0 committed Feb 2, 2022
1 parent d12c52b commit 3bc5bf4
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions eventbus/driver/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,10 @@ type eventSourceMessageHolder struct {
lastResetTime int64
// after lastResetTime is set, this represents the time that a reset of all dependencies actually occurred
fullResetOccurrenceTime int64
expr *govaluate.EvaluableExpression
depNames []string
// does the lastResetTime represent something that occurred after startup?
lastResetTimeAfterStartup bool
expr *govaluate.EvaluableExpression
depNames []string
// Mapping of [eventSourceName + eventName]dependencyName
sourceDepMap map[string]string
parameters map[string]interface{}
Expand Down Expand Up @@ -393,16 +395,16 @@ func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr strin
}

return &eventSourceMessageHolder{
lastResetTime: lastResetTime.Unix(),
fullResetOccurrenceTime: lastResetTime.Unix(), // a little kludgey, but this is needed in order to prevent fullResetTimeout() from returning true at startup
expr: expression,
depNames: deps,
sourceDepMap: srcDepMap,
parameters: parameters,
msgs: msgs,
smap: new(sync.Map),
lock: sync.RWMutex{},
logger: logger,
lastResetTime: lastResetTime.Unix(),
lastResetTimeAfterStartup: false,
expr: expression,
depNames: deps,
sourceDepMap: srcDepMap,
parameters: parameters,
msgs: msgs,
smap: new(sync.Map),
lock: sync.RWMutex{},
logger: logger,
}, nil
}

Expand All @@ -416,11 +418,16 @@ func (mh *eventSourceMessageHolder) setLastResetTime(t int64) {
mh.lock.Lock()
defer mh.lock.Unlock()
mh.lastResetTime = t
mh.lastResetTimeAfterStartup = true
}

// failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime
func (mh *eventSourceMessageHolder) fullResetTimeout() bool {
return mh.getLastResetTime() != 0 && time.Now().Unix()-mh.getLastResetTime() > 60 && mh.fullResetOccurrenceTime < mh.lastResetTime
if mh.lastResetTimeAfterStartup {
return time.Now().Unix()-mh.getLastResetTime() > 60 && mh.fullResetOccurrenceTime < mh.lastResetTime
} else {
return false
}
}

func (mh *eventSourceMessageHolder) getDependencyName(eventSourceName, eventName string) (string, error) {
Expand Down

0 comments on commit 3bc5bf4

Please sign in to comment.