Skip to content

Commit

Permalink
Merge pull request #2285 from Shopify/dnwe/de-duplicate-balance-strategy
Browse files Browse the repository at this point in the history
fix(balance): sort and de-deplicate memberIDs
  • Loading branch information
dnwe authored Jul 20, 2022
2 parents c46db55 + 6750d92 commit bd9d8f1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 32 deletions.
48 changes: 16 additions & 32 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,27 @@ func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, t
}
}

// Sort members for each topic
for topic, memberIDs := range mbt {
sort.Sort(&balanceStrategySortable{
topic: topic,
memberIDs: memberIDs,
})
// func to sort and de-duplicate a StringSlice
uniq := func(ss sort.StringSlice) []string {
if ss.Len() < 2 {
return ss
}
sort.Sort(ss)
var i, j int
for i = 1; i < ss.Len(); i++ {
if ss[i] == ss[j] {
continue
}
j++
ss.Swap(i, j)
}
return ss[:j+1]
}

// Assemble plan
plan := make(BalanceStrategyPlan, len(members))
for topic, memberIDs := range mbt {
s.coreFn(plan, memberIDs, topic, topics[topic])
s.coreFn(plan, uniq(memberIDs), topic, topics[topic])
}
return plan, nil
}
Expand All @@ -138,31 +147,6 @@ func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]in
return nil, nil
}

type balanceStrategySortable struct {
topic string
memberIDs []string
}

func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
func (p balanceStrategySortable) Swap(i, j int) {
p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
}

func (p balanceStrategySortable) Less(i, j int) bool {
return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
}

func balanceStrategyHashValue(vv ...string) uint32 {
h := uint32(2166136261)
for _, s := range vv {
for _, c := range s {
h ^= uint32(c)
h *= 16777619
}
}
return h
}

type stickyBalanceStrategy struct {
movements partitionMovements
}
Expand Down
9 changes: 9 additions & 0 deletions balance_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func TestBalanceStrategyRange(t *testing.T) {
"M2": map[string][]int32{"T1": {1}, "T2": {0, 1}},
},
},
{
name: "2 members, 1 topic with duplicate assignments, 8 partitions each",
members: map[string][]string{"M1": {"T1", "T1", "T1", "T1", "T1", "T1", "T1", "T1"}, "M2": {"T1", "T1", "T1", "T1", "T1", "T1", "T1", "T1"}},
topics: map[string][]int32{"T1": {0, 1, 2, 3, 4, 5, 6, 7}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 1, 2, 3}},
"M2": map[string][]int32{"T1": {4, 5, 6, 7}},
},
},
}

strategy := BalanceStrategyRange
Expand Down

0 comments on commit bd9d8f1

Please sign in to comment.