Skip to content

Commit

Permalink
Merge pull request #1936 from Shopify/dnwe/follow-the-leader
Browse files Browse the repository at this point in the history
fix(consumer): follow preferred broker
  • Loading branch information
dnwe authored May 7, 2021
2 parents 8dbbfb5 + 73be95a commit 83d633e
Show file tree
Hide file tree
Showing 101 changed files with 858 additions and 630 deletions.
6 changes: 3 additions & 3 deletions acl_bindings.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//Resource holds information about acl resource type
// Resource holds information about acl resource type
type Resource struct {
ResourceType AclResourceType
ResourceName string
Expand Down Expand Up @@ -46,7 +46,7 @@ func (r *Resource) decode(pd packetDecoder, version int16) (err error) {
return nil
}

//Acl holds information about acl type
// Acl holds information about acl type
type Acl struct {
Principal string
Host string
Expand Down Expand Up @@ -93,7 +93,7 @@ func (a *Acl) decode(pd packetDecoder, version int16) (err error) {
return nil
}

//ResourceAcls is an acl resource type
// ResourceAcls is an acl resource type
type ResourceAcls struct {
Resource
Acls []*Acl
Expand Down
4 changes: 2 additions & 2 deletions acl_create_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//CreateAclsRequest is an acl creation request
// CreateAclsRequest is an acl creation request
type CreateAclsRequest struct {
Version int16
AclCreations []*AclCreation
Expand Down Expand Up @@ -60,7 +60,7 @@ func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
}
}

//AclCreation is a wrapper around Resource and Acl type
// AclCreation is a wrapper around Resource and Acl type
type AclCreation struct {
Resource
Acl
Expand Down
46 changes: 25 additions & 21 deletions acl_create_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ var (
func TestCreateAclsRequestv0(t *testing.T) {
req := &CreateAclsRequest{
Version: 0,
AclCreations: []*AclCreation{{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
AclCreations: []*AclCreation{
{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
},
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
}},
},
}

Expand All @@ -47,18 +49,20 @@ func TestCreateAclsRequestv0(t *testing.T) {
func TestCreateAclsRequestv1(t *testing.T) {
req := &CreateAclsRequest{
Version: 1,
AclCreations: []*AclCreation{{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
ResourcePatternType: AclPatternLiteral,
AclCreations: []*AclCreation{
{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
ResourcePatternType: AclPatternLiteral,
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
},
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
}},
},
}

Expand Down
4 changes: 2 additions & 2 deletions acl_create_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sarama

import "time"

//CreateAclsResponse is a an acl response creation type
// CreateAclsResponse is a an acl response creation type
type CreateAclsResponse struct {
ThrottleTime time.Duration
AclCreationResponses []*AclCreationResponse
Expand Down Expand Up @@ -63,7 +63,7 @@ func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

//AclCreationResponse is an acl creation response type
// AclCreationResponse is an acl creation response type
type AclCreationResponse struct {
Err KError
ErrMsg *string
Expand Down
2 changes: 1 addition & 1 deletion acl_delete_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//DeleteAclsRequest is a delete acl request
// DeleteAclsRequest is a delete acl request
type DeleteAclsRequest struct {
Version int
Filters []*AclFilter
Expand Down
6 changes: 3 additions & 3 deletions acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sarama

import "time"

//DeleteAclsResponse is a delete acl response
// DeleteAclsResponse is a delete acl response
type DeleteAclsResponse struct {
Version int16
ThrottleTime time.Duration
Expand Down Expand Up @@ -64,7 +64,7 @@ func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

//FilterResponse is a filter response type
// FilterResponse is a filter response type
type FilterResponse struct {
Err KError
ErrMsg *string
Expand Down Expand Up @@ -115,7 +115,7 @@ func (f *FilterResponse) decode(pd packetDecoder, version int16) (err error) {
return nil
}

//MatchingAcl is a matching acl type
// MatchingAcl is a matching acl type
type MatchingAcl struct {
Err KError
ErrMsg *string
Expand Down
32 changes: 15 additions & 17 deletions acl_delete_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,21 @@ import (
"time"
)

var (
deleteAclsResponse = []byte{
0, 0, 0, 100,
0, 0, 0, 1,
0, 0, // no error
255, 255, // no error message
0, 0, 0, 1, // 1 matching acl
0, 0, // no error
255, 255, // no error message
2, // resource type
0, 5, 't', 'o', 'p', 'i', 'c',
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
4,
3,
}
)
var deleteAclsResponse = []byte{
0, 0, 0, 100,
0, 0, 0, 1,
0, 0, // no error
255, 255, // no error message
0, 0, 0, 1, // 1 matching acl
0, 0, // no error
255, 255, // no error message
2, // resource type
0, 5, 't', 'o', 'p', 'i', 'c',
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
4,
3,
}

func TestDeleteAclsResponse(t *testing.T) {
resp := &DeleteAclsResponse{
Expand Down
2 changes: 1 addition & 1 deletion acl_describe_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//DescribeAclsRequest is a secribe acl request type
// DescribeAclsRequest is a secribe acl request type
type DescribeAclsRequest struct {
Version int
AclFilter
Expand Down
2 changes: 1 addition & 1 deletion acl_describe_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sarama

import "time"

//DescribeAclsResponse is a describe acl response type
// DescribeAclsResponse is a describe acl response type
type DescribeAclsResponse struct {
Version int16
ThrottleTime time.Duration
Expand Down
1 change: 0 additions & 1 deletion acl_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) {

if a.Version == 1 {
pattern, err := pd.getInt8()

if err != nil {
return err
}
Expand Down
16 changes: 8 additions & 8 deletions acl_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func (a *AclOperation) String() string {
return s
}

//MarshalText returns the text form of the AclOperation (name without prefix)
// MarshalText returns the text form of the AclOperation (name without prefix)
func (a *AclOperation) MarshalText() ([]byte, error) {
return []byte(a.String()), nil
}

//UnmarshalText takes a text reprentation of the operation and converts it to an AclOperation
// UnmarshalText takes a text reprentation of the operation and converts it to an AclOperation
func (a *AclOperation) UnmarshalText(text []byte) error {
normalized := strings.ToLower(string(text))
mapping := map[string]AclOperation{
Expand Down Expand Up @@ -109,12 +109,12 @@ func (a *AclPermissionType) String() string {
return s
}

//MarshalText returns the text form of the AclPermissionType (name without prefix)
// MarshalText returns the text form of the AclPermissionType (name without prefix)
func (a *AclPermissionType) MarshalText() ([]byte, error) {
return []byte(a.String()), nil
}

//UnmarshalText takes a text reprentation of the permission type and converts it to an AclPermissionType
// UnmarshalText takes a text reprentation of the permission type and converts it to an AclPermissionType
func (a *AclPermissionType) UnmarshalText(text []byte) error {
normalized := strings.ToLower(string(text))
mapping := map[string]AclPermissionType{
Expand Down Expand Up @@ -159,12 +159,12 @@ func (a *AclResourceType) String() string {
return s
}

//MarshalText returns the text form of the AclResourceType (name without prefix)
// MarshalText returns the text form of the AclResourceType (name without prefix)
func (a *AclResourceType) MarshalText() ([]byte, error) {
return []byte(a.String()), nil
}

//UnmarshalText takes a text reprentation of the resource type and converts it to an AclResourceType
// UnmarshalText takes a text reprentation of the resource type and converts it to an AclResourceType
func (a *AclResourceType) UnmarshalText(text []byte) error {
normalized := strings.ToLower(string(text))
mapping := map[string]AclResourceType{
Expand Down Expand Up @@ -209,12 +209,12 @@ func (a *AclResourcePatternType) String() string {
return s
}

//MarshalText returns the text form of the AclResourcePatternType (name without prefix)
// MarshalText returns the text form of the AclResourcePatternType (name without prefix)
func (a *AclResourcePatternType) MarshalText() ([]byte, error) {
return []byte(a.String()), nil
}

//UnmarshalText takes a text reprentation of the resource pattern type and converts it to an AclResourcePatternType
// UnmarshalText takes a text reprentation of the resource pattern type and converts it to an AclResourcePatternType
func (a *AclResourcePatternType) UnmarshalText(text []byte) error {
normalized := strings.ToLower(string(text))
mapping := map[string]AclResourcePatternType{
Expand Down
2 changes: 2 additions & 0 deletions acl_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestAclPermissionTypeTextMarshal(t *testing.T) {
}
}
}

func TestAclResourceTypeTextMarshal(t *testing.T) {
for i := AclResourceUnknown; i <= AclResourceTransactionalID; i++ {
text, err := i.MarshalText()
Expand All @@ -53,6 +54,7 @@ func TestAclResourceTypeTextMarshal(t *testing.T) {
}
}
}

func TestAclResourcePatternTypeTextMarshal(t *testing.T) {
for i := AclPatternUnknown; i <= AclPatternPrefixed; i++ {
text, err := i.MarshalText()
Expand Down
2 changes: 1 addition & 1 deletion add_offsets_to_txn_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//AddOffsetsToTxnRequest adds offsets to a transaction request
// AddOffsetsToTxnRequest adds offsets to a transaction request
type AddOffsetsToTxnRequest struct {
TransactionalID string
ProducerID int64
Expand Down
14 changes: 6 additions & 8 deletions add_offsets_to_txn_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package sarama

import "testing"

var (
addOffsetsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64,
0, 0,
0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd',
}
)
var addOffsetsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64,
0, 0,
0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd',
}

func TestAddOffsetsToTxnRequest(t *testing.T) {
req := &AddOffsetsToTxnRequest{
Expand Down
2 changes: 1 addition & 1 deletion add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"time"
)

//AddOffsetsToTxnResponse is a response type for adding offsets to txns
// AddOffsetsToTxnResponse is a response type for adding offsets to txns
type AddOffsetsToTxnResponse struct {
ThrottleTime time.Duration
Err KError
Expand Down
10 changes: 4 additions & 6 deletions add_offsets_to_txn_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ import (
"time"
)

var (
addOffsetsToTxnResponse = []byte{
0, 0, 0, 100,
0, 47,
}
)
var addOffsetsToTxnResponse = []byte{
0, 0, 0, 100,
0, 47,
}

func TestAddOffsetsToTxnResponse(t *testing.T) {
resp := &AddOffsetsToTxnResponse{
Expand Down
2 changes: 1 addition & 1 deletion add_partitions_to_txn_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//AddPartitionsToTxnRequest is a add paartition request
// AddPartitionsToTxnRequest is a add paartition request
type AddPartitionsToTxnRequest struct {
TransactionalID string
ProducerID int64
Expand Down
18 changes: 8 additions & 10 deletions add_partitions_to_txn_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ package sarama

import "testing"

var (
addPartitionsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64, // ProducerID
0, 0, 0, 0, // ProducerEpoch
0, 1, // 1 topic
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 1, 0, 0, 0, 1,
}
)
var addPartitionsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64, // ProducerID
0, 0, 0, 0, // ProducerEpoch
0, 1, // 1 topic
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 1, 0, 0, 0, 1,
}

func TestAddPartitionsToTxnRequest(t *testing.T) {
req := &AddPartitionsToTxnRequest{
Expand Down
Loading

0 comments on commit 83d633e

Please sign in to comment.