Skip to content

Commit

Permalink
fix(balance): sort and de-deplicate memberIDs
Browse files Browse the repository at this point in the history
- remove balanceStrategySortable as:
  - it wasn't obviously being used (as the sort.Sort was called in a
    range closure over a map without updating the map value
  - it had implemented a hashing scheme over topic+memberID which didn't
    seem obviously superior to just using sort.StringSlice
- de-duplicate memberIDs from the consumer group metadata to avoid
  incorrectly counting a given member multiple times in the assignment
  • Loading branch information
dnwe committed Jul 20, 2022
1 parent 6228349 commit 320fada
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 21 deletions.
37 changes: 16 additions & 21 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,27 @@ func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, t
}
}

// Sort members for each topic
for topic, memberIDs := range mbt {
sort.Sort(&balanceStrategySortable{
topic: topic,
memberIDs: memberIDs,
})
// func to sort and de-duplicate a StringSlice
uniq := func(ss sort.StringSlice) []string {
if ss.Len() < 2 {
return ss
}
sort.Sort(ss)
var i, j int
for i = 1; i < ss.Len(); i++ {
if ss[i] == ss[j] {
continue
}
j++
ss.Swap(i, j)
}
return ss[:j+1]
}

// Assemble plan
plan := make(BalanceStrategyPlan, len(members))
for topic, memberIDs := range mbt {
s.coreFn(plan, memberIDs, topic, topics[topic])
s.coreFn(plan, uniq(memberIDs), topic, topics[topic])
}
return plan, nil
}
Expand All @@ -138,20 +147,6 @@ func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]in
return nil, nil
}

type balanceStrategySortable struct {
topic string
memberIDs []string
}

func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
func (p balanceStrategySortable) Swap(i, j int) {
p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
}

func (p balanceStrategySortable) Less(i, j int) bool {
return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
}

func balanceStrategyHashValue(vv ...string) uint32 {
h := uint32(2166136261)
for _, s := range vv {
Expand Down
9 changes: 9 additions & 0 deletions balance_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func TestBalanceStrategyRange(t *testing.T) {
"M2": map[string][]int32{"T1": {1}, "T2": {0, 1}},
},
},
{
name: "2 members, 1 topic with duplicate assignments, 8 partitions each",
members: map[string][]string{"M1": {"T1", "T1", "T1", "T1", "T1", "T1", "T1", "T1"}, "M2": {"T1", "T1", "T1", "T1", "T1", "T1", "T1", "T1"}},
topics: map[string][]int32{"T1": {0, 1, 2, 3, 4, 5, 6, 7}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 1, 2, 3}},
"M2": map[string][]int32{"T1": {4, 5, 6, 7}},
},
},
}

strategy := BalanceStrategyRange
Expand Down

0 comments on commit 320fada

Please sign in to comment.