Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(balance): sort and de-deplicate memberIDs #2285

Merged
merged 1 commit into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 16 additions & 32 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,27 @@ func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, t
}
}

// Sort members for each topic
for topic, memberIDs := range mbt {
sort.Sort(&balanceStrategySortable{
topic: topic,
memberIDs: memberIDs,
})
// func to sort and de-duplicate a StringSlice
uniq := func(ss sort.StringSlice) []string {
if ss.Len() < 2 {
return ss
}
sort.Sort(ss)
var i, j int
for i = 1; i < ss.Len(); i++ {
if ss[i] == ss[j] {
continue
}
j++
ss.Swap(i, j)
}
return ss[:j+1]
}

// Assemble plan
plan := make(BalanceStrategyPlan, len(members))
for topic, memberIDs := range mbt {
s.coreFn(plan, memberIDs, topic, topics[topic])
s.coreFn(plan, uniq(memberIDs), topic, topics[topic])
}
return plan, nil
}
Expand All @@ -138,31 +147,6 @@ func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]in
return nil, nil
}

type balanceStrategySortable struct {
topic string
memberIDs []string
}

func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
func (p balanceStrategySortable) Swap(i, j int) {
p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
}

func (p balanceStrategySortable) Less(i, j int) bool {
return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
}

func balanceStrategyHashValue(vv ...string) uint32 {
h := uint32(2166136261)
for _, s := range vv {
for _, c := range s {
h ^= uint32(c)
h *= 16777619
}
}
return h
}

type stickyBalanceStrategy struct {
movements partitionMovements
}
Expand Down
9 changes: 9 additions & 0 deletions balance_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func TestBalanceStrategyRange(t *testing.T) {
"M2": map[string][]int32{"T1": {1}, "T2": {0, 1}},
},
},
{
name: "2 members, 1 topic with duplicate assignments, 8 partitions each",
members: map[string][]string{"M1": {"T1", "T1", "T1", "T1", "T1", "T1", "T1", "T1"}, "M2": {"T1", "T1", "T1", "T1", "T1", "T1", "T1", "T1"}},
topics: map[string][]int32{"T1": {0, 1, 2, 3, 4, 5, 6, 7}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 1, 2, 3}},
"M2": map[string][]int32{"T1": {4, 5, 6, 7}},
},
},
}

strategy := BalanceStrategyRange
Expand Down