Skip to content

Commit

Permalink
Merge pull request #1602 from Shopify/diego_improve-consumer-consume-…
Browse files Browse the repository at this point in the history
…docs

adds a note about consumer groups Consume method
  • Loading branch information
d1egoaz authored Feb 6, 2020
2 parents 9d2cdb2 + 8b72385 commit 3dfcc91
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
3 changes: 3 additions & 0 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type ConsumerGroup interface {
// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
// commit failures.
// This method should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims.
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error

// Errors returns a read channel of errors that occurred during the consumer life-cycle.
Expand Down
16 changes: 5 additions & 11 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,11 @@ func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, cla
}

func ExampleConsumerGroup() {
// Init config, specify appropriate version
config := NewConfig()
config.Version = V1_0_0_0
config.Version = V2_0_0_0 // specify appropriate version
config.Consumer.Return.Errors = true

// Start with a client
client, err := NewClient([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer func() { _ = client.Close() }()

// Start a new consumer group
group, err := NewConsumerGroupFromClient("my-group", client)
group, err := NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
if err != nil {
panic(err)
}
Expand All @@ -50,6 +41,9 @@ func ExampleConsumerGroup() {
topics := []string{"my-topic"}
handler := exampleConsumerGroupHandler{}

// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
err := group.Consume(ctx, topics, handler)
if err != nil {
panic(err)
Expand Down
3 changes: 3 additions & 0 deletions examples/consumergroup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func main() {
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
Expand Down

0 comments on commit 3dfcc91

Please sign in to comment.