From f7d9acd9b7cd899906ff069c409e4fbe0f5e9b87 Mon Sep 17 00:00:00 2001 From: Julie Vogelman Date: Wed, 2 Feb 2022 09:03:52 -0800 Subject: [PATCH] removing everywhere we set lastResetTime to 0: doesn't seem necessary and causes issues in this scenario: 'dependency=(A && B && C); trigger A and B, scale deployment to 0 before condition reset time; scale back to 1 after'; result: A gets reset, then sets lastResetTime to 0 because it's unknown that B still exists and B never gets acked Signed-off-by: Julie Vogelman --- eventbus/driver/nats.go | 63 +++++++++++------------------------------ 1 file changed, 17 insertions(+), 46 deletions(-) diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index 5d15253a6d..bd936f2965 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -259,27 +259,16 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc } // Clean up old messages before starting a new round - if msgHolder.getLastResetTime() > 0 { - // ACK all the old messages after conditions are met - //if m.Timestamp <= msgHolder.latestGoodMsgTimestamp || m.Timestamp/1e9 <= msgHolder.getLastResetTime() { - if m.Timestamp/1e9 <= msgHolder.getLastResetTime() { - log.Debugf("About to reset and ack dependency=%s due to condition reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d", - depName, m.Timestamp, msgHolder.getLastResetTime()) + // ACK all the old messages after conditions are met + if m.Timestamp/1e9 <= msgHolder.getLastResetTime() { + log.Debugf("About to reset and ack dependency=%s due to message time occurred before reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d", + depName, m.Timestamp, msgHolder.getLastResetTime()) - if depName != "" { - msgHolder.reset(depName) - } - msgHolder.ackAndCache(m, event.ID()) - return - } - - // Old redelivered messages should be able to be acked within 60 seconds. - // Reset if the flag didn't get cleared in that period for some reasons. - if time.Now().Unix()-msgHolder.getLastResetTime() > 60 { - msgHolder.resetAll() - log.Info("ATTENTION: Reset the flags because they didn't get cleared within 60 seconds...") + if depName != "" { + msgHolder.reset(depName) } + msgHolder.ackAndCache(m, event.ID()) return } @@ -303,9 +292,6 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc // New message, set and check msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now} msgHolder.parameters[depName] = true - if msgHolder.latestGoodMsgTimestamp < m.Timestamp { - msgHolder.latestGoodMsgTimestamp = m.Timestamp - } // Check if there's any stale message being held. // Stale message could be message age has been longer than NATS streaming max message age, @@ -364,10 +350,8 @@ type eventSourceMessageHolder struct { // time that resets conditions, usually the time all conditions meet, // or the time getting an external signal to reset. lastResetTime int64 - // timestamp of last msg - latestGoodMsgTimestamp int64 - expr *govaluate.EvaluableExpression - depNames []string + expr *govaluate.EvaluableExpression + depNames []string // Mapping of [eventSourceName + eventName]dependencyName sourceDepMap map[string]string parameters map[string]interface{} @@ -401,15 +385,14 @@ func newEventSourceMessageHolder(dependencyExpr string, dependencies []Dependenc } return &eventSourceMessageHolder{ - lastResetTime: lastResetTime.Unix(), - latestGoodMsgTimestamp: int64(0), - expr: expression, - depNames: deps, - sourceDepMap: srcDepMap, - parameters: parameters, - msgs: msgs, - smap: new(sync.Map), - lock: sync.RWMutex{}, + lastResetTime: lastResetTime.Unix(), + expr: expression, + depNames: deps, + sourceDepMap: srcDepMap, + parameters: parameters, + msgs: msgs, + smap: new(sync.Map), + lock: sync.RWMutex{}, }, nil } @@ -448,9 +431,6 @@ func (mh *eventSourceMessageHolder) ackAndCache(m *stan.Msg, id string) { func (mh *eventSourceMessageHolder) reset(depName string) { mh.parameters[depName] = false delete(mh.msgs, depName) - if mh.isCleanedUp() { - mh.setLastResetTime(0) - } } func (mh *eventSourceMessageHolder) resetAll() { @@ -460,7 +440,6 @@ func (mh *eventSourceMessageHolder) resetAll() { for k := range mh.parameters { mh.parameters[k] = false } - mh.setLastResetTime(0) } // Check if all the parameters and messages have been cleaned up @@ -473,14 +452,6 @@ func (mh *eventSourceMessageHolder) isCleanedUp() bool { return len(mh.msgs) == 0 } -/* -func (mh *eventSourceMessageHolder) resetTimeOccurred(m *stan.Msg) bool { - - // go through all triggers - - return m.Timestamp <= mh.latestGoodMsgTimestamp //|| ??? -}*/ - func unique(stringSlice []string) []string { if len(stringSlice) == 0 { return stringSlice