diff --git a/common/cronutil.go b/common/cronutil.go new file mode 100644 index 0000000000..a46d8b032f --- /dev/null +++ b/common/cronutil.go @@ -0,0 +1,134 @@ +package common + +import ( + "fmt" + "time" + + cronlib "github.com/robfig/cron/v3" +) + +const ( + // Set the top bit if a star was included in the expression. + starBit = 1 << 63 +) + +// For a given cron specification, return the previous activation time +// If no time can be found to satisfy the schedule, return the zero time. +func PrevCronTime(cronSpec string, parser cronlib.Parser, t time.Time) (time.Time, error) { + var tm time.Time + sched, err := parser.Parse(cronSpec) + if err != nil { + return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s; couldn't parse; err=%v", cronSpec, err) + } + s, castOk := sched.(*cronlib.SpecSchedule) + if !castOk { + return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s: unexpected type for %v", cronSpec, sched) + } + + // General approach is based on approach to SpecSchedule.Next() implementation + + origLocation := t.Location() + loc := s.Location + if loc == time.Local { + loc = t.Location() + } + if s.Location != time.Local { + t = t.In(s.Location) + } + + // Start at the previous second + t = t.Add(-1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond) + + // If no time is found within five years, return zero. + yearLimit := t.Year() - 5 + +WRAP: + if t.Year() < yearLimit { + return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s: no time found within %d years", cronSpec, yearLimit) + } + + // Find the first applicable month. + // If it's this month, then do nothing. + for 1< 12 { + t = t.Add(time.Duration(24-t.Hour()) * time.Hour) + } else { + t = t.Add(time.Duration(-t.Hour()) * time.Hour) + } + } + + t = t.Add(-1 * time.Second) + + if saveMonth != t.Month() { + goto WRAP + } + } + + for 1< 0 + dowMatch bool = 1< 0 + ) + if s.Dom&starBit > 0 || s.Dow&starBit > 0 { + return domMatch && dowMatch + } + return domMatch || dowMatch +} diff --git a/common/cronutil_test.go b/common/cronutil_test.go new file mode 100644 index 0000000000..cf89912b5c --- /dev/null +++ b/common/cronutil_test.go @@ -0,0 +1,106 @@ +package common + +import ( + "strings" + "testing" + "time" + + cronlib "github.com/robfig/cron/v3" +) + +func TestPrevCronTime(t *testing.T) { + runs := []struct { + time, spec string + expected string + expectedErr bool + }{ + // Simple cases + {"Mon Jul 9 15:00 2012", "0 0/15 * * * *", "Mon Jul 9 14:45 2012", false}, + {"Mon Jul 9 14:59 2012", "0 0/15 * * * *", "Mon Jul 9 14:45 2012", false}, + {"Mon Jul 9 15:01:59 2012", "0 0/15 * * * *", "Mon Jul 9 15:00 2012", false}, + + // Wrap around hours + {"Mon Jul 9 15:10 2012", "0 20-35/15 * * * *", "Mon Jul 9 14:35 2012", false}, + + // Wrap around days + {"Tue Jul 10 00:00 2012", "0 */15 * * * *", "Tue Jul 9 23:45 2012", false}, + {"Tue Jul 10 00:00 2012", "0 20-35/15 * * * *", "Tue Jul 9 23:35 2012", false}, + + // Wrap around months + {"Mon Jul 9 09:35 2012", "0 0 12 9 Apr-Oct ?", "Sat Jun 9 12:00 2012", false}, + + // Leap year + {"Mon Jul 9 23:35 2018", "0 0 0 29 Feb ?", "Mon Feb 29 00:00 2016", false}, + + // Daylight savings time 3am EDT (-4) -> 2am EST (-5) + {"2013-03-11T02:30:00-0400", "TZ=America/New_York 0 0 12 9 Mar ?", "2013-03-09T12:00:00-0500", false}, + + // hourly job + {"2012-03-11T01:00:00-0500", "TZ=America/New_York 0 0 * * * ?", "2012-03-11T00:00:00-0500", false}, + + // 2am nightly job (skipped) + {"2012-03-12T00:00:00-0400", "TZ=America/New_York 0 0 2 * * ?", "2012-03-10T02:00:00-0500", false}, + + // 2am nightly job + {"2012-11-04T02:00:00-0500", "TZ=America/New_York 0 0 0 * * ?", "2012-11-04T00:00:00-0400", false}, + {"2012-11-05T02:00:00-0500", "TZ=America/New_York 0 0 2 * * ?", "2012-11-04T02:00:00-0500", false}, + + // Unsatisfiable + {"Mon Jul 9 23:35 2012", "0 0 0 30 Feb ?", "", true}, + {"Mon Jul 9 23:35 2012", "0 0 0 31 Apr ?", "", true}, + + // Monthly job + {"TZ=America/New_York 2012-12-03T00:00:00-0500", "0 0 3 3 * ?", "2012-11-03T03:00:00-0400", false}, + } + + parser := cronlib.NewParser(cronlib.Second | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.DowOptional | cronlib.Descriptor) + + for _, c := range runs { + actual, err := PrevCronTime(c.spec, parser, getTime(c.time)) + if c.expectedErr { + if err == nil { + t.Errorf("%s, \"%s\": should have received error but didn't", c.time, c.spec) + } + } else { + if err != nil { + t.Errorf("%s, \"%s\": error: %v", c.time, c.spec, err) + } else { + expected := getTime(c.expected) + if !actual.Equal(expected) { + t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual) + } + } + } + } +} + +func getTime(value string) time.Time { + if value == "" { + return time.Time{} + } + + var location = time.Local + if strings.HasPrefix(value, "TZ=") { + parts := strings.Fields(value) + loc, err := time.LoadLocation(parts[0][len("TZ="):]) + if err != nil { + panic("could not parse location:" + err.Error()) + } + location = loc + value = parts[1] + } + + var layouts = []string{ + "Mon Jan 2 15:04 2006", + "Mon Jan 2 15:04:05 2006", + } + for _, layout := range layouts { + if t, err := time.ParseInLocation(layout, value, location); err == nil { + return t + } + } + if t, err := time.ParseInLocation("2006-01-02T15:04:05-0700", value, location); err == nil { + return t + } + panic("could not parse time value " + value) +} diff --git a/eventbus/driver/driver.go b/eventbus/driver/driver.go index ae8b86faa4..b68af5cd3a 100644 --- a/eventbus/driver/driver.go +++ b/eventbus/driver/driver.go @@ -2,6 +2,7 @@ package driver import ( "context" + "time" eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -21,7 +22,7 @@ type Driver interface { // Parameter - dependencies, array of dependencies information // Parameter - filter, a function used to filter the message // Parameter - action, a function to be triggered after all conditions meet - SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error + SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, lastResetTime time.Time, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error // Publish a message Publish(conn Connection, message []byte) error diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index 07af83899a..b940bc5d3a 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -130,13 +130,14 @@ func (n *natsStreaming) Publish(conn Connection, message []byte) error { // Parameter - group, queue group name // Parameter - closeCh, channel to indicate to close the subscription // Parameter - resetConditionsCh, channel to indicate to reset trigger conditions +// Parameter - lastResetTime, the last time reset would have occurred, if any // Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3" // Parameter - dependencies, array of dependencies information // Parameter - filter, a function used to filter the message // Parameter - action, a function to be triggered after all conditions meet -func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error { +func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, lastResetTime time.Time, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error { log := n.logger.With("clientID", n.clientID) - msgHolder, err := newEventSourceMessageHolder(dependencyExpr, dependencies) + msgHolder, err := newEventSourceMessageHolder(log, dependencyExpr, dependencies, lastResetTime) if err != nil { return err } @@ -229,6 +230,8 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc return } + log.Debugf("New incoming Event Source Message, dependency name=%s", depName) + if depName == "" { _ = m.Ack() return @@ -243,6 +246,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc if !filter(depName, *event) { // message not interested + log.Debugf("not interested in dependency %s", depName) _ = m.Ack() return } @@ -255,25 +259,22 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc return } - // Clean up old messages before starting a new round - if msgHolder.getLastResetTime() > 0 { - // ACK all the old messages after conditions meet - if m.Timestamp <= msgHolder.latestGoodMsgTimestamp { - if depName != "" { - msgHolder.reset(depName) - } - msgHolder.ackAndCache(m, event.ID()) - return + // Acknowledge any old messages that occurred before the last reset (standard reset after trigger or conditional reset) + if m.Timestamp/1e9 <= msgHolder.getLastResetTime() { + if depName != "" { + msgHolder.reset(depName) } + msgHolder.ackAndCache(m, event.ID()) - // Old redelivered messages should be able to be acked in 60 seconds. - // Reset if the flag didn't get cleared in that period for some reasons. - if time.Now().Unix()-msgHolder.getLastResetTime() > 60 { - msgHolder.resetAll() - log.Info("ATTENTION: Reset the flags because they didn't get cleared in 60 seconds...") - } + log.Debugf("reset and acked dependency=%s due to message time occurred before reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d", + depName, m.Timestamp, msgHolder.getLastResetTime()) return } + // make sure that everything has been cleared within a certain amount of time + if msgHolder.fullResetTimeout() { + log.Infof("ATTENTION: Resetting the flags because they didn't get cleared before the timeout: msgHolder=%+v", msgHolder) + msgHolder.resetAll() + } now := time.Now().Unix() @@ -283,6 +284,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc // Re-delivered latest messge, update delivery timestamp and return existingMsg.lastDeliveredTime = now msgHolder.msgs[depName] = existingMsg + log.Debugf("Updating timestamp for dependency=%s", depName) return } else if m.Timestamp < existingMsg.timestamp { // Re-delivered old message, ack and return @@ -294,9 +296,6 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc // New message, set and check msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now} msgHolder.parameters[depName] = true - if msgHolder.latestGoodMsgTimestamp < m.Timestamp { - msgHolder.latestGoodMsgTimestamp = m.Timestamp - } // Check if there's any stale message being held. // Stale message could be message age has been longer than NATS streaming max message age, @@ -324,7 +323,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc meetDeps = append(meetDeps, k) meetMsgIds = append(meetMsgIds, v.event.ID()) } - log.Infow("trigger conditions do not meet", zap.Any("meetDependencies", meetDeps), zap.Any("meetEvents", meetMsgIds)) + log.Infow("trigger conditions not met", zap.Any("meetDependencies", meetDeps), zap.Any("meetEvents", meetMsgIds)) return } msgHolder.setLastResetTime(now) @@ -355,20 +354,23 @@ type eventSourceMessageHolder struct { // time that resets conditions, usually the time all conditions meet, // or the time getting an external signal to reset. lastResetTime int64 - // timestamp of last msg - latestGoodMsgTimestamp int64 - expr *govaluate.EvaluableExpression - depNames []string + // if we reach this time, we reset everything (occurs 60 seconds after lastResetTime) + resetTimeout int64 + expr *govaluate.EvaluableExpression + depNames []string // Mapping of [eventSourceName + eventName]dependencyName sourceDepMap map[string]string parameters map[string]interface{} msgs map[string]*eventSourceMessage // A sync map used to cache the message IDs, it is used to guarantee Exact Once triggering - smap *sync.Map - lock sync.RWMutex + smap *sync.Map + lock sync.RWMutex + timeoutLock sync.RWMutex + + logger *zap.SugaredLogger } -func newEventSourceMessageHolder(dependencyExpr string, dependencies []Dependency) (*eventSourceMessageHolder, error) { +func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr string, dependencies []Dependency, lastResetTime time.Time) (*eventSourceMessageHolder, error) { dependencyExpr = strings.ReplaceAll(dependencyExpr, "-", "\\-") expression, err := govaluate.NewEvaluableExpression(dependencyExpr) if err != nil { @@ -392,15 +394,15 @@ func newEventSourceMessageHolder(dependencyExpr string, dependencies []Dependenc } return &eventSourceMessageHolder{ - lastResetTime: int64(0), - latestGoodMsgTimestamp: int64(0), - expr: expression, - depNames: deps, - sourceDepMap: srcDepMap, - parameters: parameters, - msgs: msgs, - smap: new(sync.Map), - lock: sync.RWMutex{}, + lastResetTime: lastResetTime.Unix(), + expr: expression, + depNames: deps, + sourceDepMap: srcDepMap, + parameters: parameters, + msgs: msgs, + smap: new(sync.Map), + lock: sync.RWMutex{}, + logger: logger, }, nil } @@ -411,9 +413,30 @@ func (mh *eventSourceMessageHolder) getLastResetTime() int64 { } func (mh *eventSourceMessageHolder) setLastResetTime(t int64) { - mh.lock.Lock() - defer mh.lock.Unlock() - mh.lastResetTime = t + { + mh.lock.Lock() // since this can be called asyncronously as part of a ConditionReset, we neeed to lock this code + defer mh.lock.Unlock() + mh.lastResetTime = t + } + mh.setResetTimeout(t + 60) // failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime +} + +func (mh *eventSourceMessageHolder) setResetTimeout(t int64) { + mh.timeoutLock.Lock() // since this can be called asyncronously as part of a ConditionReset, we neeed to lock this code + defer mh.timeoutLock.Unlock() + mh.resetTimeout = t +} + +func (mh *eventSourceMessageHolder) getResetTimeout() int64 { + mh.timeoutLock.RLock() + defer mh.timeoutLock.RUnlock() + return mh.resetTimeout +} + +// failsafe condition after lastResetTime +func (mh *eventSourceMessageHolder) fullResetTimeout() bool { + resetTimeout := mh.getResetTimeout() + return resetTimeout != 0 && time.Now().Unix() > resetTimeout } func (mh *eventSourceMessageHolder) getDependencyName(eventSourceName, eventName string) (string, error) { @@ -439,8 +462,9 @@ func (mh *eventSourceMessageHolder) ackAndCache(m *stan.Msg, id string) { func (mh *eventSourceMessageHolder) reset(depName string) { mh.parameters[depName] = false delete(mh.msgs, depName) + if mh.isCleanedUp() { - mh.setLastResetTime(0) + mh.setResetTimeout(0) } } @@ -451,7 +475,7 @@ func (mh *eventSourceMessageHolder) resetAll() { for k := range mh.parameters { mh.parameters[k] = false } - mh.setLastResetTime(0) + mh.setResetTimeout(0) } // Check if all the parameters and messages have been cleaned up diff --git a/sensors/listener.go b/sensors/listener.go index a56d6dd993..fc389147f4 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -212,36 +212,21 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error { var subLock uint32 wg1 := &sync.WaitGroup{} closeSubCh := make(chan struct{}) - resetConditionsCh := make(chan struct{}) - - subscribeFunc := func() { - wg1.Add(1) - go func() { - defer wg1.Done() - // release the lock when goroutine exits - defer atomic.StoreUint32(&subLock, 0) - - logger.Infof("started subscribing to events for trigger %s with client %s", trigger.Template.Name, clientID) - - err = ebDriver.SubscribeEventSources(ctx, conn, group, closeSubCh, resetConditionsCh, depExpression, deps, transformFunc, filterFunc, actionFunc) - if err != nil { - logger.Errorw("failed to subscribe to eventbus", zap.Any("clientID", clientID), zap.Error(err)) - return - } - }() - } - - subscribeOnce(&subLock, subscribeFunc) + resetConditionsCh := make(chan struct{}) + var lastResetTime time.Time if len(trigger.Template.ConditionsReset) > 0 { for _, c := range trigger.Template.ConditionsReset { + if c.ByTime == nil { continue } + cronParser := cronlib.NewParser(cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow) opts := []cronlib.Option{ - cronlib.WithParser(cronlib.NewParser(cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow)), + cronlib.WithParser(cronParser), cronlib.WithChain(cronlib.Recover(cronlib.DefaultLogger)), } + nowTime := time.Now() if c.ByTime.Timezone != "" { location, err := time.LoadLocation(c.ByTime.Timezone) if err != nil { @@ -249,6 +234,7 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error { continue } opts = append(opts, cronlib.WithLocation(location)) + nowTime = nowTime.In(location) } cr := cronlib.New(opts...) _, err = cr.AddFunc(c.ByTime.Cron, func() { @@ -259,9 +245,43 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error { continue } cr.Start() + + logger.Debugf("just started cron job; entries=%v", cr.Entries()) + + // set lastResetTime (the last time this would've been triggered) + if len(cr.Entries()) > 0 { + prevTriggerTime, err := common.PrevCronTime(c.ByTime.Cron, cronParser, nowTime) + if err != nil { + logger.Errorw("couldn't get previous cron trigger time", zap.Error(err)) + continue + } + logger.Infof("previous trigger time: %v", prevTriggerTime) + if prevTriggerTime.After(lastResetTime) { + lastResetTime = prevTriggerTime + } + } } } + subscribeFunc := func() { + wg1.Add(1) + go func() { + defer wg1.Done() + // release the lock when goroutine exits + defer atomic.StoreUint32(&subLock, 0) + + logger.Infof("started subscribing to events for trigger %s with client %s", trigger.Template.Name, clientID) + + err = ebDriver.SubscribeEventSources(ctx, conn, group, closeSubCh, resetConditionsCh, lastResetTime, depExpression, deps, transformFunc, filterFunc, actionFunc) + if err != nil { + logger.Errorw("failed to subscribe to eventbus", zap.Any("clientID", clientID), zap.Error(err)) + return + } + }() + } + + subscribeOnce(&subLock, subscribeFunc) + logger.Infof("starting eventbus connection daemon for client %s...", clientID) ticker := time.NewTicker(5 * time.Second) defer ticker.Stop()