From 39cc66a4594c07b4f8eada621145fa0d092a4506 Mon Sep 17 00:00:00 2001 From: aiquestion Date: Tue, 7 Jun 2022 13:04:07 +0800 Subject: [PATCH] Fix: fix describe group failed --- describe_groups_response.go | 59 ++++++++++++++++---------------- describe_groups_response_test.go | 24 ++++++------- functional_admin_test.go | 56 ++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+), 43 deletions(-) diff --git a/describe_groups_response.go b/describe_groups_response.go index 21c9cbc46..7e0daa111 100644 --- a/describe_groups_response.go +++ b/describe_groups_response.go @@ -1,10 +1,9 @@ package sarama type DescribeGroupsResponse struct { - Version int16 - ThrottleTimeMs int32 - Groups []*GroupDescription - AuthorizedOperations int32 + Version int16 + ThrottleTimeMs int32 + Groups []*GroupDescription } func (r *DescribeGroupsResponse) encode(pe packetEncoder) error { @@ -21,9 +20,6 @@ func (r *DescribeGroupsResponse) encode(pe packetEncoder) error { return err } } - if r.Version >= 3 { - pe.putInt32(r.AuthorizedOperations) - } return nil } @@ -48,11 +44,6 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err er return err } } - if r.Version >= 3 { - if r.AuthorizedOperations, err = pd.getInt32(); err != nil { - return err - } - } return nil } @@ -80,12 +71,13 @@ func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion { type GroupDescription struct { Version int16 - Err KError - GroupId string - State string - ProtocolType string - Protocol string - Members map[string]*GroupMemberDescription + Err KError + GroupId string + State string + ProtocolType string + Protocol string + Members map[string]*GroupMemberDescription + AuthorizedOperations int32 } func (gd *GroupDescription) encode(pe packetEncoder) error { @@ -119,6 +111,10 @@ func (gd *GroupDescription) encode(pe packetEncoder) error { } } + if gd.Version >= 3 { + pe.putInt32(gd.AuthorizedOperations) + } + return nil } @@ -147,20 +143,25 @@ func (gd *GroupDescription) decode(pd packetDecoder) (err error) { if err != nil { return err } - if n == 0 { - return nil - } - gd.Members = make(map[string]*GroupMemberDescription) - for i := 0; i < n; i++ { - memberId, err := pd.getString() - if err != nil { - return err + if n > 0 { + gd.Members = make(map[string]*GroupMemberDescription) + for i := 0; i < n; i++ { + memberId, err := pd.getString() + if err != nil { + return err + } + + gd.Members[memberId] = new(GroupMemberDescription) + gd.Members[memberId].Version = gd.Version + if err := gd.Members[memberId].decode(pd); err != nil { + return err + } } + } - gd.Members[memberId] = new(GroupMemberDescription) - gd.Members[memberId].Version = gd.Version - if err := gd.Members[memberId].decode(pd); err != nil { + if gd.Version >= 3 { + if gd.AuthorizedOperations, err = pd.getInt32(); err != nil { return err } } diff --git a/describe_groups_response_test.go b/describe_groups_response_test.go index 75e903600..fbbab815a 100644 --- a/describe_groups_response_test.go +++ b/describe_groups_response_test.go @@ -95,7 +95,6 @@ var ( describeGroupsResponseEmptyV3 = []byte{ 0, 0, 0, 0, // throttle time 0 0, 0, 0, 0, // no groups - 0, 0, 0, 0, // authorizedOperations 0 } describeGroupsResponsePopulatedV3 = []byte{ @@ -113,6 +112,7 @@ var ( 0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host 0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata 0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment + 0, 0, 0, 0, // authorizedOperations 0 0, 30, // ErrGroupAuthorizationFailed 0, 0, @@ -120,14 +120,13 @@ var ( 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, // authorizedOperations 0 + } describeGroupsResponseEmptyV4 = []byte{ 0, 0, 0, 0, // throttle time 0 0, 0, 0, 0, // no groups - 0, 0, 0, 0, // authorizedOperations 0 } describeGroupsResponsePopulatedV4 = []byte{ @@ -146,6 +145,7 @@ var ( 0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host 0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata 0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment + 0, 0, 0, 0, // authorizedOperations 0 0, 30, // ErrGroupAuthorizationFailed 0, 0, @@ -153,8 +153,8 @@ var ( 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, // authorizedOperations 0 + } ) @@ -171,10 +171,9 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) { 3, describeGroupsResponseEmptyV3, &DescribeGroupsResponse{ - Version: 3, - ThrottleTimeMs: int32(0), - Groups: []*GroupDescription{}, - AuthorizedOperations: int32(0), + Version: 3, + ThrottleTimeMs: int32(0), + Groups: []*GroupDescription{}, }, }, { @@ -212,7 +211,6 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) { Members: nil, }, }, - AuthorizedOperations: int32(0), }, }, { @@ -220,10 +218,9 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) { 4, describeGroupsResponseEmptyV4, &DescribeGroupsResponse{ - Version: 4, - ThrottleTimeMs: int32(0), - Groups: []*GroupDescription{}, - AuthorizedOperations: int32(0), + Version: 4, + ThrottleTimeMs: int32(0), + Groups: []*GroupDescription{}, }, }, { @@ -262,7 +259,6 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) { Members: nil, }, }, - AuthorizedOperations: int32(0), }, }, } diff --git a/functional_admin_test.go b/functional_admin_test.go index 8de6481a7..6d12261fe 100644 --- a/functional_admin_test.go +++ b/functional_admin_test.go @@ -123,3 +123,59 @@ func TestFuncAdminQuotas(t *testing.T) { t.Fatal(err) } } + +func TestFuncAdminDescribeGroups(t *testing.T) { + checkKafkaVersion(t, "2.3.0.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + group1 := testFuncConsumerGroupID(t) + group2 := testFuncConsumerGroupID(t) + + kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion) + if err != nil { + t.Fatal(err) + } + + config := NewTestConfig() + config.Version = kafkaVersion + adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + t.Fatal(err) + } + + config1 := NewTestConfig() + config1.ClientID = "M1" + config1.Version = V2_3_0_0 + config1.Consumer.Offsets.Initial = OffsetNewest + m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4") + defer m1.Close() + + config2 := NewTestConfig() + config2.ClientID = "M2" + config2.Version = V2_3_0_0 + config2.Consumer.Offsets.Initial = OffsetNewest + config2.Consumer.Group.InstanceId = "Instance2" + m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, group2, 100, nil, "test.4") + defer m2.Close() + + m1.WaitForState(2) + m2.WaitForState(2) + + res, err := adminClient.DescribeConsumerGroups([]string{group1, group2}) + if err != nil { + t.Fatal(err) + } + if len(res) != 2 { + t.Errorf("group description should be 2, got %v\n", len(res)) + } + if len(res[0].Members) != 1 { + t.Errorf("should have 1 members in group , got %v\n", len(res[0].Members)) + } + if len(res[1].Members) != 1 { + t.Errorf("should have 1 members in group , got %v\n", len(res[1].Members)) + } + + m1.AssertCleanShutdown() + m2.AssertCleanShutdown() +}