Skip to content

Commit

Permalink
Merge pull request #2247 from aiquestion/fix_describe_group
Browse files Browse the repository at this point in the history
fix(protocol): move AuthorizedOperations into GroupDescription of DescribeGroupsResponse
  • Loading branch information
dnwe authored Jun 7, 2022
2 parents 0c4ba61 + 39cc66a commit 9bf344f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 43 deletions.
59 changes: 30 additions & 29 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package sarama

type DescribeGroupsResponse struct {
Version int16
ThrottleTimeMs int32
Groups []*GroupDescription
AuthorizedOperations int32
Version int16
ThrottleTimeMs int32
Groups []*GroupDescription
}

func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
Expand All @@ -21,9 +20,6 @@ func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
return err
}
}
if r.Version >= 3 {
pe.putInt32(r.AuthorizedOperations)
}

return nil
}
Expand All @@ -48,11 +44,6 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err er
return err
}
}
if r.Version >= 3 {
if r.AuthorizedOperations, err = pd.getInt32(); err != nil {
return err
}
}

return nil
}
Expand Down Expand Up @@ -80,12 +71,13 @@ func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
type GroupDescription struct {
Version int16

Err KError
GroupId string
State string
ProtocolType string
Protocol string
Members map[string]*GroupMemberDescription
Err KError
GroupId string
State string
ProtocolType string
Protocol string
Members map[string]*GroupMemberDescription
AuthorizedOperations int32
}

func (gd *GroupDescription) encode(pe packetEncoder) error {
Expand Down Expand Up @@ -119,6 +111,10 @@ func (gd *GroupDescription) encode(pe packetEncoder) error {
}
}

if gd.Version >= 3 {
pe.putInt32(gd.AuthorizedOperations)
}

return nil
}

Expand Down Expand Up @@ -147,20 +143,25 @@ func (gd *GroupDescription) decode(pd packetDecoder) (err error) {
if err != nil {
return err
}
if n == 0 {
return nil
}

gd.Members = make(map[string]*GroupMemberDescription)
for i := 0; i < n; i++ {
memberId, err := pd.getString()
if err != nil {
return err
if n > 0 {
gd.Members = make(map[string]*GroupMemberDescription)
for i := 0; i < n; i++ {
memberId, err := pd.getString()
if err != nil {
return err
}

gd.Members[memberId] = new(GroupMemberDescription)
gd.Members[memberId].Version = gd.Version
if err := gd.Members[memberId].decode(pd); err != nil {
return err
}
}
}

gd.Members[memberId] = new(GroupMemberDescription)
gd.Members[memberId].Version = gd.Version
if err := gd.Members[memberId].decode(pd); err != nil {
if gd.Version >= 3 {
if gd.AuthorizedOperations, err = pd.getInt32(); err != nil {
return err
}
}
Expand Down
24 changes: 10 additions & 14 deletions describe_groups_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ var (
describeGroupsResponseEmptyV3 = []byte{
0, 0, 0, 0, // throttle time 0
0, 0, 0, 0, // no groups
0, 0, 0, 0, // authorizedOperations 0
}

describeGroupsResponsePopulatedV3 = []byte{
Expand All @@ -113,21 +112,21 @@ var (
0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host
0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata
0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment
0, 0, 0, 0, // authorizedOperations 0

0, 30, // ErrGroupAuthorizationFailed
0, 0,
0, 0,
0, 0,
0, 0,
0, 0, 0, 0,

0, 0, 0, 0, // authorizedOperations 0

}

describeGroupsResponseEmptyV4 = []byte{
0, 0, 0, 0, // throttle time 0
0, 0, 0, 0, // no groups
0, 0, 0, 0, // authorizedOperations 0
}

describeGroupsResponsePopulatedV4 = []byte{
Expand All @@ -146,15 +145,16 @@ var (
0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host
0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata
0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment
0, 0, 0, 0, // authorizedOperations 0

0, 30, // ErrGroupAuthorizationFailed
0, 0,
0, 0,
0, 0,
0, 0,
0, 0, 0, 0,

0, 0, 0, 0, // authorizedOperations 0

}
)

Expand All @@ -171,10 +171,9 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
3,
describeGroupsResponseEmptyV3,
&DescribeGroupsResponse{
Version: 3,
ThrottleTimeMs: int32(0),
Groups: []*GroupDescription{},
AuthorizedOperations: int32(0),
Version: 3,
ThrottleTimeMs: int32(0),
Groups: []*GroupDescription{},
},
},
{
Expand Down Expand Up @@ -212,18 +211,16 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
Members: nil,
},
},
AuthorizedOperations: int32(0),
},
},
{
"empty",
4,
describeGroupsResponseEmptyV4,
&DescribeGroupsResponse{
Version: 4,
ThrottleTimeMs: int32(0),
Groups: []*GroupDescription{},
AuthorizedOperations: int32(0),
Version: 4,
ThrottleTimeMs: int32(0),
Groups: []*GroupDescription{},
},
},
{
Expand Down Expand Up @@ -262,7 +259,6 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
Members: nil,
},
},
AuthorizedOperations: int32(0),
},
},
}
Expand Down
56 changes: 56 additions & 0 deletions functional_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,59 @@ func TestFuncAdminQuotas(t *testing.T) {
t.Fatal(err)
}
}

func TestFuncAdminDescribeGroups(t *testing.T) {
checkKafkaVersion(t, "2.3.0.0")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

group1 := testFuncConsumerGroupID(t)
group2 := testFuncConsumerGroupID(t)

kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)
if err != nil {
t.Fatal(err)
}

config := NewTestConfig()
config.Version = kafkaVersion
adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}

config1 := NewTestConfig()
config1.ClientID = "M1"
config1.Version = V2_3_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4")
defer m1.Close()

config2 := NewTestConfig()
config2.ClientID = "M2"
config2.Version = V2_3_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, group2, 100, nil, "test.4")
defer m2.Close()

m1.WaitForState(2)
m2.WaitForState(2)

res, err := adminClient.DescribeConsumerGroups([]string{group1, group2})
if err != nil {
t.Fatal(err)
}
if len(res) != 2 {
t.Errorf("group description should be 2, got %v\n", len(res))
}
if len(res[0].Members) != 1 {
t.Errorf("should have 1 members in group , got %v\n", len(res[0].Members))
}
if len(res[1].Members) != 1 {
t.Errorf("should have 1 members in group , got %v\n", len(res[1].Members))
}

m1.AssertCleanShutdown()
m2.AssertCleanShutdown()
}

0 comments on commit 9bf344f

Please sign in to comment.