Skip to content

Commit

Permalink
fix: conditions reset does not work if the service is down at the tri…
Browse files Browse the repository at this point in the history
…ggering time (#1585)

* incorporating new struct TimeBasedReset to encapsulate reset state/spec

Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 authored Feb 3, 2022
1 parent c0a28a6 commit 9606aed
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 64 deletions.
134 changes: 134 additions & 0 deletions common/cronutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package common

import (
"fmt"
"time"

cronlib "github.com/robfig/cron/v3"
)

const (
// Set the top bit if a star was included in the expression.
starBit = 1 << 63
)

// For a given cron specification, return the previous activation time
// If no time can be found to satisfy the schedule, return the zero time.
func PrevCronTime(cronSpec string, parser cronlib.Parser, t time.Time) (time.Time, error) {
var tm time.Time
sched, err := parser.Parse(cronSpec)
if err != nil {
return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s; couldn't parse; err=%v", cronSpec, err)
}
s, castOk := sched.(*cronlib.SpecSchedule)
if !castOk {
return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s: unexpected type for %v", cronSpec, sched)
}

// General approach is based on approach to SpecSchedule.Next() implementation

origLocation := t.Location()
loc := s.Location
if loc == time.Local {
loc = t.Location()
}
if s.Location != time.Local {
t = t.In(s.Location)
}

// Start at the previous second
t = t.Add(-1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond)

// If no time is found within five years, return zero.
yearLimit := t.Year() - 5

WRAP:
if t.Year() < yearLimit {
return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s: no time found within %d years", cronSpec, yearLimit)
}

// Find the first applicable month.
// If it's this month, then do nothing.
for 1<<uint(t.Month())&s.Month == 0 {
// set t to the last second of the previous month
t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, loc)
t = t.Add(-1 * time.Second)

// Wrapped around.
if t.Month() == time.December {
goto WRAP
}
}

// Now get a day in that month.
for !dayMatches(s, t) {
// set t to the last second of the previous day

saveMonth := t.Month()
t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, loc)

// NOTE: This causes issues for daylight savings regimes where midnight does
// not exist. For example: Sao Paulo has DST that transforms midnight on
// 11/3 into 1am. Handle that by noticing when the Hour ends up != 0.

// Notice if the hour is no longer midnight due to DST.
// Add an hour if it's 23, subtract an hour if it's 1.
if t.Hour() != 0 {
if t.Hour() > 12 {
t = t.Add(time.Duration(24-t.Hour()) * time.Hour)
} else {
t = t.Add(time.Duration(-t.Hour()) * time.Hour)
}
}

t = t.Add(-1 * time.Second)

if saveMonth != t.Month() {
goto WRAP
}
}

for 1<<uint(t.Hour())&s.Hour == 0 {
// set t to the last second of the previous hour
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, loc)
t = t.Add(-1 * time.Second)

if t.Hour() == 23 {
goto WRAP
}
}

for 1<<uint(t.Minute())&s.Minute == 0 {
// set t to the last second of the previous minute
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, loc)
t = t.Add(-1 * time.Second)

if t.Minute() == 59 {
goto WRAP
}
}

for 1<<uint(t.Second())&s.Second == 0 {
// set t to the previous second
t = t.Add(-1 * time.Second)

if t.Second() == 59 {
goto WRAP
}
}

return t.In(origLocation), nil
}

// dayMatches returns true if the schedule's day-of-week and day-of-month
// restrictions are satisfied by the given time.
func dayMatches(s *cronlib.SpecSchedule, t time.Time) bool {
var (
domMatch bool = 1<<uint(t.Day())&s.Dom > 0
dowMatch bool = 1<<uint(t.Weekday())&s.Dow > 0
)
if s.Dom&starBit > 0 || s.Dow&starBit > 0 {
return domMatch && dowMatch
}
return domMatch || dowMatch
}
106 changes: 106 additions & 0 deletions common/cronutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package common

import (
"strings"
"testing"
"time"

cronlib "github.com/robfig/cron/v3"
)

func TestPrevCronTime(t *testing.T) {
runs := []struct {
time, spec string
expected string
expectedErr bool
}{
// Simple cases
{"Mon Jul 9 15:00 2012", "0 0/15 * * * *", "Mon Jul 9 14:45 2012", false},
{"Mon Jul 9 14:59 2012", "0 0/15 * * * *", "Mon Jul 9 14:45 2012", false},
{"Mon Jul 9 15:01:59 2012", "0 0/15 * * * *", "Mon Jul 9 15:00 2012", false},

// Wrap around hours
{"Mon Jul 9 15:10 2012", "0 20-35/15 * * * *", "Mon Jul 9 14:35 2012", false},

// Wrap around days
{"Tue Jul 10 00:00 2012", "0 */15 * * * *", "Tue Jul 9 23:45 2012", false},
{"Tue Jul 10 00:00 2012", "0 20-35/15 * * * *", "Tue Jul 9 23:35 2012", false},

// Wrap around months
{"Mon Jul 9 09:35 2012", "0 0 12 9 Apr-Oct ?", "Sat Jun 9 12:00 2012", false},

// Leap year
{"Mon Jul 9 23:35 2018", "0 0 0 29 Feb ?", "Mon Feb 29 00:00 2016", false},

// Daylight savings time 3am EDT (-4) -> 2am EST (-5)
{"2013-03-11T02:30:00-0400", "TZ=America/New_York 0 0 12 9 Mar ?", "2013-03-09T12:00:00-0500", false},

// hourly job
{"2012-03-11T01:00:00-0500", "TZ=America/New_York 0 0 * * * ?", "2012-03-11T00:00:00-0500", false},

// 2am nightly job (skipped)
{"2012-03-12T00:00:00-0400", "TZ=America/New_York 0 0 2 * * ?", "2012-03-10T02:00:00-0500", false},

// 2am nightly job
{"2012-11-04T02:00:00-0500", "TZ=America/New_York 0 0 0 * * ?", "2012-11-04T00:00:00-0400", false},
{"2012-11-05T02:00:00-0500", "TZ=America/New_York 0 0 2 * * ?", "2012-11-04T02:00:00-0500", false},

// Unsatisfiable
{"Mon Jul 9 23:35 2012", "0 0 0 30 Feb ?", "", true},
{"Mon Jul 9 23:35 2012", "0 0 0 31 Apr ?", "", true},

// Monthly job
{"TZ=America/New_York 2012-12-03T00:00:00-0500", "0 0 3 3 * ?", "2012-11-03T03:00:00-0400", false},
}

parser := cronlib.NewParser(cronlib.Second | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.DowOptional | cronlib.Descriptor)

for _, c := range runs {
actual, err := PrevCronTime(c.spec, parser, getTime(c.time))
if c.expectedErr {
if err == nil {
t.Errorf("%s, \"%s\": should have received error but didn't", c.time, c.spec)
}
} else {
if err != nil {
t.Errorf("%s, \"%s\": error: %v", c.time, c.spec, err)
} else {
expected := getTime(c.expected)
if !actual.Equal(expected) {
t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual)
}
}
}
}
}

func getTime(value string) time.Time {
if value == "" {
return time.Time{}
}

var location = time.Local
if strings.HasPrefix(value, "TZ=") {
parts := strings.Fields(value)
loc, err := time.LoadLocation(parts[0][len("TZ="):])
if err != nil {
panic("could not parse location:" + err.Error())
}
location = loc
value = parts[1]
}

var layouts = []string{
"Mon Jan 2 15:04 2006",
"Mon Jan 2 15:04:05 2006",
}
for _, layout := range layouts {
if t, err := time.ParseInLocation(layout, value, location); err == nil {
return t
}
}
if t, err := time.ParseInLocation("2006-01-02T15:04:05-0700", value, location); err == nil {
return t
}
panic("could not parse time value " + value)
}
3 changes: 2 additions & 1 deletion eventbus/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package driver

import (
"context"
"time"

eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
cloudevents "github.com/cloudevents/sdk-go/v2"
Expand All @@ -21,7 +22,7 @@ type Driver interface {
// Parameter - dependencies, array of dependencies information
// Parameter - filter, a function used to filter the message
// Parameter - action, a function to be triggered after all conditions meet
SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error
SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, lastResetTime time.Time, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error

// Publish a message
Publish(conn Connection, message []byte) error
Expand Down
Loading

0 comments on commit 9606aed

Please sign in to comment.