Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correct bugs in DescribeGroupsResponse #2111

Merged
merged 1 commit into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions consumer_group_members.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package sarama

// ConsumerGroupMemberMetadata holds the metadata for consumer group
// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
type ConsumerGroupMemberMetadata struct {
Version int16
Topics []string
UserData []byte
Version int16
Topics []string
UserData []byte
OwnedPartitions []*OwnedPartition
}

func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
Expand Down Expand Up @@ -33,11 +35,50 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
if m.UserData, err = pd.getBytes(); err != nil {
return
}
if m.Version >= 1 {
n, err := pd.getArrayLength()
if err != nil {
// permit missing data here in case of misbehaving 3rd party
// clients who incorrectly marked the member metadata as V1 in
// their JoinGroup request
if err == ErrInsufficientData {
return nil
}
return err
}
if n == 0 {
return nil
}
m.OwnedPartitions = make([]*OwnedPartition, n)
for i := 0; i < n; i++ {
m.OwnedPartitions[i] = &OwnedPartition{}
if err := m.OwnedPartitions[i].decode(pd); err != nil {
return err
}
}
}

return nil
}

type OwnedPartition struct {
Topic string
Partitions []int32
}

func (m *OwnedPartition) decode(pd packetDecoder) (err error) {
if m.Topic, err = pd.getString(); err != nil {
return err
}
if m.Partitions, err = pd.getInt32Array(); err != nil {
return err
}

return nil
}

// ConsumerGroupMemberAssignment holds the member assignment for a consume group
// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
type ConsumerGroupMemberAssignment struct {
Version int16
Topics map[string][]int32
Expand Down
50 changes: 40 additions & 10 deletions consumer_group_members_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,55 @@ import (
)

var (
groupMemberMetadata = []byte{
0, 1, // Version
groupMemberMetadataV0 = []byte{
0, 0, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberAssignment = []byte{
0, 1, // Version
groupMemberAssignmentV0 = []byte{
0, 0, // Version
0, 0, 0, 1, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, // Topic one, partition array length
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}

// notably it looks like the old 3rdparty bsm/sarama-cluster incorrectly
// set V1 in the member metadata when it sent the JoinGroup request so
// we need to cope with that one being too short
groupMemberMetadataV1Bad = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}

groupMemberMetadataV1 = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
0, 0, 0, 0, // OwnedPartitions KIP-429
}
)

func TestConsumerGroupMemberMetadata(t *testing.T) {
meta := &ConsumerGroupMemberMetadata{
Version: 1,
Version: 0,
Topics: []string{"one", "two"},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(meta, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadata, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf)
} else if !bytes.Equal(groupMemberMetadataV0, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadataV0, buf)
}

meta2 := new(ConsumerGroupMemberMetadata)
Expand All @@ -47,9 +67,19 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {
}
}

func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(groupMemberMetadataV1, meta); err != nil {
t.Error("Failed to decode V1 data", err)
}
if err := decode(groupMemberMetadataV1Bad, meta); err != nil {
t.Error("Failed to decode V1 'bad' data", err)
}
}

func TestConsumerGroupMemberAssignment(t *testing.T) {
amt := &ConsumerGroupMemberAssignment{
Version: 1,
Version: 0,
Topics: map[string][]int32{
"one": {0, 2, 4},
},
Expand All @@ -59,8 +89,8 @@ func TestConsumerGroupMemberAssignment(t *testing.T) {
buf, err := encode(amt, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignment, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf)
} else if !bytes.Equal(groupMemberAssignmentV0, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignmentV0, buf)
}

amt2 := new(ConsumerGroupMemberAssignment)
Expand Down
6 changes: 6 additions & 0 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,18 @@ func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) {
}

func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
if len(gmd.MemberAssignment) == 0 {
return nil, nil
}
assignment := new(ConsumerGroupMemberAssignment)
err := decode(gmd.MemberAssignment, assignment)
return assignment, err
}

func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) {
if len(gmd.MemberMetadata) == 0 {
return nil, nil
}
metadata := new(ConsumerGroupMemberMetadata)
err := decode(gmd.MemberMetadata, metadata)
return metadata, err
Expand Down