Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Wait for messages to be processed before committing during shutdown #37

Merged
merged 2 commits into from
Mar 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions consumer_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
Expand Down Expand Up @@ -41,8 +42,10 @@ func init() {
}

func main() {
config := consumergroup.NewConsumerGroupConfig()
config.InitialOffset = sarama.OffsetNewest
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = 10 * time.Second

consumer, consumerErr := consumergroup.JoinConsumerGroup(consumerGroup, kafkaTopics, zookeeper, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
Expand Down Expand Up @@ -76,6 +79,9 @@ func main() {
log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
}

// Simulate processing time
time.Sleep(10 * time.Millisecond)

offsets[message.Topic][message.Partition] = message.Offset
consumer.CommitUpto(message)
}
Expand Down
152 changes: 67 additions & 85 deletions consumergroup/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,55 @@ var (
FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?")
)

type ConsumerGroupConfig struct {
// The Zookeeper read timeout. Defaults to 1 second
ZookeeperTimeout time.Duration
type Config struct {
*sarama.Config

// Zookeeper chroot to use. Should not include a trailing slash.
// Leave this empty when your Kafka install does not use a chroot.
ZookeeperChroot string
Zookeeper struct {
// The Zookeeper read timeout. Defaults to 1 second
Timeout time.Duration

SaramaConfig *sarama.Config // This will be passed to Sarama when creating a new consumer
// Zookeeper chroot to use. Should not include a trailing slash.
// Leave this empty when your Kafka install does not use a chroot.
Chroot string
}

ChannelBufferSize int // The buffer size of the channel for the messages coming from Kafka. Zero means no buffering. Default is 256.
CommitInterval time.Duration // The interval between which the prossed offsets are commited to Zookeeper.
InitialOffset int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetMethodOldest (default) or sarama.OffsetMethodNewest.
Offsets struct {
Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
CommitInterval time.Duration // The interval between which the prossed offsets are commited.
}
}

func NewConsumerGroupConfig() *ConsumerGroupConfig {
return &ConsumerGroupConfig{
ZookeeperTimeout: 1 * time.Second,
ChannelBufferSize: 256,
CommitInterval: 10 * time.Second,
InitialOffset: sarama.OffsetOldest,
}
func NewConfig() *Config {
config := &Config{}
config.Config = sarama.NewConfig()
config.Zookeeper.Timeout = 1 * time.Second
config.Offsets.Initial = sarama.OffsetOldest
config.Offsets.ProcessingTimeout = 60 * time.Second
config.Offsets.CommitInterval = 10 * time.Second

return config
}

func (cgc *ConsumerGroupConfig) Validate() error {
if cgc.ZookeeperTimeout <= 0 {
return errors.New("ZookeeperTimeout should have a duration > 0")
func (cgc *Config) Validate() error {
if cgc.Zookeeper.Timeout <= 0 {
return sarama.ConfigurationError("ZookeeperTimeout should have a duration > 0")
}

if cgc.CommitInterval <= 0 {
return errors.New("CommitInterval should have a duration > 0")
if cgc.Offsets.CommitInterval <= 0 {
return sarama.ConfigurationError("CommitInterval should have a duration > 0")
}

if cgc.ChannelBufferSize < 0 {
return errors.New("ChannelBufferSize should be >= 0.")
if cgc.Offsets.Initial != sarama.OffsetOldest && cgc.Offsets.Initial != sarama.OffsetNewest {
return errors.New("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest.")
}

if cgc.SaramaConfig != nil {
if err := cgc.SaramaConfig.Validate(); err != nil {
if cgc.Config != nil {
if err := cgc.Config.Validate(); err != nil {
return err
}
}

if cgc.InitialOffset != sarama.OffsetOldest && cgc.InitialOffset != sarama.OffsetNewest {
return errors.New("InitialOffset should OffsetMethodNewest or OffsetMethodOldest.")
}

return nil
}

Expand All @@ -69,7 +71,7 @@ func (cgc *ConsumerGroupConfig) Validate() error {
type ConsumerGroup struct {
id, name string

config *ConsumerGroupConfig
config *Config

consumer *sarama.Consumer
zk *ZK
Expand All @@ -88,7 +90,7 @@ type ConsumerGroup struct {
}

// Connects to a consumer group, using Zookeeper for auto-discovery
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *ConsumerGroupConfig) (cg *ConsumerGroup, err error) {
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) {

if name == "" {
return nil, sarama.ConfigurationError("Empty consumergroup name")
Expand All @@ -103,20 +105,17 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
}

if config == nil {
config = NewConsumerGroupConfig()
}
if config.SaramaConfig == nil {
config.SaramaConfig = sarama.NewConfig()
config = NewConfig()
}
config.SaramaConfig.ClientID = name
config.ClientID = name

// Validate configuration
if err = config.Validate(); err != nil {
return
}

var zk *ZK
if zk, err = NewZK(zookeeper, config.ZookeeperChroot, config.ZookeeperTimeout); err != nil {
if zk, err = NewZK(zookeeper, config.Zookeeper.Chroot, config.Zookeeper.Timeout); err != nil {
return
}

Expand All @@ -132,7 +131,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
}

var consumer *sarama.Consumer
if consumer, err = sarama.NewConsumer(brokerList, config.SaramaConfig); err != nil {
if consumer, err = sarama.NewConsumer(brokerList, config.Config); err != nil {
zk.Close()
return
}
Expand Down Expand Up @@ -171,7 +170,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
cg.Logf("Consumer instance registered (%s).", cg.id)
}

offsetConfig := OffsetManagerConfig{CommitInterval: config.CommitInterval}
offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval}
cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig)

go cg.topicListConsumer(topics)
Expand Down Expand Up @@ -210,7 +209,7 @@ func (cg *ConsumerGroup) Close() error {
if shutdownError = cg.zk.DeregisterConsumer(cg.name, cg.id); shutdownError != nil {
cg.Logf("FAILED deregistering consumer instance: %s!\n", shutdownError)
} else {
cg.Logf("Deregistered consumer istance %s.\n", cg.id)
cg.Logf("Deregistered consumer instance %s.\n", cg.id)
}

if shutdownError = cg.consumer.Close(); shutdownError != nil {
Expand Down Expand Up @@ -241,26 +240,7 @@ func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error {
return nil
}

func (cg *ConsumerGroup) closeOnPanic() {
if err := recover(); err != nil {
cg.Logf("Error: %s\n", err)

// Try to produce an error event on the channel so we can inform the consumer.
// If that doesn't work, continue.
ce := &sarama.ConsumerError{Err: fmt.Errorf("%s", err)}
select {
case cg.errors <- ce:
default:
}

// Now, close the consumer
cg.Close()
}
}

func (cg *ConsumerGroup) topicListConsumer(topics []string) {
defer cg.closeOnPanic()

for {
select {
case <-cg.stopper:
Expand All @@ -270,7 +250,8 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {

consumers, consumerChanges, err := cg.zk.Consumers(cg.name)
if err != nil {
panic(err)
cg.Logf("FAILED to get list of registered consumer instances: %s\n", err)
return
}

cg.consumers = consumers
Expand All @@ -289,15 +270,14 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {
return

case <-consumerChanges:
cg.Logf("Triggering rebalance due to consumer list change.\n")
cg.Logf("Triggering rebalance due to consumer list change\n")
close(stopper)
cg.wg.Wait()
}
}
}

func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, stopper <-chan struct{}) {
defer cg.closeOnPanic()
defer cg.wg.Done()

select {
Expand All @@ -306,17 +286,18 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con
default:
}

cg.Logf("Started topic consumer for %s\n", topic)
cg.Logf("%s :: Started topic consumer\n", topic)

// Fetch a list of partition IDs
partitions, err := cg.zk.Partitions(topic)
if err != nil {
panic(err)
cg.Logf("%s :: FAILED to get list of partitions: %s\n", topic, err)
return
}

dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitions)
myPartitions := dividedPartitions[cg.id]
cg.Logf("Claiming %d of %d partitions for topic %s.", len(myPartitions), len(partitions), topic)
cg.Logf("%s :: Claiming %d of %d partitions", topic, len(myPartitions), len(partitions))

// Consume all the assigned partitions
var wg sync.WaitGroup
Expand All @@ -327,12 +308,11 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con
}

wg.Wait()
cg.Logf("Stopped topic consumer for %s\n", topic)
cg.Logf("%s :: Stopped topic consumer\n", topic)
}

// Consumes a partition
func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, wg *sync.WaitGroup, stopper <-chan struct{}) {
defer cg.closeOnPanic()
defer wg.Done()

select {
Expand All @@ -343,29 +323,32 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag

err := cg.zk.Claim(cg.name, topic, partition, cg.id)
if err != nil {
panic(err)
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
return
}
defer cg.zk.Release(cg.name, topic, partition, cg.id)

nextOffset, err := cg.offsetManager.InitializePartition(topic, partition)
if err != nil {
panic(err)
cg.Logf("%s/%d :: FAILED to determine initial offset: %s\n", topic, partition, err)
return
}

if nextOffset > 0 {
cg.Logf("Partition consumer for %s/%d starting at offset %d.\n", topic, partition, nextOffset)
cg.Logf("%s/%d :: Partition consumer starting at offset %d.\n", topic, partition, nextOffset)
} else {
nextOffset = cg.config.InitialOffset
nextOffset = cg.config.Offsets.Initial
if nextOffset == sarama.OffsetOldest {
cg.Logf("Partition consumer for %s/%d starting at the oldest available offset.\n", topic, partition)
cg.Logf("%s/%d :: Partition consumer starting at the oldest available offset.\n", topic, partition)
} else if nextOffset == sarama.OffsetNewest {
cg.Logf("Partition consumer for %s/%d for new messages only.\n", topic, partition)
cg.Logf("%s/%d :: Partition consumer listening for new messages only.\n", topic, partition)
}
}

consumer, err := cg.consumer.ConsumePartition(topic, partition, nextOffset)
if err != nil {
panic(err)
cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err)
return
}
defer consumer.Close()

Expand All @@ -374,6 +357,9 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag
partitionConsumerLoop:
for {
select {
case <-stopper:
break partitionConsumerLoop

case err := <-consumer.Errors():
for {
select {
Expand All @@ -388,23 +374,19 @@ partitionConsumerLoop:
case message := <-consumer.Messages():
for {
select {
case <-stopper:
break partitionConsumerLoop

case messages <- message:
lastOffset = message.Offset
continue partitionConsumerLoop

case <-stopper:
break partitionConsumerLoop
}
}

case <-stopper:
break partitionConsumerLoop
}
}

if lastCommittedOffset, err := cg.offsetManager.FinalizePartition(topic, partition); err == nil {
cg.Logf("Stopping partition consumer for %s/%d at offset %d.\n", topic, partition, lastCommittedOffset)
} else {
cg.Logf("FAILED to commit offset %d when stopping partition consumer for %s/%d.\n", lastOffset, topic, partition)
cg.Logf("%s/%d :: Stopping partition consumer at offset %d\n", topic, partition, lastOffset)
if err := cg.offsetManager.FinalizePartition(topic, partition, lastOffset, cg.config.Offsets.ProcessingTimeout); err != nil {
cg.Logf("%s/%d :: %s\n", topic, partition, err)
}
}
2 changes: 1 addition & 1 deletion consumergroup/consumergroup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestSingleTopicSequentialConsumer(t *testing.T) {
// If the channel is buffered, the consumer will enqueue more events in the channel,
// which assertEvents will simply skip. When consumer 2 starts it will skip a bunch of
// events because of this. Transactional processing will fix this.
config := NewConsumerGroupConfig()
config := NewConfig()
config.ChannelBufferSize = 0

consumer1, err := JoinConsumerGroup(consumerGroup, []string{TopicWithSinglePartition}, zookeeper, config)
Expand Down
10 changes: 5 additions & 5 deletions consumergroup/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type ConsumerGroupProcessingLag map[string]TopicProcessingLag // The number of m

// Instantiates a new consumergroup monitor. Retuns the number of messages the consumergroup is behind
// the latest offset in Kafka for every topic/partition the consumergroup is consuming.
func NewMonitor(name string, consumergroup string, zookeeper []string, config *ConsumerGroupConfig) (*Monitor, error) {
func NewMonitor(name string, consumergroup string, zookeeper []string, config *Config) (*Monitor, error) {
if config == nil {
config = NewConsumerGroupConfig()
config = NewConfig()
}
config.SaramaConfig.ClientID = name
config.ClientID = name

zkConn, err := NewZK(zookeeper, config.ZookeeperChroot, config.ZookeeperTimeout)
zkConn, err := NewZK(zookeeper, config.Zookeeper.Chroot, config.Zookeeper.Timeout)
if err != nil {
return nil, err
}
Expand All @@ -43,7 +43,7 @@ func NewMonitor(name string, consumergroup string, zookeeper []string, config *C
brokerList = append(brokerList, broker)
}

saramaClient, err := sarama.NewClient(brokerList, config.SaramaConfig)
saramaClient, err := sarama.NewClient(brokerList, config.Config)
if err != nil {
return nil, err
}
Expand Down
Loading