Skip to content

Commit

Permalink
fix(kafka): use non-global BalanceStrategy values
Browse files Browse the repository at this point in the history
Sarama deprecated the old global BalanceStrategy global instances (which
would inadvertently share state) in favour of creating dedicated
instances. Use the replacement func.

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Sep 25, 2023
1 parent 8a3aa9c commit c43957a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 10 deletions.
6 changes: 3 additions & 3 deletions eventbus/kafka/base/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func (k *Kafka) Config() (*sarama.Config, error) {

switch k.config.ConsumerGroup.RebalanceStrategy {
case "sticky":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
case "roundrobin":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
default:
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
}

// producer config
Expand Down
12 changes: 5 additions & 7 deletions eventsources/sources/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.Sug

switch kafkaEventSource.ConsumerGroup.RebalanceStrategy {
case "sticky":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
case "roundrobin":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
case "range":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
default:
log.Info("Invalid rebalance strategy, using default: range")
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
}

consumer := Consumer{
Expand Down Expand Up @@ -354,9 +354,7 @@ func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

var (
eventBusErr *eventbuscommon.EventBusError
)
var eventBusErr *eventbuscommon.EventBusError

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
Expand Down

0 comments on commit c43957a

Please sign in to comment.