Skip to content

Commit

Permalink
AWS SNS Gateway (#169)
Browse files Browse the repository at this point in the history
* Added support for backoff option when making connections in stream gateways

* Extracting common webhook functionality

* Modified storage grid and webhook implementation to take advantage of common webhook functionality

* Added sns gateway implementation

* Enhancing sns gateway

* Update deps
  • Loading branch information
VaibhavPage authored and magaldima committed Feb 17, 2019
1 parent fce79f1 commit c98e35c
Show file tree
Hide file tree
Showing 14 changed files with 791 additions and 345 deletions.
4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ required = [
name = "github.com/nats-io/go-nats-streaming"
branch = "master"

[[constraint]]
name = "github.com/aws/aws-sdk-go"
branch = "master"

[[constraint]]
name = "github.com/smartystreets/goconvey"
version = "1.6.3"
Expand Down
18 changes: 18 additions & 0 deletions gateways/common/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Copyright 2018 BlackRock, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// This package contains structs and methods that are shared across different gateways.
package common
226 changes: 226 additions & 0 deletions gateways/common/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
Copyright 2018 BlackRock, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

import (
"fmt"
"net/http"
"sync"

"github.com/argoproj/argo-events/gateways"
"github.com/rs/zerolog"
)

// Webhook is a general purpose REST API
type Webhook struct {
// REST API endpoint
Endpoint string `json:"endpoint" protobuf:"bytes,1,opt,name=endpoint"`
// Method is HTTP request method that indicates the desired action to be performed for a given resource.
// See RFC7231 Hypertext Transfer Protocol (HTTP/1.1): Semantics and Content
Method string `json:"method" protobuf:"bytes,2,opt,name=method"`
// Port on which HTTP server is listening for incoming events.
Port string `json:"port" protobuf:"bytes,3,opt,name=port"`
// srv holds reference to http server
srv *http.Server `json:"srv,omitempty"`
mux *http.ServeMux `json:"mux,omitempty"`
}

// WebhookHelper is a helper struct
type WebhookHelper struct {
// Mutex synchronizes ActiveServers
Mutex sync.Mutex
// ActiveServers keeps track of currently running http servers.
ActiveServers map[string]*activeServer
// ActiveEndpoints keep track of endpoints that are already registered with server and their status active or inactive
ActiveEndpoints map[string]*Endpoint
// RouteActivateChan handles assigning new route to server.
RouteActivateChan chan *RouteConfig
// RouteDeactivateChan handles deactivating existing route
RouteDeactivateChan chan *RouteConfig
}

// HTTP Muxer
type server struct {
mux *http.ServeMux
}

// activeServer contains reference to server and an error channel that is shared across all functions registering endpoints for the server.
type activeServer struct {
srv *http.ServeMux
errChan chan error
}

// RouteConfig contains configuration about a http route
type RouteConfig struct {
Webhook *Webhook
Configs map[string]interface{}
EventSource *gateways.EventSource
Log zerolog.Logger
StartCh chan struct{}
RouteActiveHandler func(writer http.ResponseWriter, request *http.Request, rc *RouteConfig)
PostActivate func(rc *RouteConfig) error
PostStop func(rc *RouteConfig) error
}

// endpoint contains state of an http endpoint
type Endpoint struct {
// whether endpoint is active
Active bool
// data channel to receive data on this endpoint
DataCh chan []byte
}

// NewWebhookHelper returns new webhook helper
func NewWebhookHelper() *WebhookHelper {
return &WebhookHelper{
ActiveEndpoints: make(map[string]*Endpoint),
ActiveServers: make(map[string]*activeServer),
Mutex: sync.Mutex{},
RouteActivateChan: make(chan *RouteConfig),
RouteDeactivateChan: make(chan *RouteConfig),
}
}

// InitRouteChannels initializes route channels so they can activate and deactivate routes.
func InitRouteChannels(helper *WebhookHelper) {
for {
select {
case config := <-helper.RouteActivateChan:
// start server if it has not been started on this port
config.startHttpServer(helper)
config.StartCh <- struct{}{}

case config := <-helper.RouteDeactivateChan:
webhook := config.Webhook
_, ok := helper.ActiveServers[webhook.Port]
if ok {
helper.ActiveEndpoints[webhook.Endpoint].Active = false
}
}
}
}

// ServeHTTP implementation
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mux.ServeHTTP(w, r)
}

// starts a http server
func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) {
// start a http server only if no other configuration previously started the server on given port
helper.Mutex.Lock()
if _, ok := helper.ActiveServers[rc.Webhook.Port]; !ok {
s := &server{
mux: http.NewServeMux(),
}
rc.Webhook.mux = s.mux
rc.Webhook.srv = &http.Server{
Addr: ":" + fmt.Sprintf("%s", rc.Webhook.Port),
Handler: s,
}
errChan := make(chan error, 1)
helper.ActiveServers[rc.Webhook.Port] = &activeServer{
srv: s.mux,
errChan: errChan,
}

// start http server
go func() {
err := rc.Webhook.srv.ListenAndServe()
rc.Log.Info().Str("event-source", rc.EventSource.Name).Str("port", rc.Webhook.Port).Msg("http server stopped")
if err != nil {
errChan <- err
}
}()
}
helper.Mutex.Unlock()
}

// activateRoute activates route
func (rc *RouteConfig) activateRoute(helper *WebhookHelper) {
helper.RouteActivateChan <- rc

<-rc.StartCh

if rc.Webhook.mux == nil {
helper.Mutex.Lock()
rc.Webhook.mux = helper.ActiveServers[rc.Webhook.Port].srv
helper.Mutex.Unlock()
}

rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("adding route handler")
if _, ok := helper.ActiveEndpoints[rc.Webhook.Endpoint]; !ok {
helper.ActiveEndpoints[rc.Webhook.Endpoint] = &Endpoint{
Active: true,
DataCh: make(chan []byte),
}
rc.Webhook.mux.HandleFunc(rc.Webhook.Endpoint, func(writer http.ResponseWriter, request *http.Request) {
rc.RouteActiveHandler(writer, request, rc)
})
}
helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true

rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("route handler added")
}

func (rc *RouteConfig) processChannels(helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
for {
select {
case data := <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh:
rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("new event received, dispatching to gateway client")
err := eventStream.Send(&gateways.Event{
Name: rc.EventSource.Name,
Payload: data,
})
if err != nil {
rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to send event")
return err
}

case <-eventStream.Context().Done():
rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("connection is closed by client")
helper.RouteDeactivateChan <- rc
return nil

// this error indicates that the server has stopped running
case err := <-helper.ActiveServers[rc.Webhook.Port].errChan:
return err
}
}
}

func DefaultPostActivate(rc *RouteConfig) error {
return nil
}

func DefaultPostStop(rc *RouteConfig) error {
return nil
}

func ProcessRoute(rc *RouteConfig, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
rc.activateRoute(helper)
if err := rc.PostActivate(rc); err != nil {
return err
}
if err := rc.processChannels(helper, eventStream); err != nil {
return err
}
if err := rc.PostStop(rc); err != nil {
rc.Log.Error().Err(err).Msg("error occurred while executing post stop logic")
}
return nil
}
3 changes: 3 additions & 0 deletions gateways/community/aws-sns/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM scratch
COPY dist/sns-gateway /bin/
ENTRYPOINT [ "/bin/sns-gateway" ]
44 changes: 44 additions & 0 deletions gateways/community/aws-sns/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright 2018 BlackRock, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"os"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/gateways"
"github.com/argoproj/argo-events/gateways/community/aws-sns"
"k8s.io/client-go/kubernetes"
)

func main() {
kubeConfig, _ := os.LookupEnv(common.EnvVarKubeConfig)
restConfig, err := common.GetClientConfig(kubeConfig)
if err != nil {
panic(err)
}
clientset := kubernetes.NewForConfigOrDie(restConfig)
namespace, ok := os.LookupEnv(common.EnvVarGatewayNamespace)
if !ok {
panic("namespace is not provided")
}
gateways.StartGateway(&aws_sns.SNSEventSourceExecutor{
Log: common.GetLoggerContext(common.LoggerConf()).Logger(),
Clientset: clientset,
Namespace: namespace,
})
}
86 changes: 86 additions & 0 deletions gateways/community/aws-sns/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2018 BlackRock, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package aws_sns

import (
"k8s.io/client-go/kubernetes"
"time"

"github.com/ghodss/yaml"
"github.com/rs/zerolog"
corev1 "k8s.io/api/core/v1"
)

const (
MESSAGE_TYPE_SUBSCRIPTION_CONFIRMATION = "SubscriptionConfirmation"
MESSAGE_TYPE_UNSUBSCRIBE_CONFIRMATION = "UnsubscribeConfirmation"
MESSAGE_TYPE_NOTIFICATION = "Notification"
)

var (
snsProtocol = "http"
)

// SNSEventSourceExecutor implements Eventing
type SNSEventSourceExecutor struct {
Log zerolog.Logger
// Clientset is kubernetes client
Clientset kubernetes.Interface
// Namespace where gateway is deployed
Namespace string
}

// Json http notifications
// SNS posts those to your http url endpoint if http is selected as delivery method.
// http://docs.aws.amazon.com/sns/latest/dg/json-formats.html#http-subscription-confirmation-json
// http://docs.aws.amazon.com/sns/latest/dg/json-formats.html#http-notification-json
// http://docs.aws.amazon.com/sns/latest/dg/json-formats.html#http-unsubscribe-confirmation-json
type httpNotification struct {
Type string `json:"Type"`
MessageId string `json:"MessageId"`
Token string `json:"Token,omitempty"` // Only for subscribe and unsubscribe
TopicArn string `json:"TopicArn"`
Subject string `json:"Subject,omitempty"` // Only for Notification
Message string `json:"Message"`
SubscribeURL string `json:"SubscribeURL,omitempty"` // Only for subscribe and unsubscribe
Timestamp time.Time `json:"Timestamp"`
SignatureVersion string `json:"SignatureVersion"`
Signature string `json:"Signature"`
SigningCertURL string `json:"SigningCertURL"`
UnsubscribeURL string `json:"UnsubscribeURL,omitempty"` // Only for notifications
}

// snsConfig contains configuration to subscribe to SNS topic
type snsConfig struct {
TopicArn string `json:"topicArn"`
// Endpoint you wish to register
Endpoint string `json:"endpoint"`
// Port on which http server is running.
Port string `json:"port"`
AccessKey *corev1.SecretKeySelector `json:"accessKey" protobuf:"bytes,5,opt,name=accessKey"`
SecretKey *corev1.SecretKeySelector `json:"secretKey" protobuf:"bytes,6,opt,name=secretKey"`
Region string `json:"region"`
}

func parseEventSource(es string) (*snsConfig, error) {
var n *snsConfig
err := yaml.Unmarshal([]byte(es), &n)
if err != nil {
return nil, err
}
return n, nil
}
Loading

0 comments on commit c98e35c

Please sign in to comment.