diff --git a/broker.go b/broker.go index 46f06a0f3..5bd1df2d6 100644 --- a/broker.go +++ b/broker.go @@ -239,6 +239,50 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, return response, nil } +func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) { + response := new(JoinGroupResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) { + response := new(SyncGroupResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) { + response := new(LeaveGroupResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) { + response := new(HeartbeatResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/broker_test.go b/broker_test.go index f1aa2e8ba..590a4dc28 100644 --- a/broker_test.go +++ b/broker_test.go @@ -176,4 +176,52 @@ var brokerTestTable = []struct { t.Error("Offset request got no response!") } }}, + + {[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := JoinGroupRequest{} + response, err := broker.JoinGroup(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("JoinGroup request got no response!") + } + }}, + + {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := SyncGroupRequest{} + response, err := broker.SyncGroup(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("SyncGroup request got no response!") + } + }}, + + {[]byte{0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := LeaveGroupRequest{} + response, err := broker.LeaveGroup(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("LeaveGroup request got no response!") + } + }}, + + {[]byte{0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := HeartbeatRequest{} + response, err := broker.Heartbeat(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("Heartbeat request got no response!") + } + }}, } diff --git a/consumer_group_members.go b/consumer_group_members.go new file mode 100644 index 000000000..9d92d350a --- /dev/null +++ b/consumer_group_members.go @@ -0,0 +1,94 @@ +package sarama + +type ConsumerGroupMemberMetadata struct { + Version int16 + Topics []string + UserData []byte +} + +func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error { + pe.putInt16(m.Version) + + if err := pe.putStringArray(m.Topics); err != nil { + return err + } + + if err := pe.putBytes(m.UserData); err != nil { + return err + } + + return nil +} + +func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) { + if m.Version, err = pd.getInt16(); err != nil { + return + } + + if m.Topics, err = pd.getStringArray(); err != nil { + return + } + + if m.UserData, err = pd.getBytes(); err != nil { + return + } + + return nil +} + +type ConsumerGroupMemberAssignment struct { + Version int16 + Topics map[string][]int32 + UserData []byte +} + +func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error { + pe.putInt16(m.Version) + + if err := pe.putArrayLength(len(m.Topics)); err != nil { + return err + } + + for topic, partitions := range m.Topics { + if err := pe.putString(topic); err != nil { + return err + } + if err := pe.putInt32Array(partitions); err != nil { + return err + } + } + + if err := pe.putBytes(m.UserData); err != nil { + return err + } + + return nil +} + +func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) { + if m.Version, err = pd.getInt16(); err != nil { + return + } + + var topicLen int + if topicLen, err = pd.getArrayLength(); err != nil { + return + } + + m.Topics = make(map[string][]int32, topicLen) + for i := 0; i < topicLen; i++ { + var topic string + if topic, err = pd.getString(); err != nil { + return + } + if m.Topics[topic], err = pd.getInt32Array(); err != nil { + return + } + } + + if m.UserData, err = pd.getBytes(); err != nil { + return + } + + return nil +} diff --git a/consumer_group_members_test.go b/consumer_group_members_test.go new file mode 100644 index 000000000..4213d81ee --- /dev/null +++ b/consumer_group_members_test.go @@ -0,0 +1,77 @@ +package sarama + +import ( + "bytes" + "reflect" + "testing" +) + +var ( + groupMemberMetadata = []byte{ + 0, 1, // Version + 0, 0, 0, 2, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 3, 't', 'w', 'o', // Topic two + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + } + groupMemberAssignment = []byte{ + 0, 1, // Version + 0, 0, 0, 2, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 0, 0, 3, // Topic one, partition array length + 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4 + 0, 3, 't', 'w', 'o', // Topic two + 0, 0, 0, 2, // Topic two, partition array length + 0, 0, 0, 1, 0, 0, 0, 3, // 1, 3 + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + } +) + +func TestConsumerGroupMemberMetadata(t *testing.T) { + meta := &ConsumerGroupMemberMetadata{ + Version: 1, + Topics: []string{"one", "two"}, + UserData: []byte{0x01, 0x02, 0x03}, + } + + buf, err := encode(meta) + if err != nil { + t.Error("Failed to encode data", err) + } else if !bytes.Equal(groupMemberMetadata, buf) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf) + } + + meta2 := new(ConsumerGroupMemberMetadata) + err = decode(buf, meta2) + if err != nil { + t.Error("Failed to decode data", err) + } else if !reflect.DeepEqual(meta, meta2) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", meta, meta2) + } +} + +func TestConsumerGroupMemberAssignment(t *testing.T) { + amt := &ConsumerGroupMemberAssignment{ + Version: 1, + Topics: map[string][]int32{ + "one": []int32{0, 2, 4}, + "two": []int32{1, 3}, + }, + UserData: []byte{0x01, 0x02, 0x03}, + } + + buf, err := encode(amt) + if err != nil { + t.Error("Failed to encode data", err) + } else if !bytes.Equal(groupMemberAssignment, buf) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf) + } + + amt2 := new(ConsumerGroupMemberAssignment) + err = decode(buf, amt2) + if err != nil { + t.Error("Failed to decode data", err) + } else if !reflect.DeepEqual(amt, amt2) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", amt, amt2) + } +} diff --git a/join_group_request.go b/join_group_request.go index 8bb5ce826..5884d79d4 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -92,3 +92,13 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) { r.GroupProtocols[name] = metadata } + +func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error { + bin, err := encode(metadata) + if err != nil { + return err + } + + r.AddGroupProtocol(name, bin) + return nil +} diff --git a/join_group_response.go b/join_group_response.go index 037a9cd26..16f6b9b40 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -9,6 +9,18 @@ type JoinGroupResponse struct { Members map[string][]byte } +func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) { + members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members)) + for id, bin := range r.Members { + meta := new(ConsumerGroupMemberMetadata) + if err := decode(bin, meta); err != nil { + return nil, err + } + members[id] = *meta + } + return members, nil +} + func (r *JoinGroupResponse) encode(pe packetEncoder) error { pe.putInt16(int16(r.Err)) pe.putInt32(r.GenerationId) diff --git a/sync_group_request.go b/sync_group_request.go index 60be6f3f3..031cf0f2f 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -84,3 +84,13 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment r.GroupAssignments[memberId] = memberAssignment } + +func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error { + bin, err := encode(memberAssignment) + if err != nil { + return err + } + + r.AddGroupAssignment(memberId, bin) + return nil +} diff --git a/sync_group_response.go b/sync_group_response.go index e10685ef8..49c86922d 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -5,6 +5,12 @@ type SyncGroupResponse struct { MemberAssignment []byte } +func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) { + assignment := new(ConsumerGroupMemberAssignment) + err := decode(r.MemberAssignment, assignment) + return assignment, err +} + func (r *SyncGroupResponse) encode(pe packetEncoder) error { pe.putInt16(int16(r.Err)) return pe.putBytes(r.MemberAssignment)