Skip to content

Commit

Permalink
Add batch CreateACLs call to ClusterAdmin
Browse files Browse the repository at this point in the history
  • Loading branch information
nkostoulas committed Mar 22, 2022
1 parent 27b8f1b commit a0d82e6
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 1 deletion.
30 changes: 29 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,17 @@ type ClusterAdmin interface {
// The configs for a particular resource are updated automatically.
IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error

// Creates an access control list (ACL) which is bound to a specific resource.
// This operation is not transactional so it may succeed or fail.
// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
CreateACL(resource Resource, acl Acl) error

// Creates access control lists (ACLs) which are bound to specific resources.
// This operation is not transactional so it may succeed for some ACLs while fail for others.
// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
CreateACL(resource Resource, acl Acl) error
CreateACLs([]*ResourceAcls) error

// Lists access control lists (ACLs) according to the supplied filter.
// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
Expand Down Expand Up @@ -799,6 +805,28 @@ func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
return err
}

func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error {
var acls []*AclCreation
for _, resourceACL := range resourceACLs {
for _, acl := range resourceACL.Acls {
acls = append(acls, &AclCreation{resourceACL.Resource, *acl})
}
}
request := &CreateAclsRequest{AclCreations: acls}

if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}

b, err := ca.Controller()
if err != nil {
return err
}

_, err = b.CreateAcls(request)
return err
}

func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
request := &DescribeAclsRequest{AclFilter: filter}

Expand Down
44 changes: 44 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,50 @@ func TestClusterAdminCreateAcl(t *testing.T) {
}
}

func TestClusterAdminCreateAcls(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"CreateAclsRequest": NewMockCreateAclsResponse(t),
})

config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

rACLs := []*ResourceAcls{
&ResourceAcls{
Resource: Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"},
Acls: []*Acl{
&Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny},
},
},
&ResourceAcls{
Resource: Resource{ResourceType: AclResourceTopic, ResourceName: "your_topic"},
Acls: []*Acl{
&Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny},
},
},
}

err = admin.CreateACLs(rACLs)
if err != nil {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminListAcls(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down

0 comments on commit a0d82e6

Please sign in to comment.