Skip to content

Commit

Permalink
feat: Rate limit for Kafka and initial offset support and refactor co…
Browse files Browse the repository at this point in the history
…nfig (#829)

* add a rate limit to kafka event source

* refactor sarama config and move kafka version up a level

* fix most linter issues

* fix last lint issue by bypassing

* update example

* make requested changes

* update kafka example for new version

* add +optional back in after removal of comments

* add more optional based on settings that can be defaulted

* remove context based cancelable sleep

* remove unused context
  • Loading branch information
zachaller authored Aug 13, 2020
1 parent 85baae7 commit d14487d
Show file tree
Hide file tree
Showing 9 changed files with 567 additions and 378 deletions.
32 changes: 29 additions & 3 deletions api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 52 additions & 3 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 42 additions & 27 deletions eventsources/sources/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"strconv"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/argoproj/argo-events/common"
Expand Down Expand Up @@ -81,14 +82,10 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

func (listener *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte) error) error {
config := sarama.NewConfig()

version, err := sarama.ParseKafkaVersion(kafkaEventSource.ConsumerGroup.KafkaVersion)
config, err := getSaramaConfig(kafkaEventSource, log)
if err != nil {
log.Errorf("Error parsing Kafka version: %v", err)
return err
}
config.Version = version

switch kafkaEventSource.ConsumerGroup.RebalanceStrategy {
case "sticky":
Expand All @@ -102,15 +99,6 @@ func (listener *EventListener) consumerGroupConsumer(ctx context.Context, log *z
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
}

if kafkaEventSource.TLS != nil {
tlsConfig, err := common.GetTLSConfig(kafkaEventSource.TLS.CACertPath, kafkaEventSource.TLS.ClientCertPath, kafkaEventSource.TLS.ClientKeyPath)
if err != nil {
return errors.Wrap(err, "failed to get the tls configuration")
}
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}

consumer := Consumer{
ready: make(chan bool),
dispatch: dispatch,
Expand Down Expand Up @@ -169,20 +157,10 @@ func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.Sugared
log.Info("connecting to Kafka cluster...")
if err := sources.Connect(common.GetConnectionBackoff(kafkaEventSource.ConnectionBackoff), func() error {
var err error
config := sarama.NewConfig()

if kafkaEventSource.TLS != nil {
tlsConfig, err := common.GetTLSConfig(kafkaEventSource.TLS.CACertPath, kafkaEventSource.TLS.ClientCertPath, kafkaEventSource.TLS.ClientKeyPath)
if err != nil {
return errors.Wrap(err, "failed to get the tls configuration")
}
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
} else {
consumer, err = sarama.NewConsumer([]string{kafkaEventSource.URL}, nil)
if err != nil {
return err
}
config, err := getSaramaConfig(kafkaEventSource, log)
if err != nil {
return err
}

consumer, err = sarama.NewConsumer([]string{kafkaEventSource.URL}, config)
Expand Down Expand Up @@ -259,6 +237,37 @@ func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.Sugared
}
}

func getSaramaConfig(kafkaEventSource *v1alpha1.KafkaEventSource, log *zap.SugaredLogger) (*sarama.Config, error) { //nolint:interfacer
config := sarama.NewConfig()

if kafkaEventSource.Version == "" {
config.Version = sarama.V1_0_0_0
} else {
version, err := sarama.ParseKafkaVersion(kafkaEventSource.Version)
if err != nil {
log.Errorf("Error parsing Kafka version: %v", err)
return nil, err
}
config.Version = version
}

if kafkaEventSource.TLS != nil {
tlsConfig, err := common.GetTLSConfig(kafkaEventSource.TLS.CACertPath, kafkaEventSource.TLS.ClientCertPath, kafkaEventSource.TLS.ClientKeyPath)
if err != nil {
return nil, errors.Wrap(err, "failed to get the tls configuration")
}
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}

if kafkaEventSource.ConsumerGroup != nil {
if kafkaEventSource.ConsumerGroup.Oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
}
return config, nil
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
Expand Down Expand Up @@ -310,6 +319,12 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
} else {
session.MarkMessage(message, "")
}
if consumer.kafkaEventSource.LimitEventsPerSecond > 0 {
//1000000000 is 1 second in nanoseconds
d := (1000000000 / time.Duration(consumer.kafkaEventSource.LimitEventsPerSecond) * time.Nanosecond) * time.Nanosecond
consumer.logger.Infof("Sleeping for: %v.", d)
time.Sleep(d)
}
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion examples/event-sources/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ spec:
# Use a consumer group, if this is used you do not need to specify a "partition: <id>"
# consumerGroup:
# groupName: test-group
# kafkaVersion: "2.5.0"
# oldest: false
# rebalanceStrategy: range
# limitEventsPerSecond: 1
# version: "2.5.0"

# example-tls:
# url: "kafka.argo-events:9092"
Expand Down
Loading

0 comments on commit d14487d

Please sign in to comment.