Skip to content

Commit

Permalink
fix(consumer): support JoinGroup V4
Browse files Browse the repository at this point in the history
Had previously commented this out until I'd looked into the behaviour
described in the protocol doc, but it looks like we should be safe to
support this.

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Aug 3, 2023
1 parent 50609a0 commit 01ebd8c
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,10 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrMemberIdRequired:
// from JoinGroupRequest v4, if client start with empty member id,
// it need to get member id from response and send another join request to join group
// from JoinGroupRequest v4 onwards (due to KIP-394) if the client starts
// with an empty member id, it needs to get the assigned id from the
// response and send another join request with that id to actually join the
// group
c.memberID = join.MemberId
return c.retryNewSession(ctx, topics, handler, retries+1 /*keep retry time*/, false)
case ErrFencedInstancedId:
Expand Down Expand Up @@ -442,14 +444,13 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
if c.config.Version.IsAtLeast(V2_0_0_0) {
req.Version = 3
}
// XXX: protocol states "Starting from version 4, the client needs to issue a
// second request to join group", so not enabling this until we can
// investigate
/*
if c.config.Version.IsAtLeast(V2_2_0_0) {
req.Version = 4
}
*/
// from JoinGroupRequest v4 onwards (due to KIP-394) the client will actually
// send two JoinGroupRequests, once with the empty member id, and then again
// with the assigned id from the first response. This is handled via the
// ErrMemberIdRequired case.
if c.config.Version.IsAtLeast(V2_2_0_0) {
req.Version = 4
}
if c.config.Version.IsAtLeast(V2_3_0_0) {
req.Version = 5
req.GroupInstanceId = c.groupInstanceId
Expand Down

0 comments on commit 01ebd8c

Please sign in to comment.