Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer groups on Kafka 0.9 #588

Merged
merged 4 commits into from
Jan 4, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,50 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
return response, nil
}

func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
response := new(JoinGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
response := new(SyncGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
response := new(LeaveGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
response := new(HeartbeatResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down
48 changes: 48 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,52 @@ var brokerTestTable = []struct {
t.Error("Offset request got no response!")
}
}},

{[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := JoinGroupRequest{}
response, err := broker.JoinGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("JoinGroup request got no response!")
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := SyncGroupRequest{}
response, err := broker.SyncGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("SyncGroup request got no response!")
}
}},

{[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := LeaveGroupRequest{}
response, err := broker.LeaveGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("LeaveGroup request got no response!")
}
}},

{[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := HeartbeatRequest{}
response, err := broker.Heartbeat(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("Heartbeat request got no response!")
}
}},
}
31 changes: 31 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ type Config struct {
RefreshFrequency time.Duration
}

// Group is the namespace for group management properties
Group struct {
Session struct {
// The allowed session timeout for registered consumers (defaults to 30s).
// Must be within the allowed server range.
Timeout time.Duration
}
Heartbeat struct {
// Interval between each heartbeat (defaults to 3s). It should be no more
// than 1/3rd of the Group.Session.Timout setting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could check this in Config.Validate()

Interval time.Duration
}
}

// Producer is the namespace for configuration related to producing messages,
// used by the Producer.
Producer struct {
Expand Down Expand Up @@ -212,6 +226,9 @@ func NewConfig() *Config {
c.Metadata.Retry.Backoff = 250 * time.Millisecond
c.Metadata.RefreshFrequency = 10 * time.Minute

c.Group.Session.Timeout = 30 * time.Second
c.Group.Heartbeat.Interval = 3 * time.Second

c.Producer.MaxMessageBytes = 1000000
c.Producer.RequiredAcks = WaitForLocal
c.Producer.Timeout = 10 * time.Second
Expand Down Expand Up @@ -259,6 +276,12 @@ func (c *Config) Validate() error {
if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Group.Heartbeat.Interval%time.Millisecond != 0 {
Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Group.Session.Timeout%time.Millisecond != 0 {
Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
}
if c.ClientID == "sarama" {
Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
}
Expand Down Expand Up @@ -287,6 +310,14 @@ func (c *Config) Validate() error {
return ConfigurationError("Metadata.RefreshFrequency must be >= 0")
}

// validate the Group values
switch {
case c.Group.Heartbeat.Interval <= 0:
return ConfigurationError("Group.Heartbeat.Interval must be > 0")
case c.Group.Session.Timeout <= 0:
return ConfigurationError("Group.Session.Timeout must be > 0")
}

// validate the Producer values
switch {
case c.Producer.MaxMessageBytes <= 0:
Expand Down
94 changes: 94 additions & 0 deletions group_members.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package sarama

type GroupMemberMetadata struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for naming, we should probably call this ConsumerGroupMemberMetadata. This is a specialization of the generic group type.

Version int16
Topics []string
UserData []byte
}

func (m *GroupMemberMetadata) encode(pe packetEncoder) error {
pe.putInt16(m.Version)

if err := pe.putStringArray(m.Topics); err != nil {
return err
}

if err := pe.putBytes(m.UserData); err != nil {
return err
}

return nil
}

func (m *GroupMemberMetadata) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}

if m.Topics, err = pd.getStringArray(); err != nil {
return
}

if m.UserData, err = pd.getBytes(); err != nil {
return
}

return nil
}

type GroupMemberAssignment struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, ConsumerGroupMemberAssignment

We should probably also name the file consumer_group_members.rb

Version int16
Topics map[string][]int32
UserData []byte
}

func (m *GroupMemberAssignment) encode(pe packetEncoder) error {
pe.putInt16(m.Version)

if err := pe.putArrayLength(len(m.Topics)); err != nil {
return err
}

for topic, partitions := range m.Topics {
if err := pe.putString(topic); err != nil {
return err
}
if err := pe.putInt32Array(partitions); err != nil {
return err
}
}

if err := pe.putBytes(m.UserData); err != nil {
return err
}

return nil
}

func (m *GroupMemberAssignment) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}

var topicLen int
if topicLen, err = pd.getArrayLength(); err != nil {
return
}

m.Topics = make(map[string][]int32, topicLen)
for i := 0; i < topicLen; i++ {
var topic string
if topic, err = pd.getString(); err != nil {
return
}
if m.Topics[topic], err = pd.getInt32Array(); err != nil {
return
}
}

if m.UserData, err = pd.getBytes(); err != nil {
return
}

return nil
}
77 changes: 77 additions & 0 deletions group_members_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package sarama

import (
"bytes"
"reflect"
"testing"
)

var (
groupMemberMetadata = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberAssignment = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, // Topic one, partition array length
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 2, // Topic two, partition array length
0, 0, 0, 1, 0, 0, 0, 3, // 1, 3
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
)

func TestGroupMemberMetadata(t *testing.T) {
meta := &GroupMemberMetadata{
Version: 1,
Topics: []string{"one", "two"},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(meta)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadata, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf)
}

meta2 := new(GroupMemberMetadata)
err = decode(buf, meta2)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(meta, meta2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", meta, meta2)
}
}

func TestGroupMemberAssignment(t *testing.T) {
amt := &GroupMemberAssignment{
Version: 1,
Topics: map[string][]int32{
"one": []int32{0, 2, 4},
"two": []int32{1, 3},
},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(amt)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignment, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf)
}

amt2 := new(GroupMemberAssignment)
err = decode(buf, amt2)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(amt, amt2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", amt, amt2)
}
}
10 changes: 10 additions & 0 deletions join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,13 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {

r.GroupProtocols[name] = metadata
}

func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *GroupMemberMetadata) error {
bin, err := encode(metadata)
if err != nil {
return err
}

r.AddGroupProtocol(name, bin)
return nil
}
12 changes: 12 additions & 0 deletions join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ type JoinGroupResponse struct {
Members map[string][]byte
}

func (r *JoinGroupResponse) GetMembers() (map[string]GroupMemberMetadata, error) {
members := make(map[string]GroupMemberMetadata, len(r.Members))
for id, bin := range r.Members {
meta := new(GroupMemberMetadata)
if err := decode(bin, meta); err != nil {
return nil, err
}
members[id] = *meta
}
return members, nil
}

func (r *JoinGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
pe.putInt32(r.GenerationId)
Expand Down
10 changes: 10 additions & 0 deletions sync_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,13 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment

r.GroupAssignments[memberId] = memberAssignment
}

func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *GroupMemberAssignment) error {
bin, err := encode(memberAssignment)
if err != nil {
return err
}

r.AddGroupAssignment(memberId, bin)
return nil
}
6 changes: 6 additions & 0 deletions sync_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ type SyncGroupResponse struct {
MemberAssignment []byte
}

func (r *SyncGroupResponse) GetMemberAssignment() (*GroupMemberAssignment, error) {
assignment := new(GroupMemberAssignment)
err := decode(r.MemberAssignment, assignment)
return assignment, err
}

func (r *SyncGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
return pe.putBytes(r.MemberAssignment)
Expand Down