From 6750d926bb1148bab75ac05865df05b41013ce20 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 20 Jul 2022 14:46:56 +0100 Subject: [PATCH] fix(balance): sort and de-deplicate memberIDs - remove balanceStrategySortable as: - it wasn't obviously being used (as the sort.Sort was called in a range closure over a map without updating the map value - it had implemented a hashing scheme over topic+memberID which didn't seem obviously superior to just using sort.StringSlice - de-duplicate memberIDs from the consumer group metadata to avoid incorrectly counting a given member multiple times in the assignment --- balance_strategy.go | 48 ++++++++++++++-------------------------- balance_strategy_test.go | 9 ++++++++ 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/balance_strategy.go b/balance_strategy.go index 49c1a40bd..716da90c6 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -117,18 +117,27 @@ func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, t } } - // Sort members for each topic - for topic, memberIDs := range mbt { - sort.Sort(&balanceStrategySortable{ - topic: topic, - memberIDs: memberIDs, - }) + // func to sort and de-duplicate a StringSlice + uniq := func(ss sort.StringSlice) []string { + if ss.Len() < 2 { + return ss + } + sort.Sort(ss) + var i, j int + for i = 1; i < ss.Len(); i++ { + if ss[i] == ss[j] { + continue + } + j++ + ss.Swap(i, j) + } + return ss[:j+1] } // Assemble plan plan := make(BalanceStrategyPlan, len(members)) for topic, memberIDs := range mbt { - s.coreFn(plan, memberIDs, topic, topics[topic]) + s.coreFn(plan, uniq(memberIDs), topic, topics[topic]) } return plan, nil } @@ -138,31 +147,6 @@ func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]in return nil, nil } -type balanceStrategySortable struct { - topic string - memberIDs []string -} - -func (p balanceStrategySortable) Len() int { return len(p.memberIDs) } -func (p balanceStrategySortable) Swap(i, j int) { - p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i] -} - -func (p balanceStrategySortable) Less(i, j int) bool { - return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j]) -} - -func balanceStrategyHashValue(vv ...string) uint32 { - h := uint32(2166136261) - for _, s := range vv { - for _, c := range s { - h ^= uint32(c) - h *= 16777619 - } - } - return h -} - type stickyBalanceStrategy struct { movements partitionMovements } diff --git a/balance_strategy_test.go b/balance_strategy_test.go index 2d2351d11..452a6a398 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -62,6 +62,15 @@ func TestBalanceStrategyRange(t *testing.T) { "M2": map[string][]int32{"T1": {1}, "T2": {0, 1}}, }, }, + { + name: "2 members, 1 topic with duplicate assignments, 8 partitions each", + members: map[string][]string{"M1": {"T1", "T1", "T1", "T1", "T1", "T1", "T1", "T1"}, "M2": {"T1", "T1", "T1", "T1", "T1", "T1", "T1", "T1"}}, + topics: map[string][]int32{"T1": {0, 1, 2, 3, 4, 5, 6, 7}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0, 1, 2, 3}}, + "M2": map[string][]int32{"T1": {4, 5, 6, 7}}, + }, + }, } strategy := BalanceStrategyRange