Skip to content

Commit

Permalink
fix: prevent empty requests from paused consumer
Browse files Browse the repository at this point in the history
When using the new "pause consumer" support (IBM#2005), Sarama would
incorrectly submit empty FetchRequests if all of the assigned partitions
were paused. This is because we only used Pause to skip adding the
topicPartition blocks to the FetchRequest and still went ahead and sent
the Fetch even if it was essentially empty:
  • Loading branch information
raulnegreiros authored and dnwe committed Apr 13, 2022
1 parent 8f8d8da commit cc1420b
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,12 @@ func (bc *brokerConsumer) subscriptionConsumer() {
return
}

// if there isn't response, it means that not fetch was made
// so we don't need to handle any response
if response == nil {
continue
}

bc.acks.Add(len(bc.subscriptions))
for child := range bc.subscriptions {
child.feeder <- response
Expand Down Expand Up @@ -1036,6 +1042,8 @@ func (bc *brokerConsumer) abort(err error) {
}
}

// fetchResponse can be nil if no fetch is made, it can occur when
// all partitions are paused
func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
request := &FetchRequest{
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
Expand Down Expand Up @@ -1077,5 +1085,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
}
}

// avoid to fetch when there is no block
if len(request.blocks) == 0 {
return nil, nil
}

return bc.broker.Fetch(request)
}

0 comments on commit cc1420b

Please sign in to comment.