Skip to content

Commit

Permalink
feat: trigger conditions reset. Closes #1381 (#1392)
Browse files Browse the repository at this point in the history
* feat: trigger conditions reset

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Oct 26, 2021
1 parent 2251760 commit 65c49d1
Show file tree
Hide file tree
Showing 20 changed files with 1,353 additions and 448 deletions.
28 changes: 28 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 85 additions & 0 deletions api/sensor.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 91 additions & 0 deletions api/sensor.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 18 additions & 2 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"net/http"
"time"

"github.com/pkg/errors"
cronlib "github.com/robfig/cron/v3"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/pkg/errors"
)

// ValidateSensor accepts a sensor and performs validation against it
Expand All @@ -38,7 +40,7 @@ func ValidateSensor(s *v1alpha1.Sensor) error {
s.Status.MarkDependenciesProvided()
err := validateTriggers(s.Spec.Triggers)
if err != nil {
s.Status.MarkTriggersNotProvided("InvalidTriggers", "Invalid triggers.")
s.Status.MarkTriggersNotProvided("InvalidTriggers", err.Error())
return err
}
s.Status.MarkTriggersProvided()
Expand Down Expand Up @@ -79,6 +81,20 @@ func validateTriggerTemplate(template *v1alpha1.TriggerTemplate) error {
if template.Name == "" {
return errors.Errorf("trigger must define a name")
}
if len(template.ConditionsReset) > 0 {
for _, c := range template.ConditionsReset {
if c.ByTime == nil {
return errors.Errorf("invalid conditionsReset")
}
parser := cronlib.NewParser(cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow)
if _, err := parser.Parse(c.ByTime.Cron); err != nil {
return errors.Errorf("invalid cron expression %q", c.ByTime.Cron)
}
if _, err := time.LoadLocation(c.ByTime.Timezone); err != nil {
return errors.Errorf("invalid timezone %q", c.ByTime.Timezone)
}
}
}
if template.K8s != nil {
if err := validateK8STrigger(template.K8s); err != nil {
return errors.Wrapf(err, "trigger for template %s is invalid", template.Name)
Expand Down
51 changes: 51 additions & 0 deletions controllers/sensor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,55 @@ func TestValidTriggers(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "trigger template can't be nil"))
})

t.Run("invalid conditions reset - cron", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
Conditions: "A && B",
ConditionsReset: []v1alpha1.ConditionsResetCriteria{
{
ByTime: &v1alpha1.ConditionsResetByTime{
Cron: "a * * * *",
},
},
},
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
}
err := validateTriggers(triggers)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "invalid cron expression"))
})

t.Run("invalid conditions reset - timezone", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
Conditions: "A && B",
ConditionsReset: []v1alpha1.ConditionsResetCriteria{
{
ByTime: &v1alpha1.ConditionsResetByTime{
Cron: "* * * * *",
Timezone: "fake",
},
},
},
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
}
err := validateTriggers(triggers)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "invalid timezone"))
})
}
24 changes: 22 additions & 2 deletions docs/sensors/trigger-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

> v1.0 and after
`Conditions` is a new feature to replace `Circuit` and `Switch`. With
`conditions`, triggers can be executed based on different dependency conditions.
Triggers can be executed based on different dependency `conditions`.

An example with `conditions`:

Expand Down Expand Up @@ -55,3 +54,24 @@ won't be executed until the expression resolves to true. The operators in

If `conditions` is missing, the default conditions to execute the trigger is
`&&` logic of all the defined dependencies.

## Conditions Reset

When multiple dependencies are defined for a trigger, the trigger won't be executed until the condition expression is resolved to `true`. Sometimes you might want to reset all the stakeholders of the conditions, `conditions reset` is the way to do it.

For example, your trigger has a condtion as `A && B`, both `A` and `B` are expected to have an event everyday. One day for some reason, `A` gets an event but `B` does't, then it ends up with today's `A` and tomorrow's `B` triggering an action, which might not be something you want. To avoid that, you can reset the conditons as following:

```yaml
spec:
triggers:
- template:
conditions: "dep01 && dep02"
conditionsReset:
- byTime:
# Reset conditions as 23:59
cron: "59 23 * * *"
# Optional, defaults to UTC
# More info for timezone: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
timezone: America/Los_Angeles
name: trigger01
```
3 changes: 2 additions & 1 deletion eventbus/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ type Driver interface {
// Parameter - conn, eventbus connection
// Parameter - group, NATS Streaming queue group or Kafka consumer group
// Parameter - closeCh, channel to indicate to close the subscription
// Parameter - resetConditionsCh, channel to indicate to reset trigger conditions
// Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3"
// Parameter - dependencies, array of dependencies information
// Parameter - filter, a function used to filter the message
// Parameter - action, a function to be triggered after all conditions meet
SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error
SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error

// Publish a message
Publish(conn Connection, message []byte) error
Expand Down
Loading

0 comments on commit 65c49d1

Please sign in to comment.