Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow consumers, subscription stuck problem #1897

Closed
alok87 opened this issue Mar 9, 2021 · 14 comments
Closed

Slow consumers, subscription stuck problem #1897

alok87 opened this issue Mar 9, 2021 · 14 comments

Comments

@alok87
Copy link

alok87 commented Mar 9, 2021

Our consumer group is handling 100+ topics (only one partition for all of them, partition0 for all 100 topics)

For example in the loader handler processes in batches. It batches and processes based on message count and also on time. The loop keeps going to the ticker case and since the batch size = 0 due to no inserts, nothing ever gets processed. We are stuck in this loop.

func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  claimMsgChan := claim.Messages()
  for {
      select {
          case message, ok := <-claimMsgChan:
                     // insert message in batch and process if batch > 0 
          case <-maxWaitTicker.C:
                    // process if batch > 0 
      }
  }
}

Out of 100+ topics only 57 topics got message [sarama] added subscription to, rest 43 never got subscribed so they are stuck in the endless loop waiting for messages to come to read channel.

Please suggest if this is an expected behaviour and how we can fix it. Is there some configuration I am missing?

@alok87 alok87 changed the title Is there a limit on the reads from Messages() read channel? All topics in claim are not getting subscribed leading to stuck processing Mar 10, 2021
@alok87 alok87 changed the title All topics in claim are not getting subscribed leading to stuck processing Many topics in claim are not getting subscribed leading to stuck processing Mar 10, 2021
@alok87
Copy link
Author

alok87 commented Mar 10, 2021

Cant we wait for all subscription to happen before starting?

@alok87
Copy link
Author

alok87 commented Mar 10, 2021

@dim please helpout here.

@dim
Copy link
Contributor

dim commented Mar 10, 2021

@alok87 sorry, I have no idea what's going on here tbh. I think that there is a rebalancing issue in your cluster and your handlers cannot exit the loop quickly enough. Can you try and reduce your ConsumeClaim to just:

for msg := range claim.Messages() {
  ...
}

and see if this fixes things. If it does, you can try and increase your timeout configs (this is usually the culprit). If it doesn't, you will need to do some lower-level debugging yourself (sorry), as it's really impossible for me to go through your specific use case. I am more than happy to review the PR if are able to isolate the problem.

@alok87
Copy link
Author

alok87 commented Mar 10, 2021

@dim
thanks for the reply. I would try what you suggested.

I have been trying to figure out why only few topics out of many receive this message
sarama] 2021/03/10 10:52:20 consumer/broker/1 added subscription to ts.inventory.customers/0
but many never receivies it. Why could this happen?

@alok87
Copy link
Author

alok87 commented Mar 10, 2021

Keeping for msg := range claim.Messages() { this did not help.

My consumers are really slow which all values would require tuning?
I have kept maxProcessTime to 20mins, sessionTimeout to 2mins, heartBeat at 2seconds and reBalance timeout to 5mins

I keep seeing these messages still https://github.com/Shopify/sarama/blob/master/consumer.go#L826

also added subscription comes for few topics only and not all. How does any topic partition get added to subscription?

@dim
Copy link
Contributor

dim commented Mar 10, 2021

Sorry, I haven't worked with Kafka for quite some time, but timeouts are documented e.g. in https://github.com/Shopify/sarama/blob/master/config.go#L261 and you can find more information at https://kafka.apache.org/documentation/ as the server and client settings need to be aligned.

The "consumer/broker/%d closed dead subscription ..." messages are triggered when the subscription is closed in https://github.com/Shopify/sarama/blob/master/consumer_group.go#L686 or https://github.com/Shopify/sarama/blob/master/consumer_group.go#L695. This happens when the first of your ConsumeClaim loops exists. Please note that whatever happens in ConsumeClaim MUST be thread-safe, as it's called once per topic/partition claim.

The fact that "added subscription" comes up for a few and not all topic/partitions could just mean that the other topic/partitions are still claimed by a previous process. The algorithm for consumer groups is complex, but TLDR:

  1. a consumer issues a JoinGroup request and receives a list of members + the ID of the elected group leader
  2. if the consumer is the leader, it works out an "assignment plan" to distribute the partitions across the members
  3. all consumers within a group issue a SyncGroup request, the elected leader also includes the "assignment plan"
  4. each consumer receives the "assignment plan" an part of the SyncGroup response and subscribes to the topics/partitions it has been allocated

Obviously, there are various timeouts, e.g. all consumers within a group must issue a SyncGroup request in time and also must continue their heartbeat, etc. You should check the logs of your broker too. They are verbose but they will eventually tell you what happens and why.

Hope that helps!

@alok87
Copy link
Author

alok87 commented Mar 11, 2021

@dim Thank you for the TLDR; and taking time to explain this. Means a lot.

I have tried to debug this issue and have found subscriptionManager() and subscriptionConsumer() are being called for all 3 brokers. And each are trying to make subscription live for the topics they have under.

Below is the log for subscriptionManager() and subscriptionConsumer() for one of the brokers: Code for the extra debug prints.

I0311 15:19:56.201188  790] subscriptionConsumer(broker2) STARTING FOR LOOP
I0311 15:19:56.201251  792] subscriptionConsumer(broker2) waiting
I0311 15:19:56.201151  742] subscriptionManager(broker2) STARTING FOR LOOP
I0311 15:19:56.201330  768] subscriptionManager(broker2) received loader-5768b1-ts.inventory.payment_transactions, buffer(0)=[]
I0311 15:19:56.201377  770] subscriptionManager(broker2) received loader-5768b1-ts.inventory.payment_transactions, buffer(1)=[0xc000134c80] (appended)
I0311 15:19:56.201443  759] subscriptionManager(broker2) bc.wait()
I0311 15:19:56.201453  794] subscriptionConsumer(broker2) resuming...
I0311 15:19:56.201461  797] subscriptionConsumer(broker2) updating newSubscription=[0xc000134c80]
I0311 15:19:56.201468  829] updateSubscriptions(loader-5768b1-ts.inventory.payment_transactions)
[sarama] 2021/03/11 15:19:56 consumer/broker/2 added subscription to loader-5768b1-ts.inventory.payment_transactions/0
I0311 15:19:56.201483  808] subscriptionConsumer(broker2) fetchNewMessages()
I0311 15:19:56.201553  756] subscriptionManager(broker2) sent buffer(1)=[0xc000134c80], buffer set nil
I0311 15:19:56.201689  768] subscriptionManager(broker2) received loader-5768b1-ts.inventory.customers, buffer(0)=[]
I0311 15:19:56.201700  770] subscriptionManager(broker2) received loader-5768b1-ts.inventory.customers, buffer(1)=[0xc000134a00] (appended)
I0311 15:19:56.202710  752] subscriptionManager(broker2) received loader-5768b1-ts.inventory.orders, buffer(1)=[0xc000134a00]
I0311 15:19:56.202818  754] subscriptionManager(broker2) received loader-5768b1-ts.inventory.orders, buffer(broker2)=[0xc000134a00 0xc000134b40] (appended)
I0311 15:19:56.202831  752] subscriptionManager(broker2) received loader-5768b1-ts.inventory.products, buffer(broker2)=[0xc000134a00 0xc000134b40]
I0311 15:19:56.202839  754] subscriptionManager(broker2) received loader-5768b1-ts.inventory.products, buffer(3)=[0xc000134a00 0xc000134b40 0xc000134f00] (appended)
I0311 15:19:56.202846  752] subscriptionManager(broker2) received loader-5768b1-ts.inventory.user_profile, buffer(3)=[0xc000134a00 0xc000134b40 0xc000134f00]
I0311 15:19:56.202945  754] subscriptionManager(broker2) received loader-5768b1-ts.inventory.user_profile, buffer(4)=[0xc000134a00 0xc000134b40 0xc000134f00 0xc000135400] (appended)

This is happening for all 3 brokers we have.
The subscription manager is stuck waiting for the buffer to be sent to bc.newSubscriptions after we have set the buffer = nil once.

Please suggest if you see a problem here.

@alok87
Copy link
Author

alok87 commented Mar 11, 2021

Found the second batch of buffer is not being set because the previous subscriptionConsumer() for the broker is stuck waiting for ACK bc.acks.Wait() https://gist.github.com/alok87/d9d08a95b3af38eea7e374339f66c199#file-gistfile1-go-L83

why it is stuck there? 😢

@alok87
Copy link
Author

alok87 commented Mar 11, 2021

@dim I found the problem.

I have slow consumers and my maxProcessingTime is huge due to that.

Problem
If say a broker is handling 10 topics and we want all to get subscribed. But suppose in the first flush of buffer (bc.newSubscriptions <- buffer) only one topic went. Then the other 9 will not get subscribed until the maxProcessTime ticker is hit for the second time.

This slows down consumption if the subscription did not happen for all of them at once.
How to go around this problem?

@alok87 alok87 changed the title Many topics in claim are not getting subscribed leading to stuck processing Slow consumers, subscription stuck problem Mar 11, 2021
alok87 added a commit to practo/tipoca-stream that referenced this issue Mar 12, 2021
Sarama updated from 1.27.2 to master
Add changes to use ticker instead IBM/sarama#1899 to solve #1897

Fixes IBM/sarama#1897
Fixes #160
@dim
Copy link
Contributor

dim commented Mar 15, 2021

@alok87 glad to hear. As said before, I am not a core maintainer of Sarama, I just contributed the original consumer group patch, but looking at #1899 you seem to be in good hands :)

@alok87
Copy link
Author

alok87 commented Mar 15, 2021

thanks @dim again for the help.

alok87 added a commit to practo/tipoca-stream that referenced this issue Jun 5, 2021
Sarama updated from 1.27.2 to master
Add changes to use ticker instead IBM/sarama#1899 to solve #1897

Fixes IBM/sarama#1897
Fixes #160
alok87 added a commit to practo/tipoca-stream that referenced this issue Jun 7, 2021
Sarama updated from 1.27.2 to master
Add changes to use ticker instead IBM/sarama#1899 to solve #1897

Fixes IBM/sarama#1897
Fixes #160
alok87 added a commit to practo/tipoca-stream that referenced this issue Jun 17, 2021
Sarama updated from 1.27.2 to master
Add changes to use ticker instead IBM/sarama#1899 to solve #1897

Fixes IBM/sarama#1897
Fixes #160
@jasimsulaiman
Copy link

jasimsulaiman commented Sep 19, 2021

Use session.Context().Done() chan to decide to stop processing. For long running processes, add checking the context.Done() chan between each step to stop processing. This is the only way processing goroutines can preemptively exit on rebalance. If it doesn't exit, keep in mind the processing of same message can happen in parallel as the message offset was not committed.

func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case <-session.Context().Done():
			return nil
		case message := <-claim.Messages():
			if message == nil || session.Context().Err() != nil { // if context was cancelled and claim has messages select is random
				return nil
			}
			ch := make(chan bool)
			go func() {
				ch <- c.handleMessage(session.Context(), message)
			}()
			select {
			case <-session.Context().Done():
				shouldCommit := <-ch // wait for current processing message to complete - do not do this if processing time can be more than rebalance timeout
				commitOffset(shouldCommit, session, message)
				return nil
			case shouldCommit := <-ch:
				commitOffset(shouldCommit, session, message)
				continue
			}
		}
	}
}

@SakalyaDeshpande
Copy link

I also have a long running process that takes 5 minutes and maxprocessing time is adjusted accordingly(set to 10 minutes). Our kafka consumer group is consuming same message again and again when our application in crashed/panics and kubernetes is restarting it. it generally works fine if we do a restart of application but if application is in crash state, it keeps on replaying the same last message

alok87 added a commit to practo/tipoca-stream that referenced this issue Jan 5, 2022
Upgraded to use Kafka 3.0.0

Not using the fork as we are running separate consumer groups
for every table and IBM/sarama#1897 issue does not happen to us.

We are not impacted by this bug anymore: IBM/sarama#1897
dnwe pushed a commit that referenced this issue Jan 12, 2022
The first few fetches from Kafka may only fetch data from one or two
partitions, starving the rest for a very long time (depending on message
size / processing time)

Once a member joins the consumer groups and receives its partitions,
they are fed into the "subscription manager" from different go routines.
The subscription manager then performs batching and executes a fetch for
all the partitions. However, it seems like the batching logic in
`subscriptionManager` is faulty, perhaps assuming that `case:` order
prioritizes which `case` should be handled when all are signaled which
is not the case, according to the go docs
(https://golang.org/ref/spec#Select_statements):
```
If one or more of the communications can proceed, a single one that can
proceed is chosen via a uniform pseudo-random selection. Otherwise, if
there is a default case, that case is chosen. If there is no default
case, the "select" statement blocks until at least one of the
communications can proceed.
```

For example - if you receive 64 partitions, each will be handled in
their own go routine which sends the partition information to the
`bc.input` channel. After an iteration there is a race between `case
event, ok := <-bc.input` which will batch the request and `case
bc.newSubscriptions <- buffer` which will trigger an immediate fetch of
the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If
the condition happens with 1 partition being in the batch (even though
63 extra partitions have been claimed but didn't make it into the batch)
the fetch will ask for 1MB (by default) of messages from that single
partition. If the messages are only a few bytes long and processing time
is minutes, you will not perform another fetch for hours.

Contributes-to: #1608 #1897
dnwe pushed a commit that referenced this issue Jan 31, 2022
The first few fetches from Kafka may only fetch data from one or two
partitions, starving the rest for a very long time (depending on message
size / processing time)

Once a member joins the consumer groups and receives its partitions,
they are fed into the "subscription manager" from different go routines.
The subscription manager then performs batching and executes a fetch for
all the partitions. However, it seems like the batching logic in
`subscriptionManager` is faulty, perhaps assuming that `case:` order
prioritizes which `case` should be handled when all are signaled which
is not the case, according to the go docs
(https://golang.org/ref/spec#Select_statements):
```
If one or more of the communications can proceed, a single one that can
proceed is chosen via a uniform pseudo-random selection. Otherwise, if
there is a default case, that case is chosen. If there is no default
case, the "select" statement blocks until at least one of the
communications can proceed.
```

For example - if you receive 64 partitions, each will be handled in
their own go routine which sends the partition information to the
`bc.input` channel. After an iteration there is a race between `case
event, ok := <-bc.input` which will batch the request and `case
bc.newSubscriptions <- buffer` which will trigger an immediate fetch of
the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If
the condition happens with 1 partition being in the batch (even though
63 extra partitions have been claimed but didn't make it into the batch)
the fetch will ask for 1MB (by default) of messages from that single
partition. If the messages are only a few bytes long and processing time
is minutes, you will not perform another fetch for hours.

Contributes-to: #1608 #1897
dnwe pushed a commit that referenced this issue Feb 24, 2022
The first few fetches from Kafka may only fetch data from one or two
partitions, starving the rest for a very long time (depending on message
size / processing time)

Once a member joins the consumer groups and receives its partitions,
they are fed into the "subscription manager" from different go routines.
The subscription manager then performs batching and executes a fetch for
all the partitions. However, it seems like the batching logic in
`subscriptionManager` is faulty, perhaps assuming that `case:` order
prioritizes which `case` should be handled when all are signaled which
is not the case, according to the go docs
(https://golang.org/ref/spec#Select_statements):
```
If one or more of the communications can proceed, a single one that can
proceed is chosen via a uniform pseudo-random selection. Otherwise, if
there is a default case, that case is chosen. If there is no default
case, the "select" statement blocks until at least one of the
communications can proceed.
```

For example - if you receive 64 partitions, each will be handled in
their own go routine which sends the partition information to the
`bc.input` channel. After an iteration there is a race between `case
event, ok := <-bc.input` which will batch the request and `case
bc.newSubscriptions <- buffer` which will trigger an immediate fetch of
the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If
the condition happens with 1 partition being in the batch (even though
63 extra partitions have been claimed but didn't make it into the batch)
the fetch will ask for 1MB (by default) of messages from that single
partition. If the messages are only a few bytes long and processing time
is minutes, you will not perform another fetch for hours.

Contributes-to: #1608 #1897
dnwe pushed a commit that referenced this issue Feb 24, 2022
The first few fetches from Kafka may only fetch data from one or two
partitions, starving the rest for a very long time (depending on message
size / processing time)

Once a member joins the consumer groups and receives its partitions,
they are fed into the "subscription manager" from different go routines.
The subscription manager then performs batching and executes a fetch for
all the partitions. However, it seems like the batching logic in
`subscriptionManager` is faulty, perhaps assuming that `case:` order
prioritizes which `case` should be handled when all are signaled which
is not the case, according to the go docs
(https://golang.org/ref/spec#Select_statements):
```
If one or more of the communications can proceed, a single one that can
proceed is chosen via a uniform pseudo-random selection. Otherwise, if
there is a default case, that case is chosen. If there is no default
case, the "select" statement blocks until at least one of the
communications can proceed.
```

For example - if you receive 64 partitions, each will be handled in
their own go routine which sends the partition information to the
`bc.input` channel. After an iteration there is a race between `case
event, ok := <-bc.input` which will batch the request and `case
bc.newSubscriptions <- buffer` which will trigger an immediate fetch of
the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If
the condition happens with 1 partition being in the batch (even though
63 extra partitions have been claimed but didn't make it into the batch)
the fetch will ask for 1MB (by default) of messages from that single
partition. If the messages are only a few bytes long and processing time
is minutes, you will not perform another fetch for hours.

Contributes-to: #1608 #1897
dnwe pushed a commit that referenced this issue Feb 24, 2022
The first few fetches from Kafka may only fetch data from one or two
partitions, starving the rest for a very long time (depending on message
size / processing time)

Once a member joins the consumer groups and receives its partitions,
they are fed into the "subscription manager" from different go routines.
The subscription manager then performs batching and executes a fetch for
all the partitions. However, it seems like the batching logic in
`subscriptionManager` is faulty, perhaps assuming that `case:` order
prioritizes which `case` should be handled when all are signaled which
is not the case, according to the go docs
(https://golang.org/ref/spec#Select_statements):
```
If one or more of the communications can proceed, a single one that can
proceed is chosen via a uniform pseudo-random selection. Otherwise, if
there is a default case, that case is chosen. If there is no default
case, the "select" statement blocks until at least one of the
communications can proceed.
```

For example - if you receive 64 partitions, each will be handled in
their own go routine which sends the partition information to the
`bc.input` channel. After an iteration there is a race between `case
event, ok := <-bc.input` which will batch the request and `case
bc.newSubscriptions <- buffer` which will trigger an immediate fetch of
the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If
the condition happens with 1 partition being in the batch (even though
63 extra partitions have been claimed but didn't make it into the batch)
the fetch will ask for 1MB (by default) of messages from that single
partition. If the messages are only a few bytes long and processing time
is minutes, you will not perform another fetch for hours.

Contributes-to: #1608 #1897
dnwe pushed a commit that referenced this issue Feb 25, 2022
The first few fetches from Kafka may only fetch data from one or two
partitions, starving the rest for a very long time (depending on message
size / processing time)

Once a member joins the consumer groups and receives its partitions,
they are fed into the "subscription manager" from different go routines.
The subscription manager then performs batching and executes a fetch for
all the partitions. However, it seems like the batching logic in
`subscriptionManager` is faulty, perhaps assuming that `case:` order
prioritizes which `case` should be handled when all are signaled which
is not the case, according to the go docs
(https://golang.org/ref/spec#Select_statements):
```
If one or more of the communications can proceed, a single one that can
proceed is chosen via a uniform pseudo-random selection. Otherwise, if
there is a default case, that case is chosen. If there is no default
case, the "select" statement blocks until at least one of the
communications can proceed.
```

For example - if you receive 64 partitions, each will be handled in
their own go routine which sends the partition information to the
`bc.input` channel. After an iteration there is a race between `case
event, ok := <-bc.input` which will batch the request and `case
bc.newSubscriptions <- buffer` which will trigger an immediate fetch of
the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If
the condition happens with 1 partition being in the batch (even though
63 extra partitions have been claimed but didn't make it into the batch)
the fetch will ask for 1MB (by default) of messages from that single
partition. If the messages are only a few bytes long and processing time
is minutes, you will not perform another fetch for hours.

Contributes-to: #1608 #1897
dnwe added a commit that referenced this issue Feb 25, 2022
The first few fetches from Kafka may only fetch data from one or two
partitions, starving the rest for a very long time (depending on message
size / processing time)

Once a member joins the consumer groups and receives its partitions,
they are fed into the "subscription manager" from different go routines.
The subscription manager then performs batching and executes a fetch for
all the partitions. However, it seems like the batching logic in
`subscriptionManager` is faulty, perhaps assuming that `case:` order
prioritizes which `case` should be handled when all are signaled which
is not the case, according to the go docs
(https://golang.org/ref/spec#Select_statements):
```
If one or more of the communications can proceed, a single one that can
proceed is chosen via a uniform pseudo-random selection. Otherwise, if
there is a default case, that case is chosen. If there is no default
case, the "select" statement blocks until at least one of the
communications can proceed.
```

For example - if you receive 64 partitions, each will be handled in
their own go routine which sends the partition information to the
`bc.input` channel. After an iteration there is a race between `case
event, ok := <-bc.input` which will batch the request and `case
bc.newSubscriptions <- buffer` which will trigger an immediate fetch of
the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If
the condition happens with 1 partition being in the batch (even though
63 extra partitions have been claimed but didn't make it into the batch)
the fetch will ask for 1MB (by default) of messages from that single
partition. If the messages are only a few bytes long and processing time
is minutes, you will not perform another fetch for hours.

Contributes-to: #1608 #1897

Co-authored-by: Dominic Evans <[email protected]>
@dnwe
Copy link
Collaborator

dnwe commented Feb 25, 2022

This should now be fixed on main

@dnwe dnwe closed this as completed Feb 25, 2022
chrisxrepo pushed a commit to chrisxrepo/sarama that referenced this issue Apr 11, 2022
The first few fetches from Kafka may only fetch data from one or two
partitions, starving the rest for a very long time (depending on message
size / processing time)

Once a member joins the consumer groups and receives its partitions,
they are fed into the "subscription manager" from different go routines.
The subscription manager then performs batching and executes a fetch for
all the partitions. However, it seems like the batching logic in
`subscriptionManager` is faulty, perhaps assuming that `case:` order
prioritizes which `case` should be handled when all are signaled which
is not the case, according to the go docs
(https://golang.org/ref/spec#Select_statements):
```
If one or more of the communications can proceed, a single one that can
proceed is chosen via a uniform pseudo-random selection. Otherwise, if
there is a default case, that case is chosen. If there is no default
case, the "select" statement blocks until at least one of the
communications can proceed.
```

For example - if you receive 64 partitions, each will be handled in
their own go routine which sends the partition information to the
`bc.input` channel. After an iteration there is a race between `case
event, ok := <-bc.input` which will batch the request and `case
bc.newSubscriptions <- buffer` which will trigger an immediate fetch of
the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If
the condition happens with 1 partition being in the batch (even though
63 extra partitions have been claimed but didn't make it into the batch)
the fetch will ask for 1MB (by default) of messages from that single
partition. If the messages are only a few bytes long and processing time
is minutes, you will not perform another fetch for hours.

Contributes-to: IBM#1608 IBM#1897

Co-authored-by: Dominic Evans <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants