diff --git a/gateways/core/stream/amqp/config.go b/gateways/core/stream/amqp/config.go index 508a93f0df..6bdfc6d264 100644 --- a/gateways/core/stream/amqp/config.go +++ b/gateways/core/stream/amqp/config.go @@ -46,6 +46,8 @@ type amqp struct { // Connection manages the serialization and deserialization of frames from IO // and dispatches the frames to the appropriate channel. conn *amqplib.Connection + // Maximum number of events that can be consumed from the queue per minute. + RateLimit uint32 `json:"rateLimit,omitempty"` } func parseEventSource(eventSource string) (interface{}, error) { diff --git a/gateways/core/stream/amqp/config_test.go b/gateways/core/stream/amqp/config_test.go index 890604fd75..189e058cda 100644 --- a/gateways/core/stream/amqp/config_test.go +++ b/gateways/core/stream/amqp/config_test.go @@ -26,6 +26,7 @@ url: amqp://amqp.argo-events:5672 exchangeName: fooExchangeName exchangeType: fanout routingKey: fooRoutingKey +rateLimit: 50 ` func TestParseConfig(t *testing.T) { diff --git a/gateways/core/stream/amqp/start.go b/gateways/core/stream/amqp/start.go index c351507d9b..a59f470409 100644 --- a/gateways/core/stream/amqp/start.go +++ b/gateways/core/stream/amqp/start.go @@ -18,6 +18,7 @@ package amqp import ( "fmt" + "time" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" amqplib "github.com/streadway/amqp" @@ -44,6 +45,31 @@ func (ese *AMQPEventSourceExecutor) StartEventSource(eventSource *gateways.Event return gateways.HandleEventsFromEventSource(eventSource.Name, eventStream, dataCh, errorCh, doneCh, ese.Log) } +func getLimitedDelivery(ch *amqplib.Channel, a *amqp, delivery chan amqplib.Delivery, queue string) { + for { + var elapsedTime int64 + startTime := time.Now().Unix() + + for i := uint32(0); i < a.RateLimit; i++ { + msg, ok, err := ch.Get(queue, true) + elapsedTime = time.Now().Unix() - startTime + + if err != nil || ok == false { + break + } + delivery <- msg + + if elapsedTime >= 60 { + startTime = time.Now().Unix() + elapsedTime %= 60 + i = 0 + } + } + + time.Sleep(time.Duration(60 - elapsedTime) * time.Second) + } +} + func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error) { err := ch.ExchangeDeclare(a.ExchangeName, a.ExchangeType, true, false, false, false, nil) if err != nil { @@ -60,6 +86,12 @@ func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error) return nil, fmt.Errorf("failed to bind %s exchange '%s' to queue with routingKey: %s: %s", a.ExchangeType, a.ExchangeName, a.RoutingKey, err) } + if a.RateLimit != 0 { + delivery := make(chan amqplib.Delivery) + go getLimitedDelivery(ch, a, delivery, q.Name) + return delivery, nil + } + delivery, err := ch.Consume(q.Name, "", true, false, false, false, nil) if err != nil { return nil, fmt.Errorf("failed to begin consuming messages: %s", err)