diff --git a/consumer_example.go b/consumer_example.go index 70d7b31..11bd7b8 100644 --- a/consumer_example.go +++ b/consumer_example.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "strings" + "time" "github.com/Shopify/sarama" "github.com/wvanbergen/kafka/consumergroup" @@ -41,8 +42,10 @@ func init() { } func main() { - config := consumergroup.NewConsumerGroupConfig() - config.InitialOffset = sarama.OffsetNewest + config := consumergroup.NewConfig() + config.Offsets.Initial = sarama.OffsetNewest + config.Offsets.ProcessingTimeout = 10 * time.Second + consumer, consumerErr := consumergroup.JoinConsumerGroup(consumerGroup, kafkaTopics, zookeeper, config) if consumerErr != nil { log.Fatalln(consumerErr) @@ -76,6 +79,9 @@ func main() { log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1) } + // Simulate processing time + time.Sleep(10 * time.Millisecond) + offsets[message.Topic][message.Partition] = message.Offset consumer.CommitUpto(message) } diff --git a/consumergroup/consumer_group.go b/consumergroup/consumer_group.go index 64d7d66..b4d9313 100644 --- a/consumergroup/consumer_group.go +++ b/consumergroup/consumer_group.go @@ -14,53 +14,55 @@ var ( FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?") ) -type ConsumerGroupConfig struct { - // The Zookeeper read timeout. Defaults to 1 second - ZookeeperTimeout time.Duration +type Config struct { + *sarama.Config - // Zookeeper chroot to use. Should not include a trailing slash. - // Leave this empty when your Kafka install does not use a chroot. - ZookeeperChroot string + Zookeeper struct { + // The Zookeeper read timeout. Defaults to 1 second + Timeout time.Duration - SaramaConfig *sarama.Config // This will be passed to Sarama when creating a new consumer + // Zookeeper chroot to use. Should not include a trailing slash. + // Leave this empty when your Kafka install does not use a chroot. + Chroot string + } - ChannelBufferSize int // The buffer size of the channel for the messages coming from Kafka. Zero means no buffering. Default is 256. - CommitInterval time.Duration // The interval between which the prossed offsets are commited to Zookeeper. - InitialOffset int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetMethodOldest (default) or sarama.OffsetMethodNewest. + Offsets struct { + Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest. + ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute. + CommitInterval time.Duration // The interval between which the prossed offsets are commited. + } } -func NewConsumerGroupConfig() *ConsumerGroupConfig { - return &ConsumerGroupConfig{ - ZookeeperTimeout: 1 * time.Second, - ChannelBufferSize: 256, - CommitInterval: 10 * time.Second, - InitialOffset: sarama.OffsetOldest, - } +func NewConfig() *Config { + config := &Config{} + config.Config = sarama.NewConfig() + config.Zookeeper.Timeout = 1 * time.Second + config.Offsets.Initial = sarama.OffsetOldest + config.Offsets.ProcessingTimeout = 60 * time.Second + config.Offsets.CommitInterval = 10 * time.Second + + return config } -func (cgc *ConsumerGroupConfig) Validate() error { - if cgc.ZookeeperTimeout <= 0 { - return errors.New("ZookeeperTimeout should have a duration > 0") +func (cgc *Config) Validate() error { + if cgc.Zookeeper.Timeout <= 0 { + return sarama.ConfigurationError("ZookeeperTimeout should have a duration > 0") } - if cgc.CommitInterval <= 0 { - return errors.New("CommitInterval should have a duration > 0") + if cgc.Offsets.CommitInterval <= 0 { + return sarama.ConfigurationError("CommitInterval should have a duration > 0") } - if cgc.ChannelBufferSize < 0 { - return errors.New("ChannelBufferSize should be >= 0.") + if cgc.Offsets.Initial != sarama.OffsetOldest && cgc.Offsets.Initial != sarama.OffsetNewest { + return errors.New("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest.") } - if cgc.SaramaConfig != nil { - if err := cgc.SaramaConfig.Validate(); err != nil { + if cgc.Config != nil { + if err := cgc.Config.Validate(); err != nil { return err } } - if cgc.InitialOffset != sarama.OffsetOldest && cgc.InitialOffset != sarama.OffsetNewest { - return errors.New("InitialOffset should OffsetMethodNewest or OffsetMethodOldest.") - } - return nil } @@ -69,7 +71,7 @@ func (cgc *ConsumerGroupConfig) Validate() error { type ConsumerGroup struct { id, name string - config *ConsumerGroupConfig + config *Config consumer *sarama.Consumer zk *ZK @@ -88,7 +90,7 @@ type ConsumerGroup struct { } // Connects to a consumer group, using Zookeeper for auto-discovery -func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *ConsumerGroupConfig) (cg *ConsumerGroup, err error) { +func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) { if name == "" { return nil, sarama.ConfigurationError("Empty consumergroup name") @@ -103,12 +105,9 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config } if config == nil { - config = NewConsumerGroupConfig() - } - if config.SaramaConfig == nil { - config.SaramaConfig = sarama.NewConfig() + config = NewConfig() } - config.SaramaConfig.ClientID = name + config.ClientID = name // Validate configuration if err = config.Validate(); err != nil { @@ -116,7 +115,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config } var zk *ZK - if zk, err = NewZK(zookeeper, config.ZookeeperChroot, config.ZookeeperTimeout); err != nil { + if zk, err = NewZK(zookeeper, config.Zookeeper.Chroot, config.Zookeeper.Timeout); err != nil { return } @@ -132,7 +131,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config } var consumer *sarama.Consumer - if consumer, err = sarama.NewConsumer(brokerList, config.SaramaConfig); err != nil { + if consumer, err = sarama.NewConsumer(brokerList, config.Config); err != nil { zk.Close() return } @@ -171,7 +170,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config cg.Logf("Consumer instance registered (%s).", cg.id) } - offsetConfig := OffsetManagerConfig{CommitInterval: config.CommitInterval} + offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval} cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig) go cg.topicListConsumer(topics) @@ -210,7 +209,7 @@ func (cg *ConsumerGroup) Close() error { if shutdownError = cg.zk.DeregisterConsumer(cg.name, cg.id); shutdownError != nil { cg.Logf("FAILED deregistering consumer instance: %s!\n", shutdownError) } else { - cg.Logf("Deregistered consumer istance %s.\n", cg.id) + cg.Logf("Deregistered consumer instance %s.\n", cg.id) } if shutdownError = cg.consumer.Close(); shutdownError != nil { @@ -241,26 +240,7 @@ func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error { return nil } -func (cg *ConsumerGroup) closeOnPanic() { - if err := recover(); err != nil { - cg.Logf("Error: %s\n", err) - - // Try to produce an error event on the channel so we can inform the consumer. - // If that doesn't work, continue. - ce := &sarama.ConsumerError{Err: fmt.Errorf("%s", err)} - select { - case cg.errors <- ce: - default: - } - - // Now, close the consumer - cg.Close() - } -} - func (cg *ConsumerGroup) topicListConsumer(topics []string) { - defer cg.closeOnPanic() - for { select { case <-cg.stopper: @@ -270,7 +250,8 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { consumers, consumerChanges, err := cg.zk.Consumers(cg.name) if err != nil { - panic(err) + cg.Logf("FAILED to get list of registered consumer instances: %s\n", err) + return } cg.consumers = consumers @@ -289,7 +270,7 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { return case <-consumerChanges: - cg.Logf("Triggering rebalance due to consumer list change.\n") + cg.Logf("Triggering rebalance due to consumer list change\n") close(stopper) cg.wg.Wait() } @@ -297,7 +278,6 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { } func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, stopper <-chan struct{}) { - defer cg.closeOnPanic() defer cg.wg.Done() select { @@ -306,17 +286,18 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con default: } - cg.Logf("Started topic consumer for %s\n", topic) + cg.Logf("%s :: Started topic consumer\n", topic) // Fetch a list of partition IDs partitions, err := cg.zk.Partitions(topic) if err != nil { - panic(err) + cg.Logf("%s :: FAILED to get list of partitions: %s\n", topic, err) + return } dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitions) myPartitions := dividedPartitions[cg.id] - cg.Logf("Claiming %d of %d partitions for topic %s.", len(myPartitions), len(partitions), topic) + cg.Logf("%s :: Claiming %d of %d partitions", topic, len(myPartitions), len(partitions)) // Consume all the assigned partitions var wg sync.WaitGroup @@ -327,12 +308,11 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con } wg.Wait() - cg.Logf("Stopped topic consumer for %s\n", topic) + cg.Logf("%s :: Stopped topic consumer\n", topic) } // Consumes a partition func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, wg *sync.WaitGroup, stopper <-chan struct{}) { - defer cg.closeOnPanic() defer wg.Done() select { @@ -343,29 +323,32 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag err := cg.zk.Claim(cg.name, topic, partition, cg.id) if err != nil { - panic(err) + cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err) + return } defer cg.zk.Release(cg.name, topic, partition, cg.id) nextOffset, err := cg.offsetManager.InitializePartition(topic, partition) if err != nil { - panic(err) + cg.Logf("%s/%d :: FAILED to determine initial offset: %s\n", topic, partition, err) + return } if nextOffset > 0 { - cg.Logf("Partition consumer for %s/%d starting at offset %d.\n", topic, partition, nextOffset) + cg.Logf("%s/%d :: Partition consumer starting at offset %d.\n", topic, partition, nextOffset) } else { - nextOffset = cg.config.InitialOffset + nextOffset = cg.config.Offsets.Initial if nextOffset == sarama.OffsetOldest { - cg.Logf("Partition consumer for %s/%d starting at the oldest available offset.\n", topic, partition) + cg.Logf("%s/%d :: Partition consumer starting at the oldest available offset.\n", topic, partition) } else if nextOffset == sarama.OffsetNewest { - cg.Logf("Partition consumer for %s/%d for new messages only.\n", topic, partition) + cg.Logf("%s/%d :: Partition consumer listening for new messages only.\n", topic, partition) } } consumer, err := cg.consumer.ConsumePartition(topic, partition, nextOffset) if err != nil { - panic(err) + cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err) + return } defer consumer.Close() @@ -374,6 +357,9 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag partitionConsumerLoop: for { select { + case <-stopper: + break partitionConsumerLoop + case err := <-consumer.Errors(): for { select { @@ -388,23 +374,19 @@ partitionConsumerLoop: case message := <-consumer.Messages(): for { select { + case <-stopper: + break partitionConsumerLoop + case messages <- message: lastOffset = message.Offset continue partitionConsumerLoop - - case <-stopper: - break partitionConsumerLoop } } - - case <-stopper: - break partitionConsumerLoop } } - if lastCommittedOffset, err := cg.offsetManager.FinalizePartition(topic, partition); err == nil { - cg.Logf("Stopping partition consumer for %s/%d at offset %d.\n", topic, partition, lastCommittedOffset) - } else { - cg.Logf("FAILED to commit offset %d when stopping partition consumer for %s/%d.\n", lastOffset, topic, partition) + cg.Logf("%s/%d :: Stopping partition consumer at offset %d\n", topic, partition, lastOffset) + if err := cg.offsetManager.FinalizePartition(topic, partition, lastOffset, cg.config.Offsets.ProcessingTimeout); err != nil { + cg.Logf("%s/%d :: %s\n", topic, partition, err) } } diff --git a/consumergroup/consumergroup_integration_test.go b/consumergroup/consumergroup_integration_test.go index 10b3629..94554a6 100644 --- a/consumergroup/consumergroup_integration_test.go +++ b/consumergroup/consumergroup_integration_test.go @@ -148,7 +148,7 @@ func TestSingleTopicSequentialConsumer(t *testing.T) { // If the channel is buffered, the consumer will enqueue more events in the channel, // which assertEvents will simply skip. When consumer 2 starts it will skip a bunch of // events because of this. Transactional processing will fix this. - config := NewConsumerGroupConfig() + config := NewConfig() config.ChannelBufferSize = 0 consumer1, err := JoinConsumerGroup(consumerGroup, []string{TopicWithSinglePartition}, zookeeper, config) diff --git a/consumergroup/monitor.go b/consumergroup/monitor.go index 5e02f26..79a63b7 100644 --- a/consumergroup/monitor.go +++ b/consumergroup/monitor.go @@ -22,13 +22,13 @@ type ConsumerGroupProcessingLag map[string]TopicProcessingLag // The number of m // Instantiates a new consumergroup monitor. Retuns the number of messages the consumergroup is behind // the latest offset in Kafka for every topic/partition the consumergroup is consuming. -func NewMonitor(name string, consumergroup string, zookeeper []string, config *ConsumerGroupConfig) (*Monitor, error) { +func NewMonitor(name string, consumergroup string, zookeeper []string, config *Config) (*Monitor, error) { if config == nil { - config = NewConsumerGroupConfig() + config = NewConfig() } - config.SaramaConfig.ClientID = name + config.ClientID = name - zkConn, err := NewZK(zookeeper, config.ZookeeperChroot, config.ZookeeperTimeout) + zkConn, err := NewZK(zookeeper, config.Zookeeper.Chroot, config.Zookeeper.Timeout) if err != nil { return nil, err } @@ -43,7 +43,7 @@ func NewMonitor(name string, consumergroup string, zookeeper []string, config *C brokerList = append(brokerList, broker) } - saramaClient, err := sarama.NewClient(brokerList, config.SaramaConfig) + saramaClient, err := sarama.NewClient(brokerList, config.Config) if err != nil { return nil, err } diff --git a/consumergroup/offset_manager.go b/consumergroup/offset_manager.go index 20faff7..41e3362 100644 --- a/consumergroup/offset_manager.go +++ b/consumergroup/offset_manager.go @@ -2,6 +2,7 @@ package consumergroup import ( "errors" + "fmt" "sync" "time" ) @@ -28,10 +29,10 @@ type OffsetManager interface { // FinalizePartition is called when the consumergroup is done consuming a // partition. In this method, the offset manager can flush any remaining offsets to its - // backend store. It should return the last committed offset, and an error if there - // was any problem flushing the offset. Note: it's possible that the consumergroup - // instance will start to consume the same partition again after this function is called. - FinalizePartition(topic string, partition int32) (int64, error) + // backend store. It should return an error if it was not able to commit the offset. + // Note: it's possible that the consumergroup instance will start to consume the same + // partition again after this function is called. + FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error // Close is called when the consumergroup is shutting down. In normal circumstances, all // offsets are committed because FinalizePartition is called for all the running partition @@ -41,7 +42,7 @@ type OffsetManager interface { } var ( - UncleanClose = errors.New("Not all offsets were committed before shutdown was completed.") + UncleanClose = errors.New("Not all offsets were committed before shutdown was completed") ) // OffsetManagerConfig holds configuration setting son how the offset manager should behave. @@ -65,8 +66,10 @@ type ( type partitionOffsetTracker struct { l sync.Mutex + waitingForOffset int64 highestProcessedOffset int64 lastCommittedOffset int64 + done chan struct{} } type zookeeperOffsetManager struct { @@ -114,22 +117,33 @@ func (zom *zookeeperOffsetManager) InitializePartition(topic string, partition i zom.offsets[topic][partition] = &partitionOffsetTracker{ highestProcessedOffset: nextOffset - 1, lastCommittedOffset: nextOffset - 1, + done: make(chan struct{}), } return nextOffset, nil } -func (zom *zookeeperOffsetManager) FinalizePartition(topic string, partition int32) (int64, error) { - zom.l.Lock() - defer zom.l.Unlock() - +func (zom *zookeeperOffsetManager) FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error { + zom.l.RLock() tracker := zom.offsets[topic][partition] - err := zom.commitOffset(topic, partition, tracker) - if err == nil { - delete(zom.offsets[topic], partition) + zom.l.RUnlock() + + if lastOffset-tracker.highestProcessedOffset > 0 { + zom.cg.Logf("%s/%d :: Last processed offset: %d. Waiting up to %ds for another %d messages to process...", topic, partition, tracker.highestProcessedOffset, timeout/time.Second, lastOffset-tracker.highestProcessedOffset) + if !tracker.waitForOffset(lastOffset, timeout) { + return fmt.Errorf("TIMEOUT waiting for offset %d. Last committed offset: %d", lastOffset, tracker.lastCommittedOffset) + } } - return tracker.lastCommittedOffset, err + if err := zom.commitOffset(topic, partition, tracker); err != nil { + return fmt.Errorf("FAILED to commit offset %d to Zookeeper. Last committed offset: %d", tracker.highestProcessedOffset, tracker.lastCommittedOffset) + } + + zom.l.Lock() + delete(zom.offsets[topic], partition) + zom.l.Unlock() + + return nil } func (zom *zookeeperOffsetManager) MarkAsProcessed(topic string, partition int32, offset int64) bool { @@ -145,19 +159,14 @@ func (zom *zookeeperOffsetManager) Close() error { zom.l.Lock() defer zom.l.Unlock() - var uncleanClose bool - for topic, partitionOffsets := range zom.offsets { + var closeError error + for _, partitionOffsets := range zom.offsets { if len(partitionOffsets) > 0 { - uncleanClose = true - zom.cg.Logf("Uncommitted offsets for topic %s! Committing now...\n", topic) + closeError = UncleanClose } } - if uncleanClose { - return UncleanClose - } else { - return nil - } + return closeError } func (zom *zookeeperOffsetManager) offsetCommitter() { @@ -215,6 +224,9 @@ func (pot *partitionOffsetTracker) markAsProcessed(offset int64) bool { defer pot.l.Unlock() if offset > pot.highestProcessedOffset { pot.highestProcessedOffset = offset + if pot.waitingForOffset == pot.highestProcessedOffset { + close(pot.done) + } return true } else { return false @@ -237,3 +249,20 @@ func (pot *partitionOffsetTracker) commit(committer offsetCommitter) error { return nil } } + +func (pot *partitionOffsetTracker) waitForOffset(offset int64, timeout time.Duration) bool { + pot.l.Lock() + if offset > pot.highestProcessedOffset { + pot.waitingForOffset = offset + pot.l.Unlock() + select { + case <-pot.done: + return true + case <-time.After(timeout): + return false + } + } else { + pot.l.Unlock() + return true + } +}