Skip to content

Commit

Permalink
[WIP] Implement new Group protocol messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Dec 7, 2015
1 parent c36adfa commit c48c940
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 0 deletions.
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ var ErrShuttingDown = errors.New("kafka: message received by producer in process
// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")

// ErrMessageTooLarge is returned when a JoinGroup request returns a protocol type that is not supported by sarama.
var ErrUnknownGroupProtocol = errors.New("kafka: encountered an unknown group protocol")

// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
type PacketEncodingError struct {
Expand Down
111 changes: 111 additions & 0 deletions join_group_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package sarama

type JoinGroupRequest struct {
GroupId string
SessionTimeout int32
MemberId string
ProtocolType string
GroupProtocols []GroupProtocol
}

func (r *JoinGroupRequest) encode(pe packetEncoder) error {
if err := pe.putString(r.GroupId); err != nil {
return err
}
pe.putInt32(r.SessionTimeout)
if err := pe.putString(r.MemberId); err != nil {
return err
}
if err := pe.putString(r.ProtocolType); err != nil {
return err
}

if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
return err
}
for _, groupProtocol := range r.GroupProtocols {
if err := groupProtocol.encodeGroupProtocol(pe); err != nil {
return err
}
}

return nil
}

func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) {
r.GroupId, err = pd.getString()
if err != nil {
return
}

r.SessionTimeout, err = pd.getInt32()
if err != nil {
return
}

r.MemberId, err = pd.getString()
if err != nil {
return
}

r.ProtocolType, err = pd.getString()
if err != nil {
return
}

switch r.ProtocolType {
case "consumer":
n, err := pd.getArrayLength()
if err != nil {
return err
}

r.GroupProtocols = make([]GroupProtocol, n)
for i := 0; i < n; i++ {
r.GroupProtocols[i] = new(ConsumerGroupProtocol)
if err := r.GroupProtocols[i].decodeGroupProtocol(pd); err != nil {
return nil
}
}

default:
return ErrUnknownGroupProtocol
}

return nil
}

func (r *JoinGroupRequest) key() int16 {
return 11
}

func (r *JoinGroupRequest) version() int16 {
return 0
}

type GroupProtocol interface {
encodeGroupProtocol(packetEncoder) error
decodeGroupProtocol(packetDecoder) error
}

type ConsumerGroupProtocol struct {
ProtocolName string
Version int16
Subscription []string
UserData []byte
}

func (cgp *ConsumerGroupProtocol) encodeGroupProtocol(pe packetEncoder) error {
if err := pe.putString(cgp.ProtocolName); err != nil {
return err
}
pe.putInt16(cgp.Version)
if err := pe.putStringArray(cgp.Subscription); err != nil {
return err
}
return pe.putBytes(cgp.UserData)
}

func (cgp *ConsumerGroupProtocol) decodeGroupProtocol(pd packetDecoder) error {
return nil
}
111 changes: 111 additions & 0 deletions join_group_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package sarama

type JoinGroupResponse struct {
ErrorCode int16
GenerationId int32
GroupProtocol string
LeaderId string
MemberId string
Members []*GroupMember
}

func (r *JoinGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(r.ErrorCode)
pe.putInt32(r.GenerationId)

if err := pe.putString(r.GroupProtocol); err != nil {
return err
}

if err := pe.putString(r.LeaderId); err != nil {
return err
}

if err := pe.putString(r.MemberId); err != nil {
return err
}

if err := pe.putArrayLength(len(r.Members)); err != nil {
return err
}
for _, member := range r.Members {
if err := member.encode(pe); err != nil {
return err
}
}

return nil
}

func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) {
r.ErrorCode, err = pd.getInt16()
if err != nil {
return err
}

r.GenerationId, err = pd.getInt32()
if err != nil {
return err
}

r.GroupProtocol, err = pd.getString()
if err != nil {
return err
}

r.LeaderId, err = pd.getString()
if err != nil {
return err
}

r.MemberId, err = pd.getString()
if err != nil {
return err
}

n, err := pd.getArrayLength()
if err != nil {
return err
}

r.Members = make([]*GroupMember, n)
for i := 0; i < n; i++ {
r.Members[i] = new(GroupMember)
if err := r.Members[i].decode(pd); err != nil {
return nil
}
}

return nil
}

type GroupMember struct {
MemberId string
MemberMetadata []byte
}

func (gm *GroupMember) encode(pe packetEncoder) error {
if err := pe.putString(gm.MemberId); err != nil {
return err
}

if err := pe.putBytes(gm.MemberMetadata); err != nil {
return err
}

return nil
}

func (gm *GroupMember) decode(pd packetDecoder) (err error) {
gm.MemberId, err = pd.getString()
if err != nil {
return err
}

gm.MemberMetadata, err = pd.getBytes()
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions packet_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type packetEncoder interface {
putBytes(in []byte) error
putRawBytes(in []byte) error
putString(in string) error
putStringArray(in []string) error
putInt32Array(in []int32) error
putInt64Array(in []int64) error

Expand Down
15 changes: 15 additions & 0 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ func (pe *prepEncoder) putString(in string) error {
return nil
}

func (pe *prepEncoder) putStringArray(in []string) error {
err := pe.putArrayLength(len(in))
if err != nil {
return err
}

for _, str := range in {
if err := pe.putString(str); err != nil {
return err
}
}

return nil
}

func (pe *prepEncoder) putInt32Array(in []int32) error {
err := pe.putArrayLength(len(in))
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions real_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ func (re *realEncoder) putString(in string) error {
return nil
}

func (re *realEncoder) putStringArray(in []string) error {
err := re.putArrayLength(len(in))
if err != nil {
return err
}

for _, val := range in {
if err := re.putString(val); err != nil {
return err
}
}

return nil
}

func (re *realEncoder) putInt32Array(in []int32) error {
err := re.putArrayLength(len(in))
if err != nil {
Expand Down

0 comments on commit c48c940

Please sign in to comment.