Skip to content

Commit

Permalink
Added support for backoff option when making connections in stream ga…
Browse files Browse the repository at this point in the history
…teways (#168)
  • Loading branch information
VaibhavPage authored Feb 16, 2019
1 parent b09c405 commit fce79f1
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 33 deletions.
11 changes: 7 additions & 4 deletions gateways/core/stream/amqp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package amqp
import (
"github.com/ghodss/yaml"
"github.com/rs/zerolog"
amqplib "github.com/streadway/amqp"
"k8s.io/apimachinery/pkg/util/wait"
)

// AMQPEventSourceExecutor implements Eventing
Expand All @@ -27,20 +29,21 @@ type AMQPEventSourceExecutor struct {
}

// amqp contains configuration required to connect to rabbitmq service and process messages
// +k8s:openapi-gen=true
type amqp struct {
// URL for rabbitmq service
URL string `json:"url"`

// ExchangeName is the exchange name
// For more information, visit https://www.rabbitmq.com/tutorials/amqp-concepts.html
ExchangeName string `json:"exchangeName"`

// ExchangeType is rabbitmq exchange type
ExchangeType string `json:"exchangeType"`

// Routing key for bindings
RoutingKey string `json:"routingKey"`
// Backoff holds parameters applied to connection.
Backoff *wait.Backoff `json:"backoff,omitempty"`
// Connection manages the serialization and deserialization of frames from IO
// and dispatches the frames to the appropriate channel.
conn *amqplib.Connection
}

func parseEventSource(eventSource string) (*amqp, error) {
Expand Down
14 changes: 10 additions & 4 deletions gateways/core/stream/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,19 @@ func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error)
func (ese *AMQPEventSourceExecutor) listenEvents(a *amqp, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) {
defer gateways.Recover(eventSource.Name)

conn, err := amqplib.Dial(a.URL)
if err != nil {
if err := gateways.Connect(a.Backoff, func() error {
var err error
a.conn, err = amqplib.Dial(a.URL)
if err != nil {
return err
}
return nil
}); err != nil {
errorCh <- err
return
}

ch, err := conn.Channel()
ch, err := a.conn.Channel()
if err != nil {
errorCh <- err
return
Expand All @@ -90,7 +96,7 @@ func (ese *AMQPEventSourceExecutor) listenEvents(a *amqp, eventSource *gateways.
case msg := <-delivery:
dataCh <- msg.Body
case <-doneCh:
err = conn.Close()
err = a.conn.Close()
if err != nil {
ese.Log.Error().Err(err).Str("event-stream-name", eventSource.Name).Msg("failed to close connection")
}
Expand Down
7 changes: 6 additions & 1 deletion gateways/core/stream/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package kafka

import (
"github.com/Shopify/sarama"
"github.com/ghodss/yaml"
"github.com/rs/zerolog"
"k8s.io/apimachinery/pkg/util/wait"
)

// KafkaEventSourceExecutor implements Eventing
Expand All @@ -27,14 +29,17 @@ type KafkaEventSourceExecutor struct {
}

// kafka defines configuration required to connect to kafka cluster
// +k8s:openapi-gen=true
type kafka struct {
// URL to kafka cluster
URL string `json:"url"`
// Partition name
Partition string `json:"partition"`
// Topic name
Topic string `json:"topic"`
// Backoff holds parameters applied to connection.
Backoff *wait.Backoff `json:"backoff,omitempty"`
// Consumer manages PartitionConsumers which process Kafka messages from brokers.
consumer sarama.Consumer
}

func parseEventSource(eventSource string) (*kafka, error) {
Expand Down
15 changes: 11 additions & 4 deletions gateways/core/stream/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,15 @@ func (ese *KafkaEventSourceExecutor) StartEventSource(eventSource *gateways.Even
func (ese *KafkaEventSourceExecutor) listenEvents(k *kafka, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) {
defer gateways.Recover(eventSource.Name)

consumer, err := sarama.NewConsumer([]string{k.URL}, nil)
if err != nil {
if err := gateways.Connect(k.Backoff, func() error {
var err error
k.consumer, err = sarama.NewConsumer([]string{k.URL}, nil)
if err != nil {
return err
}
return nil
}); err != nil {
ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Str("url", k.URL).Msg("failed to connect")
errorCh <- err
return
}
Expand All @@ -66,7 +73,7 @@ func (ese *KafkaEventSourceExecutor) listenEvents(k *kafka, eventSource *gateway
}
partition := int32(pInt)

availablePartitions, err := consumer.Partitions(k.Topic)
availablePartitions, err := k.consumer.Partitions(k.Topic)
if err != nil {
errorCh <- err
return
Expand All @@ -76,7 +83,7 @@ func (ese *KafkaEventSourceExecutor) listenEvents(k *kafka, eventSource *gateway
return
}

partitionConsumer, err := consumer.ConsumePartition(k.Topic, partition, sarama.OffsetNewest)
partitionConsumer, err := k.consumer.ConsumePartition(k.Topic, partition, sarama.OffsetNewest)
if err != nil {
errorCh <- err
return
Expand Down
7 changes: 6 additions & 1 deletion gateways/core/stream/mqtt/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package mqtt

import (
mqttlib "github.com/eclipse/paho.mqtt.golang"
"github.com/ghodss/yaml"
"github.com/rs/zerolog"
"k8s.io/apimachinery/pkg/util/wait"
)

// MqttEventSourceExecutor implements Eventing
Expand All @@ -27,14 +29,17 @@ type MqttEventSourceExecutor struct {
}

// mqtt contains information to connect to MQTT broker
// +k8s:openapi-gen=true
type mqtt struct {
// URL to connect to broker
URL string `json:"url"`
// Topic name
Topic string `json:"topic"`
// Client ID
ClientId string `json:"clientId"`
// Backoff holds parameters applied to connection.
Backoff *wait.Backoff `json:"backoff,omitempty"`
// It is an MQTT client for communicating with an MQTT server
client mqttlib.Client
}

func parseEventSource(eventSource string) (*mqtt, error) {
Expand Down
26 changes: 17 additions & 9 deletions gateways/core/stream/mqtt/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package mqtt

import (
"github.com/argoproj/argo-events/gateways"
MQTTlib "github.com/eclipse/paho.mqtt.golang"
mqttlib "github.com/eclipse/paho.mqtt.golang"
)

// StartEventSource starts an event source
Expand All @@ -41,24 +41,32 @@ func (ese *MqttEventSourceExecutor) StartEventSource(eventSource *gateways.Event
func (ese *MqttEventSourceExecutor) listenEvents(m *mqtt, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) {
defer gateways.Recover(eventSource.Name)

handler := func(c MQTTlib.Client, msg MQTTlib.Message) {
handler := func(c mqttlib.Client, msg mqttlib.Message) {
dataCh <- msg.Payload()
}
opts := MQTTlib.NewClientOptions().AddBroker(m.URL).SetClientID(m.ClientId)
client := MQTTlib.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
errorCh <- token.Error()
opts := mqttlib.NewClientOptions().AddBroker(m.URL).SetClientID(m.ClientId)

if err := gateways.Connect(m.Backoff, func() error {
client := mqttlib.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}); err != nil {
ese.Log.Error().Err(err).Str("url", m.URL).Str("client-id", m.ClientId).Msg("failed to connect")
errorCh <- err
return
}

ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("starting to subscribe to topic")
if token := client.Subscribe(m.Topic, 0, handler); token.Wait() && token.Error() != nil {
ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("subscribing to topic")
if token := m.client.Subscribe(m.Topic, 0, handler); token.Wait() && token.Error() != nil {
ese.Log.Error().Err(token.Error()).Str("url", m.URL).Str("client-id", m.ClientId).Msg("failed to subscribe")
errorCh <- token.Error()
return
}

<-doneCh
token := client.Unsubscribe(m.Topic)
token := m.client.Unsubscribe(m.Topic)
if token.Error() != nil {
ese.Log.Error().Err(token.Error()).Str("event-source-name", eventSource.Name).Msg("failed to unsubscribe client")
}
Expand Down
8 changes: 6 additions & 2 deletions gateways/core/stream/nats/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package nats

import (
"github.com/ghodss/yaml"
natslib "github.com/nats-io/go-nats"
"github.com/rs/zerolog"
"k8s.io/apimachinery/pkg/util/wait"
)

// NatsEventSourceExecutor implements Eventing
Expand All @@ -27,13 +29,15 @@ type NatsEventSourceExecutor struct {
}

// Nats contains configuration to connect to NATS cluster
// +k8s:openapi-gen=true
type natsConfig struct {
// URL to connect to natsConfig cluster
URL string `json:"url"`

// Subject name
Subject string `json:"subject"`
// Backoff holds parameters applied to connection.
Backoff *wait.Backoff `json:"backoff,omitempty"`
// conn represents a bare connection to a nats-server.
conn *natslib.Conn
}

func parseEventSource(es string) (*natsConfig, error) {
Expand Down
22 changes: 14 additions & 8 deletions gateways/core/stream/nats/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package nats

import (
"github.com/argoproj/argo-events/gateways"
"github.com/nats-io/go-nats"
natslib "github.com/nats-io/go-nats"
)

// StartEventSource starts an event source
Expand All @@ -41,23 +41,29 @@ func (ese *NatsEventSourceExecutor) StartEventSource(eventSource *gateways.Event
func (ese *NatsEventSourceExecutor) listenEvents(n *natsConfig, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) {
defer gateways.Recover(eventSource.Name)

nc, err := nats.Connect(n.URL)
if err != nil {
ese.Log.Error().Str("url", n.URL).Err(err).Msg("connection failed")
if err := gateways.Connect(n.Backoff, func() error {
var err error
if n.conn, err = natslib.Connect(n.URL); err != nil {
return err
}
return nil
}); err != nil {
ese.Log.Error().Str("event-source-name", eventSource.Name).Str("url", n.URL).Err(err).Msg("connection failed")
errorCh <- err
return
}

ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("starting to subscribe to messages")
_, err = nc.Subscribe(n.Subject, func(msg *nats.Msg) {
ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("subscribing to messages")
_, err := n.conn.Subscribe(n.Subject, func(msg *natslib.Msg) {
dataCh <- msg.Data
})
if err != nil {
ese.Log.Error().Str("event-source-name", eventSource.Name).Str("url", n.URL).Str("subject", n.Subject).Err(err).Msg("failed to subscribe")
errorCh <- err
return
}
nc.Flush()
if err := nc.LastError(); err != nil {
n.conn.Flush()
if err := n.conn.LastError(); err != nil {
errorCh <- err
return
}
Expand Down
23 changes: 23 additions & 0 deletions gateways/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package gateways
import (
"fmt"
"hash/fnv"

"k8s.io/apimachinery/pkg/util/wait"
)

// Hasher hashes a string
Expand All @@ -33,3 +35,24 @@ func SetValidEventSource(v *ValidEventSource, reason string, valid bool) {
v.Reason = reason
v.IsValid = valid
}

// InitBackoff initializes backoff
func InitBackoff(backoff *wait.Backoff) {
if backoff == nil {
backoff = &wait.Backoff{
Steps: 1,
}
}
}

// General connection helper
func Connect(backoff *wait.Backoff, conn func() error) error {
InitBackoff(backoff)
err := wait.ExponentialBackoff(*backoff, func() (bool, error) {
if err := conn(); err != nil {
return false, nil
}
return true, nil
})
return err
}

0 comments on commit fce79f1

Please sign in to comment.