From fb434b961e4f85030f1f95f52579626b1c2e7525 Mon Sep 17 00:00:00 2001 From: Julie Vogelman Date: Thu, 3 Feb 2022 14:33:42 -0800 Subject: [PATCH] fix: conditions reset does not work if the service is down at the triggering time (#1585) * incorporating new struct TimeBasedReset to encapsulate reset state/spec Signed-off-by: Julie Vogelman --- common/cronutil.go | 134 ++++++++++++++++++++++++++++++++++++++ common/cronutil_test.go | 106 ++++++++++++++++++++++++++++++ eventbus/driver/driver.go | 3 +- eventbus/driver/nats.go | 121 ++++++++++++++++++++++++---------- sensors/listener.go | 54 ++++++++++++++- 5 files changed, 381 insertions(+), 37 deletions(-) create mode 100644 common/cronutil.go create mode 100644 common/cronutil_test.go diff --git a/common/cronutil.go b/common/cronutil.go new file mode 100644 index 0000000000..a46d8b032f --- /dev/null +++ b/common/cronutil.go @@ -0,0 +1,134 @@ +package common + +import ( + "fmt" + "time" + + cronlib "github.com/robfig/cron/v3" +) + +const ( + // Set the top bit if a star was included in the expression. + starBit = 1 << 63 +) + +// For a given cron specification, return the previous activation time +// If no time can be found to satisfy the schedule, return the zero time. +func PrevCronTime(cronSpec string, parser cronlib.Parser, t time.Time) (time.Time, error) { + var tm time.Time + sched, err := parser.Parse(cronSpec) + if err != nil { + return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s; couldn't parse; err=%v", cronSpec, err) + } + s, castOk := sched.(*cronlib.SpecSchedule) + if !castOk { + return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s: unexpected type for %v", cronSpec, sched) + } + + // General approach is based on approach to SpecSchedule.Next() implementation + + origLocation := t.Location() + loc := s.Location + if loc == time.Local { + loc = t.Location() + } + if s.Location != time.Local { + t = t.In(s.Location) + } + + // Start at the previous second + t = t.Add(-1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond) + + // If no time is found within five years, return zero. + yearLimit := t.Year() - 5 + +WRAP: + if t.Year() < yearLimit { + return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s: no time found within %d years", cronSpec, yearLimit) + } + + // Find the first applicable month. + // If it's this month, then do nothing. + for 1< 12 { + t = t.Add(time.Duration(24-t.Hour()) * time.Hour) + } else { + t = t.Add(time.Duration(-t.Hour()) * time.Hour) + } + } + + t = t.Add(-1 * time.Second) + + if saveMonth != t.Month() { + goto WRAP + } + } + + for 1< 0 + dowMatch bool = 1< 0 + ) + if s.Dom&starBit > 0 || s.Dow&starBit > 0 { + return domMatch && dowMatch + } + return domMatch || dowMatch +} diff --git a/common/cronutil_test.go b/common/cronutil_test.go new file mode 100644 index 0000000000..cf89912b5c --- /dev/null +++ b/common/cronutil_test.go @@ -0,0 +1,106 @@ +package common + +import ( + "strings" + "testing" + "time" + + cronlib "github.com/robfig/cron/v3" +) + +func TestPrevCronTime(t *testing.T) { + runs := []struct { + time, spec string + expected string + expectedErr bool + }{ + // Simple cases + {"Mon Jul 9 15:00 2012", "0 0/15 * * * *", "Mon Jul 9 14:45 2012", false}, + {"Mon Jul 9 14:59 2012", "0 0/15 * * * *", "Mon Jul 9 14:45 2012", false}, + {"Mon Jul 9 15:01:59 2012", "0 0/15 * * * *", "Mon Jul 9 15:00 2012", false}, + + // Wrap around hours + {"Mon Jul 9 15:10 2012", "0 20-35/15 * * * *", "Mon Jul 9 14:35 2012", false}, + + // Wrap around days + {"Tue Jul 10 00:00 2012", "0 */15 * * * *", "Tue Jul 9 23:45 2012", false}, + {"Tue Jul 10 00:00 2012", "0 20-35/15 * * * *", "Tue Jul 9 23:35 2012", false}, + + // Wrap around months + {"Mon Jul 9 09:35 2012", "0 0 12 9 Apr-Oct ?", "Sat Jun 9 12:00 2012", false}, + + // Leap year + {"Mon Jul 9 23:35 2018", "0 0 0 29 Feb ?", "Mon Feb 29 00:00 2016", false}, + + // Daylight savings time 3am EDT (-4) -> 2am EST (-5) + {"2013-03-11T02:30:00-0400", "TZ=America/New_York 0 0 12 9 Mar ?", "2013-03-09T12:00:00-0500", false}, + + // hourly job + {"2012-03-11T01:00:00-0500", "TZ=America/New_York 0 0 * * * ?", "2012-03-11T00:00:00-0500", false}, + + // 2am nightly job (skipped) + {"2012-03-12T00:00:00-0400", "TZ=America/New_York 0 0 2 * * ?", "2012-03-10T02:00:00-0500", false}, + + // 2am nightly job + {"2012-11-04T02:00:00-0500", "TZ=America/New_York 0 0 0 * * ?", "2012-11-04T00:00:00-0400", false}, + {"2012-11-05T02:00:00-0500", "TZ=America/New_York 0 0 2 * * ?", "2012-11-04T02:00:00-0500", false}, + + // Unsatisfiable + {"Mon Jul 9 23:35 2012", "0 0 0 30 Feb ?", "", true}, + {"Mon Jul 9 23:35 2012", "0 0 0 31 Apr ?", "", true}, + + // Monthly job + {"TZ=America/New_York 2012-12-03T00:00:00-0500", "0 0 3 3 * ?", "2012-11-03T03:00:00-0400", false}, + } + + parser := cronlib.NewParser(cronlib.Second | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.DowOptional | cronlib.Descriptor) + + for _, c := range runs { + actual, err := PrevCronTime(c.spec, parser, getTime(c.time)) + if c.expectedErr { + if err == nil { + t.Errorf("%s, \"%s\": should have received error but didn't", c.time, c.spec) + } + } else { + if err != nil { + t.Errorf("%s, \"%s\": error: %v", c.time, c.spec, err) + } else { + expected := getTime(c.expected) + if !actual.Equal(expected) { + t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual) + } + } + } + } +} + +func getTime(value string) time.Time { + if value == "" { + return time.Time{} + } + + var location = time.Local + if strings.HasPrefix(value, "TZ=") { + parts := strings.Fields(value) + loc, err := time.LoadLocation(parts[0][len("TZ="):]) + if err != nil { + panic("could not parse location:" + err.Error()) + } + location = loc + value = parts[1] + } + + var layouts = []string{ + "Mon Jan 2 15:04 2006", + "Mon Jan 2 15:04:05 2006", + } + for _, layout := range layouts { + if t, err := time.ParseInLocation(layout, value, location); err == nil { + return t + } + } + if t, err := time.ParseInLocation("2006-01-02T15:04:05-0700", value, location); err == nil { + return t + } + panic("could not parse time value " + value) +} diff --git a/eventbus/driver/driver.go b/eventbus/driver/driver.go index ef10930d50..1c1ae32d78 100644 --- a/eventbus/driver/driver.go +++ b/eventbus/driver/driver.go @@ -2,6 +2,7 @@ package driver import ( "context" + "time" eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -20,7 +21,7 @@ type Driver interface { // Parameter - dependencies, array of dependencies information // Parameter - filter, a function used to filter the message // Parameter - action, a function to be triggered after all conditions meet - SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error + SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, lastResetTime time.Time, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error // Publish a message Publish(conn Connection, message []byte) error diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index 33687ff9e6..edb64d74bf 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -129,13 +129,15 @@ func (n *natsStreaming) Publish(conn Connection, message []byte) error { // Parameter - conn, eventbus connection // Parameter - group, queue group name // Parameter - closeCh, channel to indicate to close the subscription +// Parameter - resetConditionsCh, channel to indicate to reset trigger conditions +// Parameter - lastResetTime, the last time reset would have occurred, if any // Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3" // Parameter - dependencies, array of dependencies information // Parameter - filter, a function used to filter the message // Parameter - action, a function to be triggered after all conditions meet -func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error { +func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, lastResetTime time.Time, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error { log := n.logger.With("clientID", n.clientID) - msgHolder, err := newEventSourceMessageHolder(dependencyExpr, dependencies) + msgHolder, err := newEventSourceMessageHolder(log, dependencyExpr, dependencies, lastResetTime) if err != nil { return err } @@ -225,6 +227,8 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc return } + log.Debugf("New incoming Event Source Message, dependency name=%s", depName) + if depName == "" { _ = m.Ack() return @@ -239,6 +243,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc if !filter(depName, *event) { // message not interested + log.Debugf("not interested in dependency %s", depName) _ = m.Ack() return } @@ -260,18 +265,22 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc return } - // Clean up old messages before starting a new round - if msgHolder.lastMeetTime > 0 || msgHolder.latestGoodMsgTimestamp > 0 { - // ACK all the old messages after conditions meet - if m.Timestamp <= msgHolder.latestGoodMsgTimestamp { - if depName != "" { - msgHolder.reset(depName) - } - msgHolder.ackAndCache(m, event.ID()) - return + // Acknowledge any old messages that occurred before the last reset (standard reset after trigger or conditional reset) + if m.Timestamp/1e9 <= msgHolder.getLastResetTime() { + if depName != "" { + msgHolder.reset(depName) } + msgHolder.ackAndCache(m, event.ID()) + + log.Debugf("reset and acked dependency=%s due to message time occurred before reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d", + depName, m.Timestamp, msgHolder.getLastResetTime()) return } + // make sure that everything has been cleared within a certain amount of time + if msgHolder.fullResetTimeout() { + log.Infof("ATTENTION: Resetting the flags because they didn't get cleared before the timeout: msgHolder=%+v", msgHolder) + msgHolder.resetAll() + } now := time.Now().Unix() @@ -281,6 +290,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc // Re-delivered latest messge, update delivery timestamp and return existingMsg.lastDeliveredTime = now msgHolder.msgs[depName] = existingMsg + log.Debugf("Updating timestamp for dependency=%s", depName) return } else if m.Timestamp < existingMsg.timestamp { // Re-delivered old message, ack and return @@ -292,9 +302,6 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc // New message, set and check msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now} msgHolder.parameters[depName] = true - if msgHolder.latestGoodMsgTimestamp < m.Timestamp { - msgHolder.latestGoodMsgTimestamp = m.Timestamp - } // Check if there's any stale message being held. // Stale message could be message age has been longer than NATS streaming max message age, @@ -320,6 +327,14 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc return } if result != true { + // Log current meet dependency information + meetDeps := []string{} + meetMsgIds := []string{} + for k, v := range msgHolder.msgs { + meetDeps = append(meetDeps, k) + meetMsgIds = append(meetMsgIds, v.event.ID()) + } + log.Infow("trigger conditions not met", zap.Any("meetDependencies", meetDeps), zap.Any("meetEvents", meetMsgIds)) return } msgHolder.latestGoodMsgTimestamp = m.Timestamp @@ -348,21 +363,26 @@ type eventSourceMessage struct { // eventSourceMessageHolder is a struct used to hold the message information of subscribed dependencies type eventSourceMessageHolder struct { - // time that all conditions meet - lastMeetTime int64 - // timestamp of last msg when all the conditions meet - latestGoodMsgTimestamp int64 - expr *govaluate.EvaluableExpression - depNames []string + // time that resets conditions, usually the time all conditions meet, + // or the time getting an external signal to reset. + lastResetTime int64 + // if we reach this time, we reset everything (occurs 60 seconds after lastResetTime) + resetTimeout int64 + expr *govaluate.EvaluableExpression + depNames []string // Mapping of [eventSourceName + eventName]dependencyName sourceDepMap map[string]string parameters map[string]interface{} msgs map[string]*eventSourceMessage // A sync map used to cache the message IDs, it is used to guarantee Exact Once triggering - smap *sync.Map + smap *sync.Map + lock sync.RWMutex + timeoutLock sync.RWMutex + + logger *zap.SugaredLogger } -func newEventSourceMessageHolder(dependencyExpr string, dependencies []Dependency) (*eventSourceMessageHolder, error) { +func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr string, dependencies []Dependency, lastResetTime time.Time) (*eventSourceMessageHolder, error) { dependencyExpr = strings.ReplaceAll(dependencyExpr, "-", "\\-") expression, err := govaluate.NewEvaluableExpression(dependencyExpr) if err != nil { @@ -386,17 +406,51 @@ func newEventSourceMessageHolder(dependencyExpr string, dependencies []Dependenc } return &eventSourceMessageHolder{ - lastMeetTime: int64(0), - latestGoodMsgTimestamp: int64(0), - expr: expression, - depNames: deps, - sourceDepMap: srcDepMap, - parameters: parameters, - msgs: msgs, - smap: new(sync.Map), + lastResetTime: lastResetTime.Unix(), + expr: expression, + depNames: deps, + sourceDepMap: srcDepMap, + parameters: parameters, + msgs: msgs, + smap: new(sync.Map), + lock: sync.RWMutex{}, + logger: logger, }, nil } +func (mh *eventSourceMessageHolder) getLastResetTime() int64 { + mh.lock.RLock() + defer mh.lock.RUnlock() + return mh.lastResetTime +} + +func (mh *eventSourceMessageHolder) setLastResetTime(t int64) { + { + mh.lock.Lock() // since this can be called asyncronously as part of a ConditionReset, we neeed to lock this code + defer mh.lock.Unlock() + mh.lastResetTime = t + } + mh.setResetTimeout(t + 60) // failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime +} + +func (mh *eventSourceMessageHolder) setResetTimeout(t int64) { + mh.timeoutLock.Lock() // since this can be called asyncronously as part of a ConditionReset, we neeed to lock this code + defer mh.timeoutLock.Unlock() + mh.resetTimeout = t +} + +func (mh *eventSourceMessageHolder) getResetTimeout() int64 { + mh.timeoutLock.RLock() + defer mh.timeoutLock.RUnlock() + return mh.resetTimeout +} + +// failsafe condition after lastResetTime +func (mh *eventSourceMessageHolder) fullResetTimeout() bool { + resetTimeout := mh.getResetTimeout() + return resetTimeout != 0 && time.Now().Unix() > resetTimeout +} + func (mh *eventSourceMessageHolder) getDependencyName(eventSourceName, eventName string) (string, error) { for k, v := range mh.sourceDepMap { sourceGlob, err := glob.Compile(k) @@ -420,9 +474,9 @@ func (mh *eventSourceMessageHolder) ackAndCache(m *stan.Msg, id string) { func (mh *eventSourceMessageHolder) reset(depName string) { mh.parameters[depName] = false delete(mh.msgs, depName) + if mh.isCleanedUp() { - mh.lastMeetTime = 0 - mh.latestGoodMsgTimestamp = 0 + mh.setResetTimeout(0) } } @@ -433,8 +487,7 @@ func (mh *eventSourceMessageHolder) resetAll() { for k := range mh.parameters { mh.parameters[k] = false } - mh.lastMeetTime = 0 - mh.latestGoodMsgTimestamp = 0 + mh.setResetTimeout(0) } // Check if all the parameters and messages have been cleaned up diff --git a/sensors/listener.go b/sensors/listener.go index a28700f767..42604a6fea 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -228,6 +228,56 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error { wg1 := &sync.WaitGroup{} closeSubCh := make(chan struct{}) + resetConditionsCh := make(chan struct{}) + var lastResetTime time.Time + if len(trigger.Template.ConditionsReset) > 0 { + for _, c := range trigger.Template.ConditionsReset { + + if c.ByTime == nil { + continue + } + cronParser := cronlib.NewParser(cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow) + opts := []cronlib.Option{ + cronlib.WithParser(cronParser), + cronlib.WithChain(cronlib.Recover(cronlib.DefaultLogger)), + } + nowTime := time.Now() + if c.ByTime.Timezone != "" { + location, err := time.LoadLocation(c.ByTime.Timezone) + if err != nil { + logger.Errorw("failed to load timezone", zap.Error(err)) + continue + } + opts = append(opts, cronlib.WithLocation(location)) + nowTime = nowTime.In(location) + } + cr := cronlib.New(opts...) + _, err = cr.AddFunc(c.ByTime.Cron, func() { + resetConditionsCh <- struct{}{} + }) + if err != nil { + logger.Errorw("failed to add cron schedule", zap.Error(err)) + continue + } + cr.Start() + + logger.Debugf("just started cron job; entries=%v", cr.Entries()) + + // set lastResetTime (the last time this would've been triggered) + if len(cr.Entries()) > 0 { + prevTriggerTime, err := common.PrevCronTime(c.ByTime.Cron, cronParser, nowTime) + if err != nil { + logger.Errorw("couldn't get previous cron trigger time", zap.Error(err)) + continue + } + logger.Infof("previous trigger time: %v", prevTriggerTime) + if prevTriggerTime.After(lastResetTime) { + lastResetTime = prevTriggerTime + } + } + } + } + subscribeFunc := func() { wg1.Add(1) go func() { @@ -235,9 +285,9 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error { // release the lock when goroutine exits defer atomic.StoreUint32(&subLock, 0) - logger.Infof("started subscribing to events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID) + logger.Infof("started subscribing to events for trigger %s with client %s", trigger.Template.Name, clientID) - err = ebDriver.SubscribeEventSources(ctx, conn, group, closeSubCh, resetConditionsCh, depExpression, deps, transformFunc, filterFunc, actionFunc) + err = ebDriver.SubscribeEventSources(ctx, conn, group, closeSubCh, resetConditionsCh, lastResetTime, depExpression, deps, transformFunc, filterFunc, actionFunc) if err != nil { logger.Errorw("failed to subscribe to eventbus", zap.Any("clientID", clientID), zap.Error(err)) return