From bf1b8fb51e59906904029fb14d318f06f9a58469 Mon Sep 17 00:00:00 2001 From: Al DeLucca Date: Thu, 30 Jan 2020 09:28:42 -0500 Subject: [PATCH 1/3] Allow BalanceStrategy to provide custom assignment data The StickyBalanceStrategy currently provides state information with member assignments. This is achieved through a custom name check in the `syncGroupRequest` of the consumer group. This works well for this case, but falls apart when trying to create additional stateful balance strategies. This update will make it possible to create new stateful balance strategies --- balance_strategy.go | 22 ++++++++++++++++++++++ consumer_group.go | 16 +++++----------- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/balance_strategy.go b/balance_strategy.go index 67c4d96d0..6261d44ff 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -47,6 +47,10 @@ type BalanceStrategy interface { // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions` // and returns a distribution plan. Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) + + // Assignment data returns the serialized assignment data for the specified + // memberID + AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) } // -------------------------------------------------------------------- @@ -132,6 +136,11 @@ func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, t return plan, nil } +// AssignmentData simple strategies do not require any shared assignment data +func (s *balanceStrategy) AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) { + return nil, nil +} + type balanceStrategySortable struct { topic string memberIDs []string @@ -268,6 +277,19 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad return plan, nil } +// AssignmentData serializes the set of topics currently assigned to the +// specified member as part of the supplied balance plan +func (s *stickyBalanceStrategy) AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) { + topics, ok := plan[memberID] + if !ok { + return nil, nil + } + return encode(&StickyAssignorUserDataV1{ + Topics: topics, + Generation: generationID, + }, nil) +} + func strsContains(s []string, value string) bool { for _, entry := range s { if entry == value { diff --git a/consumer_group.go b/consumer_group.go index fc95cd0df..81f2e2051 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -331,20 +331,14 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate MemberId: c.memberID, GenerationId: generationID, } + strategy := c.config.Consumer.Group.Rebalance.Strategy for memberID, topics := range plan { assignment := &ConsumerGroupMemberAssignment{Topics: topics} - - // Include topic assignments in group-assignment userdata for each consumer-group member - if c.config.Consumer.Group.Rebalance.Strategy.Name() == StickyBalanceStrategyName { - userDataBytes, err := encode(&StickyAssignorUserDataV1{ - Topics: topics, - Generation: generationID, - }, nil) - if err != nil { - return nil, err - } - assignment.UserData = userDataBytes + userDataBytes, err := strategy.AssignmentData(plan, memberID, generationID) + if err != nil { + return nil, err } + assignment.UserData = userDataBytes if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil { return nil, err } From d819ac05be04bb78253bc0b0a9e4e3daaec6641d Mon Sep 17 00:00:00 2001 From: Al DeLucca Date: Thu, 30 Jan 2020 10:22:44 -0500 Subject: [PATCH 2/3] Fix doc comment --- balance_strategy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balance_strategy.go b/balance_strategy.go index 6261d44ff..626dd7dcc 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -48,7 +48,7 @@ type BalanceStrategy interface { // and returns a distribution plan. Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) - // Assignment data returns the serialized assignment data for the specified + // AssignmentData returns the serialized assignment data for the specified // memberID AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) } From 46f804e149f7d53dc112a6c434ffbc7669d696a2 Mon Sep 17 00:00:00 2001 From: Al DeLucca Date: Thu, 30 Jan 2020 11:26:35 -0500 Subject: [PATCH 3/3] Address review comments --- balance_strategy.go | 10 ++---- balance_strategy_test.go | 66 ++++++++++++++++++++++++++++++++++++++++ consumer_group.go | 2 +- 3 files changed, 70 insertions(+), 8 deletions(-) diff --git a/balance_strategy.go b/balance_strategy.go index 626dd7dcc..56da276d2 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -50,7 +50,7 @@ type BalanceStrategy interface { // AssignmentData returns the serialized assignment data for the specified // memberID - AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) + AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) } // -------------------------------------------------------------------- @@ -137,7 +137,7 @@ func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, t } // AssignmentData simple strategies do not require any shared assignment data -func (s *balanceStrategy) AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) { +func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) { return nil, nil } @@ -279,11 +279,7 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad // AssignmentData serializes the set of topics currently assigned to the // specified member as part of the supplied balance plan -func (s *stickyBalanceStrategy) AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) { - topics, ok := plan[memberID] - if !ok { - return nil, nil - } +func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) { return encode(&StickyAssignorUserDataV1{ Topics: topics, Generation: generationID, diff --git a/balance_strategy_test.go b/balance_strategy_test.go index e9c72748b..f930d7663 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -1,6 +1,7 @@ package sarama import ( + "bytes" "fmt" "math" "math/rand" @@ -62,6 +63,27 @@ func TestBalanceStrategyRange(t *testing.T) { } } +func TestBalanceStrategyRangeAssignmentData(t *testing.T) { + + strategy := BalanceStrategyRange + + members := make(map[string]ConsumerGroupMemberMetadata, 2) + members["consumer1"] = ConsumerGroupMemberMetadata{ + Topics: []string{"topic1"}, + } + members["consumer2"] = ConsumerGroupMemberMetadata{ + Topics: []string{"topic1"}, + } + + actual, err := strategy.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1) + if err != nil { + t.Errorf("Error building assignment data: %v", err) + } + if actual != nil { + t.Error("Invalid assignment data returned from AssignmentData") + } +} + func TestBalanceStrategyRoundRobin(t *testing.T) { tests := []struct { members map[string][]string @@ -191,6 +213,27 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) { } } +func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) { + + strategy := BalanceStrategyRoundRobin + + members := make(map[string]ConsumerGroupMemberMetadata, 2) + members["consumer1"] = ConsumerGroupMemberMetadata{ + Topics: []string{"topic1"}, + } + members["consumer2"] = ConsumerGroupMemberMetadata{ + Topics: []string{"topic1"}, + } + + actual, err := strategy.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1) + if err != nil { + t.Errorf("Error building assignment data: %v", err) + } + if actual != nil { + t.Error("Invalid assignment data returned from AssignmentData") + } +} + func Test_prepopulateCurrentAssignments(t *testing.T) { type args struct { members map[string]ConsumerGroupMemberMetadata @@ -1950,6 +1993,29 @@ func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T verifyFullyBalanced(t, plan) } +func Test_stickyBalanceStrategy_Plan_AssignmentData(t *testing.T) { + + s := &stickyBalanceStrategy{} + + members := make(map[string]ConsumerGroupMemberMetadata, 2) + members["consumer1"] = ConsumerGroupMemberMetadata{ + Topics: []string{"topic1"}, + } + members["consumer2"] = ConsumerGroupMemberMetadata{ + Topics: []string{"topic1"}, + } + + expected := encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1) + + actual, err := s.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1) + if err != nil { + t.Errorf("Error building assignment data: %v", err) + } + if bytes.Compare(expected, actual) != 0 { + t.Error("Invalid assignment data returned from AssignmentData") + } +} + func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) { s := &stickyBalanceStrategy{} r := rand.New(rand.NewSource(time.Now().UnixNano())) diff --git a/consumer_group.go b/consumer_group.go index 81f2e2051..951f64b33 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -334,7 +334,7 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate strategy := c.config.Consumer.Group.Rebalance.Strategy for memberID, topics := range plan { assignment := &ConsumerGroupMemberAssignment{Topics: topics} - userDataBytes, err := strategy.AssignmentData(plan, memberID, generationID) + userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID) if err != nil { return nil, err }