Skip to content

Commit

Permalink
Add rateLimit option to amqp gateway (argoproj#353)
Browse files Browse the repository at this point in the history
Signed-off-by: Damien Grisonnet <[email protected]>
  • Loading branch information
dgrisonnet committed Oct 15, 2019
1 parent b7e13bf commit d265d96
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 0 deletions.
2 changes: 2 additions & 0 deletions gateways/core/stream/amqp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type amqp struct {
// Connection manages the serialization and deserialization of frames from IO
// and dispatches the frames to the appropriate channel.
conn *amqplib.Connection
// Maximum number of events that can be consumed from the queue per minute.
RateLimit uint32 `json:"rateLimit,omitempty"`
}

func parseEventSource(eventSource string) (interface{}, error) {
Expand Down
1 change: 1 addition & 0 deletions gateways/core/stream/amqp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ url: amqp://amqp.argo-events:5672
exchangeName: fooExchangeName
exchangeType: fanout
routingKey: fooRoutingKey
rateLimit: 50
`

func TestParseConfig(t *testing.T) {
Expand Down
32 changes: 32 additions & 0 deletions gateways/core/stream/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package amqp

import (
"fmt"
"time"
"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/gateways"
amqplib "github.com/streadway/amqp"
Expand All @@ -44,6 +45,31 @@ func (ese *AMQPEventSourceExecutor) StartEventSource(eventSource *gateways.Event
return gateways.HandleEventsFromEventSource(eventSource.Name, eventStream, dataCh, errorCh, doneCh, ese.Log)
}

func getLimitedDelivery(ch *amqplib.Channel, a *amqp, delivery chan amqplib.Delivery, queue string) {
for {
var elapsedTime int64
startTime := time.Now().Unix()

for i := uint32(0); i < a.RateLimit; i++ {
msg, ok, err := ch.Get(queue, true)
elapsedTime = time.Now().Unix() - startTime

if err != nil || ok == false {
break
}
delivery <- msg

if elapsedTime >= 60 {
startTime = time.Now().Unix()
elapsedTime %= 60
i = 0
}
}

time.Sleep(time.Duration(60 - elapsedTime) * time.Second)
}
}

func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error) {
err := ch.ExchangeDeclare(a.ExchangeName, a.ExchangeType, true, false, false, false, nil)
if err != nil {
Expand All @@ -60,6 +86,12 @@ func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error)
return nil, fmt.Errorf("failed to bind %s exchange '%s' to queue with routingKey: %s: %s", a.ExchangeType, a.ExchangeName, a.RoutingKey, err)
}

if a.RateLimit != 0 {
delivery := make(chan amqplib.Delivery)
go getLimitedDelivery(ch, a, delivery, q.Name)
return delivery, nil
}

delivery, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
return nil, fmt.Errorf("failed to begin consuming messages: %s", err)
Expand Down

0 comments on commit d265d96

Please sign in to comment.