Skip to content

Commit

Permalink
Add rateLimit option to amqp gateway (#353) (#378)
Browse files Browse the repository at this point in the history
* Add rateLimit option to amqp gateway (#353)

Signed-off-by: Damien Grisonnet <[email protected]>

* Add ratePeriod option to amqp gateway

Signed-off-by: Damien Grisonnet <[email protected]>

* Fix period value

Signed-off-by: Damien Grisonnet <[email protected]>

* Fix errors import to keep the codebase consistent

Signed-off-by: Damien Grisonnet <[email protected]>
  • Loading branch information
dgrisonnet authored and VaibhavPage committed Oct 21, 2019
1 parent 13cb29d commit 70cca52
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
13 changes: 13 additions & 0 deletions gateways/core/stream/amqp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package amqp
import (
"github.com/argoproj/argo-events/common"
"github.com/ghodss/yaml"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
amqplib "github.com/streadway/amqp"
)
Expand Down Expand Up @@ -46,6 +47,10 @@ type amqp struct {
// Connection manages the serialization and deserialization of frames from IO
// and dispatches the frames to the appropriate channel.
conn *amqplib.Connection
// Maximum number of events consumed from the queue per RatePeriod.
RateLimit uint32 `json:"rateLimit,omitempty"`
// Number of seconds between two consumptions.
RatePeriod uint32 `json:"ratePeriod,omitempty"`
}

func parseEventSource(eventSource string) (interface{}, error) {
Expand All @@ -56,3 +61,11 @@ func parseEventSource(eventSource string) (interface{}, error) {
}
return a, nil
}

// Validate validates amqp
func (a *amqp) Validate() error {
if (a.RateLimit == 0) != (a.RatePeriod == 0) {
return errors.New("RateLimit and RatePeriod must be either set or omitted")
}
return nil
}
31 changes: 31 additions & 0 deletions gateways/core/stream/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package amqp

import (
"fmt"
"time"
"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/gateways"
amqplib "github.com/streadway/amqp"
Expand All @@ -44,6 +45,30 @@ func (ese *AMQPEventSourceExecutor) StartEventSource(eventSource *gateways.Event
return gateways.HandleEventsFromEventSource(eventSource.Name, eventStream, dataCh, errorCh, doneCh, ese.Log)
}

func getLimitedDelivery(ch *amqplib.Channel, a *amqp, delivery chan amqplib.Delivery, queue string) {
period := time.Duration(a.RatePeriod) * time.Second
for {
startTime := time.Now()

for i := uint32(0); i < a.RateLimit; i++ {
msg, ok, err := ch.Get(queue, true)

if err != nil || ok == false {
break
}
delivery <- msg

if time.Now().After(startTime.Add(period)) {
startTime = time.Now()
i = 0
}
}

remainingTime := startTime.Add(period).Sub(time.Now())
time.Sleep(remainingTime)
}
}

func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error) {
err := ch.ExchangeDeclare(a.ExchangeName, a.ExchangeType, true, false, false, false, nil)
if err != nil {
Expand All @@ -60,6 +85,12 @@ func getDelivery(ch *amqplib.Channel, a *amqp) (<-chan amqplib.Delivery, error)
return nil, fmt.Errorf("failed to bind %s exchange '%s' to queue with routingKey: %s: %s", a.ExchangeType, a.ExchangeName, a.RoutingKey, err)
}

if a.RateLimit != 0 {
delivery := make(chan amqplib.Delivery)
go getLimitedDelivery(ch, a, delivery, q.Name)
return delivery, nil
}

delivery, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
return nil, fmt.Errorf("failed to begin consuming messages: %s", err)
Expand Down

0 comments on commit 70cca52

Please sign in to comment.