diff --git a/consumer_group.go b/consumer_group.go index b974dd9af..97c08e9cc 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -175,6 +175,7 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co // loop check topic partition numbers changed // will trigger rebalance when any topic partitions number had changed + // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine go c.loopCheckPartitionNumbers(topics, sess) // Wait for session exit signal @@ -448,7 +449,7 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { } func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) { - pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2) + pause := time.NewTicker(c.config.Metadata.RefreshFrequency) defer session.cancel() defer pause.Stop() var oldTopicToPartitionNum map[string]int @@ -468,6 +469,10 @@ func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *cons } select { case <-pause.C: + case <-session.ctx.Done(): + Logger.Printf("loop check partition number coroutine will exit, topics %s", topics) + // if session closed by other, should be exited + return case <-c.closed: return } @@ -475,10 +480,6 @@ func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *cons } func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) { - if err := c.client.RefreshMetadata(topics...); err != nil { - Logger.Printf("Consumer Group refresh metadata failed %v", err) - return nil, err - } topicToPartitionNum := make(map[string]int, len(topics)) for _, topic := range topics { if partitionNum, err := c.client.Partitions(topic); err != nil {