Skip to content

Commit

Permalink
fix: redis event source to be able to parameterize (#1754)
Browse files Browse the repository at this point in the history
* fix: redis events to be able to use json

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Mar 28, 2022
1 parent 16a449f commit 3406270
Show file tree
Hide file tree
Showing 42 changed files with 600 additions and 453 deletions.
13 changes: 13 additions & 0 deletions api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,10 @@
"description": "HostAddress refers to the address of the Redis host/server",
"type": "string"
},
"jsonBody": {
"description": "JSONBody specifies that all event body payload coming from this source will be JSON",
"type": "boolean"
},
"metadata": {
"additionalProperties": {
"type": "string"
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions eventsources/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package common

import "github.com/cloudevents/sdk-go/v2/event"

type Options func(*event.Event) error
type Option func(*event.Event) error

// Option to set different ID for event
func WithID(id string) Options {
func WithID(id string) Option {
return func(e *event.Event) error {
e.SetID(id)
return nil
Expand Down
4 changes: 2 additions & 2 deletions eventsources/common/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func activateRoute(router Router, controller *Controller) {
}

// manageRouteChannels consumes data from route's data channel and stops the processing when the event source is stopped/removed
func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Options) error) {
func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Option) error) {
route := router.GetRoute()
logger := route.Logger
for {
Expand All @@ -199,7 +199,7 @@ func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecomm
}

// ManagerRoute manages the lifecycle of a route
func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
route := router.GetRoute()

logger := route.Logger
Expand Down
4 changes: 2 additions & 2 deletions eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type EventingServer interface {
GetEventSourceType() apicommon.EventSourceType

// Function to start listening events.
StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error
StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error
}

// GetEventingServers returns the mapping of event source type and list of eventing servers
Expand Down Expand Up @@ -477,7 +477,7 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even
Jitter: &jitter,
}
if err = common.Connect(&backoff, func() error {
return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Options) error {
return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Option) error {
if filter, ok := filters[s.GetEventName()]; ok {
proceed, err := filterEvent(data, filter)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())

Expand Down Expand Up @@ -156,7 +156,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/awssns/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts an SNS event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
logger := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())

Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/awssqs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the AWS SQS event source...")
Expand Down Expand Up @@ -129,7 +129,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) processMessage(ctx context.Context, message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Options) error, ack func(), log *zap.SugaredLogger) {
func (el *EventListener) processMessage(ctx context.Context, message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Option) error, ack func(), log *zap.SugaredLogger) {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/azureeventshub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the Azure Events Hub event source...")
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/bitbucket/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (router *Router) PostInactivate() error {
}

// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
defer sources.Recover(el.GetEventName())

bitbucketEventSource := &el.BitbucketEventSource
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/bitbucketserver/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (router *Router) PostInactivate() error {
}

// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
defer sources.Recover(el.GetEventName())

bitbucketserverEventSource := &el.BitbucketServerEventSource
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/calendar/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (el *EventListener) getExecutionTime() (time.Time, error) {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
el.log = logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
el.log.Info("started processing the calendar event source...")
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/emitter/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the Emitter event source...")
Expand Down
6 changes: 3 additions & 3 deletions eventsources/sources/file/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
defer sources.Recover(el.GetEventName())
Expand All @@ -82,7 +82,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

// listenEvents listen to file related events.
func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down Expand Up @@ -162,7 +162,7 @@ func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte,
}

// listenEvents listen to file related events using polling.
func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/gcppubsub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening listens to GCP PubSub events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
// In order to listen events from GCP PubSub,
// 1. Parse the event source that contains configuration to connect to GCP PubSub
// 2. Create a new PubSub client
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/generic/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening listens to generic events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
logger := logging.FromContext(ctx).
With(zap.String(logging.LabelEventSourceType, string(el.GetEventSourceType())),
zap.String(logging.LabelEventName, el.GetEventName()),
Expand Down Expand Up @@ -95,7 +95,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) handleOne(event *Event, dispatch func([]byte, ...eventsourcecommon.Options) error, logger *zap.SugaredLogger) error {
func (el *EventListener) handleOne(event *Event, dispatch func([]byte, ...eventsourcecommon.Option) error, logger *zap.SugaredLogger) error {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/github/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (router *Router) PostInactivate() error {
}

// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
logger := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
logger.Info("started processing the Github event source...")
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/gitlab/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (router *Router) PostInactivate() error {
}

// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
logger := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
logger.Info("started processing the Gitlab event source...")
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/hdfs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (w *WatchableHDFS) GetFileID(fi os.FileInfo) interface{} {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the Emitter event source...")
Expand Down Expand Up @@ -157,7 +157,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down
8 changes: 4 additions & 4 deletions eventsources/sources/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func verifyPartitionAvailable(part int32, partitions []int32) bool {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
defer sources.Recover(el.GetEventName())
Expand All @@ -87,7 +87,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
config, err := getSaramaConfig(kafkaEventSource, log)
if err != nil {
return err
Expand Down Expand Up @@ -157,7 +157,7 @@ func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.Sug
return nil
}

func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
defer sources.Recover(el.GetEventName())

log.Info("start kafka event source...")
Expand Down Expand Up @@ -320,7 +320,7 @@ func getSaramaConfig(kafkaEventSource *v1alpha1.KafkaEventSource, log *zap.Sugar
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
dispatch func([]byte, ...eventsourcecommon.Options) error
dispatch func([]byte, ...eventsourcecommon.Option) error
logger *zap.SugaredLogger
kafkaEventSource *v1alpha1.KafkaEventSource
eventSourceName string
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/minio/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName(),
zap.String("bucketName", el.MinioEventSource.Bucket.Name))
Expand Down Expand Up @@ -106,7 +106,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
return nil
}

func (el *EventListener) handleOne(notification notification.Info, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error {
func (el *EventListener) handleOne(notification notification.Info, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/mqtt/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
defer sources.Recover(el.GetEventName())
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/nats/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
defer sources.Recover(el.GetEventName())
Expand Down
Loading

0 comments on commit 3406270

Please sign in to comment.