-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for SASL plain text authentication #648
Changes from 1 commit
3834ba1
369d017
d3b1e0e
5ac5287
0a075f2
2270962
6fbcb5f
a95632e
8b75447
9d69627
d0d6717
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ import ( | |
"sync" | ||
"sync/atomic" | ||
"time" | ||
"bytes" | ||
"encoding/binary" | ||
) | ||
|
||
// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe. | ||
|
@@ -82,6 +84,38 @@ func (b *Broker) Open(conf *Config) error { | |
b.conn = newBufConn(b.conn) | ||
|
||
b.conf = conf | ||
|
||
if conf.Net.SASL.Enable { | ||
// | ||
// Begin SASL/PLAIN authentication | ||
// | ||
authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password) | ||
buf := new(bytes.Buffer) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you know how many bytes you're going to need to write, there's no need to use a dynamic buffer here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I admit that I am a bit new to Golang. Can you clarify what you mean by this comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. creating a new |
||
|
||
err = binary.Write(buf, binary.BigEndian, int32(len(authBytes))) | ||
if err != nil { | ||
Logger.Printf("Failed to encode payload size (SASL credentials): %s", err.Error()) | ||
} | ||
|
||
err = binary.Write(buf, binary.BigEndian, authBytes) | ||
if err != nil { | ||
Logger.Printf("Failed to encode payload (SASL credentials): %s", err.Error()) | ||
} | ||
|
||
b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) | ||
b.conn.Write(buf.Bytes()) | ||
|
||
header := make([]byte, 4) | ||
n, err := io.ReadFull(b.conn, header) | ||
if err != nil { | ||
Logger.Printf("Failed to read response while authenticating with SASL: %s", err.Error()) | ||
} | ||
Logger.Printf("SASL authentication successful:\n%v\n%v\n%v", n, header, string(header)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know a whole lot about SASL but don't you need to actually check the response header in some way before saying it succeeded? |
||
// | ||
// End SASL/PLAIN authentication | ||
// | ||
} | ||
|
||
b.done = make(chan bool) | ||
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,17 @@ type Config struct { | |
Config *tls.Config | ||
} | ||
|
||
// SASL based authentication with broker. While there are multiple SASL authentication methods | ||
// the current implementation is limited to plaintext (SASL/PLAIN) authentication | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we currently only support plain-text, should we enforce that SASL is only used over TLS to avoid leaking credentials on the wire? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the comments. Will address your other comments and send out a new PR soon. On Thu, Apr 28, 2016 at 3:46 PM Evan Huus [email protected] wrote:
~shriram |
||
SASL struct { | ||
// Whether or not to use SASL authentication when connecting to the broker | ||
// (defaults to false). | ||
Enable bool | ||
//username and password for SASL/PLAIN authentication | ||
User string | ||
Password string | ||
} | ||
|
||
// KeepAlive specifies the keep-alive period for an active network connection. | ||
// If zero, keep-alives are disabled. (default is 0: disabled). | ||
KeepAlive time.Duration | ||
|
@@ -222,6 +233,7 @@ func NewConfig() *Config { | |
c.Net.DialTimeout = 30 * time.Second | ||
c.Net.ReadTimeout = 30 * time.Second | ||
c.Net.WriteTimeout = 30 * time.Second | ||
c.Net.SASL.Enable = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you don't need to set this, |
||
|
||
c.Metadata.Retry.Max = 3 | ||
c.Metadata.Retry.Backoff = 250 * time.Millisecond | ||
|
@@ -256,6 +268,14 @@ func (c *Config) Validate() error { | |
if c.Net.TLS.Enable == false && c.Net.TLS.Config != nil { | ||
Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.") | ||
} | ||
if c.Net.SASL.Enable == false { | ||
if c.Net.SASL.User != "" { | ||
Logger.Println("Net.SASL is disabled but a non-empty username was provided.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these are good checks; I don't know a lot about SASL, but is it valid to have empty usernames and passwords? If not you should do the opposite check as well - if it's enabled but the user/pass is blank return a configuration error |
||
} | ||
if c.Net.SASL.Password != "" { | ||
Logger.Println("Net.SASL is disabled but a non-empty password was provided.") | ||
} | ||
} | ||
if c.Producer.RequiredAcks > 1 { | ||
Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this block of SASL code be moved to a method please to keep
Open
fairly straightforward?