From c46ca679341754520d926eb7552b860472da81c6 Mon Sep 17 00:00:00 2001 From: Damien Grisonnet Date: Wed, 16 Oct 2019 00:24:38 +0200 Subject: [PATCH] Add ratePeriod option to amqp gateway Signed-off-by: Damien Grisonnet --- gateways/core/stream/amqp/config.go | 13 ++++++++++++- gateways/core/stream/amqp/config_test.go | 1 - gateways/core/stream/amqp/start.go | 13 ++++++------- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/gateways/core/stream/amqp/config.go b/gateways/core/stream/amqp/config.go index 6bdfc6d264..6b62e8f8e6 100644 --- a/gateways/core/stream/amqp/config.go +++ b/gateways/core/stream/amqp/config.go @@ -17,6 +17,7 @@ limitations under the License. package amqp import ( + "errors" "github.com/argoproj/argo-events/common" "github.com/ghodss/yaml" "github.com/sirupsen/logrus" @@ -46,8 +47,10 @@ type amqp struct { // Connection manages the serialization and deserialization of frames from IO // and dispatches the frames to the appropriate channel. conn *amqplib.Connection - // Maximum number of events that can be consumed from the queue per minute. + // Maximum number of events consumed from the queue per RatePeriod. RateLimit uint32 `json:"rateLimit,omitempty"` + // Number of seconds between two consumptions. + RatePeriod uint32 `json:"ratePeriod,omitempty"` } func parseEventSource(eventSource string) (interface{}, error) { @@ -58,3 +61,11 @@ func parseEventSource(eventSource string) (interface{}, error) { } return a, nil } + +// Validate validates amqp +func (a *amqp) Validate() error { + if (a.RateLimit == 0) != (a.RatePeriod == 0) { + return errors.New("RateLimit and RatePeriod must be either set or omitted") + } + return nil +} diff --git a/gateways/core/stream/amqp/config_test.go b/gateways/core/stream/amqp/config_test.go index 189e058cda..890604fd75 100644 --- a/gateways/core/stream/amqp/config_test.go +++ b/gateways/core/stream/amqp/config_test.go @@ -26,7 +26,6 @@ url: amqp://amqp.argo-events:5672 exchangeName: fooExchangeName exchangeType: fanout routingKey: fooRoutingKey -rateLimit: 50 ` func TestParseConfig(t *testing.T) { diff --git a/gateways/core/stream/amqp/start.go b/gateways/core/stream/amqp/start.go index c243aa9a8c..3e126abe67 100644 --- a/gateways/core/stream/amqp/start.go +++ b/gateways/core/stream/amqp/start.go @@ -46,27 +46,26 @@ func (ese *AMQPEventSourceExecutor) StartEventSource(eventSource *gateways.Event } func getLimitedDelivery(ch *amqplib.Channel, a *amqp, delivery chan amqplib.Delivery, queue string) { + period := time.Duration(a.RatePeriod) for { - var elapsedTime int64 - startTime := time.Now().Unix() + startTime := time.Now() for i := uint32(0); i < a.RateLimit; i++ { msg, ok, err := ch.Get(queue, true) - elapsedTime = time.Now().Unix() - startTime if err != nil || ok == false { break } delivery <- msg - if elapsedTime >= 60 { - startTime = time.Now().Unix() - elapsedTime = 0 + if time.Now().After(startTime.Add(period)) { + startTime = time.Now() i = 0 } } - time.Sleep(time.Duration(60 - elapsedTime) * time.Second) + remainingTime := startTime.Add(period).Sub(time.Now()) + time.Sleep(remainingTime) } }