Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Commit

Permalink
Better locking and logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Mar 8, 2015
1 parent adddc9a commit d929679
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 32 deletions.
36 changes: 10 additions & 26 deletions consumergroup/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,26 +243,7 @@ func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error {
return nil
}

func (cg *ConsumerGroup) closeOnPanic() {
if err := recover(); err != nil {
cg.Logf("Error: %s\n", err)

// Try to produce an error event on the channel so we can inform the consumer.
// If that doesn't work, continue.
ce := &sarama.ConsumerError{Err: fmt.Errorf("%s", err)}
select {
case cg.errors <- ce:
default:
}

// Now, close the consumer
cg.Close()
}
}

func (cg *ConsumerGroup) topicListConsumer(topics []string) {
defer cg.closeOnPanic()

for {
select {
case <-cg.stopper:
Expand All @@ -272,7 +253,8 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {

consumers, consumerChanges, err := cg.zk.Consumers(cg.name)
if err != nil {
panic(err)
cg.Logf("FAILED to get list of registered consumer instances: %s\n", err)
return
}

cg.consumers = consumers
Expand All @@ -299,7 +281,6 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {
}

func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, stopper <-chan struct{}) {
defer cg.closeOnPanic()
defer cg.wg.Done()

select {
Expand All @@ -313,7 +294,8 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con
// Fetch a list of partition IDs
partitions, err := cg.zk.Partitions(topic)
if err != nil {
panic(err)
cg.Logf("%s :: FAILED to get list of partitions: %s\n", topic, err)
return
}

dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitions)
Expand All @@ -334,7 +316,6 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con

// Consumes a partition
func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, wg *sync.WaitGroup, stopper <-chan struct{}) {
defer cg.closeOnPanic()
defer wg.Done()

select {
Expand All @@ -345,13 +326,15 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag

err := cg.zk.Claim(cg.name, topic, partition, cg.id)
if err != nil {
panic(err)
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
return
}
defer cg.zk.Release(cg.name, topic, partition, cg.id)

nextOffset, err := cg.offsetManager.InitializePartition(topic, partition)
if err != nil {
panic(err)
cg.Logf("%s/%d :: FAILED to determine initial offset: %s\n", topic, partition, err)
return
}

if nextOffset > 0 {
Expand All @@ -367,7 +350,8 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag

consumer, err := cg.consumer.ConsumePartition(topic, partition, nextOffset)
if err != nil {
panic(err)
cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err)
return
}
defer consumer.Close()

Expand Down
14 changes: 8 additions & 6 deletions consumergroup/offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type OffsetManager interface {
}

var (
UncleanClose = errors.New("Not all offsets were committed before shutdown was completed.")
UncleanClose = errors.New("Not all offsets were committed before shutdown was completed")
)

// OffsetManagerConfig holds configuration setting son how the offset manager should behave.
Expand Down Expand Up @@ -124,10 +124,10 @@ func (zom *zookeeperOffsetManager) InitializePartition(topic string, partition i
}

func (zom *zookeeperOffsetManager) FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error {
zom.l.Lock()
defer zom.l.Unlock()

zom.l.RLock()
tracker := zom.offsets[topic][partition]
zom.l.RUnlock()

if !tracker.waitForOffset(lastOffset, timeout) {
return fmt.Errorf("TIMEOUT waiting for offset %d. Last committed offset: %d", lastOffset, tracker.lastCommittedOffset)
}
Expand All @@ -136,7 +136,10 @@ func (zom *zookeeperOffsetManager) FinalizePartition(topic string, partition int
return fmt.Errorf("FAILED to commit offset %d to Zookeeper. Last committed offset: %d", tracker.highestProcessedOffset, tracker.lastCommittedOffset)
}

zom.l.Lock()
delete(zom.offsets[topic], partition)
zom.l.Unlock()

return nil
}

Expand All @@ -154,10 +157,9 @@ func (zom *zookeeperOffsetManager) Close() error {
defer zom.l.Unlock()

var closeError error
for topic, partitionOffsets := range zom.offsets {
for _, partitionOffsets := range zom.offsets {
if len(partitionOffsets) > 0 {
closeError = UncleanClose
zom.cg.Logf("WARNING: uncommitted offsets for topic %s.\n", topic)
}
}

Expand Down

0 comments on commit d929679

Please sign in to comment.