Skip to content

Commit

Permalink
feat(cdc): Add support for SCRAM SASL mechanism (#7765)
Browse files Browse the repository at this point in the history
We used to support only the PLAIN mechanism for SASL in CDC-Kafka.
This commit adds support for SCRAM-SHA-256 and SCRAM-SHA-512.

(cherry picked from commit 8925049)
  • Loading branch information
ahsanbarkati committed Apr 30, 2021
1 parent bfc75c3 commit 8de5575
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 4 deletions.
2 changes: 2 additions & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ they form a Raft group and provide synchronous replication.
"The SASL username for Kafka.").
Flag("sasl-password",
"The SASL password for Kafka.").
Flag("sasl-mechanism",
"The SASL mechanism for Kafka (PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512)").
Flag("ca-cert",
"The path to CA cert file for TLS encryption.").
Flag("client-cert",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
github.com/twpayne/go-geom v1.0.5
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.etcd.io/etcd v0.0.0-20190228193606-a943ad0ee4c9
go.opencensus.io v0.22.5
go.uber.org/zap v1.16.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,9 @@ github.com/vektah/dataloaden v0.2.1-0.20190515034641-a19b9a6e7c9e/go.mod h1:/HUd
github.com/vektah/gqlparser/v2 v2.1.0/go.mod h1:SyUiHgLATUR8BiYURfTirrTcGpcE+4XkV2se04Px1Ms=
github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc=
github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
Expand Down
7 changes: 3 additions & 4 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ const (
BadgerDefaults = `compression=snappy; numgoroutines=8;`
RaftDefaults = `learner=false; snapshot-after-entries=10000; ` +
`snapshot-after-duration=30m; pending-proposals=256; idx=; group=;`
SecurityDefaults = `token=; whitelist=;`
LudicrousDefaults = `enabled=false; concurrency=2000;`
CDCDefaults = `file=; kafka=; sasl_user=; sasl_password=; ca_cert=; client_cert=; ` +
`client_key=;`
SecurityDefaults = `token=; whitelist=;`
CDCDefaults = `file=; kafka=; sasl_user=; sasl_password=; ca_cert=; client_cert=; ` +
`client_key=; sasl-mechanism=PLAIN;`
LimitDefaults = `mutations=allow; query-edge=1000000; normalize-node=10000; ` +
`mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m; ` +
` max-retries=-1;max-pending-queries=10000`
Expand Down
48 changes: 48 additions & 0 deletions worker/sink_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package worker

import (
"crypto/sha256"
"crypto/sha512"
"crypto/tls"
"crypto/x509"
"encoding/binary"
Expand All @@ -27,6 +29,7 @@ import (
"strings"

"github.com/pkg/errors"
"github.com/xdg/scram"

"github.com/Shopify/sarama"

Expand Down Expand Up @@ -116,6 +119,27 @@ func newKafkaSink(config *z.SuperFlag) (Sink, error) {
saramaConf.Net.SASL.User = config.GetString("sasl-user")
saramaConf.Net.SASL.Password = config.GetString("sasl-password")
}
mechanism := config.GetString("sasl-mechanism")
if mechanism != "" {
switch mechanism {
case sarama.SASLTypeSCRAMSHA256:
saramaConf.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
saramaConf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &scramClient{HashGeneratorFcn: sha256.New}
}
case sarama.SASLTypeSCRAMSHA512:
saramaConf.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
saramaConf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &scramClient{HashGeneratorFcn: sha512.New}
}
case sarama.SASLTypePlaintext:
saramaConf.Net.SASL.Mechanism = sarama.SASLTypePlaintext
default:
return nil, errors.Errorf("Invalid SASL mechanism. Valid mechanisms are: %s, %s and %s",
sarama.SASLTypePlaintext, sarama.SASLTypeSCRAMSHA256, sarama.SASLTypeSCRAMSHA512)
}
}

brokers := strings.Split(config.GetString("kafka"), ",")
client, err := sarama.NewClient(brokers, saramaConf)
if err != nil {
Expand Down Expand Up @@ -195,3 +219,27 @@ func newFileSink(path *z.SuperFlag) (Sink, error) {
fileWriter: w,
}, nil
}

type scramClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (sc *scramClient) Begin(userName, password, authzID string) (err error) {
sc.Client, err = sc.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
sc.ClientConversation = sc.Client.NewConversation()
return nil
}

func (sc *scramClient) Step(challenge string) (response string, err error) {
response, err = sc.ClientConversation.Step(challenge)
return
}

func (sc *scramClient) Done() bool {
return sc.ClientConversation.Done()
}

0 comments on commit 8de5575

Please sign in to comment.