Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-345: support static membership #2230

Merged
merged 1 commit into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ type ClusterAdmin interface {
// locally cached value if it's available.
Controller() (*Broker, error)

// Remove members from the consumer group by given member identities.
// This operation is supported by brokers with version 2.3 or higher
// This is for static membership feature. KIP-345
RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -900,9 +905,13 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group
}

for broker, brokerGroups := range groupsPerBroker {
response, err := broker.DescribeGroups(&DescribeGroupsRequest{
describeReq := &DescribeGroupsRequest{
Groups: brokerGroups,
})
}
if ca.conf.Version.IsAtLeast(V2_3_0_0) {
describeReq.Version = 4
}
response, err := broker.DescribeGroups(describeReq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1196,3 +1205,21 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie

return nil
}

func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
controller, err := ca.client.Coordinator(groupId)
if err != nil {
return nil, err
}
request := &LeaveGroupRequest{
Version: 3,
GroupId: groupId,
}
for _, instanceId := range groupInstanceIds {
groupInstanceId := instanceId
request.Members = append(request.Members, MemberIdentity{
GroupInstanceId: &groupInstanceId,
})
}
return controller.LeaveGroup(request)
}
31 changes: 31 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ type Config struct {
// coordinator for the group.
UserData []byte
}
// support KIP-345
InstanceId string
}

Retry struct {
Expand Down Expand Up @@ -509,6 +511,7 @@ func NewConfig() *Config {

// Validate checks a Config instance. It will return a
// ConfigurationError if the specified values don't make sense.
//nolint:gocyclo // This function's cyclomatic complexity has go beyond 100
func (c *Config) Validate() error {
// some configuration values should be warned on but not fail completely, do those first
if !c.Net.TLS.Enable && c.Net.TLS.Config != nil {
Expand Down Expand Up @@ -754,6 +757,14 @@ func (c *Config) Validate() error {
case c.Consumer.Group.Rebalance.Retry.Backoff < 0:
return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
}
if c.Consumer.Group.InstanceId != "" {
if !c.Version.IsAtLeast(V2_3_0_0) {
return ConfigurationError("Consumer.Group.InstanceId need Version >= 2.3")
}
if err := validateGroupInstanceId(c.Consumer.Group.InstanceId); err != nil {
return err
}
}

// validate misc shared values
switch {
Expand All @@ -778,3 +789,23 @@ func (c *Config) getDialer() proxy.Dialer {
}
}
}

const MAX_GROUP_INSTANCE_ID_LENGTH = 249

var GROUP_INSTANCE_ID_REGEXP = regexp.MustCompile(`^[0-9a-zA-Z\._\-]+$`)

func validateGroupInstanceId(id string) error {
if id == "" {
return ConfigurationError("Group instance id must be non-empty string")
}
if id == "." || id == ".." {
return ConfigurationError(`Group instance id cannot be "." or ".."`)
}
if len(id) > MAX_GROUP_INSTANCE_ID_LENGTH {
return ConfigurationError(fmt.Sprintf(`Group instance id cannot be longer than %v, characters: %s`, MAX_GROUP_INSTANCE_ID_LENGTH, id))
}
if !GROUP_INSTANCE_ID_REGEXP.MatchString(id) {
return ConfigurationError(fmt.Sprintf(`Group instance id %s is illegal, it contains a character other than, '.', '_' and '-'`, id))
}
return nil
}
45 changes: 45 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"errors"
"os"
"strings"
"testing"

"github.com/rcrowley/go-metrics"
Expand Down Expand Up @@ -501,6 +502,50 @@ func TestZstdConfigValidation(t *testing.T) {
}
}

func TestValidGroupInstanceId(t *testing.T) {
tests := []struct {
grouptInstanceId string
shouldHaveErr bool
}{
{"groupInstanceId1", false},
{"", true},
{".", true},
{"..", true},
{strings.Repeat("a", 250), true},
{"group_InstanceId.1", false},
{"group-InstanceId1", false},
{"group#InstanceId1", true},
}
for _, testcase := range tests {
err := validateGroupInstanceId(testcase.grouptInstanceId)
if !testcase.shouldHaveErr {
if err != nil {
t.Errorf("Expected validGroupInstanceId %s to pass, got error %v", testcase.grouptInstanceId, err)
}
} else {
if err == nil {
t.Errorf("Expected validGroupInstanceId %s to be error, got nil", testcase.grouptInstanceId)
}
var target ConfigurationError
if !errors.As(err, &target) {
t.Errorf("Excepted err to be ConfigurationError, got %v", err)
}
}
}
}

func TestGroupInstanceIdAndVersionValidation(t *testing.T) {
config := NewTestConfig()
config.Consumer.Group.InstanceId = "groupInstanceId1"
if err := config.Validate(); !strings.Contains(err.Error(), "Consumer.Group.InstanceId need Version >= 2.3") {
t.Error("Expected invalid group instance error, got ", err)
}
config.Version = V2_3_0_0
if err := config.Validate(); err != nil {
t.Error("Expected group instance to work, got ", err)
}
}

// This example shows how to integrate with an existing registry as well as publishing metrics
// on the standard output
func ExampleConfig_metrics() {
Expand Down
99 changes: 72 additions & 27 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@ type ConsumerGroup interface {
type consumerGroup struct {
client Client

config *Config
consumer Consumer
groupID string
memberID string
errors chan error
config *Config
consumer Consumer
groupID string
groupInstanceId *string
memberID string
errors chan error

lock sync.Mutex
closed chan none
Expand Down Expand Up @@ -127,15 +128,19 @@ func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
return nil, err
}

return &consumerGroup{
cg := &consumerGroup{
client: client,
consumer: consumer,
config: config,
groupID: groupID,
errors: make(chan error, config.ChannelBufferSize),
closed: make(chan none),
userData: config.Consumer.Group.Member.UserData,
}, nil
}
if client.Config().Consumer.Group.InstanceId != "" && config.Version.IsAtLeast(V2_3_0_0) {
cg.groupInstanceId = &client.Config().Consumer.Group.InstanceId
}
return cg, nil
}

// Errors implements ConsumerGroup.
Expand Down Expand Up @@ -300,6 +305,16 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return nil, join.Err
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrMemberIdRequired:
// from JoinGroupRequest v4, if client start with empty member id,
// it need to get member id from response and send another join request to join group
c.memberID = join.MemberId
return c.retryNewSession(ctx, topics, handler, retries+1 /*keep retry time*/, false)
case ErrFencedInstancedId:
if c.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
}
return nil, join.Err
default:
return nil, join.Err
}
Expand Down Expand Up @@ -348,6 +363,11 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return nil, groupRequest.Err
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrFencedInstancedId:
if c.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
}
return nil, groupRequest.Err
default:
return nil, groupRequest.Err
}
Expand Down Expand Up @@ -389,6 +409,10 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
req.Version = 1
req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
}
if c.groupInstanceId != nil {
req.Version = 5
req.GroupInstanceId = c.groupInstanceId
}

meta := &ConsumerGroupMemberMetadata{
Topics: topics,
Expand All @@ -409,6 +433,10 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
GenerationId: generationID,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy
if c.groupInstanceId != nil {
req.Version = 3
req.GroupInstanceId = c.groupInstanceId
}
for memberID, topics := range plan {
assignment := &ConsumerGroupMemberAssignment{Topics: topics}
userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
Expand All @@ -429,6 +457,10 @@ func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, g
MemberId: memberID,
GenerationId: generationID,
}
if c.groupInstanceId != nil {
req.Version = 3
req.GroupInstanceId = c.groupInstanceId
}

return coordinator.Heartbeat(req)
}
Expand Down Expand Up @@ -466,25 +498,32 @@ func (c *consumerGroup) leave() error {
return err
}

resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
GroupId: c.groupID,
MemberId: c.memberID,
})
if err != nil {
_ = coordinator.Close()
return err
}
// KIP-345 if groupInstanceId is set, don not leave group when consumer closed.
// Since we do not discover ApiVersion for brokers, LeaveGroupRequest still use the old version request for now
if c.groupInstanceId == nil {
resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
GroupId: c.groupID,
MemberId: c.memberID,
})
if err != nil {
_ = coordinator.Close()
return err
}

// Unset memberID
c.memberID = ""
// Unset memberID
c.memberID = ""

// Check response
switch resp.Err {
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
return nil
default:
return resp.Err
// Check response
switch resp.Err {
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
return nil
default:
return resp.Err
}
} else {
c.memberID = ""
}
return nil
}

func (c *consumerGroup) handleError(err error, topic string, partition int32) {
Expand Down Expand Up @@ -628,15 +667,15 @@ type consumerGroupSession struct {
}

func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
// init context
ctx, cancel := context.WithCancel(ctx)

// init offset manager
offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client, cancel)
if err != nil {
return nil, err
}

// init context
ctx, cancel := context.WithCancel(ctx)

// init session
sess := &consumerGroupSession{
parent: parent,
Expand Down Expand Up @@ -862,6 +901,12 @@ func (s *consumerGroupSession) heartbeatLoop() {
s.cancel()
case ErrUnknownMemberId, ErrIllegalGeneration:
return
case ErrFencedInstancedId:
if s.parent.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *s.parent.groupInstanceId)
}
s.parent.handleError(resp.Err, "", -1)
return
default:
s.parent.handleError(resp.Err, "", -1)
return
Expand Down
Loading