Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: include assignment-less members in SyncGroup #2292

Merged
merged 1 commit into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,9 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler

// Prepare distribution plan if we joined as the leader
var plan BalanceStrategyPlan
var members map[string]ConsumerGroupMemberMetadata
if join.LeaderId == join.MemberId {
members, err := join.GetMembers()
members, err = join.GetMembers()
if err != nil {
return nil, err
}
Expand All @@ -334,7 +335,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
}

// Sync consumer group
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
groupRequest, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId)
if consumerGroupSyncTotal != nil {
consumerGroupSyncTotal.Inc(1)
}
Expand Down Expand Up @@ -426,15 +427,22 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
return coordinator.JoinGroup(req)
}

func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
func (c *consumerGroup) syncGroupRequest(
coordinator *Broker,
members map[string]ConsumerGroupMemberMetadata,
plan BalanceStrategyPlan,
generationID int32,
) (*SyncGroupResponse, error) {
req := &SyncGroupRequest{
GroupId: c.groupID,
MemberId: c.memberID,
GenerationId: generationID,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy
if c.groupInstanceId != nil {
if c.config.Version.IsAtLeast(V2_3_0_0) {
req.Version = 3
}
if c.groupInstanceId != nil {
req.GroupInstanceId = c.groupInstanceId
}
for memberID, topics := range plan {
Expand All @@ -447,7 +455,15 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
return nil, err
}
delete(members, memberID)
}
// add empty assignments for any remaining members
for memberID := range members {
if err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{}); err != nil {
return nil, err
}
}

return coordinator.SyncGroup(req)
}

Expand Down
4 changes: 2 additions & 2 deletions examples/consumergroup/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ module github.com/Shopify/sarama/examples/consumer

go 1.16

replace github.com/Shopify/sarama => ../../
require github.com/Shopify/sarama v1.34.1

require github.com/Shopify/sarama v1.27.0
replace github.com/Shopify/sarama => ../../
486 changes: 458 additions & 28 deletions examples/consumergroup/go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func testVersionDecodable(t *testing.T, name string, out versionedDecoder, in []
}

func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) {
t.Helper()
if !rb.requiredVersion().IsAtLeast(MinVersion) {
t.Errorf("Request %s has invalid required version", name)
}
Expand Down Expand Up @@ -74,6 +75,7 @@ func testRequestEncode(t *testing.T, name string, rb protocolBody, expected []by
}

func testRequestDecode(t *testing.T, name string, rb protocolBody, packet []byte) {
t.Helper()
decoded, n, err := decodeRequest(bytes.NewReader(packet))
if err != nil {
t.Error("Failed to decode request", err)
Expand Down
135 changes: 82 additions & 53 deletions sync_group_request.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,111 @@
package sarama

type SyncGroupRequestAssignment struct {
// MemberId contains the ID of the member to assign.
MemberId string
// Assignment contains the member assignment.
Assignment []byte
}

func (a *SyncGroupRequestAssignment) encode(pe packetEncoder, version int16) (err error) {
if err := pe.putString(a.MemberId); err != nil {
return err
}

if err := pe.putBytes(a.Assignment); err != nil {
return err
}

return nil
}

func (a *SyncGroupRequestAssignment) decode(pd packetDecoder, version int16) (err error) {
if a.MemberId, err = pd.getString(); err != nil {
return err
}

if a.Assignment, err = pd.getBytes(); err != nil {
return err
}

return nil
}

type SyncGroupRequest struct {
Version int16
GroupId string
GenerationId int32
MemberId string
GroupInstanceId *string
GroupAssignments map[string][]byte
// Version defines the protocol version to use for encode and decode
Version int16
// GroupId contains the unique group identifier.
GroupId string
// GenerationId contains the generation of the group.
GenerationId int32
// MemberId contains the member ID assigned by the group.
MemberId string
// GroupInstanceId contains the unique identifier of the consumer instance provided by end user.
GroupInstanceId *string
// GroupAssignments contains each assignment.
GroupAssignments []SyncGroupRequestAssignment
}

func (r *SyncGroupRequest) encode(pe packetEncoder) error {
if err := pe.putString(r.GroupId); err != nil {
func (s *SyncGroupRequest) encode(pe packetEncoder) (err error) {
if err := pe.putString(s.GroupId); err != nil {
return err
}

pe.putInt32(r.GenerationId)
pe.putInt32(s.GenerationId)

if err := pe.putString(r.MemberId); err != nil {
if err := pe.putString(s.MemberId); err != nil {
return err
}

if r.Version >= 3 {
if err := pe.putNullableString(r.GroupInstanceId); err != nil {
if s.Version >= 3 {
if err := pe.putNullableString(s.GroupInstanceId); err != nil {
return err
}
}

if err := pe.putArrayLength(len(r.GroupAssignments)); err != nil {
if err := pe.putArrayLength(len(s.GroupAssignments)); err != nil {
return err
}
for memberId, memberAssignment := range r.GroupAssignments {
if err := pe.putString(memberId); err != nil {
return err
}
if err := pe.putBytes(memberAssignment); err != nil {
for _, block := range s.GroupAssignments {
if err := block.encode(pe, s.Version); err != nil {
return err
}
}

return nil
}

func (r *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version

if r.GroupId, err = pd.getString(); err != nil {
return
}
if r.GenerationId, err = pd.getInt32(); err != nil {
return
}
if r.MemberId, err = pd.getString(); err != nil {
return
}
if r.Version >= 3 {
if r.GroupInstanceId, err = pd.getNullableString(); err != nil {
return
}
func (s *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
s.Version = version
if s.GroupId, err = pd.getString(); err != nil {
return err
}

n, err := pd.getArrayLength()
if err != nil {
if s.GenerationId, err = pd.getInt32(); err != nil {
return err
}
if n == 0 {
return nil

if s.MemberId, err = pd.getString(); err != nil {
return err
}

r.GroupAssignments = make(map[string][]byte)
for i := 0; i < n; i++ {
memberId, err := pd.getString()
if err != nil {
return err
}
memberAssignment, err := pd.getBytes()
if err != nil {
if s.Version >= 3 {
if s.GroupInstanceId, err = pd.getNullableString(); err != nil {
return err
}
}

r.GroupAssignments[memberId] = memberAssignment
if numAssignments, err := pd.getArrayLength(); err != nil {
return err
} else if numAssignments > 0 {
s.GroupAssignments = make([]SyncGroupRequestAssignment, numAssignments)
for i := 0; i < numAssignments; i++ {
var block SyncGroupRequestAssignment
if err := block.decode(pd, s.Version); err != nil {
return err
}
s.GroupAssignments[i] = block
}
}

return nil
Expand All @@ -105,14 +132,16 @@ func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
}

func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) {
if r.GroupAssignments == nil {
r.GroupAssignments = make(map[string][]byte)
}

r.GroupAssignments[memberId] = memberAssignment
r.GroupAssignments = append(r.GroupAssignments, SyncGroupRequestAssignment{
MemberId: memberId,
Assignment: memberAssignment,
})
}

func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error {
func (r *SyncGroupRequest) AddGroupAssignmentMember(
memberId string,
memberAssignment *ConsumerGroupMemberAssignment,
) error {
bin, err := encode(memberAssignment, nil)
if err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions sync_group_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ func TestSyncGroupRequestV3AndPlus(t *testing.T) {
GenerationId: 0x00010203,
MemberId: "baz",
GroupInstanceId: &groupInstanceId,
GroupAssignments: map[string][]byte{
"baz": []byte("foo"),
GroupAssignments: []SyncGroupRequestAssignment{
{
MemberId: "baz",
Assignment: []byte("foo"),
},
},
},
},
Expand Down
12 changes: 9 additions & 3 deletions sync_group_response.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package sarama

type SyncGroupResponse struct {
Version int16
ThrottleTime int32
Err KError
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTimeMs contains the duration in milliseconds for which the
// request was throttled due to a quota violation, or zero if the request
// did not violate any quota.
ThrottleTime int32
// Err contains the error code, or 0 if there was no error.
Err KError
// MemberAssignment contains the member assignment.
MemberAssignment []byte
}

Expand Down