Skip to content

Commit

Permalink
Add ratePeriod option to amqp gateway
Browse files Browse the repository at this point in the history
Signed-off-by: Damien Grisonnet <[email protected]>
  • Loading branch information
dgrisonnet committed Oct 15, 2019
1 parent f4b7634 commit c46ca67
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
13 changes: 12 additions & 1 deletion gateways/core/stream/amqp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package amqp

import (
"errors"
"github.com/argoproj/argo-events/common"
"github.com/ghodss/yaml"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -46,8 +47,10 @@ type amqp struct {
// Connection manages the serialization and deserialization of frames from IO
// and dispatches the frames to the appropriate channel.
conn *amqplib.Connection
// Maximum number of events that can be consumed from the queue per minute.
// Maximum number of events consumed from the queue per RatePeriod.
RateLimit uint32 `json:"rateLimit,omitempty"`
// Number of seconds between two consumptions.
RatePeriod uint32 `json:"ratePeriod,omitempty"`
}

func parseEventSource(eventSource string) (interface{}, error) {
Expand All @@ -58,3 +61,11 @@ func parseEventSource(eventSource string) (interface{}, error) {
}
return a, nil
}

// Validate validates amqp
func (a *amqp) Validate() error {
if (a.RateLimit == 0) != (a.RatePeriod == 0) {
return errors.New("RateLimit and RatePeriod must be either set or omitted")
}
return nil
}
1 change: 0 additions & 1 deletion gateways/core/stream/amqp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ url: amqp://amqp.argo-events:5672
exchangeName: fooExchangeName
exchangeType: fanout
routingKey: fooRoutingKey
rateLimit: 50
`

func TestParseConfig(t *testing.T) {
Expand Down
13 changes: 6 additions & 7 deletions gateways/core/stream/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,26 @@ func (ese *AMQPEventSourceExecutor) StartEventSource(eventSource *gateways.Event
}

func getLimitedDelivery(ch *amqplib.Channel, a *amqp, delivery chan amqplib.Delivery, queue string) {
period := time.Duration(a.RatePeriod)
for {
var elapsedTime int64
startTime := time.Now().Unix()
startTime := time.Now()

for i := uint32(0); i < a.RateLimit; i++ {
msg, ok, err := ch.Get(queue, true)
elapsedTime = time.Now().Unix() - startTime

if err != nil || ok == false {
break
}
delivery <- msg

if elapsedTime >= 60 {
startTime = time.Now().Unix()
elapsedTime = 0
if time.Now().After(startTime.Add(period)) {
startTime = time.Now()
i = 0
}
}

time.Sleep(time.Duration(60 - elapsedTime) * time.Second)
remainingTime := startTime.Add(period).Sub(time.Now())
time.Sleep(remainingTime)
}
}

Expand Down

0 comments on commit c46ca67

Please sign in to comment.