diff --git a/broker.go b/broker.go index ed28af4da..e38bd58ef 100644 --- a/broker.go +++ b/broker.go @@ -1,6 +1,7 @@ package sarama import ( + "crypto/tls" "fmt" "io" "net" @@ -15,6 +16,8 @@ type BrokerConfig struct { DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error. ReadTimeout time.Duration // How long to wait for a response before timing out and returning an error. WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error. + UseTLS bool // Whether or not to use TLS when connecting to the broker (defaults to false). + TLSConfig *tls.Config // The TLS configuration to use for secure connections if specified by UseTLS (defaults to nil). } // NewBrokerConfig returns a new broker configuration with sane defaults. @@ -99,7 +102,15 @@ func (b *Broker) Open(conf *BrokerConfig) error { go withRecover(func() { defer b.lock.Unlock() - b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.DialTimeout) + dialer := &net.Dialer{ + Timeout: conf.DialTimeout, + } + + if conf.UseTLS { + b.conn, b.connErr = tls.DialWithDialer(dialer, "tcp", b.addr, conf.TLSConfig) + } else { + b.conn, b.connErr = dialer.Dial("tcp", b.addr) + } if b.connErr != nil { Logger.Printf("Failed to connect to broker %s\n", b.addr) Logger.Println(b.connErr)