diff --git a/broker.go b/broker.go index f54dd3639..44e0c11a5 100644 --- a/broker.go +++ b/broker.go @@ -1,6 +1,7 @@ package sarama import ( + "crypto/tls" "fmt" "io" "net" @@ -11,7 +12,9 @@ import ( // BrokerConfig is used to pass multiple configuration options to Broker.Open. type BrokerConfig struct { - MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5). + MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5). + UseTLS bool // Whether or not to use TLS when connecting to the broker (defaults to false). + TLSConfig *tls.Config // The TLS configuration to use for secure connections if specified by UseTLS (defaults to nil). // All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka. DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s). @@ -104,7 +107,15 @@ func (b *Broker) Open(conf *BrokerConfig) error { go withRecover(func() { defer b.lock.Unlock() - b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.DialTimeout) + dialer := &net.Dialer{ + Timeout: conf.DialTimeout, + } + + if conf.UseTLS { + b.conn, b.connErr = tls.DialWithDialer(dialer, "tcp", b.addr, conf.TLSConfig) + } else { + b.conn, b.connErr = dialer.Dial("tcp", b.addr) + } if b.connErr != nil { b.conn = nil Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)