Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: trigger conditions reset. Closes #1381 #1392

Merged
merged 7 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 85 additions & 0 deletions api/sensor.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 91 additions & 0 deletions api/sensor.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 18 additions & 2 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"net/http"
"time"

"github.com/pkg/errors"
cronlib "github.com/robfig/cron/v3"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/pkg/errors"
)

// ValidateSensor accepts a sensor and performs validation against it
Expand All @@ -38,7 +40,7 @@ func ValidateSensor(s *v1alpha1.Sensor) error {
s.Status.MarkDependenciesProvided()
err := validateTriggers(s.Spec.Triggers)
if err != nil {
s.Status.MarkTriggersNotProvided("InvalidTriggers", "Invalid triggers.")
s.Status.MarkTriggersNotProvided("InvalidTriggers", err.Error())
return err
}
s.Status.MarkTriggersProvided()
Expand Down Expand Up @@ -79,6 +81,20 @@ func validateTriggerTemplate(template *v1alpha1.TriggerTemplate) error {
if template.Name == "" {
return errors.Errorf("trigger must define a name")
}
if len(template.ConditionsReset) > 0 {
for _, c := range template.ConditionsReset {
if c.ByTime == nil {
return errors.Errorf("invalid conditionsReset")
}
parser := cronlib.NewParser(cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow)
if _, err := parser.Parse(c.ByTime.Cron); err != nil {
return errors.Errorf("invalid cron expression %q", c.ByTime.Cron)
}
if _, err := time.LoadLocation(c.ByTime.Timezone); err != nil {
return errors.Errorf("invalid timezone %q", c.ByTime.Timezone)
}
}
}
if template.K8s != nil {
if err := validateK8STrigger(template.K8s); err != nil {
return errors.Wrapf(err, "trigger for template %s is invalid", template.Name)
Expand Down
3 changes: 2 additions & 1 deletion eventbus/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ type Driver interface {
// Parameter - conn, eventbus connection
// Parameter - group, NATS Streaming queue group or Kafka consumer group
// Parameter - closeCh, channel to indicate to close the subscription
// Parameter - resetConditionsCh, channel to indicate to reset trigger conditions
// Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3"
// Parameter - dependencies, array of dependencies information
// Parameter - filter, a function used to filter the message
// Parameter - action, a function to be triggered after all conditions meet
SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error
SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error

// Publish a message
Publish(conn Connection, message []byte) error
Expand Down
Loading