Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support userscramcredentials apis #1168

Merged
merged 13 commits into from
Jul 28, 2023
115 changes: 115 additions & 0 deletions alteruserscramcredentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/alteruserscramcredentials"
)

// AlterUserScramCredentialsRequest represents a request sent to a kafka broker to
// alter user scram credentials.
type AlterUserScramCredentialsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of credentials to delete.
Deletions []UserScramCredentialsDeletion

// List of credentials to upsert.
Upsertions []UserScramCredentialsUpsertion
}

type ScramMechanism int8

const (
ScramMechanismUnknown ScramMechanism = iota // 0
ScramMechanismSha256 // 1
ScramMechanismSha512 // 2
)

type UserScramCredentialsDeletion struct {
Name string
Mechanism ScramMechanism
}

type UserScramCredentialsUpsertion struct {
Name string
Mechanism ScramMechanism
Iterations int32
rhansen2 marked this conversation as resolved.
Show resolved Hide resolved
Salt []byte
SaltedPassword []byte
}

// AlterUserScramCredentialsResponse represents a response from a kafka broker to an alter user
// credentials request.
type AlterUserScramCredentialsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// List of errors that occurred while attempting to alter
// the user scram credentials.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Errors []error
rhansen2 marked this conversation as resolved.
Show resolved Hide resolved

// List of altered user scram credentials.
Results []AlterUserScramCredentialsResponseUser
}

type AlterUserScramCredentialsResponseUser struct {
User string
}

// AlterUserScramCredentials sends user scram credentials alteration request to a kafka broker and returns
// the response.
func (c *Client) AlterUserScramCredentials(ctx context.Context, req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
deletions := make([]alteruserscramcredentials.RequestUserScramCredentialsDeletion, len(req.Deletions))
upsertions := make([]alteruserscramcredentials.RequestUserScramCredentialsUpsertion, len(req.Upsertions))

for deletionIdx, deletion := range req.Deletions {
deletions[deletionIdx] = alteruserscramcredentials.RequestUserScramCredentialsDeletion{
Name: deletion.Name,
Mechanism: int8(deletion.Mechanism),
}
}

for upsertionIdx, upsertion := range req.Upsertions {
upsertions[upsertionIdx] = alteruserscramcredentials.RequestUserScramCredentialsUpsertion{
Name: upsertion.Name,
Mechanism: int8(upsertion.Mechanism),
Iterations: upsertion.Iterations,
Salt: upsertion.Salt,
SaltedPassword: upsertion.SaltedPassword,
}
}

m, err := c.roundTrip(ctx, req.Addr, &alteruserscramcredentials.Request{
Deletions: deletions,
Upsertions: upsertions,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).AlterUserScramCredentials: %w", err)
}

res := m.(*alteruserscramcredentials.Response)
responseEntries := make([]AlterUserScramCredentialsResponseUser, len(res.Results))
responseErrors := make([]error, len(res.Results))

for responseIdx, responseResult := range res.Results {
responseEntries[responseIdx] = AlterUserScramCredentialsResponseUser{
User: responseResult.User,
}
responseErrors[responseIdx] = makeError(responseResult.ErrorCode, responseResult.ErrorMessage)
}
ret := &AlterUserScramCredentialsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Errors: responseErrors,
Results: responseEntries,
}

return ret, nil
}
142 changes: 142 additions & 0 deletions alteruserscramcredentials_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package kafka

import (
"context"
"errors"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
"github.com/stretchr/testify/assert"
)

func TestAlterUserScramCredentials(t *testing.T) {
petedannemann marked this conversation as resolved.
Show resolved Hide resolved
// https://issues.apache.org/jira/browse/KAFKA-10259
if !ktesting.KafkaIsAtLeast("2.7.0") {
return
}

client, shutdown := newLocalClient()
defer shutdown()

createRes, err := client.AlterUserScramCredentials(context.Background(), &AlterUserScramCredentialsRequest{
Upsertions: []UserScramCredentialsUpsertion{
{
Name: "alice",
petedannemann marked this conversation as resolved.
Show resolved Hide resolved
Mechanism: ScramMechanismSha512,
Iterations: 15000,
Salt: []byte("my-salt"),
SaltedPassword: []byte("my-salted-password"),
},
},
})

if err != nil {
t.Fatal(err)
}

for _, err := range createRes.Errors {
if err != nil {
t.Error(err)
}
}

if len(createRes.Results) != 1 {
t.Fatalf("expected 1 createResult; got %d", len(createRes.Results))
}

if createRes.Results[0].User != "alice" {
t.Fatalf("expected createResult with user alice, got %s", createRes.Results[0].User)
}

describeCreationRes, err := client.DescribeUserScramCredentials(context.Background(), &DescribeUserScramCredentialsRequest{
Users: []UserScramCredentialsUser{
{
Name: "alice",
},
},
})

if err != nil {
t.Fatal(err)
}

expectedCreation := DescribeUserScramCredentialsResponse{
Throttle: makeDuration(0),
Error: makeError(0, ""),
Results: []DescribeUserScramCredentialsResponseResult{
{
User: "alice",
CredentialInfos: []DescribeUserScramCredentialsCredentialInfo{
{
Mechanism: ScramMechanismSha512,
Iterations: 15000,
},
},
Error: makeError(0, ""),
},
},
}

assert.Equal(t, expectedCreation, *describeCreationRes)

deleteRes, err := client.AlterUserScramCredentials(context.Background(), &AlterUserScramCredentialsRequest{
Deletions: []UserScramCredentialsDeletion{
{
Name: "alice",
Mechanism: ScramMechanismSha512,
},
},
})

if err != nil {
t.Fatal(err)
}

for _, err := range deleteRes.Errors {
if err != nil {
t.Error(err)
}
}

if len(deleteRes.Results) != 1 {
t.Fatalf("expected 1 deleteResult; got %d", len(deleteRes.Results))
}

if deleteRes.Results[0].User != "alice" {
t.Fatalf("expected deleteResult with user alice, got %s", deleteRes.Results[0].User)
}

describeDeletionRes, err := client.DescribeUserScramCredentials(context.Background(), &DescribeUserScramCredentialsRequest{
petedannemann marked this conversation as resolved.
Show resolved Hide resolved
Users: []UserScramCredentialsUser{
{
Name: "alice",
},
},
})

if err != nil {
t.Fatal(err)
}

if !errors.Is(describeDeletionRes.Error, makeError(0, "")) {
t.Fatalf("didn't expect a top level error on describe results after deletion, got %v", describeDeletionRes.Error)
}

if len(describeDeletionRes.Results) != 1 {
t.Fatalf("expected one describe results after deletion, got %d describe results", len(describeDeletionRes.Results))
}

result := describeDeletionRes.Results[0]

if result.User != "alice" {
t.Fatalf("expected describeResult with user alice, got %s", result.User)
}

if len(result.CredentialInfos) != 0 {
t.Fatalf("didn't expect describeResult credential infos, got %v", result.CredentialInfos)
}

if !errors.Is(result.Error, ResourceNotFound) {
t.Fatalf("expected describeResult resourcenotfound error, got %s", result.Error)
}
}
97 changes: 97 additions & 0 deletions describeuserscramcredentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/describeuserscramcredentials"
)

// DescribeUserScramCredentialsRequest represents a request sent to a kafka broker to
// describe user scram credentials.
type DescribeUserScramCredentialsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of Scram users to describe
Users []UserScramCredentialsUser
}

type UserScramCredentialsUser struct {
Name string
}

// DescribeUserScramCredentialsResponse represents a response from a kafka broker to a describe user
// credentials request.
type DescribeUserScramCredentialsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// Top level error that occurred while attempting to describe
// the user scram credentials.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Error error

// List of described user scram credentials.
Results []DescribeUserScramCredentialsResponseResult
}

type DescribeUserScramCredentialsResponseResult struct {
User string
CredentialInfos []DescribeUserScramCredentialsCredentialInfo
Error error
}

type DescribeUserScramCredentialsCredentialInfo struct {
Mechanism ScramMechanism
Iterations int32
}

// DescribeUserScramCredentials sends a user scram credentials describe request to a kafka broker and returns
// the response.
func (c *Client) DescribeUserScramCredentials(ctx context.Context, req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
users := make([]describeuserscramcredentials.RequestUser, len(req.Users))

for userIdx, user := range req.Users {
users[userIdx] = describeuserscramcredentials.RequestUser{
Name: user.Name,
}
}

m, err := c.roundTrip(ctx, req.Addr, &describeuserscramcredentials.Request{
Users: users,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DescribeUserScramCredentials: %w", err)
}

res := m.(*describeuserscramcredentials.Response)
responseResults := make([]DescribeUserScramCredentialsResponseResult, len(res.Results))

for responseIdx, responseResult := range res.Results {
credentialInfos := make([]DescribeUserScramCredentialsCredentialInfo, len(responseResult.CredentialInfos))

for credentialInfoIdx, credentialInfo := range responseResult.CredentialInfos {
credentialInfos[credentialInfoIdx] = DescribeUserScramCredentialsCredentialInfo{
Mechanism: ScramMechanism(credentialInfo.Mechanism),
Iterations: credentialInfo.Iterations,
}
}
responseResults[responseIdx] = DescribeUserScramCredentialsResponseResult{
User: responseResult.User,
CredentialInfos: credentialInfos,
Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage),
}
}
ret := &DescribeUserScramCredentialsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Error: makeError(res.ErrorCode, res.ErrorMessage),
Results: responseResults,
}

return ret, nil
}
Loading