Skip to content

Commit

Permalink
Add configuration to support broker SSL
Browse files Browse the repository at this point in the history
Possibly implements #154, if my assumptions about the implementation are
correct.
  • Loading branch information
eapache committed Feb 24, 2015
1 parent 5d1f38e commit 867a00a
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions broker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"crypto/tls"
"fmt"
"io"
"net"
Expand All @@ -11,7 +12,9 @@ import (

// BrokerConfig is used to pass multiple configuration options to Broker.Open.
type BrokerConfig struct {
MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5).
MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5).
UseTLS bool // Whether or not to use TLS when connecting to the broker (defaults to false).
TLSConfig *tls.Config // The TLS configuration to use for secure connections if specified by UseTLS (defaults to nil).

// All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
Expand Down Expand Up @@ -104,7 +107,15 @@ func (b *Broker) Open(conf *BrokerConfig) error {
go withRecover(func() {
defer b.lock.Unlock()

b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.DialTimeout)
dialer := &net.Dialer{
Timeout: conf.DialTimeout,
}

if conf.UseTLS {
b.conn, b.connErr = tls.DialWithDialer(dialer, "tcp", b.addr, conf.TLSConfig)
} else {
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
}
if b.connErr != nil {
b.conn = nil
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
Expand Down

0 comments on commit 867a00a

Please sign in to comment.