From e93d1d633d99b0f5ac64e8cebd31a8674191267d Mon Sep 17 00:00:00 2001 From: Damien Grisonnet Date: Fri, 11 Oct 2019 14:44:24 +0200 Subject: [PATCH 1/4] Add rateLimit option to amqp gateway (#353) Signed-off-by: Damien Grisonnet --- gateways/core/stream/amqp/config.go | 2 ++ gateways/core/stream/amqp/config_test.go | 1 + gateways/core/stream/amqp/start.go | 32 ++++++++++++++++++++++++ 3 files changed, 35 insertions(+) diff --git a/gateways/core/stream/amqp/config.go b/gateways/core/stream/amqp/config.go index 508a93f0df..6bdfc6d264 100644 --- a/gateways/core/stream/amqp/config.go +++ b/gateways/core/stream/amqp/config.go @@ -46,6 +46,8 @@ type amqp struct { // Connection manages the serialization and deserialization of frames from IO // and dispatches the frames to the appropriate channel. conn *amqplib.Connection + // Maximum number of events that can be consumed from the queue per minute. + RateLimit uint32 `json:"rateLimit,omitempty"` } func parseEventSource(eventSource string) (interface{}, error) { diff --git a/gateways/core/stream/amqp/config_test.go b/gateways/core/stream/amqp/config_test.go index 890604fd75..189e058cda 100644 --- a/gateways/core/stream/amqp/config_test.go +++ b/gateways/core/stream/amqp/config_test.go @@ -26,6 +26,7 @@ url: amqp://amqp.argo-events:5672 exchangeName: fooExchangeName exchangeType: fanout routingKey: fooRoutingKey +rateLimit: 50 ` func TestParseConfig(t *testing.T) { diff --git a/gateways/core/stream/amqp/start.go b/gateways/core/stream/amqp/start.go index c351507d9b..c243aa9a8c 100644 --- a/gateways/core/stream/amqp/start.go +++ b/gateways/core/stream/amqp/start.go @@ -18,6 +18,7 @@ package amqp import ( "fmt" + "time" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" amqplib "github.com/streadway/amqp" @@ -44,6 +45,31 @@ func (ese *AMQPEventSourceExecutor) StartEventSource(eventSource *gateways.Event return gateways.HandleEventsFromEventSource(eventSource.Name, eventStream, dataCh, errorCh, doneCh, ese.Log) } +func getLimitedDelivery(ch *amqplib.Channel, a *amqp, delivery chan amqplib.Delivery, queue string) { + for { + var elapsedTime int64 + startTime := time.Now().Unix() + + for i := uint32(0); i < a.RateLimit; i++ { + msg, ok, err := ch.Get(queue, true) + elapsedTime = time.Now().Unix() - startTime + + if err != nil || ok == false { + break + } + delivery <- msg + + if elapsedTime >= 60 { + startTime = time.Now().Unix() + elapsedTime = 0 + i = 0 + } + } + + time.Sleep(time.Duration(60 - elapsedTime) * time.Second) + } +} + func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error) { err := ch.ExchangeDeclare(a.ExchangeName, a.ExchangeType, true, false, false, false, nil) if err != nil { @@ -60,6 +86,12 @@ func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error) return nil, fmt.Errorf("failed to bind %s exchange '%s' to queue with routingKey: %s: %s", a.ExchangeType, a.ExchangeName, a.RoutingKey, err) } + if a.RateLimit != 0 { + delivery := make(chan amqplib.Delivery) + go getLimitedDelivery(ch, a, delivery, q.Name) + return delivery, nil + } + delivery, err := ch.Consume(q.Name, "", true, false, false, false, nil) if err != nil { return nil, fmt.Errorf("failed to begin consuming messages: %s", err) From f318af7554c5f3420cf695fbfc88dc25fd809e7e Mon Sep 17 00:00:00 2001 From: Damien Grisonnet Date: Wed, 16 Oct 2019 00:24:38 +0200 Subject: [PATCH 2/4] Add ratePeriod option to amqp gateway Signed-off-by: Damien Grisonnet --- gateways/core/stream/amqp/config.go | 13 ++++++++++++- gateways/core/stream/amqp/config_test.go | 1 - gateways/core/stream/amqp/start.go | 13 ++++++------- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/gateways/core/stream/amqp/config.go b/gateways/core/stream/amqp/config.go index 6bdfc6d264..6b62e8f8e6 100644 --- a/gateways/core/stream/amqp/config.go +++ b/gateways/core/stream/amqp/config.go @@ -17,6 +17,7 @@ limitations under the License. package amqp import ( + "errors" "github.com/argoproj/argo-events/common" "github.com/ghodss/yaml" "github.com/sirupsen/logrus" @@ -46,8 +47,10 @@ type amqp struct { // Connection manages the serialization and deserialization of frames from IO // and dispatches the frames to the appropriate channel. conn *amqplib.Connection - // Maximum number of events that can be consumed from the queue per minute. + // Maximum number of events consumed from the queue per RatePeriod. RateLimit uint32 `json:"rateLimit,omitempty"` + // Number of seconds between two consumptions. + RatePeriod uint32 `json:"ratePeriod,omitempty"` } func parseEventSource(eventSource string) (interface{}, error) { @@ -58,3 +61,11 @@ func parseEventSource(eventSource string) (interface{}, error) { } return a, nil } + +// Validate validates amqp +func (a *amqp) Validate() error { + if (a.RateLimit == 0) != (a.RatePeriod == 0) { + return errors.New("RateLimit and RatePeriod must be either set or omitted") + } + return nil +} diff --git a/gateways/core/stream/amqp/config_test.go b/gateways/core/stream/amqp/config_test.go index 189e058cda..890604fd75 100644 --- a/gateways/core/stream/amqp/config_test.go +++ b/gateways/core/stream/amqp/config_test.go @@ -26,7 +26,6 @@ url: amqp://amqp.argo-events:5672 exchangeName: fooExchangeName exchangeType: fanout routingKey: fooRoutingKey -rateLimit: 50 ` func TestParseConfig(t *testing.T) { diff --git a/gateways/core/stream/amqp/start.go b/gateways/core/stream/amqp/start.go index c243aa9a8c..3e126abe67 100644 --- a/gateways/core/stream/amqp/start.go +++ b/gateways/core/stream/amqp/start.go @@ -46,27 +46,26 @@ func (ese *AMQPEventSourceExecutor) StartEventSource(eventSource *gateways.Event } func getLimitedDelivery(ch *amqplib.Channel, a *amqp, delivery chan amqplib.Delivery, queue string) { + period := time.Duration(a.RatePeriod) for { - var elapsedTime int64 - startTime := time.Now().Unix() + startTime := time.Now() for i := uint32(0); i < a.RateLimit; i++ { msg, ok, err := ch.Get(queue, true) - elapsedTime = time.Now().Unix() - startTime if err != nil || ok == false { break } delivery <- msg - if elapsedTime >= 60 { - startTime = time.Now().Unix() - elapsedTime = 0 + if time.Now().After(startTime.Add(period)) { + startTime = time.Now() i = 0 } } - time.Sleep(time.Duration(60 - elapsedTime) * time.Second) + remainingTime := startTime.Add(period).Sub(time.Now()) + time.Sleep(remainingTime) } } From 22743b7fd4d335556832c1f054438ddb6f808fbd Mon Sep 17 00:00:00 2001 From: Damien Grisonnet Date: Wed, 16 Oct 2019 01:41:11 +0200 Subject: [PATCH 3/4] Fix period value Signed-off-by: Damien Grisonnet --- gateways/core/stream/amqp/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateways/core/stream/amqp/start.go b/gateways/core/stream/amqp/start.go index 3e126abe67..ce383e84aa 100644 --- a/gateways/core/stream/amqp/start.go +++ b/gateways/core/stream/amqp/start.go @@ -46,7 +46,7 @@ func (ese *AMQPEventSourceExecutor) StartEventSource(eventSource *gateways.Event } func getLimitedDelivery(ch *amqplib.Channel, a *amqp, delivery chan amqplib.Delivery, queue string) { - period := time.Duration(a.RatePeriod) + period := time.Duration(a.RatePeriod) * time.Second for { startTime := time.Now() From 4483c3207d6f33cbaafa70272c0010404a0496fe Mon Sep 17 00:00:00 2001 From: Damien Grisonnet Date: Wed, 16 Oct 2019 14:01:51 +0200 Subject: [PATCH 4/4] Fix errors import to keep the codebase consistent Signed-off-by: Damien Grisonnet --- gateways/core/stream/amqp/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateways/core/stream/amqp/config.go b/gateways/core/stream/amqp/config.go index 6b62e8f8e6..32656459ab 100644 --- a/gateways/core/stream/amqp/config.go +++ b/gateways/core/stream/amqp/config.go @@ -17,9 +17,9 @@ limitations under the License. package amqp import ( - "errors" "github.com/argoproj/argo-events/common" "github.com/ghodss/yaml" + "github.com/pkg/errors" "github.com/sirupsen/logrus" amqplib "github.com/streadway/amqp" )