Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration to support broker SSL #493

Merged
merged 7 commits into from
Aug 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion broker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"crypto/tls"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -73,7 +74,11 @@ func (b *Broker) Open(conf *Config) error {
KeepAlive: conf.Net.KeepAlive,
}

b.conn, b.connErr = dialer.Dial("tcp", b.addr)
if conf.Net.TLS.Enable {
b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
} else {
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
}
if b.connErr != nil {
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
Expand Down
10 changes: 9 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package sarama

import "time"
import (
"crypto/tls"
"time"
)

// Config is used to pass multiple configuration options to Sarama's constructors.
type Config struct {
Expand All @@ -13,6 +16,11 @@ type Config struct {
ReadTimeout time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).

TLS struct {
Enable bool // Whether or not to use TLS when connecting to the broker (defaults to false).
Config *tls.Config // The TLS configuration to use for secure connections if enabled (defaults to nil).
}

// KeepAlive specifies the keep-alive period for an active network connection.
// If zero, keep-alives are disabled. (default is 0: disabled).
KeepAlive time.Duration
Expand Down
48 changes: 45 additions & 3 deletions examples/http_server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package main
import (
"github.com/Shopify/sarama"

"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
Expand All @@ -14,9 +17,13 @@ import (
)

var (
addr = flag.String("addr", ":8080", "The address to bind to")
brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
addr = flag.String("addr", ":8080", "The address to bind to")
brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
keyFile = flag.String("key", "", "The optional key file for client authentication")
caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
)

func main() {
Expand Down Expand Up @@ -47,6 +54,31 @@ func main() {
log.Fatal(server.Run(*addr))
}

func createTlsConfiguration() (t *tls.Config) {
if *certFile != "" && *keyFile != "" && *caFile != "" {
cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
if err != nil {
log.Fatal(err)
}

caCert, err := ioutil.ReadFile(*caFile)
if err != nil {
log.Fatal(err)
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: *verifySsl,
}
}
// will be nil by default if nothing is provided
return t
}

type Server struct {
DataCollector sarama.SyncProducer
AccessLogProducer sarama.AsyncProducer
Expand Down Expand Up @@ -164,6 +196,11 @@ func newDataCollector(brokerList []string) sarama.SyncProducer {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}

// On the broker side, you may want to change the following settings to get
// stronger consistency guarantees:
Expand All @@ -183,6 +220,11 @@ func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
// For the access log, we are looking for AP semantics, with high throughput.
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
config := sarama.NewConfig()
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
Expand Down