Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
nkostoulas committed Mar 29, 2022
2 parents a0d82e6 + f07b7b8 commit d670624
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 0 deletions.
32 changes: 32 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,38 @@ func TestClusterAdminCreateAcls(t *testing.T) {
}
}

func TestClusterAdminCreateAclErrorHandling(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"CreateAclsRequest": NewMockCreateAclsResponseWithError(t),
})

config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}

err = admin.CreateACL(r, a)
if err == nil {
t.Fatal(errors.New("error should have been thrown"))
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminListAcls(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,17 @@ func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, er
return nil, err
}

errs := make([]error, 0)
for _, res := range response.AclCreationResponses {
if !errors.Is(res.Err, ErrNoError) {
errs = append(errs, res.Err)
}
}

if len(errs) > 0 {
return response, Wrap(ErrCreateACLs, errs...)
}

return response, nil
}

Expand Down
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ var ErrReassignPartitions = errors.New("failed to reassign partitions for topic"
// ErrDeleteRecords is the type of error returned when fail to delete the required records
var ErrDeleteRecords = errors.New("kafka server: failed to delete records")

// ErrCreateACLs is the type of error returned when ACL creation failed
var ErrCreateACLs = errors.New("kafka server: failed to create one or more ACL rules")

// MultiErrorFormat specifies the formatter applied to format multierrors. The
// default implementation is a consensed version of the hashicorp/go-multierror
// default one
Expand Down
22 changes: 22 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ func (cc CompressionCodec) String() string {
}[int(cc)]
}

// UnmarshalText returns a CompressionCodec from its string representation.
func (cc *CompressionCodec) UnmarshalText(text []byte) error {
codecs := map[string]CompressionCodec{
"none": CompressionNone,
"gzip": CompressionGZIP,
"snappy": CompressionSnappy,
"lz4": CompressionLZ4,
"zstd": CompressionZSTD,
}
codec, ok := codecs[string(text)]
if !ok {
return fmt.Errorf("cannot parse %q as a compression codec", string(text))
}
*cc = codec
return nil
}

// MarshalText transforms a CompressionCodec into its string representation.
func (cc CompressionCodec) MarshalText() ([]byte, error) {
return []byte(cc.String()), nil
}

// Message is a kafka message type
type Message struct {
Codec CompressionCodec // codec used to compress the message contents
Expand Down
29 changes: 29 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,32 @@ func TestMessageDecodingUnknownVersions(t *testing.T) {
t.Error("Decoding an unknown magic byte produced an unknown error ", err)
}
}

func TestCompressionCodecUnmarshal(t *testing.T) {
cases := []struct {
Input string
Expected CompressionCodec
ExpectedError bool
}{
{"none", CompressionNone, false},
{"zstd", CompressionZSTD, false},
{"gzip", CompressionGZIP, false},
{"unknown", CompressionNone, true},
}
for _, c := range cases {
var cc CompressionCodec
err := cc.UnmarshalText([]byte(c.Input))
if err != nil && !c.ExpectedError {
t.Errorf("UnmarshalText(%q) error:\n%+v", c.Input, err)
continue
}
if err == nil && c.ExpectedError {
t.Errorf("UnmarshalText(%q) got %v but expected error", c.Input, cc)
continue
}
if cc != c.Expected {
t.Errorf("UnmarshalText(%q) got %v but expected %v", c.Input, cc, c.Expected)
continue
}
}
}
18 changes: 18 additions & 0 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,24 @@ func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeade
return res
}

type MockCreateAclsResponseError struct {
t TestReporter
}

func NewMockCreateAclsResponseWithError(t TestReporter) *MockCreateAclsResponseError {
return &MockCreateAclsResponseError{t: t}
}

func (mr *MockCreateAclsResponseError) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*CreateAclsRequest)
res := &CreateAclsResponse{}

for range req.AclCreations {
res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrInvalidRequest})
}
return res
}

type MockListAclsResponse struct {
t TestReporter
}
Expand Down

0 comments on commit d670624

Please sign in to comment.