From 7929485b66d66de59ce22958370e4269d662a5ab Mon Sep 17 00:00:00 2001 From: Vaibhav Date: Sun, 17 Feb 2019 07:50:32 -0500 Subject: [PATCH] AWS SNS Gateway (#169) * Added support for backoff option when making connections in stream gateways * Extracting common webhook functionality * Modified storage grid and webhook implementation to take advantage of common webhook functionality * Added sns gateway implementation * Enhancing sns gateway * Update deps --- Gopkg.toml | 4 + gateways/common/doc.go | 18 ++ gateways/common/webhook.go | 226 ++++++++++++++++++++ gateways/community/aws-sns/Dockerfile | 3 + gateways/community/aws-sns/cmd/main.go | 44 ++++ gateways/community/aws-sns/config.go | 86 ++++++++ gateways/community/aws-sns/start.go | 188 ++++++++++++++++ gateways/community/aws-sns/validate.go | 74 +++++++ gateways/community/aws-sns/validate_test.go | 72 +++++++ gateways/community/storagegrid/config.go | 7 +- gateways/community/storagegrid/start.go | 206 ++++-------------- gateways/core/webhook/config.go | 24 +-- gateways/core/webhook/start.go | 181 +++------------- gateways/core/webhook/validate.go | 3 +- 14 files changed, 791 insertions(+), 345 deletions(-) create mode 100644 gateways/common/doc.go create mode 100644 gateways/common/webhook.go create mode 100644 gateways/community/aws-sns/Dockerfile create mode 100644 gateways/community/aws-sns/cmd/main.go create mode 100644 gateways/community/aws-sns/config.go create mode 100644 gateways/community/aws-sns/start.go create mode 100644 gateways/community/aws-sns/validate.go create mode 100644 gateways/community/aws-sns/validate_test.go diff --git a/Gopkg.toml b/Gopkg.toml index 8f415f8cc0..8109064692 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -20,6 +20,10 @@ required = [ name = "github.com/nats-io/go-nats-streaming" branch = "master" +[[constraint]] + name = "github.com/aws/aws-sdk-go" + branch = "master" + [[constraint]] name = "github.com/smartystreets/goconvey" version = "1.6.3" diff --git a/gateways/common/doc.go b/gateways/common/doc.go new file mode 100644 index 0000000000..3e9999cea6 --- /dev/null +++ b/gateways/common/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2018 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This package contains structs and methods that are shared across different gateways. +package common diff --git a/gateways/common/webhook.go b/gateways/common/webhook.go new file mode 100644 index 0000000000..f1b7c7be3e --- /dev/null +++ b/gateways/common/webhook.go @@ -0,0 +1,226 @@ +/* +Copyright 2018 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "fmt" + "net/http" + "sync" + + "github.com/argoproj/argo-events/gateways" + "github.com/rs/zerolog" +) + +// Webhook is a general purpose REST API +type Webhook struct { + // REST API endpoint + Endpoint string `json:"endpoint" protobuf:"bytes,1,opt,name=endpoint"` + // Method is HTTP request method that indicates the desired action to be performed for a given resource. + // See RFC7231 Hypertext Transfer Protocol (HTTP/1.1): Semantics and Content + Method string `json:"method" protobuf:"bytes,2,opt,name=method"` + // Port on which HTTP server is listening for incoming events. + Port string `json:"port" protobuf:"bytes,3,opt,name=port"` + // srv holds reference to http server + srv *http.Server `json:"srv,omitempty"` + mux *http.ServeMux `json:"mux,omitempty"` +} + +// WebhookHelper is a helper struct +type WebhookHelper struct { + // Mutex synchronizes ActiveServers + Mutex sync.Mutex + // ActiveServers keeps track of currently running http servers. + ActiveServers map[string]*activeServer + // ActiveEndpoints keep track of endpoints that are already registered with server and their status active or inactive + ActiveEndpoints map[string]*Endpoint + // RouteActivateChan handles assigning new route to server. + RouteActivateChan chan *RouteConfig + // RouteDeactivateChan handles deactivating existing route + RouteDeactivateChan chan *RouteConfig +} + +// HTTP Muxer +type server struct { + mux *http.ServeMux +} + +// activeServer contains reference to server and an error channel that is shared across all functions registering endpoints for the server. +type activeServer struct { + srv *http.ServeMux + errChan chan error +} + +// RouteConfig contains configuration about a http route +type RouteConfig struct { + Webhook *Webhook + Configs map[string]interface{} + EventSource *gateways.EventSource + Log zerolog.Logger + StartCh chan struct{} + RouteActiveHandler func(writer http.ResponseWriter, request *http.Request, rc *RouteConfig) + PostActivate func(rc *RouteConfig) error + PostStop func(rc *RouteConfig) error +} + +// endpoint contains state of an http endpoint +type Endpoint struct { + // whether endpoint is active + Active bool + // data channel to receive data on this endpoint + DataCh chan []byte +} + +// NewWebhookHelper returns new webhook helper +func NewWebhookHelper() *WebhookHelper { + return &WebhookHelper{ + ActiveEndpoints: make(map[string]*Endpoint), + ActiveServers: make(map[string]*activeServer), + Mutex: sync.Mutex{}, + RouteActivateChan: make(chan *RouteConfig), + RouteDeactivateChan: make(chan *RouteConfig), + } +} + +// InitRouteChannels initializes route channels so they can activate and deactivate routes. +func InitRouteChannels(helper *WebhookHelper) { + for { + select { + case config := <-helper.RouteActivateChan: + // start server if it has not been started on this port + config.startHttpServer(helper) + config.StartCh <- struct{}{} + + case config := <-helper.RouteDeactivateChan: + webhook := config.Webhook + _, ok := helper.ActiveServers[webhook.Port] + if ok { + helper.ActiveEndpoints[webhook.Endpoint].Active = false + } + } + } +} + +// ServeHTTP implementation +func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.mux.ServeHTTP(w, r) +} + +// starts a http server +func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) { + // start a http server only if no other configuration previously started the server on given port + helper.Mutex.Lock() + if _, ok := helper.ActiveServers[rc.Webhook.Port]; !ok { + s := &server{ + mux: http.NewServeMux(), + } + rc.Webhook.mux = s.mux + rc.Webhook.srv = &http.Server{ + Addr: ":" + fmt.Sprintf("%s", rc.Webhook.Port), + Handler: s, + } + errChan := make(chan error, 1) + helper.ActiveServers[rc.Webhook.Port] = &activeServer{ + srv: s.mux, + errChan: errChan, + } + + // start http server + go func() { + err := rc.Webhook.srv.ListenAndServe() + rc.Log.Info().Str("event-source", rc.EventSource.Name).Str("port", rc.Webhook.Port).Msg("http server stopped") + if err != nil { + errChan <- err + } + }() + } + helper.Mutex.Unlock() +} + +// activateRoute activates route +func (rc *RouteConfig) activateRoute(helper *WebhookHelper) { + helper.RouteActivateChan <- rc + + <-rc.StartCh + + if rc.Webhook.mux == nil { + helper.Mutex.Lock() + rc.Webhook.mux = helper.ActiveServers[rc.Webhook.Port].srv + helper.Mutex.Unlock() + } + + rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("adding route handler") + if _, ok := helper.ActiveEndpoints[rc.Webhook.Endpoint]; !ok { + helper.ActiveEndpoints[rc.Webhook.Endpoint] = &Endpoint{ + Active: true, + DataCh: make(chan []byte), + } + rc.Webhook.mux.HandleFunc(rc.Webhook.Endpoint, func(writer http.ResponseWriter, request *http.Request) { + rc.RouteActiveHandler(writer, request, rc) + }) + } + helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true + + rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("route handler added") +} + +func (rc *RouteConfig) processChannels(helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error { + for { + select { + case data := <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh: + rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("new event received, dispatching to gateway client") + err := eventStream.Send(&gateways.Event{ + Name: rc.EventSource.Name, + Payload: data, + }) + if err != nil { + rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to send event") + return err + } + + case <-eventStream.Context().Done(): + rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("connection is closed by client") + helper.RouteDeactivateChan <- rc + return nil + + // this error indicates that the server has stopped running + case err := <-helper.ActiveServers[rc.Webhook.Port].errChan: + return err + } + } +} + +func DefaultPostActivate(rc *RouteConfig) error { + return nil +} + +func DefaultPostStop(rc *RouteConfig) error { + return nil +} + +func ProcessRoute(rc *RouteConfig, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error { + rc.activateRoute(helper) + if err := rc.PostActivate(rc); err != nil { + return err + } + if err := rc.processChannels(helper, eventStream); err != nil { + return err + } + if err := rc.PostStop(rc); err != nil { + rc.Log.Error().Err(err).Msg("error occurred while executing post stop logic") + } + return nil +} diff --git a/gateways/community/aws-sns/Dockerfile b/gateways/community/aws-sns/Dockerfile new file mode 100644 index 0000000000..b6e15d1dfd --- /dev/null +++ b/gateways/community/aws-sns/Dockerfile @@ -0,0 +1,3 @@ +FROM scratch +COPY dist/sns-gateway /bin/ +ENTRYPOINT [ "/bin/sns-gateway" ] diff --git a/gateways/community/aws-sns/cmd/main.go b/gateways/community/aws-sns/cmd/main.go new file mode 100644 index 0000000000..62c2379a43 --- /dev/null +++ b/gateways/community/aws-sns/cmd/main.go @@ -0,0 +1,44 @@ +/* +Copyright 2018 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "os" + + "github.com/argoproj/argo-events/common" + "github.com/argoproj/argo-events/gateways" + "github.com/argoproj/argo-events/gateways/community/aws-sns" + "k8s.io/client-go/kubernetes" +) + +func main() { + kubeConfig, _ := os.LookupEnv(common.EnvVarKubeConfig) + restConfig, err := common.GetClientConfig(kubeConfig) + if err != nil { + panic(err) + } + clientset := kubernetes.NewForConfigOrDie(restConfig) + namespace, ok := os.LookupEnv(common.EnvVarGatewayNamespace) + if !ok { + panic("namespace is not provided") + } + gateways.StartGateway(&aws_sns.SNSEventSourceExecutor{ + Log: common.GetLoggerContext(common.LoggerConf()).Logger(), + Clientset: clientset, + Namespace: namespace, + }) +} diff --git a/gateways/community/aws-sns/config.go b/gateways/community/aws-sns/config.go new file mode 100644 index 0000000000..f2683be239 --- /dev/null +++ b/gateways/community/aws-sns/config.go @@ -0,0 +1,86 @@ +/* +Copyright 2018 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_sns + +import ( + "k8s.io/client-go/kubernetes" + "time" + + "github.com/ghodss/yaml" + "github.com/rs/zerolog" + corev1 "k8s.io/api/core/v1" +) + +const ( + MESSAGE_TYPE_SUBSCRIPTION_CONFIRMATION = "SubscriptionConfirmation" + MESSAGE_TYPE_UNSUBSCRIBE_CONFIRMATION = "UnsubscribeConfirmation" + MESSAGE_TYPE_NOTIFICATION = "Notification" +) + +var ( + snsProtocol = "http" +) + +// SNSEventSourceExecutor implements Eventing +type SNSEventSourceExecutor struct { + Log zerolog.Logger + // Clientset is kubernetes client + Clientset kubernetes.Interface + // Namespace where gateway is deployed + Namespace string +} + +// Json http notifications +// SNS posts those to your http url endpoint if http is selected as delivery method. +// http://docs.aws.amazon.com/sns/latest/dg/json-formats.html#http-subscription-confirmation-json +// http://docs.aws.amazon.com/sns/latest/dg/json-formats.html#http-notification-json +// http://docs.aws.amazon.com/sns/latest/dg/json-formats.html#http-unsubscribe-confirmation-json +type httpNotification struct { + Type string `json:"Type"` + MessageId string `json:"MessageId"` + Token string `json:"Token,omitempty"` // Only for subscribe and unsubscribe + TopicArn string `json:"TopicArn"` + Subject string `json:"Subject,omitempty"` // Only for Notification + Message string `json:"Message"` + SubscribeURL string `json:"SubscribeURL,omitempty"` // Only for subscribe and unsubscribe + Timestamp time.Time `json:"Timestamp"` + SignatureVersion string `json:"SignatureVersion"` + Signature string `json:"Signature"` + SigningCertURL string `json:"SigningCertURL"` + UnsubscribeURL string `json:"UnsubscribeURL,omitempty"` // Only for notifications +} + +// snsConfig contains configuration to subscribe to SNS topic +type snsConfig struct { + TopicArn string `json:"topicArn"` + // Endpoint you wish to register + Endpoint string `json:"endpoint"` + // Port on which http server is running. + Port string `json:"port"` + AccessKey *corev1.SecretKeySelector `json:"accessKey" protobuf:"bytes,5,opt,name=accessKey"` + SecretKey *corev1.SecretKeySelector `json:"secretKey" protobuf:"bytes,6,opt,name=secretKey"` + Region string `json:"region"` +} + +func parseEventSource(es string) (*snsConfig, error) { + var n *snsConfig + err := yaml.Unmarshal([]byte(es), &n) + if err != nil { + return nil, err + } + return n, nil +} diff --git a/gateways/community/aws-sns/start.go b/gateways/community/aws-sns/start.go new file mode 100644 index 0000000000..93e773c42c --- /dev/null +++ b/gateways/community/aws-sns/start.go @@ -0,0 +1,188 @@ +/* +Copyright 2018 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_sns + +import ( + "fmt" + "io/ioutil" + "net/http" + + "github.com/argoproj/argo-events/common" + "github.com/argoproj/argo-events/gateways" + gwcommon "github.com/argoproj/argo-events/gateways/common" + "github.com/argoproj/argo-events/store" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + snslib "github.com/aws/aws-sdk-go/service/sns" + "github.com/ghodss/yaml" +) + +const ( + LabelSNSConfig = "snsConfig" + LabelSNSSession = "snsSession" + LabelSubscriptionArn = "subscriptionArn" +) + +var ( + helper = gwcommon.NewWebhookHelper() +) + +func init() { + go gwcommon.InitRouteChannels(helper) +} + +// routeActiveHandler handles new route +func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *gwcommon.RouteConfig) { + var response string + + logger := rc.Log.With().Str("event-source", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint). + Str("port", rc.Webhook.Port). + Str("http-method", request.Method).Logger() + logger.Info().Msg("request received") + + if !helper.ActiveEndpoints[rc.Webhook.Endpoint].Active { + response = fmt.Sprintf("the route: endpoint %s and method %s is deactived", rc.Webhook.Endpoint, rc.Webhook.Method) + logger.Info().Msg("endpoint is not active") + common.SendErrorResponse(writer, response) + return + } + + body, err := ioutil.ReadAll(request.Body) + if err != nil { + logger.Error().Err(err).Msg("failed to parse request body") + common.SendErrorResponse(writer, fmt.Sprintf("failed to parse request. err: %+v", err)) + return + } + + var snspayload *httpNotification + err = yaml.Unmarshal(body, &snspayload) + if err != nil { + logger.Error().Err(err).Msg("failed to convert request payload into snsConfig payload") + return + } + + sc := rc.Configs[LabelSNSConfig].(*snsConfig) + + switch snspayload.Type { + case MESSAGE_TYPE_SUBSCRIPTION_CONFIRMATION: + awsSession := rc.Configs[LabelSNSSession].(*snslib.SNS) + out, err := awsSession.ConfirmSubscription(&snslib.ConfirmSubscriptionInput{ + TopicArn: &sc.TopicArn, + Token: &snspayload.Token, + }) + if err != nil { + logger.Error().Err(err).Msg("failed to send confirmation response to amazon") + return + } + rc.Configs[LabelSubscriptionArn] = out.SubscriptionArn + + case MESSAGE_TYPE_NOTIFICATION: + helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh <- body + } + + response = "request successfully processed" + logger.Info().Msg(response) + common.SendSuccessResponse(writer, response) +} + +func (ese *SNSEventSourceExecutor) PostActivate(rc *gwcommon.RouteConfig) error { + logger := rc.Log.With().Str("event-source", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint). + Str("port", rc.Webhook.Port).Logger() + + sc := rc.Configs[LabelSNSConfig].(*snsConfig) + // retrieve access key id and secret access key + accessKey, err := store.GetSecrets(ese.Clientset, ese.Namespace, sc.AccessKey.Name, sc.AccessKey.Key) + if err != nil { + logger.Error().Err(err).Msg("failed to retrieve access key") + return err + } + secretKey, err := store.GetSecrets(ese.Clientset, ese.Namespace, sc.SecretKey.Name, sc.SecretKey.Key) + if err != nil { + logger.Error().Err(err).Msg("failed to retrieve secret key") + return err + } + + creds := credentials.NewStaticCredentialsFromCreds(credentials.Value{ + AccessKeyID: accessKey, + SecretAccessKey: secretKey, + }) + awsSession, err := session.NewSession(&aws.Config{ + Endpoint: &rc.Webhook.Endpoint, + Region: &sc.Region, + Credentials: creds, + HTTPClient: &http.Client{}, + }) + if err != nil { + logger.Error().Err(err).Msg("failed to create new session") + return err + } + + snsSession := snslib.New(awsSession) + rc.Configs[LabelSNSSession] = snsSession + + logger.Info().Msg("subscribing to snsConfig topic") + if _, err := snsSession.Subscribe(&snslib.SubscribeInput{ + Endpoint: &rc.Webhook.Endpoint, + Protocol: &snsProtocol, + TopicArn: &sc.TopicArn, + }); err != nil { + logger.Error().Err(err).Msg("failed to send subscribe request") + return err + } + + return nil +} + +// PostStop unsubscribes the topic +func PostStop(rc *gwcommon.RouteConfig) error { + awsSession := rc.Configs[LabelSNSSession].(*snslib.SNS) + if _, err := awsSession.Unsubscribe(&snslib.UnsubscribeInput{ + SubscriptionArn: rc.Configs[LabelSubscriptionArn].(*string), + }); err != nil { + rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to unsubscribe") + return err + } + return nil +} + +// StartConfig runs a configuration +func (ese *SNSEventSourceExecutor) StartEventSource(eventSource *gateways.EventSource, eventStream gateways.Eventing_StartEventSourceServer) error { + defer gateways.Recover(eventSource.Name) + + ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("operating on event source") + sc, err := parseEventSource(eventSource.Data) + if err != nil { + return err + } + + return gwcommon.ProcessRoute(&gwcommon.RouteConfig{ + Webhook: &gwcommon.Webhook{ + Endpoint: sc.Endpoint, + Port: sc.Port, + }, + Configs: map[string]interface{}{ + LabelSNSConfig: sc, + }, + Log: ese.Log, + EventSource: eventSource, + PostActivate: ese.PostActivate, + PostStop: PostStop, + RouteActiveHandler: RouteActiveHandler, + StartCh: make(chan struct{}), + }, helper, eventStream) +} diff --git a/gateways/community/aws-sns/validate.go b/gateways/community/aws-sns/validate.go new file mode 100644 index 0000000000..31db23c153 --- /dev/null +++ b/gateways/community/aws-sns/validate.go @@ -0,0 +1,74 @@ +/* +Copyright 2018 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_sns + +import ( + "context" + "fmt" + "strings" + + "github.com/argoproj/argo-events/gateways" +) + +// ValidateEventSource validates gateway event source +func (ese *SNSEventSourceExecutor) ValidateEventSource(ctx context.Context, es *gateways.EventSource) (*gateways.ValidEventSource, error) { + sc, err := parseEventSource(es.Data) + if err != nil { + return &gateways.ValidEventSource{ + IsValid: false, + Reason: fmt.Sprintf("failed to parse event source. err: %+v", err), + }, nil + } + if err = validateSNSConfig(sc); err != nil { + return &gateways.ValidEventSource{ + Reason: err.Error(), + IsValid: false, + }, nil + } + return &gateways.ValidEventSource{ + IsValid: true, + Reason: "valid", + }, nil +} + +func validateSNSConfig(sc *snsConfig) error { + if sc == nil { + return gateways.ErrEmptyEventSource + } + if sc.Port == "" { + return fmt.Errorf("must specify port") + } + if sc.Endpoint == "" { + return fmt.Errorf("must specify endpoint") + } + if !strings.HasPrefix(sc.Endpoint, "/") { + return fmt.Errorf("endpoint must start with '/'") + } + if sc.TopicArn == "" { + return fmt.Errorf("must specify topic arn") + } + if sc.Region == "" { + return fmt.Errorf("must specify region") + } + if sc.AccessKey == nil { + return fmt.Errorf("must specify access key") + } + if sc.SecretKey == nil { + return fmt.Errorf("must specify secret key") + } + return nil +} diff --git a/gateways/community/aws-sns/validate_test.go b/gateways/community/aws-sns/validate_test.go new file mode 100644 index 0000000000..3babe68656 --- /dev/null +++ b/gateways/community/aws-sns/validate_test.go @@ -0,0 +1,72 @@ +/* +Copyright 2018 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_sns + +import ( + "context" + "github.com/argoproj/argo-events/gateways" + "github.com/smartystreets/goconvey/convey" + "testing" +) + +var ( + configKey = "testConfig" + configId = "1234" + validConfig = ` +endpoint: "/" +port: "8080" +topicArn: "test-arn" +region: "us-east-1" +accessKey: + key: accesskey + name: sns +secretKey: + key: secretkey + name: sns + +` + + invalidConfig = ` +endpoint: "/" +port: "8080" +` +) + +func TestSNSEventSourceExecutor_ValidateEventSource(t *testing.T) { + convey.Convey("Given a valid sns event source spec, parse it and make sure no error occurs", t, func() { + ese := &SNSEventSourceExecutor{} + valid, _ := ese.ValidateEventSource(context.Background(), &gateways.EventSource{ + Name: configKey, + Id: configId, + Data: validConfig, + }) + convey.So(valid, convey.ShouldNotBeNil) + convey.So(valid.IsValid, convey.ShouldBeTrue) + }) + + convey.Convey("Given an invalid sns event source spec, parse it and make sure error occurs", t, func() { + ese := &SNSEventSourceExecutor{} + valid, _ := ese.ValidateEventSource(context.Background(), &gateways.EventSource{ + Data: invalidConfig, + Id: configId, + Name: configKey, + }) + convey.So(valid, convey.ShouldNotBeNil) + convey.So(valid.IsValid, convey.ShouldBeFalse) + convey.So(valid.Reason, convey.ShouldNotBeEmpty) + }) +} diff --git a/gateways/community/storagegrid/config.go b/gateways/community/storagegrid/config.go index 5f2725833b..9a3c36eeb5 100644 --- a/gateways/community/storagegrid/config.go +++ b/gateways/community/storagegrid/config.go @@ -24,13 +24,12 @@ import ( "github.com/rs/zerolog" ) -// StorageGridEventSourceExecutor implements ConfigExecutor interface +// StorageGridEventSourceExecutor implements Eventing type StorageGridEventSourceExecutor struct { Log zerolog.Logger } // storageGrid contains configuration for storage grid sns -// +k8s:openapi-gen=true type storageGrid struct { // Port to run web server on Port string `json:"port"` @@ -43,9 +42,7 @@ type storageGrid struct { // Filter on object key which caused the notification. Filter *Filter `json:"filter,omitempty"` // srv holds reference to http server - // +k8s:openapi-gen=false - srv *http.Server `json:"srv,omitempty"` - // +k8s:openapi-gen=false + srv *http.Server `json:"srv,omitempty"` mux *http.ServeMux `json:"mux,omitempty"` } diff --git a/gateways/community/storagegrid/start.go b/gateways/community/storagegrid/start.go index a8880775b7..ec748276b4 100644 --- a/gateways/community/storagegrid/start.go +++ b/gateways/community/storagegrid/start.go @@ -18,37 +18,25 @@ package storagegrid import ( "encoding/json" - "fmt" - "github.com/argoproj/argo-events/common" "io/ioutil" "net/http" "net/url" "strings" - "sync" + + "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" + gwcommon "github.com/argoproj/argo-events/gateways/common" "github.com/joncalhoun/qson" "github.com/satori/go.uuid" ) -var ( - // mutex synchronizes activeServers - mutex sync.Mutex - // activeServers keeps track of currently running http servers. - activeServers = make(map[string]*activeServer) - - // activeEndpoints keep track of endpoints that are already registered with server and their status active or deactive - activeEndpoints = make(map[string]*endpoint) - - // mutex synchronizes activeRoutes - routesMutex sync.Mutex - // activeRoutes keep track of active routes for a http server - activeRoutes = make(map[string]map[string]struct{}) - - // routeActivateChan handles assigning new route to server. - routeActivateChan = make(chan routeConfig) +const ( + LabelStorageGridConfig = "storageGridConfig" +) - routeDeactivateChan = make(chan routeConfig) +var ( + helper = gwcommon.NewWebhookHelper() respBody = ` @@ -61,52 +49,8 @@ var ( ` + "\n" ) -// HTTP Muxer -type server struct { - mux *http.ServeMux -} - -type routeConfig struct { - sgConfig *storageGrid - eventSource *gateways.EventSource - eventSourceExecutor *StorageGridEventSourceExecutor - startCh chan struct{} -} - -type endpoint struct { - active bool - dataCh chan []byte - errCh chan error -} - func init() { - go func() { - for { - select { - case config := <-routeActivateChan: - // start server if it has not been started on this port - config.startHttpServer() - config.startCh <- struct{}{} - - case config := <-routeDeactivateChan: - _, ok := activeServers[config.sgConfig.Port] - if ok { - activeEndpoints[config.sgConfig.Endpoint].active = false - } - } - } - }() -} - -// activeServer contains reference to server and an error channel that is shared across all functions registering endpoints for the server. -type activeServer struct { - srv *http.ServeMux - errChan chan error -} - -// ServeHTTP implementation -func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.mux.ServeHTTP(w, r) + go gwcommon.InitRouteChannels(helper) } // generateUUID returns a new uuid @@ -144,37 +88,6 @@ func filterName(notification *storageGridNotification, sg *storageGrid) bool { return true } -// starts a http server -func (rc *routeConfig) startHttpServer() { - // start a http server only if no other configuration previously started the server on given port - mutex.Lock() - if _, ok := activeServers[rc.sgConfig.Port]; !ok { - s := &server{ - mux: http.NewServeMux(), - } - rc.sgConfig.mux = s.mux - rc.sgConfig.srv = &http.Server{ - Addr: ":" + fmt.Sprintf("%s", rc.sgConfig.Port), - Handler: s, - } - errChan := make(chan error, 1) - activeServers[rc.sgConfig.Port] = &activeServer{ - srv: s.mux, - errChan: errChan, - } - - // start http server - go func() { - err := rc.sgConfig.srv.ListenAndServe() - rc.eventSourceExecutor.Log.Info().Str("event-source", rc.eventSource.Name).Str("port", rc.sgConfig.Port).Msg("http server stopped") - if err != nil { - errChan <- err - } - }() - } - mutex.Unlock() -} - // StartConfig runs a configuration func (ese *StorageGridEventSourceExecutor) StartEventSource(eventSource *gateways.EventSource, eventStream gateways.Eventing_StartEventSourceServer) error { defer gateways.Recover(eventSource.Name) @@ -185,76 +98,37 @@ func (ese *StorageGridEventSourceExecutor) StartEventSource(eventSource *gateway return err } - rc := routeConfig{ - sgConfig: sg, - eventSource: eventSource, - eventSourceExecutor: ese, - startCh: make(chan struct{}), - } - - routeActivateChan <- rc - - <-rc.startCh - - if rc.sgConfig.mux == nil { - mutex.Lock() - rc.sgConfig.mux = activeServers[rc.sgConfig.Port].srv - mutex.Unlock() - } - - ese.Log.Info().Str("event-source-name", eventSource.Name).Str("port", sg.Port).Str("endpoint", sg.Endpoint).Msg("adding route handler") - if _, ok := activeEndpoints[rc.sgConfig.Endpoint]; !ok { - activeEndpoints[rc.sgConfig.Endpoint] = &endpoint{ - active: true, - dataCh: make(chan []byte), - errCh: make(chan error), - } - rc.sgConfig.mux.HandleFunc(rc.sgConfig.Endpoint, rc.routeActiveHandler) - } - activeEndpoints[rc.sgConfig.Endpoint].active = true - - ese.Log.Info().Str("event-source-name", eventSource.Name).Str("port", sg.Port).Str("endpoint", sg.Endpoint).Msg("route handler added") - - for { - select { - case data := <-activeEndpoints[rc.sgConfig.Endpoint].dataCh: - ese.Log.Info().Msg("received data") - err := eventStream.Send(&gateways.Event{ - Name: eventSource.Name, - Payload: data, - }) - if err != nil { - ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Str("port", sg.Port).Str("endpoint", sg.Endpoint).Msg("failed to send event") - return err - } - - case err := <-activeEndpoints[rc.sgConfig.Endpoint].errCh: - ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Str("port", sg.Port).Str("endpoint", sg.Endpoint).Msg("internal error occurred") - - case <-eventStream.Context().Done(): - ese.Log.Info().Str("event-source-name", eventSource.Name).Str("port", sg.Port).Str("endpoint", sg.Endpoint).Msg("connection is closed by client") - routeDeactivateChan <- rc - return nil - - // this error indicates that the server has stopped running - case err := <-activeServers[rc.sgConfig.Port].errChan: - return err - } - } + return gwcommon.ProcessRoute(&gwcommon.RouteConfig{ + Configs: map[string]interface{}{ + LabelStorageGridConfig: sg, + }, + Webhook: &gwcommon.Webhook{ + Port: sg.Port, + Endpoint: sg.Endpoint, + }, + Log: ese.Log, + EventSource: eventSource, + PostActivate: gwcommon.DefaultPostActivate, + PostStop: gwcommon.DefaultPostStop, + RouteActiveHandler: RouteActiveHandler, + StartCh: make(chan struct{}), + }, helper, eventStream) } // routeActiveHandler handles new route -func (rc *routeConfig) routeActiveHandler(writer http.ResponseWriter, request *http.Request) { - if !activeEndpoints[rc.sgConfig.Endpoint].active { - rc.eventSourceExecutor.Log.Info().Str("event-source-name", rc.eventSource.Name).Str("method", http.MethodHead).Msg("deactived route") +func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *gwcommon.RouteConfig) { + logger := rc.Log.With().Str("event-source-name", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint).Str("port", rc.Webhook.Port).Str("method", http.MethodHead).Logger() + + if !helper.ActiveEndpoints[rc.Webhook.Endpoint].Active { + logger.Warn().Msg("inactive route") common.SendErrorResponse(writer, "route is not valid") return } - rc.eventSourceExecutor.Log.Info().Str("event-source-name", rc.eventSource.Name).Str("method", http.MethodHead).Msg("received a request") + rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("method", http.MethodHead).Msg("received a request") body, err := ioutil.ReadAll(request.Body) if err != nil { - rc.eventSourceExecutor.Log.Error().Err(err).Msg("failed to parse request body") + logger.Error().Err(err).Msg("failed to parse request body") common.SendErrorResponse(writer, "failed to parse request body") return } @@ -267,33 +141,33 @@ func (rc *routeConfig) routeActiveHandler(writer http.ResponseWriter, request *h writer.Header().Add("Content-Type", "text/plain") writer.Write([]byte(respBody)) - rc.eventSourceExecutor.Log.Info().Str("body", string(body)).Msg("response body") - // notification received from storage grid is url encoded. parsedURL, err := url.QueryUnescape(string(body)) if err != nil { - activeEndpoints[rc.sgConfig.Endpoint].errCh <- err + logger.Error().Err(err).Msg("failed to unescape request body url") return } b, err := qson.ToJSON(parsedURL) if err != nil { - activeEndpoints[rc.sgConfig.Endpoint].errCh <- err + logger.Error().Err(err).Msg("failed to convert request body in JSON format") return } var notification *storageGridNotification err = json.Unmarshal(b, ¬ification) if err != nil { - activeEndpoints[rc.sgConfig.Endpoint].errCh <- err + logger.Error().Err(err).Msg("failed to unmarshal request body") return } - if filterEvent(notification, rc.sgConfig) && filterName(notification, rc.sgConfig) { - rc.eventSourceExecutor.Log.Info().Str("event-source-name", rc.eventSource.Name).Msg("new event received, dispatching to gateway client") - activeEndpoints[rc.sgConfig.Endpoint].dataCh <- b + storageGridConfig := rc.Configs[LabelStorageGridConfig].(*storageGrid) + + if filterEvent(notification, storageGridConfig) && filterName(notification, storageGridConfig) { + logger.Info().Msg("new event received, dispatching to gateway client") + helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh <- b return } - rc.eventSourceExecutor.Log.Warn().Str("event-source-name", rc.eventSource.Name).Interface("notification", notification). + logger.Warn().Interface("notification", notification). Msg("discarding notification since it did not pass all filters") } diff --git a/gateways/core/webhook/config.go b/gateways/core/webhook/config.go index 890ed93ebf..c9bb44b355 100644 --- a/gateways/core/webhook/config.go +++ b/gateways/core/webhook/config.go @@ -17,8 +17,7 @@ limitations under the License. package webhook import ( - "net/http" - + "github.com/argoproj/argo-events/gateways/common" "github.com/ghodss/yaml" "github.com/rs/zerolog" ) @@ -28,25 +27,8 @@ type WebhookEventSourceExecutor struct { Log zerolog.Logger } -// webhook is a general purpose REST API -// +k8s:openapi-gen=true -type webhook struct { - // REST API endpoint - Endpoint string `json:"endpoint" protobuf:"bytes,1,opt,name=endpoint"` - // Method is HTTP request method that indicates the desired action to be performed for a given resource. - // See RFC7231 Hypertext Transfer Protocol (HTTP/1.1): Semantics and Content - Method string `json:"method" protobuf:"bytes,2,opt,name=method"` - // Port on which HTTP server is listening for incoming events. - Port string `json:"port" protobuf:"bytes,3,opt,name=port"` - // srv holds reference to http server - // +k8s:openapi-gen=false - srv *http.Server `json:"srv,omitempty"` - // +k8s:openapi-gen=false - mux *http.ServeMux `json:"mux,omitempty"` -} - -func parseEventSource(es string) (*webhook, error) { - var n *webhook +func parseEventSource(es string) (*common.Webhook, error) { + var n *common.Webhook err := yaml.Unmarshal([]byte(es), &n) if err != nil { return nil, err diff --git a/gateways/core/webhook/start.go b/gateways/core/webhook/start.go index a97a369af7..19a5faa026 100644 --- a/gateways/core/webhook/start.go +++ b/gateways/core/webhook/start.go @@ -19,134 +19,53 @@ package webhook import ( "fmt" "github.com/argoproj/argo-events/common" + gwcommon "github.com/argoproj/argo-events/gateways/common" "io/ioutil" "net/http" - "sync" "github.com/argoproj/argo-events/gateways" ) var ( - // mutex synchronizes activeServers - mutex sync.Mutex - // activeServers keeps track of currently running http servers. - activeServers = make(map[string]*activeServer) - - // activeEndpoints keep track of endpoints that are already registered with server and their status active or deactive - activeEndpoints = make(map[string]*endpoint) - - // routeActivateChan handles assigning new route to server. - routeActivateChan = make(chan *routeConfig) - - // routeDeactivateChan handles deactivating existing route - routeDeactivateChan = make(chan *routeConfig) + helper = gwcommon.NewWebhookHelper() ) -// HTTP Muxer -type server struct { - mux *http.ServeMux -} - -// activeServer contains reference to server and an error channel that is shared across all functions registering endpoints for the server. -type activeServer struct { - srv *http.ServeMux - errChan chan error -} - -type routeConfig struct { - wConfig *webhook - eventSource *gateways.EventSource - eventSourceExecutor *WebhookEventSourceExecutor - startCh chan struct{} -} - -type endpoint struct { - active bool - dataCh chan []byte -} - func init() { - go func() { - for { - select { - case config := <-routeActivateChan: - // start server if it has not been started on this port - config.startHttpServer() - config.startCh <- struct{}{} - - case config := <-routeDeactivateChan: - _, ok := activeServers[config.wConfig.Port] - if ok { - activeEndpoints[config.wConfig.Endpoint].active = false - } - } - } - }() -} - -// ServeHTTP implementation -func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.mux.ServeHTTP(w, r) -} - -// starts a http server -func (rc *routeConfig) startHttpServer() { - // start a http server only if no other configuration previously started the server on given port - mutex.Lock() - if _, ok := activeServers[rc.wConfig.Port]; !ok { - s := &server{ - mux: http.NewServeMux(), - } - rc.wConfig.mux = s.mux - rc.wConfig.srv = &http.Server{ - Addr: ":" + fmt.Sprintf("%s", rc.wConfig.Port), - Handler: s, - } - errChan := make(chan error, 1) - activeServers[rc.wConfig.Port] = &activeServer{ - srv: s.mux, - errChan: errChan, - } - - // start http server - go func() { - err := rc.wConfig.srv.ListenAndServe() - rc.eventSourceExecutor.Log.Info().Str("event-source", rc.eventSource.Name).Str("port", rc.wConfig.Port).Msg("http server stopped") - if err != nil { - errChan <- err - } - }() - } - mutex.Unlock() + go gwcommon.InitRouteChannels(helper) } // routeActiveHandler handles new route -func (rc *routeConfig) routeActiveHandler(writer http.ResponseWriter, request *http.Request) { +func RouteActiveHandler(writer http.ResponseWriter, request *http.Request, rc *gwcommon.RouteConfig) { var response string - if !activeEndpoints[rc.wConfig.Endpoint].active { - response = fmt.Sprintf("the route: endpoint %s and method %s is deactived", rc.wConfig.Endpoint, rc.wConfig.Method) - rc.eventSourceExecutor.Log.Info().Str("endpoint", rc.wConfig.Endpoint).Str("http-method", request.Method).Str("response", response).Msg("endpoint is not active") + + logger := rc.Log.With().Str("event-source", rc.EventSource.Name).Str("endpoint", rc.Webhook.Endpoint). + Str("port", rc.Webhook.Port). + Str("http-method", request.Method).Logger() + logger.Info().Msg("request received") + + if !helper.ActiveEndpoints[rc.Webhook.Endpoint].Active { + response = fmt.Sprintf("the route: endpoint %s and method %s is deactived", rc.Webhook.Endpoint, rc.Webhook.Method) + logger.Info().Msg("endpoint is not active") common.SendErrorResponse(writer, response) return } - if rc.wConfig.Method != request.Method { - msg := fmt.Sprintf("the method %s is not defined for endpoint %s", rc.wConfig.Method, rc.wConfig.Endpoint) - rc.eventSourceExecutor.Log.Info().Str("endpoint", rc.wConfig.Endpoint).Str("http-method", request.Method).Str("response", response).Msg("endpoint is not active") - common.SendErrorResponse(writer, msg) + + if rc.Webhook.Method != request.Method { + logger.Warn().Str("expected", rc.Webhook.Method).Str("actual", request.Method).Msg("method mismatch") + common.SendErrorResponse(writer, fmt.Sprintf("the method %s is not defined for endpoint %s", rc.Webhook.Method, rc.Webhook.Endpoint)) return } - rc.eventSourceExecutor.Log.Info().Str("endpoint", rc.wConfig.Endpoint).Str("http-method", request.Method).Msg("payload received") body, err := ioutil.ReadAll(request.Body) if err != nil { - rc.eventSourceExecutor.Log.Error().Err(err).Str("endpoint", rc.wConfig.Endpoint).Str("http-method", request.Method).Str("response", response).Msg("failed to parse request body") + logger.Error().Err(err).Msg("failed to parse request body") common.SendErrorResponse(writer, fmt.Sprintf("failed to parse request. err: %+v", err)) return } - activeEndpoints[rc.wConfig.Endpoint].dataCh <- body + helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh <- body response = "request successfully processed" - rc.eventSourceExecutor.Log.Info().Str("endpoint", rc.wConfig.Endpoint).Str("http-method", request.Method).Str("response", response).Msg("request payload parsed successfully") + logger.Info().Msg(response) common.SendSuccessResponse(writer, response) } @@ -160,55 +79,13 @@ func (ese *WebhookEventSourceExecutor) StartEventSource(eventSource *gateways.Ev return err } - rc := &routeConfig{ - wConfig: h, - eventSource: eventSource, - eventSourceExecutor: ese, - startCh: make(chan struct{}), - } - - routeActivateChan <- rc - - <-rc.startCh - - if rc.wConfig.mux == nil { - mutex.Lock() - rc.wConfig.mux = activeServers[rc.wConfig.Port].srv - mutex.Unlock() - } - - ese.Log.Info().Str("event-source-name", eventSource.Name).Str("port", h.Port).Str("endpoint", h.Endpoint).Str("method", h.Method).Msg("adding route handler") - if _, ok := activeEndpoints[rc.wConfig.Endpoint]; !ok { - activeEndpoints[rc.wConfig.Endpoint] = &endpoint{ - active: true, - dataCh: make(chan []byte), - } - rc.wConfig.mux.HandleFunc(rc.wConfig.Endpoint, rc.routeActiveHandler) - } - activeEndpoints[rc.wConfig.Endpoint].active = true - - ese.Log.Info().Str("event-source-name", eventSource.Name).Str("port", h.Port).Str("endpoint", h.Endpoint).Str("method", h.Method).Msg("route handler added") - for { - select { - case data := <-activeEndpoints[rc.wConfig.Endpoint].dataCh: - ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("new event received, dispatching to gateway client") - err := eventStream.Send(&gateways.Event{ - Name: eventSource.Name, - Payload: data, - }) - if err != nil { - ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Msg("failed to send event") - return err - } - - case <-eventStream.Context().Done(): - ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("connection is closed by client") - routeDeactivateChan <- rc - return nil - - // this error indicates that the server has stopped running - case err := <-activeServers[rc.wConfig.Port].errChan: - return err - } - } + return gwcommon.ProcessRoute(&gwcommon.RouteConfig{ + Webhook: h, + Log: ese.Log, + EventSource: eventSource, + PostActivate: gwcommon.DefaultPostActivate, + PostStop: gwcommon.DefaultPostStop, + RouteActiveHandler: RouteActiveHandler, + StartCh: make(chan struct{}), + }, helper, eventStream) } diff --git a/gateways/core/webhook/validate.go b/gateways/core/webhook/validate.go index 22fe1844ae..e6eb43bdd4 100644 --- a/gateways/core/webhook/validate.go +++ b/gateways/core/webhook/validate.go @@ -24,6 +24,7 @@ import ( "strings" "github.com/argoproj/argo-events/gateways" + gwcommon "github.com/argoproj/argo-events/gateways/common" ) // ValidateEventSource validates webhook event source @@ -48,7 +49,7 @@ func (ese *WebhookEventSourceExecutor) ValidateEventSource(ctx context.Context, }, nil } -func validateWebhook(w *webhook) error { +func validateWebhook(w *gwcommon.Webhook) error { if w == nil { return fmt.Errorf("%+v, configuration must be non empty", gateways.ErrInvalidEventSource) }