Skip to content

Commit

Permalink
fix: check session.Context().Done() in examples/consumergroup (#2240)
Browse files Browse the repository at this point in the history
  • Loading branch information
zxc111 authored Jun 6, 2022
1 parent 4c0bbf8 commit 0151486
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions examples/consumergroup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,17 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
for {
select {
case message := <-claim.Messages():
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}

return nil
}

0 comments on commit 0151486

Please sign in to comment.