From 70cca52c76443936607a55e596fbc6e83b82cc35 Mon Sep 17 00:00:00 2001 From: Makunouchi Date: Mon, 21 Oct 2019 17:58:16 +0200 Subject: [PATCH] Add rateLimit option to amqp gateway (#353) (#378) * Add rateLimit option to amqp gateway (#353) Signed-off-by: Damien Grisonnet * Add ratePeriod option to amqp gateway Signed-off-by: Damien Grisonnet * Fix period value Signed-off-by: Damien Grisonnet * Fix errors import to keep the codebase consistent Signed-off-by: Damien Grisonnet --- gateways/core/stream/amqp/config.go | 13 ++++++++++++ gateways/core/stream/amqp/start.go | 31 +++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/gateways/core/stream/amqp/config.go b/gateways/core/stream/amqp/config.go index 508a93f0df..32656459ab 100644 --- a/gateways/core/stream/amqp/config.go +++ b/gateways/core/stream/amqp/config.go @@ -19,6 +19,7 @@ package amqp import ( "github.com/argoproj/argo-events/common" "github.com/ghodss/yaml" + "github.com/pkg/errors" "github.com/sirupsen/logrus" amqplib "github.com/streadway/amqp" ) @@ -46,6 +47,10 @@ type amqp struct { // Connection manages the serialization and deserialization of frames from IO // and dispatches the frames to the appropriate channel. conn *amqplib.Connection + // Maximum number of events consumed from the queue per RatePeriod. + RateLimit uint32 `json:"rateLimit,omitempty"` + // Number of seconds between two consumptions. + RatePeriod uint32 `json:"ratePeriod,omitempty"` } func parseEventSource(eventSource string) (interface{}, error) { @@ -56,3 +61,11 @@ func parseEventSource(eventSource string) (interface{}, error) { } return a, nil } + +// Validate validates amqp +func (a *amqp) Validate() error { + if (a.RateLimit == 0) != (a.RatePeriod == 0) { + return errors.New("RateLimit and RatePeriod must be either set or omitted") + } + return nil +} diff --git a/gateways/core/stream/amqp/start.go b/gateways/core/stream/amqp/start.go index c351507d9b..ce383e84aa 100644 --- a/gateways/core/stream/amqp/start.go +++ b/gateways/core/stream/amqp/start.go @@ -18,6 +18,7 @@ package amqp import ( "fmt" + "time" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" amqplib "github.com/streadway/amqp" @@ -44,6 +45,30 @@ func (ese *AMQPEventSourceExecutor) StartEventSource(eventSource *gateways.Event return gateways.HandleEventsFromEventSource(eventSource.Name, eventStream, dataCh, errorCh, doneCh, ese.Log) } +func getLimitedDelivery(ch *amqplib.Channel, a *amqp, delivery chan amqplib.Delivery, queue string) { + period := time.Duration(a.RatePeriod) * time.Second + for { + startTime := time.Now() + + for i := uint32(0); i < a.RateLimit; i++ { + msg, ok, err := ch.Get(queue, true) + + if err != nil || ok == false { + break + } + delivery <- msg + + if time.Now().After(startTime.Add(period)) { + startTime = time.Now() + i = 0 + } + } + + remainingTime := startTime.Add(period).Sub(time.Now()) + time.Sleep(remainingTime) + } +} + func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error) { err := ch.ExchangeDeclare(a.ExchangeName, a.ExchangeType, true, false, false, false, nil) if err != nil { @@ -60,6 +85,12 @@ func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error) return nil, fmt.Errorf("failed to bind %s exchange '%s' to queue with routingKey: %s: %s", a.ExchangeType, a.ExchangeName, a.RoutingKey, err) } + if a.RateLimit != 0 { + delivery := make(chan amqplib.Delivery) + go getLimitedDelivery(ch, a, delivery, q.Name) + return delivery, nil + } + delivery, err := ch.Consume(q.Name, "", true, false, false, false, nil) if err != nil { return nil, fmt.Errorf("failed to begin consuming messages: %s", err)