Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: conditions reset does not work if the service is down at the triggering time #1585

Merged
merged 29 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3e7d58b
incorporating new struct TimeBasedReset to encapsulate reset state/spec
juliev0 Feb 1, 2022
4692763
add capability to take a cron specification and determine the previou…
juliev0 Feb 2, 2022
462990f
removing the TimeBasedReset struct; instead taking advantage of the e…
juliev0 Feb 2, 2022
2a6e3f0
Parser can be passed into PrevCronTime()
juliev0 Feb 2, 2022
ddd70e3
fix log statement so it actually shows the reset time
juliev0 Feb 2, 2022
710c187
PrevCronTime returns error
juliev0 Feb 2, 2022
0f56078
removing everywhere we set lastResetTime to 0: doesn't seem necessary…
juliev0 Feb 2, 2022
a588e73
clarify comment
juliev0 Feb 2, 2022
1e32835
chore(deps): bump github.com/nsqio/go-nsq from 1.0.8 to 1.1.0 (#1569)
dependabot[bot] Jan 28, 2022
b7c53e9
chore(deps): bump github.com/nats-io/stan.go from 0.6.0 to 0.10.2 (#1…
dependabot[bot] Jan 28, 2022
45994a8
chore(deps): bump go.uber.org/zap from 1.19.0 to 1.20.0 (#1573)
dependabot[bot] Jan 31, 2022
9075307
chore: Update golangci-lint to 1.44.0 (#1578)
blkperl Feb 1, 2022
7b96c12
chore(deps): bump google.golang.org/grpc from v1.42.0 to v1.43.0 (#1579)
blkperl Feb 1, 2022
f91b89b
chore(deps): bump github.com/go-resty/resty/v2 from 2.3.0 to 2.7.0 (#…
dependabot[bot] Feb 1, 2022
35dcb13
fix: e2e test not recognizing .kube/config (#1581)
juliev0 Feb 1, 2022
c7dc358
chore(deps): bump cloud.google.com/go/compute from 0.1.0 to 1.1.0 (#1…
dependabot[bot] Feb 2, 2022
9d8e78f
chore(deps): bump github.com/spf13/viper from 1.10.0 to 1.10.1 (#1575)
dependabot[bot] Feb 2, 2022
d65b35a
adding fullResetOccurrenceTime to keep track of when the reset occurr…
juliev0 Feb 2, 2022
0b0282f
comments
juliev0 Feb 2, 2022
48dbb5e
removing commented out code
juliev0 Feb 2, 2022
2560cef
comment
juliev0 Feb 2, 2022
eb36a38
less kludgey way of doing fullResetTimeout()
juliev0 Feb 2, 2022
e51bb0c
cleaner way to handle the reset time
juliev0 Feb 3, 2022
dbab45b
chore(deps): bump github.com/nsqio/go-nsq from 1.0.8 to 1.1.0 (#1569)
dependabot[bot] Jan 28, 2022
ee5767a
chore(deps): bump github.com/nats-io/stan.go from 0.6.0 to 0.10.2 (#1…
dependabot[bot] Jan 28, 2022
0fa81a1
chore(deps): bump github.com/Shopify/sarama from 1.26.1 to 1.31.1 (#1…
dependabot[bot] Feb 2, 2022
1750197
chore(deps): bump github.com/xanzy/go-gitlab from 0.50.2 to 0.54.4 (#…
dependabot[bot] Feb 2, 2022
2563de3
chore:Updating security.md (#1588)
hblixt Feb 2, 2022
71f8eb4
adding mutex for eventSourceMessageHolder.resetTimeout since it can b…
juliev0 Feb 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ linters:
- gosimple
- govet
- ineffassign
- interfacer
- misspell
- nakedret
- rowserrcheck
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ start: image
kubectl -n argo-events wait --for=condition=Ready --timeout 60s pod --all

$(GOPATH)/bin/golangci-lint:
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b `go env GOPATH`/bin v1.42.1
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b `go env GOPATH`/bin v1.44.0

.PHONY: lint
lint: $(GOPATH)/bin/golangci-lint
Expand Down
19 changes: 13 additions & 6 deletions SECURITY.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
# Security
# Security policy for Argo Events

## Reporting Vulnerabilities
## Reporting a Vulnerability

Please report security vulnerabilities by e-mailing:
If you find a security related bug in Argo Events, we kindly ask you for responsible
disclosure and for giving us appropriate time to react, analyze and develop a
fix to mitigate the found security vulnerability.

* [[email protected]](mailto:[email protected])
* [[email protected]](mailto:[email protected])
Please report vulnerabilities by e-mail to the following address:

* [email protected]

All vulnerabilites and associated information will be treated with full confidentiality.

## Public Disclosure

Security vulnerabilities will be disclosed via [release notes](docs/releasing.md).
Security vulnerabilities will be disclosed via [release notes](docs/releasing.md) and using the
[GitHub Security Advisories](https://github.com/argoproj/argo-events/security/advisories)
feature to keep our community well informed, and will credit you for your findings (unless you prefer to stay anonymous, of course).

## Vulnerability Scanning

Expand Down
134 changes: 134 additions & 0 deletions common/cronutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package common

import (
"fmt"
"time"

cronlib "github.com/robfig/cron/v3"
)

const (
// Set the top bit if a star was included in the expression.
starBit = 1 << 63
)

// For a given cron specification, return the previous activation time
// If no time can be found to satisfy the schedule, return the zero time.
func PrevCronTime(cronSpec string, parser cronlib.Parser, t time.Time) (time.Time, error) {
var tm time.Time
sched, err := parser.Parse(cronSpec)
if err != nil {
return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s; couldn't parse; err=%v", cronSpec, err)
}
s, castOk := sched.(*cronlib.SpecSchedule)
if !castOk {
return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s: unexpected type for %v", cronSpec, sched)
}

// General approach is based on approach to SpecSchedule.Next() implementation

origLocation := t.Location()
loc := s.Location
if loc == time.Local {
loc = t.Location()
}
if s.Location != time.Local {
t = t.In(s.Location)
}

// Start at the previous second
t = t.Add(-1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond)

// If no time is found within five years, return zero.
yearLimit := t.Year() - 5

WRAP:
if t.Year() < yearLimit {
return tm, fmt.Errorf("can't derive previous Cron time for cron spec %s: no time found within %d years", cronSpec, yearLimit)
}

// Find the first applicable month.
// If it's this month, then do nothing.
for 1<<uint(t.Month())&s.Month == 0 {
// set t to the last second of the previous month
t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, loc)
t = t.Add(-1 * time.Second)

// Wrapped around.
if t.Month() == time.December {
goto WRAP
}
}

// Now get a day in that month.
for !dayMatches(s, t) {
// set t to the last second of the previous day

saveMonth := t.Month()
t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, loc)

// NOTE: This causes issues for daylight savings regimes where midnight does
// not exist. For example: Sao Paulo has DST that transforms midnight on
// 11/3 into 1am. Handle that by noticing when the Hour ends up != 0.

// Notice if the hour is no longer midnight due to DST.
// Add an hour if it's 23, subtract an hour if it's 1.
if t.Hour() != 0 {
if t.Hour() > 12 {
t = t.Add(time.Duration(24-t.Hour()) * time.Hour)
} else {
t = t.Add(time.Duration(-t.Hour()) * time.Hour)
}
}

t = t.Add(-1 * time.Second)

if saveMonth != t.Month() {
goto WRAP
}
}

for 1<<uint(t.Hour())&s.Hour == 0 {
// set t to the last second of the previous hour
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, loc)
t = t.Add(-1 * time.Second)

if t.Hour() == 23 {
goto WRAP
}
}

for 1<<uint(t.Minute())&s.Minute == 0 {
// set t to the last second of the previous minute
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, loc)
t = t.Add(-1 * time.Second)

if t.Minute() == 59 {
goto WRAP
}
}

for 1<<uint(t.Second())&s.Second == 0 {
// set t to the previous second
t = t.Add(-1 * time.Second)

if t.Second() == 59 {
goto WRAP
}
}

return t.In(origLocation), nil
}

// dayMatches returns true if the schedule's day-of-week and day-of-month
// restrictions are satisfied by the given time.
func dayMatches(s *cronlib.SpecSchedule, t time.Time) bool {
var (
domMatch bool = 1<<uint(t.Day())&s.Dom > 0
dowMatch bool = 1<<uint(t.Weekday())&s.Dow > 0
)
if s.Dom&starBit > 0 || s.Dow&starBit > 0 {
return domMatch && dowMatch
}
return domMatch || dowMatch
}
106 changes: 106 additions & 0 deletions common/cronutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package common

import (
"strings"
"testing"
"time"

cronlib "github.com/robfig/cron/v3"
)

func TestPrevCronTime(t *testing.T) {
juliev0 marked this conversation as resolved.
Show resolved Hide resolved
runs := []struct {
time, spec string
expected string
expectedErr bool
}{
// Simple cases
{"Mon Jul 9 15:00 2012", "0 0/15 * * * *", "Mon Jul 9 14:45 2012", false},
{"Mon Jul 9 14:59 2012", "0 0/15 * * * *", "Mon Jul 9 14:45 2012", false},
{"Mon Jul 9 15:01:59 2012", "0 0/15 * * * *", "Mon Jul 9 15:00 2012", false},

// Wrap around hours
{"Mon Jul 9 15:10 2012", "0 20-35/15 * * * *", "Mon Jul 9 14:35 2012", false},

// Wrap around days
{"Tue Jul 10 00:00 2012", "0 */15 * * * *", "Tue Jul 9 23:45 2012", false},
{"Tue Jul 10 00:00 2012", "0 20-35/15 * * * *", "Tue Jul 9 23:35 2012", false},

// Wrap around months
{"Mon Jul 9 09:35 2012", "0 0 12 9 Apr-Oct ?", "Sat Jun 9 12:00 2012", false},

// Leap year
{"Mon Jul 9 23:35 2018", "0 0 0 29 Feb ?", "Mon Feb 29 00:00 2016", false},

// Daylight savings time 3am EDT (-4) -> 2am EST (-5)
{"2013-03-11T02:30:00-0400", "TZ=America/New_York 0 0 12 9 Mar ?", "2013-03-09T12:00:00-0500", false},

// hourly job
{"2012-03-11T01:00:00-0500", "TZ=America/New_York 0 0 * * * ?", "2012-03-11T00:00:00-0500", false},

// 2am nightly job (skipped)
{"2012-03-12T00:00:00-0400", "TZ=America/New_York 0 0 2 * * ?", "2012-03-10T02:00:00-0500", false},

// 2am nightly job
{"2012-11-04T02:00:00-0500", "TZ=America/New_York 0 0 0 * * ?", "2012-11-04T00:00:00-0400", false},
{"2012-11-05T02:00:00-0500", "TZ=America/New_York 0 0 2 * * ?", "2012-11-04T02:00:00-0500", false},

// Unsatisfiable
{"Mon Jul 9 23:35 2012", "0 0 0 30 Feb ?", "", true},
{"Mon Jul 9 23:35 2012", "0 0 0 31 Apr ?", "", true},

// Monthly job
{"TZ=America/New_York 2012-12-03T00:00:00-0500", "0 0 3 3 * ?", "2012-11-03T03:00:00-0400", false},
}

parser := cronlib.NewParser(cronlib.Second | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.DowOptional | cronlib.Descriptor)

for _, c := range runs {
actual, err := PrevCronTime(c.spec, parser, getTime(c.time))
if c.expectedErr {
if err == nil {
t.Errorf("%s, \"%s\": should have received error but didn't", c.time, c.spec)
}
} else {
if err != nil {
t.Errorf("%s, \"%s\": error: %v", c.time, c.spec, err)
} else {
expected := getTime(c.expected)
if !actual.Equal(expected) {
t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual)
}
}
}
}
}

func getTime(value string) time.Time {
if value == "" {
return time.Time{}
}

var location = time.Local
if strings.HasPrefix(value, "TZ=") {
parts := strings.Fields(value)
loc, err := time.LoadLocation(parts[0][len("TZ="):])
if err != nil {
panic("could not parse location:" + err.Error())
}
location = loc
value = parts[1]
}

var layouts = []string{
"Mon Jan 2 15:04 2006",
"Mon Jan 2 15:04:05 2006",
}
for _, layout := range layouts {
if t, err := time.ParseInLocation(layout, value, location); err == nil {
return t
}
}
if t, err := time.ParseInLocation("2006-01-02T15:04:05-0700", value, location); err == nil {
return t
}
panic("could not parse time value " + value)
}
3 changes: 2 additions & 1 deletion eventbus/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package driver

import (
"context"
"time"

eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
cloudevents "github.com/cloudevents/sdk-go/v2"
Expand All @@ -21,7 +22,7 @@ type Driver interface {
// Parameter - dependencies, array of dependencies information
// Parameter - filter, a function used to filter the message
// Parameter - action, a function to be triggered after all conditions meet
SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error
SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, lastResetTime time.Time, dependencyExpr string, dependencies []Dependency, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error

// Publish a message
Publish(conn Connection, message []byte) error
Expand Down
Loading