diff --git a/Gopkg.lock b/Gopkg.lock index ae8e87c98f..84f321d6b1 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -3,7 +3,7 @@ [[projects]] branch = "master" - digest = "1:e4c7e170dbc2f12d6b7ee4fd51dccfb912da27048be45f51156ae62063e088ad" + digest = "1:3ea5d102b6bbcd4b846a74a1b9c0ce40d35b6bbc31a425690d1260f86a7b304f" name = "cloud.google.com/go" packages = [ ".", @@ -17,7 +17,7 @@ "pubsub/internal/scheduler", ] pruneopts = "UT" - revision = "09ba3f8614bab8557260ba47b7bd3a7ec711991a" + revision = "3e8951f5e3d821b4b0f8bcd2ab6b768a25b6f8b1" [[projects]] digest = "1:487dc37a77bbba996bf4ddae0bff1c69fde98027d507e75eca317ca7c94483c3" @@ -52,15 +52,15 @@ version = "v1.3.1" [[projects]] - digest = "1:1f16df85b55dd491a5268ecbd7851d7ae84d76b9f18e925ddbe380cde8bb3c9c" + digest = "1:40e781e7e5a64819eafe9d686951dac546d2dc726431c43793a32a3ec16ed9f4" name = "github.com/Azure/azure-sdk-for-go" packages = [ "services/eventhub/mgmt/2017-04-01/eventhub", "version", ] pruneopts = "UT" - revision = "f1b3b9b7d4962c94c77d16ce75f9a8e981eb2488" - version = "v40.4.0" + revision = "4acdfccd3e1bd96cf929459f72f25cb0dd1f6ce8" + version = "v40.5.0" [[projects]] digest = "1:24f3d5aa436b7cf0244efff65e6346602eb0af08e1026bb9d1940b458c5326f9" @@ -170,7 +170,7 @@ [[projects]] branch = "master" - digest = "1:43b5cd058cb76f9f2edee6ff60f1f16027c0df08b9051bd1fd3be7d9011e1ffc" + digest = "1:530b6a024e530a11c33843312d1e666d1a54399fc773cf8c747ee5f0299fb284" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -215,7 +215,7 @@ "service/sts/stsiface", ] pruneopts = "UT" - revision = "6194da3047dec68bb49483652866575d1841ea3d" + revision = "d6d63adc1c143626733f1ab3d575b6f777357d04" [[projects]] digest = "1:b3593a62778035a246ff254b02ac1289862dbfc52a3bea493565ff24b4dd4059" @@ -588,12 +588,12 @@ version = "v1.7.4" [[projects]] - digest = "1:e62657cca9badaa308d86e7716083e4c5933bb78e30a17743fc67f50be26f6f4" + digest = "1:6d29f02f0f01c627c2be40fb7347669a9ff2aa215cb97747294c1d13ffa74bdd" name = "github.com/gorilla/websocket" packages = ["."] pruneopts = "UT" - revision = "c3e18be99d19e6b3e8f1559eea2c161a665c4b6b" - version = "v1.4.1" + revision = "b65e62901fc1c0d968042419e74789f6af455eb9" + version = "v1.4.2" [[projects]] digest = "1:6eb58a12ff2e21abe77271d29981dfc74211ae13500a5f34d34c5a098945d8b7" @@ -774,12 +774,12 @@ version = "v1.1.0" [[projects]] - digest = "1:53bc4cd4914cd7cd52139990d5170d6dc99067ae31c56530621b18b35fc30318" + digest = "1:6ea2d98ba8ae7c06a918970661b744be5d1a385c40917b9c317e9a4f2c36ac6f" name = "github.com/mitchellh/mapstructure" packages = ["."] pruneopts = "UT" - revision = "3536a929edddb9a5b34bd6861dc4a9647cb459fe" - version = "v1.1.2" + revision = "694aaefbc689be1915c2ef39e880409057931a94" + version = "v1.2.2" [[projects]] digest = "1:2a7e6f8bebdca6bd8bc359c37f01ae1c4ea4f8481eaabf93b1ae4863f15b72c7" @@ -876,28 +876,23 @@ version = "v1.0.8" [[projects]] - digest = "1:208f4dd7e6312b7d56a810a0fc98e10f9c2d239409d058f46411f59c496e53c6" + digest = "1:23c090b7e676e1bb49ab9a87562156fd49d6f4cd28eabb1f23010695fbd1a325" name = "github.com/openfaas-incubator/connector-sdk" packages = ["types"] pruneopts = "UT" - revision = "d4baf95d4c62f6295c5d4af1d36bb2b9d03a32d4" - version = "0.4.1" + revision = "df5d76475412b74c3516ba912ae4522793780994" + version = "0.5.5" [[projects]] - digest = "1:3f69624cb4ae8ab815a672f7d169d64786ee027a9d42eefab3c9d4e9debe0750" - name = "github.com/openfaas/faas" - packages = ["gateway/requests"] - pruneopts = "UT" - revision = "a65df4795bc66147c41161c48bfd4c72f60c7434" - version = "0.9.14" - -[[projects]] - digest = "1:deb76da5396c9f641ddea9ca79e31a14bdb09c787cdfda90488768b7539b1fd6" + digest = "1:5ef12e7154e453638378dfc853ca53d2d775aee2d7a6da535df639d5933b39c7" name = "github.com/openfaas/faas-provider" - packages = ["auth"] + packages = [ + "auth", + "types", + ] pruneopts = "UT" - revision = "220324e98f5db5aa61f02d1ab13f03e91310796c" - version = "0.8.1" + revision = "d6579bdcf7c85f4d01f398d65ea0cab37e9633d0" + version = "0.13.3" [[projects]] digest = "1:0d21409d74019b65b488ecb02bed6d0f539d8d8563b86aac5734dfdbdf860cdf" @@ -911,12 +906,12 @@ version = "v2.4.1" [[projects]] - digest = "1:9e1d37b58d17113ec3cb5608ac0382313c5b59470b94ed97d0976e69c7022314" + digest = "1:cf31692c14422fa27c83a05292eb5cbe0fb2775972e8f1f8446a71549bd8980b" name = "github.com/pkg/errors" packages = ["."] pruneopts = "UT" - revision = "614d223910a179a466c1767a985424175c39b465" - version = "v0.9.1" + revision = "ba968bfe8b2f7e042a574c888954fccecfa385b4" + version = "v0.8.1" [[projects]] digest = "1:0028cb19b2e4c3112225cd871870f2d9cf49b9b4276531f03438a88e94be86fe" @@ -952,11 +947,11 @@ [[projects]] branch = "master" - digest = "1:d9191c35dc4c1a7f438d8a3b5397b0ec538836c9d44e1f0c17ca8fb01a66c3b6" + digest = "1:0599141a8403114d34f1e546604ad6c5361b70dfa80e80c635f438cdbf71b43a" name = "github.com/sirupsen/logrus" packages = ["."] pruneopts = "UT" - revision = "7ea96a3284ed9d25de5056804d60426cb75833c7" + revision = "d417be0fe654de640a82370515129985b407c7e3" [[projects]] digest = "1:237af0cf68bac89e21af72e6cd6b64f388854895e75f82ad08c6c011e1a8286c" @@ -1021,7 +1016,7 @@ version = "v1.5.1" [[projects]] - digest = "1:dcf5a344bc2c01d5b53c8550ef5bd5969f9953a7b7d37b71f3560fe5d54d551c" + digest = "1:ceccfece4b4f63bdbd22526e748bcfaadf582a1ded13774984a934631df5ff15" name = "github.com/stripe/stripe-go" packages = [ ".", @@ -1029,8 +1024,8 @@ "webhookendpoint", ] pruneopts = "UT" - revision = "023fd8d3b5675a207862c946ebcaefa8d07fe89e" - version = "v70.5.0" + revision = "a40cb527655f4062e8f35abddc6efc80f78fb31c" + version = "v70.6.0" [[projects]] digest = "1:358f9a53d3bb46b9dd07b4490f9b30f9f85218b41f6bbb48d6503668a45721a6" @@ -1168,7 +1163,7 @@ "ssh/terminal", ] pruneopts = "UT" - revision = "1b76d66859c6111b3d5c3ea6600ea44dc188bf12" + revision = "891825fb96dfc23279b4a42f710d49e78e05b333" [[projects]] branch = "master" @@ -1194,7 +1189,7 @@ [[projects]] branch = "master" - digest = "1:43976cc7ddf827649d5a8c839b56fa6c9d5309e86425b90cf672e75d6dbff6d9" + digest = "1:60a15ff07ff3b717610aa80964788f1e2134e6d185241a4eea49aaba99f09dcf" name = "golang.org/x/net" packages = [ "context", @@ -1211,7 +1206,7 @@ "websocket", ] pruneopts = "UT" - revision = "244492dfa37ae2ce87222fd06250a03160745faa" + revision = "118fecf932d8fea033f3b7aca66fba22cfdd517d" [[projects]] branch = "master" @@ -1240,7 +1235,7 @@ [[projects]] branch = "master" - digest = "1:017491fb7a712fbb21cd733af00c3a9032c193ea9ae51fa0403b1b33a1b487b0" + digest = "1:b226fc389ec0c44a315f2e3cbbea1701b287fad2fef8247245f00ddbb3ae094a" name = "golang.org/x/sys" packages = [ "cpu", @@ -1248,7 +1243,7 @@ "windows", ] pruneopts = "UT" - revision = "5766fd39f98ddd8d769ad4aedcee557dd28de90f" + revision = "328b4cd54aaeaba222936f16d417999f7e521f7b" [[projects]] digest = "1:66a2f252a58b4fbbad0e4e180e1d85a83c222b6bce09c3dcdef3dc87c72eda7c" @@ -1311,7 +1306,7 @@ "internal/packagesinternal", ] pruneopts = "UT" - revision = "2944c61d58b4b44936759b37e08703fc37be20e2" + revision = "0d839f3cf2ed7d63fd98875ea3589e5bf4b64de7" [[projects]] branch = "master" @@ -1361,7 +1356,7 @@ [[projects]] branch = "master" - digest = "1:54299ac581b70a93f7f1d42f67e857e82fbca7c87e389f2949aeb989143ec7fe" + digest = "1:1d80b254efb45b0d2da79aa938ca5eeab01a164c332bd8de643042a77a03027d" name = "google.golang.org/genproto" packages = [ "googleapis/api/annotations", @@ -1372,7 +1367,7 @@ "protobuf/field_mask", ] pruneopts = "UT" - revision = "1f3552e48f24ac35a91be3fd5ea65959428857e4" + revision = "08878b785e9c1c5903852e4cb9277fce9d1e16d5" [[projects]] digest = "1:c5cb55dc71404526bd9002c7b0f63c8910f51b19751248e15dcba2c46210662a" @@ -2065,7 +2060,7 @@ "trace", ] pruneopts = "UT" - revision = "138672cc43f8470f6802e22baa03a8c20ada54bd" + revision = "327a8059b905f89edaae6412417fb2354b188f5d" [[projects]] digest = "1:caa7785e6f87eb44fe0688b7eaea489a7f904ed8cbb09eb2d963bd5cde592601" diff --git a/api/event-source.html b/api/event-source.html index 3fdd61b7ea..35bc81c321 100644 --- a/api/event-source.html +++ b/api/event-source.html @@ -1822,7 +1822,8 @@

ResourceEventSource (Optional) -

Filter is applied on the metadata of the resource

+

Filter is applied on the metadata of the resource +If you apply filter, then the internal event informer will only monitor objects that pass the filter.

@@ -1843,17 +1844,16 @@

ResourceEventSource -eventType
+eventTypes
-ResourceEventType +[]ResourceEventType -(Optional) -

Type is the event type. -If not provided, the gateway will watch all events for a resource.

+

EventTypes is the list of event type to watch. +Possible values are - ADD, UPDATE and DELETE.

diff --git a/api/event-source.md b/api/event-source.md index e072a762da..6d36edc37d 100644 --- a/api/event-source.md +++ b/api/event-source.md @@ -3605,7 +3605,9 @@ Namespace where resource is deployed

-Filter is applied on the metadata of the resource +Filter is applied on the metadata of the resource If you apply filter, +then the internal event informer will only monitor objects that pass the +filter.

@@ -3646,20 +3648,18 @@ Group of the resource -eventType
- ResourceEventType +eventTypes
+
\[\]ResourceEventType -(Optional) -

-Type is the event type. If not provided, the gateway will watch all -events for a resource. +EventTypes is the list of event type to watch. Possible values are - +ADD, UPDATE and DELETE.

@@ -4775,5 +4775,4 @@ all types of events will be processed. More info at Generated with gen-crd-api-reference-docs on git commit 1782e0f. -

diff --git a/api/gateway.md b/api/gateway.md index 3f2be472d8..5f91474494 100644 --- a/api/gateway.md +++ b/api/gateway.md @@ -1322,5 +1322,4 @@ NATS refers to the subscribers over NATS protocol. Generated with gen-crd-api-reference-docs on git commit 1782e0f. -

diff --git a/api/sensor.md b/api/sensor.md index 8fb0603526..3a124f7f78 100644 --- a/api/sensor.md +++ b/api/sensor.md @@ -6240,5 +6240,4 @@ VerifyCert decides whether the connection is secure or not Generated with gen-crd-api-reference-docs on git commit 1782e0f. -

diff --git a/docs/setup/resource.md b/docs/setup/resource.md index a3089ff9da..e7dbe1a07c 100644 --- a/docs/setup/resource.md +++ b/docs/setup/resource.md @@ -59,7 +59,7 @@ The structure of an event dispatched by the gateway to the sensor looks like fol metadata: name: my-workflow labels: - app: my-workflow + name: my-workflow spec: entrypoint: whalesay templates: diff --git a/gateways/server/resource/start.go b/gateways/server/resource/start.go index e848b5d07f..a8f50e2567 100644 --- a/gateways/server/resource/start.go +++ b/gateways/server/resource/start.go @@ -111,7 +111,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c } if resourceEventSource.Filter != nil && resourceEventSource.Filter.Fields != nil { - sel, err := LabelSelector(resourceEventSource.Filter.Fields) + sel, err := FieldSelector(resourceEventSource.Filter.Fields) if err != nil { return errors.Wrapf(err, "failed to create the field selector for the event source %s", eventSource.Name) } @@ -128,66 +128,88 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c informer := factory.ForResource(gvr) informerEventCh := make(chan *InformerEvent) + stopCh := make(chan struct{}) go func() { logger.Infoln("listening to resource events...") - for event := range informerEventCh { - objBody, err := json.Marshal(event.Obj) - if err != nil { - logger.WithError(err).Errorln("failed to marshal the resource, rejecting the event...") - continue - } + for { + select { + case event := <-informerEventCh: + objBody, err := json.Marshal(event.Obj) + if err != nil { + logger.WithError(err).Errorln("failed to marshal the resource, rejecting the event...") + continue + } - eventData := &events.ResourceEventData{ - EventType: string(event.Type), - Body: (*json.RawMessage)(&objBody), - Group: resourceEventSource.Group, - Version: resourceEventSource.Version, - Resource: resourceEventSource.Resource, - } - eventBody, err := json.Marshal(eventData) - if err != nil { - logger.WithError(err).Errorln("failed to marshal the event. rejecting the event...") - continue - } - if err := passFilters(event, resourceEventSource.Filter, resourceEventSource.EventType); err != nil { - logger.WithError(err).Warnln("failed to apply the filter, rejecting the event...") - continue + eventData := &events.ResourceEventData{ + EventType: string(event.Type), + Body: (*json.RawMessage)(&objBody), + Group: resourceEventSource.Group, + Version: resourceEventSource.Version, + Resource: resourceEventSource.Resource, + } + eventBody, err := json.Marshal(eventData) + if err != nil { + logger.WithError(err).Errorln("failed to marshal the event. rejecting the event...") + continue + } + if err := passFilters(event, resourceEventSource.Filter); err != nil { + logger.WithError(err).Warnln("failed to apply the filter, rejecting the event...") + continue + } + channels.Data <- eventBody + case <-stopCh: + return } - channels.Data <- eventBody } }() - sharedInformer := informer.Informer() - sharedInformer.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { + handlerFuncs := cache.ResourceEventHandlerFuncs{} + + for _, eventType := range resourceEventSource.EventTypes { + switch eventType { + case v1alpha1.ADD: + handlerFuncs.AddFunc = func(obj interface{}) { + logger.Infoln("detected create event") informerEventCh <- &InformerEvent{ Obj: obj, Type: v1alpha1.ADD, } - }, - UpdateFunc: func(oldObj, newObj interface{}) { + } + case v1alpha1.UPDATE: + handlerFuncs.UpdateFunc = func(oldObj, newObj interface{}) { + logger.Infoln("detected update event") informerEventCh <- &InformerEvent{ Obj: newObj, OldObj: oldObj, Type: v1alpha1.UPDATE, } - }, - DeleteFunc: func(obj interface{}) { + } + case v1alpha1.DELETE: + handlerFuncs.DeleteFunc = func(obj interface{}) { + logger.Infoln("detected delete event") informerEventCh <- &InformerEvent{ Obj: obj, Type: v1alpha1.DELETE, } - }, - }, - ) + } + default: + stopCh <- struct{}{} + return errors.Errorf("unknown event type: %s", string(eventType)) + } + } + + sharedInformer := informer.Informer() + sharedInformer.AddEventHandler(handlerFuncs) doneCh := make(chan struct{}) + + logger.Infoln("running informer...") sharedInformer.Run(doneCh) <-channels.Done doneCh <- struct{}{} + stopCh <- struct{}{} logger.Infoln("event source is stopped") close(informerEventCh) @@ -231,7 +253,7 @@ func FieldSelector(fieldSelectors map[string]string) (fields.Selector, error) { } // helper method to check if the object passed the user defined filters -func passFilters(event *InformerEvent, filter *v1alpha1.ResourceFilter, eventType v1alpha1.ResourceEventType) error { +func passFilters(event *InformerEvent, filter *v1alpha1.ResourceFilter) error { uObj := event.Obj.(*unstructured.Unstructured) // no filters are applied. if filter == nil { @@ -244,8 +266,5 @@ func passFilters(event *InformerEvent, filter *v1alpha1.ResourceFilter, eventTyp if !filter.CreatedBy.IsZero() && created.UTC().After(filter.CreatedBy.UTC()) { return errors.Errorf("resource is created after filter time. creation-timestamp: %s, filter-creation-timestamp: %s", created.UTC().String(), filter.CreatedBy.UTC().String()) } - if eventType != "" && event.Type != eventType { - return errors.Errorf("resource event type mismatch. expected: %s, actual: %s", string(eventType), string(event.Type)) - } return nil } diff --git a/gateways/server/resource/start_test.go b/gateways/server/resource/start_test.go index fa24110812..6383f942c7 100644 --- a/gateways/server/resource/start_test.go +++ b/gateways/server/resource/start_test.go @@ -64,13 +64,13 @@ func TestFilter(t *testing.T) { err = passFilters(&InformerEvent{ Obj: &unstructured.Unstructured{Object: outmap}, Type: "ADD", - }, resourceEventSource.Filter, v1alpha1.ADD) + }, resourceEventSource.Filter) convey.So(err, convey.ShouldBeNil) err = passFilters(&InformerEvent{ Obj: &unstructured.Unstructured{Object: outmap}, Type: "ADD", - }, resourceEventSource.Filter, v1alpha1.UPDATE) + }, resourceEventSource.Filter) convey.So(err, convey.ShouldNotBeNil) }) } diff --git a/gateways/server/resource/validate.go b/gateways/server/resource/validate.go index 727dec6652..1a0bb5133f 100644 --- a/gateways/server/resource/validate.go +++ b/gateways/server/resource/validate.go @@ -68,5 +68,8 @@ func validate(eventSource *v1alpha1.ResourceEventSource) error { if eventSource.Resource == "" { return fmt.Errorf("resource must be specified") } + if eventSource.EventTypes == nil { + return fmt.Errorf("event types must be specified") + } return nil } diff --git a/pkg/apis/eventsources/v1alpha1/openapi_generated.go b/pkg/apis/eventsources/v1alpha1/openapi_generated.go index 5ac0f4785e..7b9f610453 100644 --- a/pkg/apis/eventsources/v1alpha1/openapi_generated.go +++ b/pkg/apis/eventsources/v1alpha1/openapi_generated.go @@ -1450,7 +1450,7 @@ func schema_pkg_apis_eventsources_v1alpha1_ResourceEventSource(ref common.Refere }, "filter": { SchemaProps: spec.SchemaProps{ - Description: "Filter is applied on the metadata of the resource", + Description: "Filter is applied on the metadata of the resource If you apply filter, then the internal event informer will only monitor objects that pass the filter.", Ref: ref("github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.ResourceFilter"), }, }, @@ -1472,15 +1472,22 @@ func schema_pkg_apis_eventsources_v1alpha1_ResourceEventSource(ref common.Refere Format: "", }, }, - "eventType": { + "eventTypes": { SchemaProps: spec.SchemaProps{ - Description: "Type is the event type. If not provided, the gateway will watch all events for a resource.", - Type: []string{"string"}, - Format: "", + Description: "EventTypes is the list of event type to watch. Possible values are - ADD, UPDATE and DELETE.", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + }, }, }, }, - Required: []string{"namespace", "group", "version", "resource"}, + Required: []string{"namespace", "group", "version", "resource", "eventTypes"}, }, }, Dependencies: []string{ diff --git a/pkg/apis/eventsources/v1alpha1/types.go b/pkg/apis/eventsources/v1alpha1/types.go index 78eb26ab6f..a83e66ea16 100644 --- a/pkg/apis/eventsources/v1alpha1/types.go +++ b/pkg/apis/eventsources/v1alpha1/types.go @@ -140,14 +140,14 @@ type ResourceEventSource struct { // Namespace where resource is deployed Namespace string `json:"namespace" protobuf:"bytes,1,name=namespace"` // Filter is applied on the metadata of the resource + // If you apply filter, then the internal event informer will only monitor objects that pass the filter. // +optional Filter *ResourceFilter `json:"filter,omitempty" protobuf:"bytes,2,opt,name=filter"` // Group of the resource metav1.GroupVersionResource `json:",inline"` - // Type is the event type. - // If not provided, the gateway will watch all events for a resource. - // +optional - EventType ResourceEventType `json:"eventType,omitempty" protobuf:"bytes,3,opt,name=eventType"` + // EventTypes is the list of event type to watch. + // Possible values are - ADD, UPDATE and DELETE. + EventTypes []ResourceEventType `json:"eventTypes" protobuf:"bytes,3,name=eventTypes"` } // ResourceFilter contains K8 ObjectMeta information to further filter resource event objects diff --git a/pkg/apis/eventsources/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventsources/v1alpha1/zz_generated.deepcopy.go index f4945e49c1..ad5630492c 100644 --- a/pkg/apis/eventsources/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventsources/v1alpha1/zz_generated.deepcopy.go @@ -664,6 +664,11 @@ func (in *ResourceEventSource) DeepCopyInto(out *ResourceEventSource) { (*in).DeepCopyInto(*out) } out.GroupVersionResource = in.GroupVersionResource + if in.EventTypes != nil { + in, out := &in.EventTypes, &out.EventTypes + *out = make([]ResourceEventType, len(*in)) + copy(*out, *in) + } return }