Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Endless loop of subscriptionManager #692

Closed
josselin-c opened this issue Jul 5, 2016 · 7 comments
Closed

Endless loop of subscriptionManager #692

josselin-c opened this issue Jul 5, 2016 · 7 comments
Labels

Comments

@josselin-c
Copy link

Versions

Sarama Version: 1.8
Kafka Version: 0.9.2
Go Version: 1.6.2

Configuration

What configuration values are you using for Sarama and Kafka?

I'm running a 3 node kafka & zookeeper cluster.
Sarama is used with github.com/wvanbergen/kafka for the consumer-group management.
I realize we are not using the latest sarama lib and the wvanbergen consumer-group library is probably deprecated as well. If you think this issue isn't relevant, I can understand

Logs
(pprof) list .subscriptionManager$
Total: 1.20mins
ROUTINE ======================== github.com/Shopify/sarama.(*brokerConsumer).subscriptionManager in /go/src/josselin/mykafkacode/Godeps/_workspace/src/github.com/Shopify/sarama/consumer.go
     2.86s     42.33s (flat, cum) 58.93% of Total
         .          .    540: //  goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
         .          .    541: // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
         .          .    542: // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
         .          .    543: // so the main goroutine can block waiting for work if it has none.
         .          .    544: for {
     200ms      200ms    545:  if len(buffer) > 0 {
         .          .    546:   select {
         .          .    547:   case event, ok := <-bc.input:
         .          .    548:    if !ok {
         .          .    549:     goto done
         .          .    550:    }
         .          .    551:    buffer = append(buffer, event)
         .          .    552:   case bc.newSubscriptions <- buffer:
         .          .    553:    buffer = nil
         .          .    554:   case bc.wait <- none{}:
         .          .    555:   }
         .          .    556:  } else {
     420ms     36.29s    557:   select {
     1.08s      2.81s    558:   case event, ok := <-bc.input:
         .          .    559:    if !ok {
         .          .    560:     goto done
         .          .    561:    }
         .          .    562:    buffer = append(buffer, event)
     1.16s      3.03s    563:   case bc.newSubscriptions <- nil:
         .          .    564:   }
         .          .    565:  }
         .          .    566: }
         .          .    567:
         .          .    568:done:
Problem Description

After the machine running sarama lost its connection to our kafka/zookeeper cluster for a few minutes, sarama takes all the available CPUs.
Running pprof on the instance shows most of the time is spent in the go runtime doing channel management (selectgoImpl, atomic.Xchg) with the only sarama function appearing in the profile being borkerConsumer.subscriptionManager. Looks like subscriptionManager is busy looping forever.

The svg profile from pprof: pprof001.zip

@eapache
Copy link
Contributor

eapache commented Jul 5, 2016

Thanks for the detailed report, I believe there are multiple related issues at play here. The tight loop is fairly obvious given the SVG you provided so I will have a fix for that shortly. What I am still trying to figure out is how it got into that state in the first place. I have a few hypotheses but if you could provide the sarama logs from the incident that would be very helpful.

@josselin-c
Copy link
Author

Here are the logs around the time of the incident.
Logs comes from one of our service using Kafka. The service has a lot of kafka consumer (in the order of 100) so the logs are unfortunately quite big.
Problem starts at 9:56:38: sarama.zip

@eapache
Copy link
Contributor

eapache commented Jul 5, 2016

Hmm... in some time prior to the incident were there any sarama logs that contained the phrase "abandoned subscription"? If so then I'm pretty sure I understand what happened. Either way, #693 should fix it.

@josselin-c
Copy link
Author

Yes, I periodically have this message in the logs. Before and after the problem.

@eapache
Copy link
Contributor

eapache commented Jul 6, 2016

Good to know, thanks. If those logs also say "because consuming was taking too long" then you may want to adjust your config.Consumer.MaxProcessingTime value.

@josselin-c
Copy link
Author

I'm not sure it could work in our case, we basically take data from Kafka and push it to other systems that we don't control.
Some of theses systems can be down for several days at a time. When the target system is down we don't consume any more data and loop trying to send the data we got from Kafka until it succeeds and then we consume again.

Pseudo code is: for msg := range consumer.Messages() { doSend(msg) } with doSend sometimes taking days to return.

Should we worry about theses "abandoned subscription because consuming was taking too long" messages in our case? I didn't see any dataloss in our application.

@eapache
Copy link
Contributor

eapache commented Jul 6, 2016

It won't drop any data it's just an efficiency thing. If you knew that your processing e.g. takes 10 seconds per message it is just more efficient not to constantly be dropping/re-establishing the subscription. In your case it sounds like the slow cases are not predictable enough to matter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants