Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
aldelucca1 committed Jan 30, 2020
1 parent d819ac0 commit 46f804e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 8 deletions.
10 changes: 3 additions & 7 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type BalanceStrategy interface {

// AssignmentData returns the serialized assignment data for the specified
// memberID
AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error)
AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
}

// --------------------------------------------------------------------
Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, t
}

// AssignmentData simple strategies do not require any shared assignment data
func (s *balanceStrategy) AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) {
func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return nil, nil
}

Expand Down Expand Up @@ -279,11 +279,7 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad

// AssignmentData serializes the set of topics currently assigned to the
// specified member as part of the supplied balance plan
func (s *stickyBalanceStrategy) AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) {
topics, ok := plan[memberID]
if !ok {
return nil, nil
}
func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return encode(&StickyAssignorUserDataV1{
Topics: topics,
Generation: generationID,
Expand Down
66 changes: 66 additions & 0 deletions balance_strategy_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"bytes"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -62,6 +63,27 @@ func TestBalanceStrategyRange(t *testing.T) {
}
}

func TestBalanceStrategyRangeAssignmentData(t *testing.T) {

strategy := BalanceStrategyRange

members := make(map[string]ConsumerGroupMemberMetadata, 2)
members["consumer1"] = ConsumerGroupMemberMetadata{
Topics: []string{"topic1"},
}
members["consumer2"] = ConsumerGroupMemberMetadata{
Topics: []string{"topic1"},
}

actual, err := strategy.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
if err != nil {
t.Errorf("Error building assignment data: %v", err)
}
if actual != nil {
t.Error("Invalid assignment data returned from AssignmentData")
}
}

func TestBalanceStrategyRoundRobin(t *testing.T) {
tests := []struct {
members map[string][]string
Expand Down Expand Up @@ -191,6 +213,27 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) {
}
}

func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) {

strategy := BalanceStrategyRoundRobin

members := make(map[string]ConsumerGroupMemberMetadata, 2)
members["consumer1"] = ConsumerGroupMemberMetadata{
Topics: []string{"topic1"},
}
members["consumer2"] = ConsumerGroupMemberMetadata{
Topics: []string{"topic1"},
}

actual, err := strategy.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
if err != nil {
t.Errorf("Error building assignment data: %v", err)
}
if actual != nil {
t.Error("Invalid assignment data returned from AssignmentData")
}
}

func Test_prepopulateCurrentAssignments(t *testing.T) {
type args struct {
members map[string]ConsumerGroupMemberMetadata
Expand Down Expand Up @@ -1950,6 +1993,29 @@ func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T
verifyFullyBalanced(t, plan)
}

func Test_stickyBalanceStrategy_Plan_AssignmentData(t *testing.T) {

s := &stickyBalanceStrategy{}

members := make(map[string]ConsumerGroupMemberMetadata, 2)
members["consumer1"] = ConsumerGroupMemberMetadata{
Topics: []string{"topic1"},
}
members["consumer2"] = ConsumerGroupMemberMetadata{
Topics: []string{"topic1"},
}

expected := encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1)

actual, err := s.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
if err != nil {
t.Errorf("Error building assignment data: %v", err)
}
if bytes.Compare(expected, actual) != 0 {
t.Error("Invalid assignment data returned from AssignmentData")
}
}

func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) {
s := &stickyBalanceStrategy{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down
2 changes: 1 addition & 1 deletion consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
strategy := c.config.Consumer.Group.Rebalance.Strategy
for memberID, topics := range plan {
assignment := &ConsumerGroupMemberAssignment{Topics: topics}
userDataBytes, err := strategy.AssignmentData(plan, memberID, generationID)
userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 46f804e

Please sign in to comment.