Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paused Consumer still sends FetchRequest (with empty blocks) #2125

Closed
dnwe opened this issue Jan 31, 2022 · 8 comments
Closed

Paused Consumer still sends FetchRequest (with empty blocks) #2125

dnwe opened this issue Jan 31, 2022 · 8 comments

Comments

@dnwe
Copy link
Collaborator

dnwe commented Jan 31, 2022

Whilst trying out the new "pause consumer" support (#2005) on main, I noticed that Sarama will submit empty FetchRequests if all of the assigned partitions are paused. This is because we only use Pause to skip adding the topicPartition blocks to the FetchRequest and still go ahead and send the Fetch even if it is essentially empty:

https://github.com/Shopify/sarama/blob/f214fdd60afa75e28e1d40308b64029033fc6f9d/consumer.go#L1071-L1077

Ideally we should work the same way as the Java client and not send any FetchRequest if there are no active assigned partitions to consume from

@raulnegreiros might you be able to take a look into what it would take to achieve this?

@raulnegreiros
Copy link
Contributor

Sure, I'll try to check it at the weekend.

@raulnegreiros
Copy link
Contributor

hi @dnwe , I've submitted a proposal, please check if this is a valid way. But I fear that skipping the handleResponses can have some collateral effect, I'm not so sure about it. I think you can give some guidance if this is the best way to follow.

@pkoutsovasilis
Copy link
Contributor

Hello, I stumpled upon this issue as well. I tested the associated PR and it solves it.

About your concerns of skipping handleResponses @raulnegreiros, I think that this is actually an appropriate solution. From what I see in the code, handleResponses acts upon the following based on the child.responseResult set in responseFeeder():

  • checks whether the consumer is assigned to the preferredBroker if there is no err [ no need to do this since we sent no request, and this can always apply at the next request we send ]
  • errTimedOut, ErrOffsetOutOfRange, ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable [ again all these errors are related to processing of the messages being slow, etc. which are not relevant since we did make any actual request ]

I don't see anything else critical inside handleResponses that may result in erroneous behavior by skipping it.

So if there is any further assistance that I could provide here which could potentially help expedite the PR status, please do let me know 🙂

@pkoutsovasilis
Copy link
Contributor

pkoutsovasilis commented Aug 11, 2022

ok @dnwe I see that you merged this one, but there is a corner case which I am not sure that it is covered by this issue.

So when all topics and the respective partitions are paused then this PR addresses that case and does not send a request, so far so good. But if you have a ConsumerGroup that consumes from two or more topics, and all the partitions of just one topic are paused, the same error resurfaces. My proposed fix to cover that case as well fix would be the following

                for child := range bc.subscriptions {
+                       if _, ok := response.Blocks[child.topic]; !ok {
+                               bc.acks.Done()
+                               continue
+                       }
+
+                       if _, ok := response.Blocks[child.topic][child.partition]; !ok {
+                               bc.acks.Done()
+                               continue
+                       }
+
                        child.feeder <- response
                }
                bc.acks.Wait()

would you like me to do a separate issue and maybe a PR about that case?

@dnwe
Copy link
Collaborator Author

dnwe commented Aug 11, 2022

@pkoutsovasilis yes that sounds interesting, please do raise an issue.

Hearing from users who are consuming from multiple topics (potentially of differing partition numbers) from the same consumer group is always interesting as this isn’t as common a scenario and it usually teases out new and unforeseen behaviour

@hawaiyon
Copy link

hawaiyon commented Nov 1, 2022

@raulnegreiros Hi, using the latest v1.37.2 with this PR(#2143) mered, I found that when all the partitions are paused, we would observe high CPU load. (using the example consumergroup and sending SIGUSR1 to the pid would reproduce this problem)

Digging into the code, I think it's caused by the following code introuduced by this PR. With if response == nil being true, the for loop just goes on and on, with no new message but drains the CPU.

Do you think a little Sleep would help? As in time.Sleep(partitionConsumersBatchTimeout)

	for newSubscriptions := range bc.newSubscriptions {
		bc.updateSubscriptions(newSubscriptions)

		if len(bc.subscriptions) == 0 {
			// We're about to be shut down or we're about to receive more subscriptions.
			// Take a small nap to avoid burning the CPU.
			time.Sleep(partitionConsumersBatchTimeout)
			continue
		}

		response, err := bc.fetchNewMessages()
		if err != nil {
			Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
			bc.abort(err)
			return
		}

		// if there isn't response, it means that not fetch was made
		// so we don't need to handle any response
		if response == nil {
			continue
		}

hawaiyon pushed a commit to hawaiyon/sarama that referenced this issue Nov 1, 2022
@implmnt
Copy link

implmnt commented Jun 18, 2023

I have the same issue

sarama v1.38.1

@dnwe
Copy link
Collaborator Author

dnwe commented Aug 31, 2023

The CPU load issue should have been fixed by #2532 in v1.41.0 and newer

@dnwe dnwe closed this as completed Aug 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants