Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-554: Add Broker-side SCRAM Config API #1917

Merged
merged 1 commit into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ type ClusterAdmin interface {
// Get information about all log directories on the given set of brokers
DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)

// Get information about SCRAM users
DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)

// Delete SCRAM users
DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)

// Upsert SCRAM users
UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -936,3 +945,61 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
err = <-errChan
return
}

func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
req := &DescribeUserScramCredentialsRequest{}
for _, u := range users {
req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
Name: u,
})
}

b, err := ca.Controller()
if err != nil {
return nil, err
}

rsp, err := b.DescribeUserScramCredentials(req)
if err != nil {
return nil, err
}

return rsp.Results, nil
}

func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
res, err := ca.AlterUserScramCredentials(upsert, nil)
if err != nil {
return nil, err
}

return res, nil
}

func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
res, err := ca.AlterUserScramCredentials(nil, delete)
if err != nil {
return nil, err
}

return res, nil
}

func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
req := &AlterUserScramCredentialsRequest{
Deletions: d,
Upsertions: u,
}

b, err := ca.Controller()
if err != nil {
return nil, err
}

rsp, err := b.AlterUserScramCredentials(req)
if err != nil {
return nil, err
}

return rsp.Results, nil
}
142 changes: 142 additions & 0 deletions alter_user_scram_credentials_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package sarama

type AlterUserScramCredentialsRequest struct {
Version int16

// Deletions represent list of SCRAM credentials to remove
Deletions []AlterUserScramCredentialsDelete

// Upsertions represent list of SCRAM credentials to update/insert
Upsertions []AlterUserScramCredentialsUpsert
}

type AlterUserScramCredentialsDelete struct {
Name string
Mechanism ScramMechanismType
}

type AlterUserScramCredentialsUpsert struct {
Name string
Mechanism ScramMechanismType
Iterations int32
Salt []byte
saltedPassword []byte

// This field is never transmitted over the wire
// @see: https://tools.ietf.org/html/rfc5802
Password []byte
}

func (r *AlterUserScramCredentialsRequest) encode(pe packetEncoder) error {
pe.putCompactArrayLength(len(r.Deletions))
for _, d := range r.Deletions {
if err := pe.putCompactString(d.Name); err != nil {
return err
}
pe.putInt8(int8(d.Mechanism))
pe.putEmptyTaggedFieldArray()
}

pe.putCompactArrayLength(len(r.Upsertions))
for _, u := range r.Upsertions {
if err := pe.putCompactString(u.Name); err != nil {
return err
}
pe.putInt8(int8(u.Mechanism))
pe.putInt32(u.Iterations)

if err := pe.putCompactBytes(u.Salt); err != nil {
return err
}

// do not transmit the password over the wire
formatter := scramFormatter{mechanism: u.Mechanism}
salted, err := formatter.saltedPassword(u.Password, u.Salt, int(u.Iterations))
if err != nil {
return err
}

if err := pe.putCompactBytes(salted); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
}

pe.putEmptyTaggedFieldArray()
return nil
}

func (r *AlterUserScramCredentialsRequest) decode(pd packetDecoder, version int16) error {
numDeletions, err := pd.getCompactArrayLength()
if err != nil {
return err
}

r.Deletions = make([]AlterUserScramCredentialsDelete, numDeletions)
for i := 0; i < numDeletions; i++ {
r.Deletions[i] = AlterUserScramCredentialsDelete{}
if r.Deletions[i].Name, err = pd.getCompactString(); err != nil {
return err
}
mechanism, err := pd.getInt8()
if err != nil {
return err
}
r.Deletions[i].Mechanism = ScramMechanismType(mechanism)
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

numUpsertions, err := pd.getCompactArrayLength()
if err != nil {
return err
}

r.Upsertions = make([]AlterUserScramCredentialsUpsert, numUpsertions)
for i := 0; i < numUpsertions; i++ {
r.Upsertions[i] = AlterUserScramCredentialsUpsert{}
if r.Upsertions[i].Name, err = pd.getCompactString(); err != nil {
return err
}
mechanism, err := pd.getInt8()
if err != nil {
return err
}

r.Upsertions[i].Mechanism = ScramMechanismType(mechanism)
if r.Upsertions[i].Iterations, err = pd.getInt32(); err != nil {
return err
}
if r.Upsertions[i].Salt, err = pd.getCompactBytes(); err != nil {
return err
}
if r.Upsertions[i].saltedPassword, err = pd.getCompactBytes(); err != nil {
return err
}
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
return nil
}

func (r *AlterUserScramCredentialsRequest) key() int16 {
return 51
}

func (r *AlterUserScramCredentialsRequest) version() int16 {
return r.Version
}

func (r *AlterUserScramCredentialsRequest) headerVersion() int16 {
return 2
}

func (r *AlterUserScramCredentialsRequest) requiredVersion() KafkaVersion {
return V2_7_0_0
}
60 changes: 60 additions & 0 deletions alter_user_scram_credentials_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package sarama

import "testing"

var (
emptyAlterUserScramCredentialsRequest = []byte{
1, // Deletions
1, // Upsertions
0, // empty tagged fields
}
userAlterUserScramCredentialsRequest = []byte{
2, // Deletions array, length 1
7, // User name length 6
'd', 'e', 'l', 'e', 't', 'e', // User name
2, // SCRAM_SHA_512
0, // empty tagged fields
2, // Upsertions array, length 1
7, // User name length 6
'u', 'p', 's', 'e', 'r', 't',
1, // SCRAM_SHA_256
0, 0, 16, 0, // iterations: 4096
// salt bytes:
6, 119, 111, 114, 108, 100,
// saltedPassword:
33, 193, 85, 83, 3, 218, 48, 159, 107, 125, 30, 143,
228, 86, 54, 191, 221, 220, 75, 245, 100, 5, 231,
233, 78, 157, 21, 240, 231, 185, 203, 211, 128,
0, // empty tagged fields
0, // empty tagged fields
}
)

func TestAlterUserScramCredentialsRequest(t *testing.T) {
request := &AlterUserScramCredentialsRequest{
Version: 0,
Deletions: []AlterUserScramCredentialsDelete{},
Upsertions: []AlterUserScramCredentialsUpsert{},
}

// Password is not transmitted, will fail with `testRequest` and `DeepEqual` check
testRequestEncode(t, "no upsertions/deletions", request, emptyAlterUserScramCredentialsRequest)

request.Deletions = []AlterUserScramCredentialsDelete{
{
Name: "delete",
Mechanism: SCRAM_MECHANISM_SHA_512,
},
}
request.Upsertions = []AlterUserScramCredentialsUpsert{
{
Name: "upsert",
Mechanism: SCRAM_MECHANISM_SHA_256,
Iterations: 4096,
Salt: []byte("world"),
Password: []byte("hello"),
},
}
// Password is not transmitted, will fail with `testRequest` and `DeepEqual` check
testRequestEncode(t, "single deletion and upsertion", request, userAlterUserScramCredentialsRequest)
}
94 changes: 94 additions & 0 deletions alter_user_scram_credentials_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package sarama

import "time"

type AlterUserScramCredentialsResponse struct {
Version int16

ThrottleTime time.Duration

Results []*AlterUserScramCredentialsResult
}

type AlterUserScramCredentialsResult struct {
User string

ErrorCode KError
ErrorMessage *string
}

func (r *AlterUserScramCredentialsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
pe.putCompactArrayLength(len(r.Results))

for _, u := range r.Results {
if err := pe.putCompactString(u.User); err != nil {
return err
}
pe.putInt16(int16(u.ErrorCode))
if err := pe.putNullableCompactString(u.ErrorMessage); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
}

pe.putEmptyTaggedFieldArray()
return nil
}

func (r *AlterUserScramCredentialsResponse) decode(pd packetDecoder, version int16) error {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

numResults, err := pd.getCompactArrayLength()
if err != nil {
return err
}

if numResults > 0 {
r.Results = make([]*AlterUserScramCredentialsResult, numResults)
for i := 0; i < numResults; i++ {
r.Results[i] = &AlterUserScramCredentialsResult{}
if r.Results[i].User, err = pd.getCompactString(); err != nil {
return err
}

kerr, err := pd.getInt16()
if err != nil {
return err
}

r.Results[i].ErrorCode = KError(kerr)
if r.Results[i].ErrorMessage, err = pd.getCompactNullableString(); err != nil {
return err
}
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}

if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
return nil
}

func (r *AlterUserScramCredentialsResponse) key() int16 {
return 51
}

func (r *AlterUserScramCredentialsResponse) version() int16 {
return r.Version
}

func (r *AlterUserScramCredentialsResponse) headerVersion() int16 {
return 2
}

func (r *AlterUserScramCredentialsResponse) requiredVersion() KafkaVersion {
return V2_7_0_0
}
Loading