diff --git a/consumer_group.go b/consumer_group.go index 56f96ea13..8de95137e 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -163,14 +163,8 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co return err } - // Get coordinator - coordinator, err := c.client.Coordinator(c.groupID) - if err != nil { - return err - } - // Init session - sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max) + sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max) if err == ErrClosedClient { return ErrClosedConsumerGroup } else if err != nil { @@ -184,7 +178,33 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co return sess.release(true) } -func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) { +func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) { + select { + case <-c.closed: + return nil, ErrClosedConsumerGroup + case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff): + } + + if refreshCoordinator { + err := c.client.RefreshCoordinator(c.groupID) + if err != nil { + return c.retryNewSession(ctx, topics, handler, retries, true) + } + } + + return c.newSession(ctx, topics, handler, retries-1) +} + +func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) { + coordinator, err := c.client.Coordinator(c.groupID) + if err != nil { + if retries <= 0 { + return nil, err + } + + return c.retryNewSession(ctx, topics, handler, retries, true) + } + // Join consumer group join, err := c.joinGroupRequest(coordinator, topics) if err != nil { @@ -196,19 +216,19 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top c.memberID = join.MemberId case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately c.memberID = "" - return c.newSession(ctx, coordinator, topics, handler, retries) - case ErrRebalanceInProgress: // retry after backoff + return c.newSession(ctx, topics, handler, retries) + case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh if retries <= 0 { return nil, join.Err } - select { - case <-c.closed: - return nil, ErrClosedConsumerGroup - case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff): + return c.retryNewSession(ctx, topics, handler, retries, true) + case ErrRebalanceInProgress: // retry after backoff + if retries <= 0 { + return nil, join.Err } - return c.newSession(ctx, coordinator, topics, handler, retries-1) + return c.retryNewSession(ctx, topics, handler, retries, false) default: return nil, join.Err } @@ -237,19 +257,19 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top case ErrNoError: case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately c.memberID = "" - return c.newSession(ctx, coordinator, topics, handler, retries) - case ErrRebalanceInProgress: // retry after backoff + return c.newSession(ctx, topics, handler, retries) + case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh if retries <= 0 { return nil, sync.Err } - select { - case <-c.closed: - return nil, ErrClosedConsumerGroup - case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff): + return c.retryNewSession(ctx, topics, handler, retries, true) + case ErrRebalanceInProgress: // retry after backoff + if retries <= 0 { + return nil, sync.Err } - return c.newSession(ctx, coordinator, topics, handler, retries-1) + return c.retryNewSession(ctx, topics, handler, retries, false) default: return nil, sync.Err }