diff --git a/consumer_group_members.go b/consumer_group_members.go index 21b11e944..f4100f560 100644 --- a/consumer_group_members.go +++ b/consumer_group_members.go @@ -1,10 +1,12 @@ package sarama // ConsumerGroupMemberMetadata holds the metadata for consumer group +// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json type ConsumerGroupMemberMetadata struct { - Version int16 - Topics []string - UserData []byte + Version int16 + Topics []string + UserData []byte + OwnedPartitions []*OwnedPartition } func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error { @@ -33,11 +35,50 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) { if m.UserData, err = pd.getBytes(); err != nil { return } + if m.Version >= 1 { + n, err := pd.getArrayLength() + if err != nil { + // permit missing data here in case of misbehaving 3rd party + // clients who incorrectly marked the member metadata as V1 in + // their JoinGroup request + if err == ErrInsufficientData { + return nil + } + return err + } + if n == 0 { + return nil + } + m.OwnedPartitions = make([]*OwnedPartition, n) + for i := 0; i < n; i++ { + m.OwnedPartitions[i] = &OwnedPartition{} + if err := m.OwnedPartitions[i].decode(pd); err != nil { + return err + } + } + } + + return nil +} + +type OwnedPartition struct { + Topic string + Partitions []int32 +} + +func (m *OwnedPartition) decode(pd packetDecoder) (err error) { + if m.Topic, err = pd.getString(); err != nil { + return err + } + if m.Partitions, err = pd.getInt32Array(); err != nil { + return err + } return nil } // ConsumerGroupMemberAssignment holds the member assignment for a consume group +// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json type ConsumerGroupMemberAssignment struct { Version int16 Topics map[string][]int32 diff --git a/consumer_group_members_test.go b/consumer_group_members_test.go index d65e8adc4..a99de61c6 100644 --- a/consumer_group_members_test.go +++ b/consumer_group_members_test.go @@ -7,26 +7,46 @@ import ( ) var ( - groupMemberMetadata = []byte{ - 0, 1, // Version + groupMemberMetadataV0 = []byte{ + 0, 0, // Version 0, 0, 0, 2, // Topic array length 0, 3, 'o', 'n', 'e', // Topic one 0, 3, 't', 'w', 'o', // Topic two 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata } - groupMemberAssignment = []byte{ - 0, 1, // Version + groupMemberAssignmentV0 = []byte{ + 0, 0, // Version 0, 0, 0, 1, // Topic array length 0, 3, 'o', 'n', 'e', // Topic one 0, 0, 0, 3, // Topic one, partition array length 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata } + + // notably it looks like the old 3rdparty bsm/sarama-cluster incorrectly + // set V1 in the member metadata when it sent the JoinGroup request so + // we need to cope with that one being too short + groupMemberMetadataV1Bad = []byte{ + 0, 1, // Version + 0, 0, 0, 2, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 3, 't', 'w', 'o', // Topic two + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + } + + groupMemberMetadataV1 = []byte{ + 0, 1, // Version + 0, 0, 0, 2, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 3, 't', 'w', 'o', // Topic two + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + 0, 0, 0, 0, // OwnedPartitions KIP-429 + } ) func TestConsumerGroupMemberMetadata(t *testing.T) { meta := &ConsumerGroupMemberMetadata{ - Version: 1, + Version: 0, Topics: []string{"one", "two"}, UserData: []byte{0x01, 0x02, 0x03}, } @@ -34,8 +54,8 @@ func TestConsumerGroupMemberMetadata(t *testing.T) { buf, err := encode(meta, nil) if err != nil { t.Error("Failed to encode data", err) - } else if !bytes.Equal(groupMemberMetadata, buf) { - t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf) + } else if !bytes.Equal(groupMemberMetadataV0, buf) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadataV0, buf) } meta2 := new(ConsumerGroupMemberMetadata) @@ -47,9 +67,19 @@ func TestConsumerGroupMemberMetadata(t *testing.T) { } } +func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) { + meta := new(ConsumerGroupMemberMetadata) + if err := decode(groupMemberMetadataV1, meta); err != nil { + t.Error("Failed to decode V1 data", err) + } + if err := decode(groupMemberMetadataV1Bad, meta); err != nil { + t.Error("Failed to decode V1 'bad' data", err) + } +} + func TestConsumerGroupMemberAssignment(t *testing.T) { amt := &ConsumerGroupMemberAssignment{ - Version: 1, + Version: 0, Topics: map[string][]int32{ "one": {0, 2, 4}, }, @@ -59,8 +89,8 @@ func TestConsumerGroupMemberAssignment(t *testing.T) { buf, err := encode(amt, nil) if err != nil { t.Error("Failed to encode data", err) - } else if !bytes.Equal(groupMemberAssignment, buf) { - t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf) + } else if !bytes.Equal(groupMemberAssignmentV0, buf) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignmentV0, buf) } amt2 := new(ConsumerGroupMemberAssignment) diff --git a/describe_groups_response.go b/describe_groups_response.go index bc242e421..1027b33d0 100644 --- a/describe_groups_response.go +++ b/describe_groups_response.go @@ -179,12 +179,18 @@ func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) { } func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) { + if len(gmd.MemberAssignment) == 0 { + return nil, nil + } assignment := new(ConsumerGroupMemberAssignment) err := decode(gmd.MemberAssignment, assignment) return assignment, err } func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) { + if len(gmd.MemberMetadata) == 0 { + return nil, nil + } metadata := new(ConsumerGroupMemberMetadata) err := decode(gmd.MemberMetadata, metadata) return metadata, err