Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Commit

Permalink
Merge pull request #37 from wvanbergen/fix_offset_committing
Browse files Browse the repository at this point in the history
Wait for messages to be processed before committing during shutdown
  • Loading branch information
wvanbergen committed Mar 9, 2015
2 parents 86b9f96 + 0829a58 commit 15a566f
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 115 deletions.
10 changes: 8 additions & 2 deletions consumer_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
Expand Down Expand Up @@ -41,8 +42,10 @@ func init() {
}

func main() {
config := consumergroup.NewConsumerGroupConfig()
config.InitialOffset = sarama.OffsetNewest
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = 10 * time.Second

consumer, consumerErr := consumergroup.JoinConsumerGroup(consumerGroup, kafkaTopics, zookeeper, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
Expand Down Expand Up @@ -76,6 +79,9 @@ func main() {
log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
}

// Simulate processing time
time.Sleep(10 * time.Millisecond)

offsets[message.Topic][message.Partition] = message.Offset
consumer.CommitUpto(message)
}
Expand Down
152 changes: 67 additions & 85 deletions consumergroup/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,55 @@ var (
FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?")
)

type ConsumerGroupConfig struct {
// The Zookeeper read timeout. Defaults to 1 second
ZookeeperTimeout time.Duration
type Config struct {
*sarama.Config

// Zookeeper chroot to use. Should not include a trailing slash.
// Leave this empty when your Kafka install does not use a chroot.
ZookeeperChroot string
Zookeeper struct {
// The Zookeeper read timeout. Defaults to 1 second
Timeout time.Duration

SaramaConfig *sarama.Config // This will be passed to Sarama when creating a new consumer
// Zookeeper chroot to use. Should not include a trailing slash.
// Leave this empty when your Kafka install does not use a chroot.
Chroot string
}

ChannelBufferSize int // The buffer size of the channel for the messages coming from Kafka. Zero means no buffering. Default is 256.
CommitInterval time.Duration // The interval between which the prossed offsets are commited to Zookeeper.
InitialOffset int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetMethodOldest (default) or sarama.OffsetMethodNewest.
Offsets struct {
Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
CommitInterval time.Duration // The interval between which the prossed offsets are commited.
}
}

func NewConsumerGroupConfig() *ConsumerGroupConfig {
return &ConsumerGroupConfig{
ZookeeperTimeout: 1 * time.Second,
ChannelBufferSize: 256,
CommitInterval: 10 * time.Second,
InitialOffset: sarama.OffsetOldest,
}
func NewConfig() *Config {
config := &Config{}
config.Config = sarama.NewConfig()
config.Zookeeper.Timeout = 1 * time.Second
config.Offsets.Initial = sarama.OffsetOldest
config.Offsets.ProcessingTimeout = 60 * time.Second
config.Offsets.CommitInterval = 10 * time.Second

return config
}

func (cgc *ConsumerGroupConfig) Validate() error {
if cgc.ZookeeperTimeout <= 0 {
return errors.New("ZookeeperTimeout should have a duration > 0")
func (cgc *Config) Validate() error {
if cgc.Zookeeper.Timeout <= 0 {
return sarama.ConfigurationError("ZookeeperTimeout should have a duration > 0")
}

if cgc.CommitInterval <= 0 {
return errors.New("CommitInterval should have a duration > 0")
if cgc.Offsets.CommitInterval <= 0 {
return sarama.ConfigurationError("CommitInterval should have a duration > 0")
}

if cgc.ChannelBufferSize < 0 {
return errors.New("ChannelBufferSize should be >= 0.")
if cgc.Offsets.Initial != sarama.OffsetOldest && cgc.Offsets.Initial != sarama.OffsetNewest {
return errors.New("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest.")
}

if cgc.SaramaConfig != nil {
if err := cgc.SaramaConfig.Validate(); err != nil {
if cgc.Config != nil {
if err := cgc.Config.Validate(); err != nil {
return err
}
}

if cgc.InitialOffset != sarama.OffsetOldest && cgc.InitialOffset != sarama.OffsetNewest {
return errors.New("InitialOffset should OffsetMethodNewest or OffsetMethodOldest.")
}

return nil
}

Expand All @@ -69,7 +71,7 @@ func (cgc *ConsumerGroupConfig) Validate() error {
type ConsumerGroup struct {
id, name string

config *ConsumerGroupConfig
config *Config

consumer *sarama.Consumer
zk *ZK
Expand All @@ -88,7 +90,7 @@ type ConsumerGroup struct {
}

// Connects to a consumer group, using Zookeeper for auto-discovery
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *ConsumerGroupConfig) (cg *ConsumerGroup, err error) {
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) {

if name == "" {
return nil, sarama.ConfigurationError("Empty consumergroup name")
Expand All @@ -103,20 +105,17 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
}

if config == nil {
config = NewConsumerGroupConfig()
}
if config.SaramaConfig == nil {
config.SaramaConfig = sarama.NewConfig()
config = NewConfig()
}
config.SaramaConfig.ClientID = name
config.ClientID = name

// Validate configuration
if err = config.Validate(); err != nil {
return
}

var zk *ZK
if zk, err = NewZK(zookeeper, config.ZookeeperChroot, config.ZookeeperTimeout); err != nil {
if zk, err = NewZK(zookeeper, config.Zookeeper.Chroot, config.Zookeeper.Timeout); err != nil {
return
}

Expand All @@ -132,7 +131,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
}

var consumer *sarama.Consumer
if consumer, err = sarama.NewConsumer(brokerList, config.SaramaConfig); err != nil {
if consumer, err = sarama.NewConsumer(brokerList, config.Config); err != nil {
zk.Close()
return
}
Expand Down Expand Up @@ -171,7 +170,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
cg.Logf("Consumer instance registered (%s).", cg.id)
}

offsetConfig := OffsetManagerConfig{CommitInterval: config.CommitInterval}
offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval}
cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig)

go cg.topicListConsumer(topics)
Expand Down Expand Up @@ -210,7 +209,7 @@ func (cg *ConsumerGroup) Close() error {
if shutdownError = cg.zk.DeregisterConsumer(cg.name, cg.id); shutdownError != nil {
cg.Logf("FAILED deregistering consumer instance: %s!\n", shutdownError)
} else {
cg.Logf("Deregistered consumer istance %s.\n", cg.id)
cg.Logf("Deregistered consumer instance %s.\n", cg.id)
}

if shutdownError = cg.consumer.Close(); shutdownError != nil {
Expand Down Expand Up @@ -241,26 +240,7 @@ func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error {
return nil
}

func (cg *ConsumerGroup) closeOnPanic() {
if err := recover(); err != nil {
cg.Logf("Error: %s\n", err)

// Try to produce an error event on the channel so we can inform the consumer.
// If that doesn't work, continue.
ce := &sarama.ConsumerError{Err: fmt.Errorf("%s", err)}
select {
case cg.errors <- ce:
default:
}

// Now, close the consumer
cg.Close()
}
}

func (cg *ConsumerGroup) topicListConsumer(topics []string) {
defer cg.closeOnPanic()

for {
select {
case <-cg.stopper:
Expand All @@ -270,7 +250,8 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {

consumers, consumerChanges, err := cg.zk.Consumers(cg.name)
if err != nil {
panic(err)
cg.Logf("FAILED to get list of registered consumer instances: %s\n", err)
return
}

cg.consumers = consumers
Expand All @@ -289,15 +270,14 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {
return

case <-consumerChanges:
cg.Logf("Triggering rebalance due to consumer list change.\n")
cg.Logf("Triggering rebalance due to consumer list change\n")
close(stopper)
cg.wg.Wait()
}
}
}

func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, stopper <-chan struct{}) {
defer cg.closeOnPanic()
defer cg.wg.Done()

select {
Expand All @@ -306,17 +286,18 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con
default:
}

cg.Logf("Started topic consumer for %s\n", topic)
cg.Logf("%s :: Started topic consumer\n", topic)

// Fetch a list of partition IDs
partitions, err := cg.zk.Partitions(topic)
if err != nil {
panic(err)
cg.Logf("%s :: FAILED to get list of partitions: %s\n", topic, err)
return
}

dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitions)
myPartitions := dividedPartitions[cg.id]
cg.Logf("Claiming %d of %d partitions for topic %s.", len(myPartitions), len(partitions), topic)
cg.Logf("%s :: Claiming %d of %d partitions", topic, len(myPartitions), len(partitions))

// Consume all the assigned partitions
var wg sync.WaitGroup
Expand All @@ -327,12 +308,11 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con
}

wg.Wait()
cg.Logf("Stopped topic consumer for %s\n", topic)
cg.Logf("%s :: Stopped topic consumer\n", topic)
}

// Consumes a partition
func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, wg *sync.WaitGroup, stopper <-chan struct{}) {
defer cg.closeOnPanic()
defer wg.Done()

select {
Expand All @@ -343,29 +323,32 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag

err := cg.zk.Claim(cg.name, topic, partition, cg.id)
if err != nil {
panic(err)
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
return
}
defer cg.zk.Release(cg.name, topic, partition, cg.id)

nextOffset, err := cg.offsetManager.InitializePartition(topic, partition)
if err != nil {
panic(err)
cg.Logf("%s/%d :: FAILED to determine initial offset: %s\n", topic, partition, err)
return
}

if nextOffset > 0 {
cg.Logf("Partition consumer for %s/%d starting at offset %d.\n", topic, partition, nextOffset)
cg.Logf("%s/%d :: Partition consumer starting at offset %d.\n", topic, partition, nextOffset)
} else {
nextOffset = cg.config.InitialOffset
nextOffset = cg.config.Offsets.Initial
if nextOffset == sarama.OffsetOldest {
cg.Logf("Partition consumer for %s/%d starting at the oldest available offset.\n", topic, partition)
cg.Logf("%s/%d :: Partition consumer starting at the oldest available offset.\n", topic, partition)
} else if nextOffset == sarama.OffsetNewest {
cg.Logf("Partition consumer for %s/%d for new messages only.\n", topic, partition)
cg.Logf("%s/%d :: Partition consumer listening for new messages only.\n", topic, partition)
}
}

consumer, err := cg.consumer.ConsumePartition(topic, partition, nextOffset)
if err != nil {
panic(err)
cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err)
return
}
defer consumer.Close()

Expand All @@ -374,6 +357,9 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag
partitionConsumerLoop:
for {
select {
case <-stopper:
break partitionConsumerLoop

case err := <-consumer.Errors():
for {
select {
Expand All @@ -388,23 +374,19 @@ partitionConsumerLoop:
case message := <-consumer.Messages():
for {
select {
case <-stopper:
break partitionConsumerLoop

case messages <- message:
lastOffset = message.Offset
continue partitionConsumerLoop

case <-stopper:
break partitionConsumerLoop
}
}

case <-stopper:
break partitionConsumerLoop
}
}

if lastCommittedOffset, err := cg.offsetManager.FinalizePartition(topic, partition); err == nil {
cg.Logf("Stopping partition consumer for %s/%d at offset %d.\n", topic, partition, lastCommittedOffset)
} else {
cg.Logf("FAILED to commit offset %d when stopping partition consumer for %s/%d.\n", lastOffset, topic, partition)
cg.Logf("%s/%d :: Stopping partition consumer at offset %d\n", topic, partition, lastOffset)
if err := cg.offsetManager.FinalizePartition(topic, partition, lastOffset, cg.config.Offsets.ProcessingTimeout); err != nil {
cg.Logf("%s/%d :: %s\n", topic, partition, err)
}
}
2 changes: 1 addition & 1 deletion consumergroup/consumergroup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestSingleTopicSequentialConsumer(t *testing.T) {
// If the channel is buffered, the consumer will enqueue more events in the channel,
// which assertEvents will simply skip. When consumer 2 starts it will skip a bunch of
// events because of this. Transactional processing will fix this.
config := NewConsumerGroupConfig()
config := NewConfig()
config.ChannelBufferSize = 0

consumer1, err := JoinConsumerGroup(consumerGroup, []string{TopicWithSinglePartition}, zookeeper, config)
Expand Down
10 changes: 5 additions & 5 deletions consumergroup/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type ConsumerGroupProcessingLag map[string]TopicProcessingLag // The number of m

// Instantiates a new consumergroup monitor. Retuns the number of messages the consumergroup is behind
// the latest offset in Kafka for every topic/partition the consumergroup is consuming.
func NewMonitor(name string, consumergroup string, zookeeper []string, config *ConsumerGroupConfig) (*Monitor, error) {
func NewMonitor(name string, consumergroup string, zookeeper []string, config *Config) (*Monitor, error) {
if config == nil {
config = NewConsumerGroupConfig()
config = NewConfig()
}
config.SaramaConfig.ClientID = name
config.ClientID = name

zkConn, err := NewZK(zookeeper, config.ZookeeperChroot, config.ZookeeperTimeout)
zkConn, err := NewZK(zookeeper, config.Zookeeper.Chroot, config.Zookeeper.Timeout)
if err != nil {
return nil, err
}
Expand All @@ -43,7 +43,7 @@ func NewMonitor(name string, consumergroup string, zookeeper []string, config *C
brokerList = append(brokerList, broker)
}

saramaClient, err := sarama.NewClient(brokerList, config.SaramaConfig)
saramaClient, err := sarama.NewClient(brokerList, config.Config)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 15a566f

Please sign in to comment.