From cc1420bbe6473ae1c895bf27d16b2119a3c6029b Mon Sep 17 00:00:00 2001 From: Raul Negreiros Date: Fri, 11 Feb 2022 20:57:38 -0300 Subject: [PATCH] fix: prevent empty requests from paused consumer When using the new "pause consumer" support (#2005), Sarama would incorrectly submit empty FetchRequests if all of the assigned partitions were paused. This is because we only used Pause to skip adding the topicPartition blocks to the FetchRequest and still went ahead and sent the Fetch even if it was essentially empty: --- consumer.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/consumer.go b/consumer.go index 46bdadeee..0a9a6c31b 100644 --- a/consumer.go +++ b/consumer.go @@ -936,6 +936,12 @@ func (bc *brokerConsumer) subscriptionConsumer() { return } + // if there isn't response, it means that not fetch was made + // so we don't need to handle any response + if response == nil { + continue + } + bc.acks.Add(len(bc.subscriptions)) for child := range bc.subscriptions { child.feeder <- response @@ -1036,6 +1042,8 @@ func (bc *brokerConsumer) abort(err error) { } } +// fetchResponse can be nil if no fetch is made, it can occur when +// all partitions are paused func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { request := &FetchRequest{ MinBytes: bc.consumer.conf.Consumer.Fetch.Min, @@ -1077,5 +1085,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { } } + // avoid to fetch when there is no block + if len(request.blocks) == 0 { + return nil, nil + } + return bc.broker.Fetch(request) }