Skip to content

Commit

Permalink
Merge pull request #1231 from thomaslee/tom_fix_coordinator_changes
Browse files Browse the repository at this point in the history
Retry ErrNotCoordinatorForConsumer in ConsumerGroup.newSession
  • Loading branch information
bai authored Apr 3, 2019
2 parents a4a1b88 + 4ff43dd commit 9522409
Showing 1 changed file with 42 additions and 22 deletions.
64 changes: 42 additions & 22 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,8 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
return err
}

// Get coordinator
coordinator, err := c.client.Coordinator(c.groupID)
if err != nil {
return err
}

// Init session
sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
if err == ErrClosedClient {
return ErrClosedConsumerGroup
} else if err != nil {
Expand All @@ -184,7 +178,33 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
return sess.release(true)
}

func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
select {
case <-c.closed:
return nil, ErrClosedConsumerGroup
case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
}

if refreshCoordinator {
err := c.client.RefreshCoordinator(c.groupID)
if err != nil {
return c.retryNewSession(ctx, topics, handler, retries, true)
}
}

return c.newSession(ctx, topics, handler, retries-1)
}

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
coordinator, err := c.client.Coordinator(c.groupID)
if err != nil {
if retries <= 0 {
return nil, err
}

return c.retryNewSession(ctx, topics, handler, retries, true)
}

// Join consumer group
join, err := c.joinGroupRequest(coordinator, topics)
if err != nil {
Expand All @@ -196,19 +216,19 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
c.memberID = join.MemberId
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, coordinator, topics, handler, retries)
case ErrRebalanceInProgress: // retry after backoff
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
if retries <= 0 {
return nil, join.Err
}

select {
case <-c.closed:
return nil, ErrClosedConsumerGroup
case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, join.Err
}

return c.newSession(ctx, coordinator, topics, handler, retries-1)
return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, join.Err
}
Expand Down Expand Up @@ -237,19 +257,19 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, coordinator, topics, handler, retries)
case ErrRebalanceInProgress: // retry after backoff
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
if retries <= 0 {
return nil, sync.Err
}

select {
case <-c.closed:
return nil, ErrClosedConsumerGroup
case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, sync.Err
}

return c.newSession(ctx, coordinator, topics, handler, retries-1)
return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, sync.Err
}
Expand Down

0 comments on commit 9522409

Please sign in to comment.