Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Commit

Permalink
Embed sarama.Config in consumergroup.Config
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Mar 8, 2015
1 parent c6b7353 commit 0829a58
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 50 deletions.
1 change: 1 addition & 0 deletions consumer_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
Expand Down
85 changes: 41 additions & 44 deletions consumergroup/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,55 +14,55 @@ var (
FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?")
)

type ConsumerGroupConfig struct {
// The Zookeeper read timeout. Defaults to 1 second
ZookeeperTimeout time.Duration
type Config struct {
*sarama.Config

// Zookeeper chroot to use. Should not include a trailing slash.
// Leave this empty when your Kafka install does not use a chroot.
ZookeeperChroot string
Zookeeper struct {
// The Zookeeper read timeout. Defaults to 1 second
Timeout time.Duration

SaramaConfig *sarama.Config // This will be passed to Sarama when creating a new consumer
// Zookeeper chroot to use. Should not include a trailing slash.
// Leave this empty when your Kafka install does not use a chroot.
Chroot string
}

ChannelBufferSize int // The buffer size of the channel for the messages coming from Kafka. Zero means no buffering. Default is 256.
CommitInterval time.Duration // The interval between which the prossed offsets are commited to Zookeeper.
InitialOffset int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetMethodOldest (default) or sarama.OffsetMethodNewest.
FinalOffsetTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
Offsets struct {
Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
CommitInterval time.Duration // The interval between which the prossed offsets are commited.

This comment has been minimized.

Copy link
@rrh

rrh Sep 23, 2015

Contributor

I don't know what you mean by 'prossed'. 'processed' perhaps?

This comment has been minimized.

Copy link
@wvanbergen

wvanbergen Oct 15, 2015

Author Owner

Fixed!

}
}

func NewConsumerGroupConfig() *ConsumerGroupConfig {
return &ConsumerGroupConfig{
ZookeeperTimeout: 1 * time.Second,
ChannelBufferSize: 256,
CommitInterval: 10 * time.Second,
InitialOffset: sarama.OffsetOldest,
FinalOffsetTimeout: 60 * time.Second,
}
func NewConfig() *Config {
config := &Config{}
config.Config = sarama.NewConfig()
config.Zookeeper.Timeout = 1 * time.Second
config.Offsets.Initial = sarama.OffsetOldest
config.Offsets.ProcessingTimeout = 60 * time.Second
config.Offsets.CommitInterval = 10 * time.Second

return config
}

func (cgc *ConsumerGroupConfig) Validate() error {
if cgc.ZookeeperTimeout <= 0 {
return errors.New("ZookeeperTimeout should have a duration > 0")
func (cgc *Config) Validate() error {
if cgc.Zookeeper.Timeout <= 0 {
return sarama.ConfigurationError("ZookeeperTimeout should have a duration > 0")
}

if cgc.CommitInterval <= 0 {
return errors.New("CommitInterval should have a duration > 0")
if cgc.Offsets.CommitInterval <= 0 {
return sarama.ConfigurationError("CommitInterval should have a duration > 0")
}

if cgc.ChannelBufferSize < 0 {
return errors.New("ChannelBufferSize should be >= 0.")
if cgc.Offsets.Initial != sarama.OffsetOldest && cgc.Offsets.Initial != sarama.OffsetNewest {
return errors.New("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest.")
}

if cgc.SaramaConfig != nil {
if err := cgc.SaramaConfig.Validate(); err != nil {
if cgc.Config != nil {
if err := cgc.Config.Validate(); err != nil {
return err
}
}

if cgc.InitialOffset != sarama.OffsetOldest && cgc.InitialOffset != sarama.OffsetNewest {
return errors.New("InitialOffset should OffsetMethodNewest or OffsetMethodOldest.")
}

return nil
}

Expand All @@ -71,7 +71,7 @@ func (cgc *ConsumerGroupConfig) Validate() error {
type ConsumerGroup struct {
id, name string

config *ConsumerGroupConfig
config *Config

consumer *sarama.Consumer
zk *ZK
Expand All @@ -90,7 +90,7 @@ type ConsumerGroup struct {
}

// Connects to a consumer group, using Zookeeper for auto-discovery
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *ConsumerGroupConfig) (cg *ConsumerGroup, err error) {
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) {

if name == "" {
return nil, sarama.ConfigurationError("Empty consumergroup name")
Expand All @@ -105,20 +105,17 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
}

if config == nil {
config = NewConsumerGroupConfig()
}
if config.SaramaConfig == nil {
config.SaramaConfig = sarama.NewConfig()
config = NewConfig()
}
config.SaramaConfig.ClientID = name
config.ClientID = name

// Validate configuration
if err = config.Validate(); err != nil {
return
}

var zk *ZK
if zk, err = NewZK(zookeeper, config.ZookeeperChroot, config.ZookeeperTimeout); err != nil {
if zk, err = NewZK(zookeeper, config.Zookeeper.Chroot, config.Zookeeper.Timeout); err != nil {
return
}

Expand All @@ -134,7 +131,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
}

var consumer *sarama.Consumer
if consumer, err = sarama.NewConsumer(brokerList, config.SaramaConfig); err != nil {
if consumer, err = sarama.NewConsumer(brokerList, config.Config); err != nil {
zk.Close()
return
}
Expand Down Expand Up @@ -173,7 +170,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
cg.Logf("Consumer instance registered (%s).", cg.id)
}

offsetConfig := OffsetManagerConfig{CommitInterval: config.CommitInterval}
offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval}
cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig)

go cg.topicListConsumer(topics)
Expand Down Expand Up @@ -340,7 +337,7 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag
if nextOffset > 0 {
cg.Logf("%s/%d :: Partition consumer starting at offset %d.\n", topic, partition, nextOffset)
} else {
nextOffset = cg.config.InitialOffset
nextOffset = cg.config.Offsets.Initial
if nextOffset == sarama.OffsetOldest {
cg.Logf("%s/%d :: Partition consumer starting at the oldest available offset.\n", topic, partition)
} else if nextOffset == sarama.OffsetNewest {
Expand Down Expand Up @@ -389,7 +386,7 @@ partitionConsumerLoop:
}

cg.Logf("%s/%d :: Stopping partition consumer at offset %d\n", topic, partition, lastOffset)
if err := cg.offsetManager.FinalizePartition(topic, partition, lastOffset, cg.config.FinalOffsetTimeout); err != nil {
if err := cg.offsetManager.FinalizePartition(topic, partition, lastOffset, cg.config.Offsets.ProcessingTimeout); err != nil {
cg.Logf("%s/%d :: %s\n", topic, partition, err)
}
}
2 changes: 1 addition & 1 deletion consumergroup/consumergroup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestSingleTopicSequentialConsumer(t *testing.T) {
// If the channel is buffered, the consumer will enqueue more events in the channel,
// which assertEvents will simply skip. When consumer 2 starts it will skip a bunch of
// events because of this. Transactional processing will fix this.
config := NewConsumerGroupConfig()
config := NewConfig()
config.ChannelBufferSize = 0

consumer1, err := JoinConsumerGroup(consumerGroup, []string{TopicWithSinglePartition}, zookeeper, config)
Expand Down
10 changes: 5 additions & 5 deletions consumergroup/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type ConsumerGroupProcessingLag map[string]TopicProcessingLag // The number of m

// Instantiates a new consumergroup monitor. Retuns the number of messages the consumergroup is behind
// the latest offset in Kafka for every topic/partition the consumergroup is consuming.
func NewMonitor(name string, consumergroup string, zookeeper []string, config *ConsumerGroupConfig) (*Monitor, error) {
func NewMonitor(name string, consumergroup string, zookeeper []string, config *Config) (*Monitor, error) {
if config == nil {
config = NewConsumerGroupConfig()
config = NewConfig()
}
config.SaramaConfig.ClientID = name
config.ClientID = name

zkConn, err := NewZK(zookeeper, config.ZookeeperChroot, config.ZookeeperTimeout)
zkConn, err := NewZK(zookeeper, config.Zookeeper.Chroot, config.Zookeeper.Timeout)
if err != nil {
return nil, err
}
Expand All @@ -43,7 +43,7 @@ func NewMonitor(name string, consumergroup string, zookeeper []string, config *C
brokerList = append(brokerList, broker)
}

saramaClient, err := sarama.NewClient(brokerList, config.SaramaConfig)
saramaClient, err := sarama.NewClient(brokerList, config.Config)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 0829a58

Please sign in to comment.