Skip to content

Commit

Permalink
Merge pull request #2111 from Shopify/dnwe/fix-describe-groups
Browse files Browse the repository at this point in the history
fix: correct bugs in DescribeGroupsResponse
  • Loading branch information
dnwe authored Jan 13, 2022
2 parents 0fcf75b + df0fb3d commit 15e9d7a
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 13 deletions.
47 changes: 44 additions & 3 deletions consumer_group_members.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package sarama

// ConsumerGroupMemberMetadata holds the metadata for consumer group
// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
type ConsumerGroupMemberMetadata struct {
Version int16
Topics []string
UserData []byte
Version int16
Topics []string
UserData []byte
OwnedPartitions []*OwnedPartition
}

func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
Expand Down Expand Up @@ -33,11 +35,50 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
if m.UserData, err = pd.getBytes(); err != nil {
return
}
if m.Version >= 1 {
n, err := pd.getArrayLength()
if err != nil {
// permit missing data here in case of misbehaving 3rd party
// clients who incorrectly marked the member metadata as V1 in
// their JoinGroup request
if err == ErrInsufficientData {
return nil
}
return err
}
if n == 0 {
return nil
}
m.OwnedPartitions = make([]*OwnedPartition, n)
for i := 0; i < n; i++ {
m.OwnedPartitions[i] = &OwnedPartition{}
if err := m.OwnedPartitions[i].decode(pd); err != nil {
return err
}
}
}

return nil
}

type OwnedPartition struct {
Topic string
Partitions []int32
}

func (m *OwnedPartition) decode(pd packetDecoder) (err error) {
if m.Topic, err = pd.getString(); err != nil {
return err
}
if m.Partitions, err = pd.getInt32Array(); err != nil {
return err
}

return nil
}

// ConsumerGroupMemberAssignment holds the member assignment for a consume group
// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
type ConsumerGroupMemberAssignment struct {
Version int16
Topics map[string][]int32
Expand Down
50 changes: 40 additions & 10 deletions consumer_group_members_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,55 @@ import (
)

var (
groupMemberMetadata = []byte{
0, 1, // Version
groupMemberMetadataV0 = []byte{
0, 0, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberAssignment = []byte{
0, 1, // Version
groupMemberAssignmentV0 = []byte{
0, 0, // Version
0, 0, 0, 1, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, // Topic one, partition array length
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}

// notably it looks like the old 3rdparty bsm/sarama-cluster incorrectly
// set V1 in the member metadata when it sent the JoinGroup request so
// we need to cope with that one being too short
groupMemberMetadataV1Bad = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}

groupMemberMetadataV1 = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
0, 0, 0, 0, // OwnedPartitions KIP-429
}
)

func TestConsumerGroupMemberMetadata(t *testing.T) {
meta := &ConsumerGroupMemberMetadata{
Version: 1,
Version: 0,
Topics: []string{"one", "two"},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(meta, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadata, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf)
} else if !bytes.Equal(groupMemberMetadataV0, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadataV0, buf)
}

meta2 := new(ConsumerGroupMemberMetadata)
Expand All @@ -47,9 +67,19 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {
}
}

func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(groupMemberMetadataV1, meta); err != nil {
t.Error("Failed to decode V1 data", err)
}
if err := decode(groupMemberMetadataV1Bad, meta); err != nil {
t.Error("Failed to decode V1 'bad' data", err)
}
}

func TestConsumerGroupMemberAssignment(t *testing.T) {
amt := &ConsumerGroupMemberAssignment{
Version: 1,
Version: 0,
Topics: map[string][]int32{
"one": {0, 2, 4},
},
Expand All @@ -59,8 +89,8 @@ func TestConsumerGroupMemberAssignment(t *testing.T) {
buf, err := encode(amt, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignment, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf)
} else if !bytes.Equal(groupMemberAssignmentV0, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignmentV0, buf)
}

amt2 := new(ConsumerGroupMemberAssignment)
Expand Down
6 changes: 6 additions & 0 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,18 @@ func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) {
}

func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
if len(gmd.MemberAssignment) == 0 {
return nil, nil
}
assignment := new(ConsumerGroupMemberAssignment)
err := decode(gmd.MemberAssignment, assignment)
return assignment, err
}

func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) {
if len(gmd.MemberMetadata) == 0 {
return nil, nil
}
metadata := new(ConsumerGroupMemberMetadata)
err := decode(gmd.MemberMetadata, metadata)
return metadata, err
Expand Down

0 comments on commit 15e9d7a

Please sign in to comment.