diff --git a/gateways/core/stream/amqp/config.go b/gateways/core/stream/amqp/config.go index 70f1168531..4f556c6e26 100644 --- a/gateways/core/stream/amqp/config.go +++ b/gateways/core/stream/amqp/config.go @@ -19,6 +19,8 @@ package amqp import ( "github.com/ghodss/yaml" "github.com/rs/zerolog" + amqplib "github.com/streadway/amqp" + "k8s.io/apimachinery/pkg/util/wait" ) // AMQPEventSourceExecutor implements Eventing @@ -27,20 +29,21 @@ type AMQPEventSourceExecutor struct { } // amqp contains configuration required to connect to rabbitmq service and process messages -// +k8s:openapi-gen=true type amqp struct { // URL for rabbitmq service URL string `json:"url"` - // ExchangeName is the exchange name // For more information, visit https://www.rabbitmq.com/tutorials/amqp-concepts.html ExchangeName string `json:"exchangeName"` - // ExchangeType is rabbitmq exchange type ExchangeType string `json:"exchangeType"` - // Routing key for bindings RoutingKey string `json:"routingKey"` + // Backoff holds parameters applied to connection. + Backoff *wait.Backoff `json:"backoff,omitempty"` + // Connection manages the serialization and deserialization of frames from IO + // and dispatches the frames to the appropriate channel. + conn *amqplib.Connection } func parseEventSource(eventSource string) (*amqp, error) { diff --git a/gateways/core/stream/amqp/start.go b/gateways/core/stream/amqp/start.go index dd44fd9903..b45b578c2f 100644 --- a/gateways/core/stream/amqp/start.go +++ b/gateways/core/stream/amqp/start.go @@ -66,13 +66,19 @@ func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error) func (ese *AMQPEventSourceExecutor) listenEvents(a *amqp, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) { defer gateways.Recover(eventSource.Name) - conn, err := amqplib.Dial(a.URL) - if err != nil { + if err := gateways.Connect(a.Backoff, func() error { + var err error + a.conn, err = amqplib.Dial(a.URL) + if err != nil { + return err + } + return nil + }); err != nil { errorCh <- err return } - ch, err := conn.Channel() + ch, err := a.conn.Channel() if err != nil { errorCh <- err return @@ -90,7 +96,7 @@ func (ese *AMQPEventSourceExecutor) listenEvents(a *amqp, eventSource *gateways. case msg := <-delivery: dataCh <- msg.Body case <-doneCh: - err = conn.Close() + err = a.conn.Close() if err != nil { ese.Log.Error().Err(err).Str("event-stream-name", eventSource.Name).Msg("failed to close connection") } diff --git a/gateways/core/stream/kafka/config.go b/gateways/core/stream/kafka/config.go index e347a2cfdd..d937c6c3f7 100644 --- a/gateways/core/stream/kafka/config.go +++ b/gateways/core/stream/kafka/config.go @@ -17,8 +17,10 @@ limitations under the License. package kafka import ( + "github.com/Shopify/sarama" "github.com/ghodss/yaml" "github.com/rs/zerolog" + "k8s.io/apimachinery/pkg/util/wait" ) // KafkaEventSourceExecutor implements Eventing @@ -27,7 +29,6 @@ type KafkaEventSourceExecutor struct { } // kafka defines configuration required to connect to kafka cluster -// +k8s:openapi-gen=true type kafka struct { // URL to kafka cluster URL string `json:"url"` @@ -35,6 +36,10 @@ type kafka struct { Partition string `json:"partition"` // Topic name Topic string `json:"topic"` + // Backoff holds parameters applied to connection. + Backoff *wait.Backoff `json:"backoff,omitempty"` + // Consumer manages PartitionConsumers which process Kafka messages from brokers. + consumer sarama.Consumer } func parseEventSource(eventSource string) (*kafka, error) { diff --git a/gateways/core/stream/kafka/start.go b/gateways/core/stream/kafka/start.go index 8e9b444cb9..fdfadaa8de 100644 --- a/gateways/core/stream/kafka/start.go +++ b/gateways/core/stream/kafka/start.go @@ -53,8 +53,15 @@ func (ese *KafkaEventSourceExecutor) StartEventSource(eventSource *gateways.Even func (ese *KafkaEventSourceExecutor) listenEvents(k *kafka, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) { defer gateways.Recover(eventSource.Name) - consumer, err := sarama.NewConsumer([]string{k.URL}, nil) - if err != nil { + if err := gateways.Connect(k.Backoff, func() error { + var err error + k.consumer, err = sarama.NewConsumer([]string{k.URL}, nil) + if err != nil { + return err + } + return nil + }); err != nil { + ese.Log.Error().Err(err).Str("event-source-name", eventSource.Name).Str("url", k.URL).Msg("failed to connect") errorCh <- err return } @@ -66,7 +73,7 @@ func (ese *KafkaEventSourceExecutor) listenEvents(k *kafka, eventSource *gateway } partition := int32(pInt) - availablePartitions, err := consumer.Partitions(k.Topic) + availablePartitions, err := k.consumer.Partitions(k.Topic) if err != nil { errorCh <- err return @@ -76,7 +83,7 @@ func (ese *KafkaEventSourceExecutor) listenEvents(k *kafka, eventSource *gateway return } - partitionConsumer, err := consumer.ConsumePartition(k.Topic, partition, sarama.OffsetNewest) + partitionConsumer, err := k.consumer.ConsumePartition(k.Topic, partition, sarama.OffsetNewest) if err != nil { errorCh <- err return diff --git a/gateways/core/stream/mqtt/config.go b/gateways/core/stream/mqtt/config.go index 4afd62da82..c8c93fa562 100644 --- a/gateways/core/stream/mqtt/config.go +++ b/gateways/core/stream/mqtt/config.go @@ -17,8 +17,10 @@ limitations under the License. package mqtt import ( + mqttlib "github.com/eclipse/paho.mqtt.golang" "github.com/ghodss/yaml" "github.com/rs/zerolog" + "k8s.io/apimachinery/pkg/util/wait" ) // MqttEventSourceExecutor implements Eventing @@ -27,7 +29,6 @@ type MqttEventSourceExecutor struct { } // mqtt contains information to connect to MQTT broker -// +k8s:openapi-gen=true type mqtt struct { // URL to connect to broker URL string `json:"url"` @@ -35,6 +36,10 @@ type mqtt struct { Topic string `json:"topic"` // Client ID ClientId string `json:"clientId"` + // Backoff holds parameters applied to connection. + Backoff *wait.Backoff `json:"backoff,omitempty"` + // It is an MQTT client for communicating with an MQTT server + client mqttlib.Client } func parseEventSource(eventSource string) (*mqtt, error) { diff --git a/gateways/core/stream/mqtt/start.go b/gateways/core/stream/mqtt/start.go index 81d341c2cb..3ecfb00947 100644 --- a/gateways/core/stream/mqtt/start.go +++ b/gateways/core/stream/mqtt/start.go @@ -18,7 +18,7 @@ package mqtt import ( "github.com/argoproj/argo-events/gateways" - MQTTlib "github.com/eclipse/paho.mqtt.golang" + mqttlib "github.com/eclipse/paho.mqtt.golang" ) // StartEventSource starts an event source @@ -41,24 +41,32 @@ func (ese *MqttEventSourceExecutor) StartEventSource(eventSource *gateways.Event func (ese *MqttEventSourceExecutor) listenEvents(m *mqtt, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) { defer gateways.Recover(eventSource.Name) - handler := func(c MQTTlib.Client, msg MQTTlib.Message) { + handler := func(c mqttlib.Client, msg mqttlib.Message) { dataCh <- msg.Payload() } - opts := MQTTlib.NewClientOptions().AddBroker(m.URL).SetClientID(m.ClientId) - client := MQTTlib.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - errorCh <- token.Error() + opts := mqttlib.NewClientOptions().AddBroker(m.URL).SetClientID(m.ClientId) + + if err := gateways.Connect(m.Backoff, func() error { + client := mqttlib.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + return nil + }); err != nil { + ese.Log.Error().Err(err).Str("url", m.URL).Str("client-id", m.ClientId).Msg("failed to connect") + errorCh <- err return } - ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("starting to subscribe to topic") - if token := client.Subscribe(m.Topic, 0, handler); token.Wait() && token.Error() != nil { + ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("subscribing to topic") + if token := m.client.Subscribe(m.Topic, 0, handler); token.Wait() && token.Error() != nil { + ese.Log.Error().Err(token.Error()).Str("url", m.URL).Str("client-id", m.ClientId).Msg("failed to subscribe") errorCh <- token.Error() return } <-doneCh - token := client.Unsubscribe(m.Topic) + token := m.client.Unsubscribe(m.Topic) if token.Error() != nil { ese.Log.Error().Err(token.Error()).Str("event-source-name", eventSource.Name).Msg("failed to unsubscribe client") } diff --git a/gateways/core/stream/nats/config.go b/gateways/core/stream/nats/config.go index df87805f4c..b439302293 100644 --- a/gateways/core/stream/nats/config.go +++ b/gateways/core/stream/nats/config.go @@ -18,7 +18,9 @@ package nats import ( "github.com/ghodss/yaml" + natslib "github.com/nats-io/go-nats" "github.com/rs/zerolog" + "k8s.io/apimachinery/pkg/util/wait" ) // NatsEventSourceExecutor implements Eventing @@ -27,13 +29,15 @@ type NatsEventSourceExecutor struct { } // Nats contains configuration to connect to NATS cluster -// +k8s:openapi-gen=true type natsConfig struct { // URL to connect to natsConfig cluster URL string `json:"url"` - // Subject name Subject string `json:"subject"` + // Backoff holds parameters applied to connection. + Backoff *wait.Backoff `json:"backoff,omitempty"` + // conn represents a bare connection to a nats-server. + conn *natslib.Conn } func parseEventSource(es string) (*natsConfig, error) { diff --git a/gateways/core/stream/nats/start.go b/gateways/core/stream/nats/start.go index 00f66d3a25..5c8f95f575 100644 --- a/gateways/core/stream/nats/start.go +++ b/gateways/core/stream/nats/start.go @@ -18,7 +18,7 @@ package nats import ( "github.com/argoproj/argo-events/gateways" - "github.com/nats-io/go-nats" + natslib "github.com/nats-io/go-nats" ) // StartEventSource starts an event source @@ -41,23 +41,29 @@ func (ese *NatsEventSourceExecutor) StartEventSource(eventSource *gateways.Event func (ese *NatsEventSourceExecutor) listenEvents(n *natsConfig, eventSource *gateways.EventSource, dataCh chan []byte, errorCh chan error, doneCh chan struct{}) { defer gateways.Recover(eventSource.Name) - nc, err := nats.Connect(n.URL) - if err != nil { - ese.Log.Error().Str("url", n.URL).Err(err).Msg("connection failed") + if err := gateways.Connect(n.Backoff, func() error { + var err error + if n.conn, err = natslib.Connect(n.URL); err != nil { + return err + } + return nil + }); err != nil { + ese.Log.Error().Str("event-source-name", eventSource.Name).Str("url", n.URL).Err(err).Msg("connection failed") errorCh <- err return } - ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("starting to subscribe to messages") - _, err = nc.Subscribe(n.Subject, func(msg *nats.Msg) { + ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("subscribing to messages") + _, err := n.conn.Subscribe(n.Subject, func(msg *natslib.Msg) { dataCh <- msg.Data }) if err != nil { + ese.Log.Error().Str("event-source-name", eventSource.Name).Str("url", n.URL).Str("subject", n.Subject).Err(err).Msg("failed to subscribe") errorCh <- err return } - nc.Flush() - if err := nc.LastError(); err != nil { + n.conn.Flush() + if err := n.conn.LastError(); err != nil { errorCh <- err return } diff --git a/gateways/utils.go b/gateways/utils.go index e652be5776..f4a663eff7 100644 --- a/gateways/utils.go +++ b/gateways/utils.go @@ -19,6 +19,8 @@ package gateways import ( "fmt" "hash/fnv" + + "k8s.io/apimachinery/pkg/util/wait" ) // Hasher hashes a string @@ -33,3 +35,24 @@ func SetValidEventSource(v *ValidEventSource, reason string, valid bool) { v.Reason = reason v.IsValid = valid } + +// InitBackoff initializes backoff +func InitBackoff(backoff *wait.Backoff) { + if backoff == nil { + backoff = &wait.Backoff{ + Steps: 1, + } + } +} + +// General connection helper +func Connect(backoff *wait.Backoff, conn func() error) error { + InitBackoff(backoff) + err := wait.ExponentialBackoff(*backoff, func() (bool, error) { + if err := conn(); err != nil { + return false, nil + } + return true, nil + }) + return err +}