Skip to content

Commit

Permalink
KIP-554: Add Broker-side SCRAM Config API
Browse files Browse the repository at this point in the history
  • Loading branch information
arkady-emelyanov committed Apr 21, 2021
1 parent 9d205e2 commit ff98e50
Show file tree
Hide file tree
Showing 19 changed files with 936 additions and 0 deletions.
67 changes: 67 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ type ClusterAdmin interface {
// Get information about all log directories on the given set of brokers
DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)

// Get information about SCRAM users
DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)

// Delete SCRAM users
DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)

// Upsert SCRAM users
UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -936,3 +945,61 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
err = <-errChan
return
}

func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
req := &DescribeUserScramCredentialsRequest{}
for _, u := range users {
req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
Name: u,
})
}

b, err := ca.Controller()
if err != nil {
return nil, err
}

rsp, err := b.DescribeUserScramCredentials(req)
if err != nil {
return nil, err
}

return rsp.Results, nil
}

func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
res, err := ca.AlterUserScramCredentials(upsert, nil)
if err != nil {
return nil, err
}

return res, nil
}

func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
res, err := ca.AlterUserScramCredentials(nil, delete)
if err != nil {
return nil, err
}

return res, nil
}

func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
req := &AlterUserScramCredentialsRequest{
Deletions: d,
Upsertions: u,
}

b, err := ca.Controller()
if err != nil {
return nil, err
}

rsp, err := b.AlterUserScramCredentials(req)
if err != nil {
return nil, err
}

return rsp.Results, nil
}
142 changes: 142 additions & 0 deletions alter_user_scram_credentials_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package sarama

type AlterUserScramCredentialsRequest struct {
Version int16

// Deletions represent list of SCRAM credentials to remove
Deletions []AlterUserScramCredentialsDelete

// Upsertions represent list of SCRAM credentials to update/insert
Upsertions []AlterUserScramCredentialsUpsert
}

type AlterUserScramCredentialsDelete struct {
Name string
Mechanism ScramMechanismType
}

type AlterUserScramCredentialsUpsert struct {
Name string
Mechanism ScramMechanismType
Iterations int32
Salt []byte
saltedPassword []byte

// This field is never transmitted over the wire
// @see: https://tools.ietf.org/html/rfc5802
Password []byte
}

func (r *AlterUserScramCredentialsRequest) encode(pe packetEncoder) error {
pe.putCompactArrayLength(len(r.Deletions))
for _, d := range r.Deletions {
if err := pe.putCompactString(d.Name); err != nil {
return err
}
pe.putInt8(int8(d.Mechanism))
pe.putEmptyTaggedFieldArray()
}

pe.putCompactArrayLength(len(r.Upsertions))
for _, u := range r.Upsertions {
if err := pe.putCompactString(u.Name); err != nil {
return err
}
pe.putInt8(int8(u.Mechanism))
pe.putInt32(u.Iterations)

if err := pe.putCompactBytes(u.Salt); err != nil {
return err
}

// do not transmit the password over the wire
formatter := scramFormatter{mechanism: u.Mechanism}
salted, err := formatter.saltedPassword(u.Password, u.Salt, int(u.Iterations))
if err != nil {
return err
}

if err := pe.putCompactBytes(salted); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
}

pe.putEmptyTaggedFieldArray()
return nil
}

func (r *AlterUserScramCredentialsRequest) decode(pd packetDecoder, version int16) error {
numDeletions, err := pd.getCompactArrayLength()
if err != nil {
return err
}

r.Deletions = make([]AlterUserScramCredentialsDelete, numDeletions)
for i := 0; i < numDeletions; i++ {
r.Deletions[i] = AlterUserScramCredentialsDelete{}
if r.Deletions[i].Name, err = pd.getCompactString(); err != nil {
return err
}
mechanism, err := pd.getInt8()
if err != nil {
return err
}
r.Deletions[i].Mechanism = ScramMechanismType(mechanism)
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

numUpsertions, err := pd.getCompactArrayLength()
if err != nil {
return err
}

r.Upsertions = make([]AlterUserScramCredentialsUpsert, numUpsertions)
for i := 0; i < numUpsertions; i++ {
r.Upsertions[i] = AlterUserScramCredentialsUpsert{}
if r.Upsertions[i].Name, err = pd.getCompactString(); err != nil {
return err
}
mechanism, err := pd.getInt8()
if err != nil {
return err
}

r.Upsertions[i].Mechanism = ScramMechanismType(mechanism)
if r.Upsertions[i].Iterations, err = pd.getInt32(); err != nil {
return err
}
if r.Upsertions[i].Salt, err = pd.getCompactBytes(); err != nil {
return err
}
if r.Upsertions[i].saltedPassword, err = pd.getCompactBytes(); err != nil {
return err
}
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
return nil
}

func (r *AlterUserScramCredentialsRequest) key() int16 {
return 51
}

func (r *AlterUserScramCredentialsRequest) version() int16 {
return r.Version
}

func (r *AlterUserScramCredentialsRequest) headerVersion() int16 {
return 2
}

func (r *AlterUserScramCredentialsRequest) requiredVersion() KafkaVersion {
return V2_7_0_0
}
60 changes: 60 additions & 0 deletions alter_user_scram_credentials_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package sarama

import "testing"

var (
emptyAlterUserScramCredentialsRequest = []byte{
1, // Deletions
1, // Upsertions
0, // empty tagged fields
}
userAlterUserScramCredentialsRequest = []byte{
2, // Deletions array, length 1
7, // User name length 6
'd', 'e', 'l', 'e', 't', 'e', // User name
2, // SCRAM_SHA_512
0, // empty tagged fields
2, // Upsertions array, length 1
7, // User name length 6
'u', 'p', 's', 'e', 'r', 't',
1, // SCRAM_SHA_256
0, 0, 16, 0, // iterations: 4096
// salt bytes:
6, 119, 111, 114, 108, 100,
// saltedPassword:
33, 193, 85, 83, 3, 218, 48, 159, 107, 125, 30, 143,
228, 86, 54, 191, 221, 220, 75, 245, 100, 5, 231,
233, 78, 157, 21, 240, 231, 185, 203, 211, 128,
0, // empty tagged fields
0, // empty tagged fields
}
)

func TestAlterUserScramCredentialsRequest(t *testing.T) {
request := &AlterUserScramCredentialsRequest{
Version: 0,
Deletions: []AlterUserScramCredentialsDelete{},
Upsertions: []AlterUserScramCredentialsUpsert{},
}

// Password is not transmitted, will fail with `testRequest` and `DeepEqual` check
testRequestEncode(t, "no upsertions/deletions", request, emptyAlterUserScramCredentialsRequest)

request.Deletions = []AlterUserScramCredentialsDelete{
{
Name: "delete",
Mechanism: SCRAM_MECHANISM_SHA_512,
},
}
request.Upsertions = []AlterUserScramCredentialsUpsert{
{
Name: "upsert",
Mechanism: SCRAM_MECHANISM_SHA_256,
Iterations: 4096,
Salt: []byte("world"),
Password: []byte("hello"),
},
}
// Password is not transmitted, will fail with `testRequest` and `DeepEqual` check
testRequestEncode(t, "single deletion and upsertion", request, userAlterUserScramCredentialsRequest)
}
94 changes: 94 additions & 0 deletions alter_user_scram_credentials_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package sarama

import "time"

type AlterUserScramCredentialsResponse struct {
Version int16

ThrottleTime time.Duration

Results []*AlterUserScramCredentialsResult
}

type AlterUserScramCredentialsResult struct {
User string

ErrorCode KError
ErrorMessage *string
}

func (r *AlterUserScramCredentialsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
pe.putCompactArrayLength(len(r.Results))

for _, u := range r.Results {
if err := pe.putCompactString(u.User); err != nil {
return err
}
pe.putInt16(int16(u.ErrorCode))
if err := pe.putNullableCompactString(u.ErrorMessage); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
}

pe.putEmptyTaggedFieldArray()
return nil
}

func (r *AlterUserScramCredentialsResponse) decode(pd packetDecoder, version int16) error {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

numResults, err := pd.getCompactArrayLength()
if err != nil {
return err
}

if numResults > 0 {
r.Results = make([]*AlterUserScramCredentialsResult, numResults)
for i := 0; i < numResults; i++ {
r.Results[i] = &AlterUserScramCredentialsResult{}
if r.Results[i].User, err = pd.getCompactString(); err != nil {
return err
}

kerr, err := pd.getInt16()
if err != nil {
return err
}

r.Results[i].ErrorCode = KError(kerr)
if r.Results[i].ErrorMessage, err = pd.getCompactNullableString(); err != nil {
return err
}
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}

if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
return nil
}

func (r *AlterUserScramCredentialsResponse) key() int16 {
return 51
}

func (r *AlterUserScramCredentialsResponse) version() int16 {
return r.Version
}

func (r *AlterUserScramCredentialsResponse) headerVersion() int16 {
return 2
}

func (r *AlterUserScramCredentialsResponse) requiredVersion() KafkaVersion {
return V2_7_0_0
}
Loading

0 comments on commit ff98e50

Please sign in to comment.