Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add batched CreateACLs func to ClusterAdmin #2191

Merged
merged 2 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,17 @@ type ClusterAdmin interface {
// The configs for a particular resource are updated automatically.
IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error

// Creates an access control list (ACL) which is bound to a specific resource.
// This operation is not transactional so it may succeed or fail.
// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
CreateACL(resource Resource, acl Acl) error
nkostoulas marked this conversation as resolved.
Show resolved Hide resolved

// Creates access control lists (ACLs) which are bound to specific resources.
// This operation is not transactional so it may succeed for some ACLs while fail for others.
// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
CreateACL(resource Resource, acl Acl) error
CreateACLs([]*ResourceAcls) error

// Lists access control lists (ACLs) according to the supplied filter.
// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
Expand Down Expand Up @@ -803,6 +809,28 @@ func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
return err
}

func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error {
var acls []*AclCreation
for _, resourceACL := range resourceACLs {
for _, acl := range resourceACL.Acls {
acls = append(acls, &AclCreation{resourceACL.Resource, *acl})
}
}
request := &CreateAclsRequest{AclCreations: acls}

if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}

b, err := ca.Controller()
if err != nil {
return err
}

_, err = b.CreateAcls(request)
return err
}

func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
request := &DescribeAclsRequest{AclFilter: filter}

Expand Down
44 changes: 44 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,50 @@ func TestClusterAdminCreateAclErrorHandling(t *testing.T) {
}
}

func TestClusterAdminCreateAcls(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"CreateAclsRequest": NewMockCreateAclsResponse(t),
})

config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

rACLs := []*ResourceAcls{
{
Resource: Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"},
Acls: []*Acl{
{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny},
},
},
{
Resource: Resource{ResourceType: AclResourceTopic, ResourceName: "your_topic"},
Acls: []*Acl{
{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny},
},
},
}

err = admin.CreateACLs(rACLs)
if err != nil {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminListAcls(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down