Skip to content

Commit

Permalink
removing everywhere we set lastResetTime to 0: doesn't seem necessary…
Browse files Browse the repository at this point in the history
… and causes issues in this scenario: 'dependency=(A && B && C); trigger A and B, scale deployment to 0 before condition reset time; scale back to 1 after'; result: A gets reset, then sets lastResetTime to 0 because it's unknown that B still exists and B never gets acked

Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 committed Feb 2, 2022
1 parent 4ed0d16 commit f7d9acd
Showing 1 changed file with 17 additions and 46 deletions.
63 changes: 17 additions & 46 deletions eventbus/driver/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,27 +259,16 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc
}

// Clean up old messages before starting a new round
if msgHolder.getLastResetTime() > 0 {

// ACK all the old messages after conditions are met
//if m.Timestamp <= msgHolder.latestGoodMsgTimestamp || m.Timestamp/1e9 <= msgHolder.getLastResetTime() {
if m.Timestamp/1e9 <= msgHolder.getLastResetTime() {
log.Debugf("About to reset and ack dependency=%s due to condition reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d",
depName, m.Timestamp, msgHolder.getLastResetTime())
// ACK all the old messages after conditions are met
if m.Timestamp/1e9 <= msgHolder.getLastResetTime() {
log.Debugf("About to reset and ack dependency=%s due to message time occurred before reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d",
depName, m.Timestamp, msgHolder.getLastResetTime())

if depName != "" {
msgHolder.reset(depName)
}
msgHolder.ackAndCache(m, event.ID())
return
}

// Old redelivered messages should be able to be acked within 60 seconds.
// Reset if the flag didn't get cleared in that period for some reasons.
if time.Now().Unix()-msgHolder.getLastResetTime() > 60 {
msgHolder.resetAll()
log.Info("ATTENTION: Reset the flags because they didn't get cleared within 60 seconds...")
if depName != "" {
msgHolder.reset(depName)
}
msgHolder.ackAndCache(m, event.ID())
return
}

Expand All @@ -303,9 +292,6 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc
// New message, set and check
msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now}
msgHolder.parameters[depName] = true
if msgHolder.latestGoodMsgTimestamp < m.Timestamp {
msgHolder.latestGoodMsgTimestamp = m.Timestamp
}

// Check if there's any stale message being held.
// Stale message could be message age has been longer than NATS streaming max message age,
Expand Down Expand Up @@ -364,10 +350,8 @@ type eventSourceMessageHolder struct {
// time that resets conditions, usually the time all conditions meet,
// or the time getting an external signal to reset.
lastResetTime int64
// timestamp of last msg
latestGoodMsgTimestamp int64
expr *govaluate.EvaluableExpression
depNames []string
expr *govaluate.EvaluableExpression
depNames []string
// Mapping of [eventSourceName + eventName]dependencyName
sourceDepMap map[string]string
parameters map[string]interface{}
Expand Down Expand Up @@ -401,15 +385,14 @@ func newEventSourceMessageHolder(dependencyExpr string, dependencies []Dependenc
}

return &eventSourceMessageHolder{
lastResetTime: lastResetTime.Unix(),
latestGoodMsgTimestamp: int64(0),
expr: expression,
depNames: deps,
sourceDepMap: srcDepMap,
parameters: parameters,
msgs: msgs,
smap: new(sync.Map),
lock: sync.RWMutex{},
lastResetTime: lastResetTime.Unix(),
expr: expression,
depNames: deps,
sourceDepMap: srcDepMap,
parameters: parameters,
msgs: msgs,
smap: new(sync.Map),
lock: sync.RWMutex{},
}, nil
}

Expand Down Expand Up @@ -448,9 +431,6 @@ func (mh *eventSourceMessageHolder) ackAndCache(m *stan.Msg, id string) {
func (mh *eventSourceMessageHolder) reset(depName string) {
mh.parameters[depName] = false
delete(mh.msgs, depName)
if mh.isCleanedUp() {
mh.setLastResetTime(0)
}
}

func (mh *eventSourceMessageHolder) resetAll() {
Expand All @@ -460,7 +440,6 @@ func (mh *eventSourceMessageHolder) resetAll() {
for k := range mh.parameters {
mh.parameters[k] = false
}
mh.setLastResetTime(0)
}

// Check if all the parameters and messages have been cleaned up
Expand All @@ -473,14 +452,6 @@ func (mh *eventSourceMessageHolder) isCleanedUp() bool {
return len(mh.msgs) == 0
}

/*
func (mh *eventSourceMessageHolder) resetTimeOccurred(m *stan.Msg) bool {
// go through all triggers
return m.Timestamp <= mh.latestGoodMsgTimestamp //|| ???
}*/

func unique(stringSlice []string) []string {
if len(stringSlice) == 0 {
return stringSlice
Expand Down

0 comments on commit f7d9acd

Please sign in to comment.