Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memory allocation optimisations? #707

Closed
Dieterbe opened this issue Jul 16, 2016 · 4 comments
Closed

memory allocation optimisations? #707

Dieterbe opened this issue Jul 16, 2016 · 4 comments

Comments

@Dieterbe
Copy link
Contributor

this is sarama, 1baff7c, aka a git checkout 10 days old.
i noticed my app's doing a lot of allocations and am trying to reduce them. so here's some insights from a heap profile while my app is running and consuming

go tool pprof --alloc_objects metrictank heap-prof
(pprof) top50
94470972 of 97026992 total (97.37%)
Dropped 162 nodes (cum <= 485134)
      flat  flat%   sum%        cum   cum%
  30229363 31.16% 31.16%   30229363 31.16%  github.com/tinylib/msgp/msgp.ReadStringBytes
  20623244 21.26% 52.41%   20623247 21.26%  time.NewTimer
  12075967 12.45% 64.86%   14107614 14.54%  github.com/Shopify/sarama.(*MessageBlock).decode
   7554518  7.79% 72.64%   21662132 22.33%  github.com/Shopify/sarama.(*MessageSet).decode
   7209180  7.43% 80.07%   37438543 38.59%  gopkg.in/raintank/schema%2ev0.(*MetricData).UnmarshalMsg
   7070097  7.29% 87.36%    7070097  7.29%  github.com/Shopify/sarama.(*partitionConsumer).parseResponse
   6586467  6.79% 94.15%    7384885  7.61%  github.com/raintank/metrictank/mdata.(*AggMetric).Add
   2031647  2.09% 96.24%    2031647  2.09%  github.com/Shopify/sarama.(*Message).decode
    614432  0.63% 96.88%    1204270  1.24%  github.com/raintank/metrictank/mdata.NewAggMetric
    393224  0.41% 97.28%    1089580  1.12%  github.com/raintank/metrictank/mdata.NewAggregator
     56176 0.058% 97.34%     503642  0.52%  github.com/raintank/metrictank/mdata.(*cassandraStore).processWriteQueue
      8740 0.009% 97.35%     493185  0.51%  encoding/json.Unmarshal
      8192 0.0084% 97.36%   21672845 22.34%  github.com/Shopify/sarama.versionedDecode
      7202 0.0074% 97.36%   21671855 22.34%  github.com/Shopify/sarama.(*brokerConsumer).fetchNewMessages
      2521 0.0026% 97.37%   21664653 22.33%  github.com/Shopify/sarama.(*FetchResponse).decode
         2 2.1e-06% 97.37%    1204272  1.24%  github.com/raintank/metrictank/mdata.(*AggMetrics).GetOrCreate
         0     0% 97.37%   21664653 22.33%  github.com/Shopify/sarama.(*Broker).Fetch
         0     0% 97.37%   21705613 22.37%  github.com/Shopify/sarama.(*Broker).sendAndReceive
         0     0% 97.37%   21662132 22.33%  github.com/Shopify/sarama.(*FetchResponseBlock).decode
         0     0% 97.37%   21671855 22.34%  github.com/Shopify/sarama.(*brokerConsumer).(github.com/Shopify/sarama.subscriptionConsumer)-fm
         0     0% 97.37%   21671855 22.34%  github.com/Shopify/sarama.(*brokerConsumer).subscriptionConsumer
         0     0% 97.37%   27663305 28.51%  github.com/Shopify/sarama.(*partitionConsumer).(github.com/Shopify/sarama.responseFeeder)-fm
         0     0% 97.37%   27663305 28.51%  github.com/Shopify/sarama.(*partitionConsumer).responseFeeder
         0     0% 97.37%   49336726 50.85%  github.com/Shopify/sarama.withRecover
         0     0% 97.37%     529350  0.55%  github.com/raintank/metrictank/defcache.(*DefCache).Add
         0     0% 97.37%   46519964 47.95%  github.com/raintank/metrictank/in.In.Handle
         0     0% 97.37%    9081421  9.36%  github.com/raintank/metrictank/in.In.process
         0     0% 97.37%   46519964 47.95%  github.com/raintank/metrictank/in/kafkamdm.(*KafkaMdm).consume
         0     0% 97.37%   97026992   100%  runtime.goexit
         0     0% 97.37%   20623247 21.26%  time.After

the NewTimer is from time.After which is from responseFeeder:

         .          .    421:       for i, msg := range msgs {
         .          .    422:           select {
         .          .    423:           case child.messages <- msg:
         .   20593208    424:           case <-time.After(child.conf.Consumer.MaxProcessingTime):
         .          .    425:               child.responseResult = errTimedOut
         .          .    426:               child.broker.acks.Done()

I tried the following patch, which unfortunately does not work.

diff --git a/consumer.go b/consumer.go
index c001794..a1d0f60 100644
--- a/consumer.go
+++ b/consumer.go
@@ -413,15 +413,17 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {

 func (child *partitionConsumer) responseFeeder() {
        var msgs []*ConsumerMessage
+       var after <-chan time.Time

 feederLoop:
        for response := range child.feeder {
                msgs, child.responseResult = child.parseResponse(response)

                for i, msg := range msgs {
+                       after = time.After(child.conf.Consumer.MaxProcessingTime)
                        select {
                        case child.messages <- msg:
-                       case <-time.After(child.conf.Consumer.MaxProcessingTime):
+                       case <-after:
                                child.responseResult = errTimedOut
                                child.broker.acks.Done()
                                for _, msg = range msgs[i:] {

the reason is it will always allocate new stuff, apparently:

(pprof) list NewTimer
Total: 30747639
ROUTINE ======================== time.NewTimer in /home/dieter/code/go/src/time/sleep.go
   6108880    6108883 (flat, cum) 19.87% of Total
         .          .     70:}
         .          .     71:
         .          .     72:// NewTimer creates a new Timer that will send
         .          .     73:// the current time on its channel after at least duration d.
         .          .     74:func NewTimer(d Duration) *Timer {
   4134488    4134488     75:   c := make(chan Time, 1)
         .          .     76:   t := &Timer{
         .          .     77:       C: c,
         .          .     78:       r: runtimeTimer{
         .          .     79:           when: when(d),
         .          .     80:           f:    sendTime,
         .          .     81:           arg:  c,
   1974392    1974392     82:       },
         .          .     83:   }
         .          3     84:   startTimer(&t.r)
         .          .     85:   return t
         .          .     86:}
         .          .     87:
         .          .     88:// Reset changes the timer to expire after duration d.
         .          .     89:// It returns true if the timer had been active, false if the timer had

beyond that, you can see there's some more sarama stuff where allocations happen.
maybe those can be solved by some sync.Pool usage or some otherwise reusing of buffers?

@eapache
Copy link
Contributor

eapache commented Jul 18, 2016

I don't see an obvious way to remove the allocation from the timer path; Go doesn't expose any other methods for managing low-level timers without allocating a new one each time. I suppose in the common case we might be able to re-use the same time.After call for every message in the batch if you added additional retry logic in the case when the timer fired, but it would be kind of messy.

The protocol decoding could plausibly use pools in some places, but it's not high on our list of priorities right now. We'd be happy to take PRs though.

@Tevic
Copy link

Tevic commented Jul 26, 2016

Maybe use time.NewTimer replace time.After is better, we can control when to release the memory .If large numbers of partitionConsumer there,the memory rise very fast and can not be released immediately according to my test.

 func (child *partitionConsumer) responseFeeder() {
    var msgs []*ConsumerMessage
 +  tm := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
 +  defer tm.Stop()
  feederLoop:
    for response := range child.feeder {
        msgs, child.responseResult = child.parseResponse(response)
        for i, msg := range msgs {
 +          tm.Reset(child.conf.Consumer.MaxProcessingTime)
            select {
            case child.messages <- msg:
 -          case <-time.After(child.conf.Consumer.MaxProcessingTime):
 +          case <-tm.C:
                child.responseResult = errTimedOut
                child.broker.acks.Done()
                for _, msg = range msgs[i:] {

@eapache
Copy link
Contributor

eapache commented Jul 26, 2016

Oh good point, I somehow missed that Timer objects could be reset without an additional allocation.

eapache added a commit that referenced this issue Jul 26, 2016
Rather than allocating a new timer with `time.After` on every message we
consume, allocate one for the `responseFeeder` and just keep resetting it.
Thanks to @Tevic for suggesting this approach.

Fixes #707.
@Dieterbe
Copy link
Contributor Author

yesss. this looks awesome.
here's another profile run with a similar workload.
looks basically same as before but the timer allocations are obliterated 👍

~/g/s/g/r/metrictank ❯❯❯ go tool pprof --alloc_objects metrictank heap.o
Entering interactive mode (type "help" for commands)
(pprof) top30
30450999 of 30985299 total (98.28%)
Dropped 115 nodes (cum <= 154926)
Showing top 30 nodes out of 63 (cum >= 340261)
      flat  flat%   sum%        cum   cum%
  12217352 39.43% 39.43%   12217352 39.43%  github.com/tinylib/msgp/msgp.ReadStringBytes
   3735896 12.06% 51.49%    5243247 16.92%  github.com/Shopify/sarama.(*MessageBlock).decode
   2916396  9.41% 60.90%    3184513 10.28%  github.com/raintank/metrictank/mdata.(*AggMetric).Add
   2886946  9.32% 70.22%    2886946  9.32%  github.com/Shopify/sarama.(*partitionConsumer).parseResponse
   2534174  8.18% 78.39%   14751526 47.61%  gopkg.in/raintank/schema%2ev1.(*MetricData).UnmarshalMsg
   2367997  7.64% 86.04%    7611244 24.56%  github.com/Shopify/sarama.(*MessageSet).decode
   1507351  4.86% 90.90%    1507351  4.86%  github.com/Shopify/sarama.(*Message).decode
    610340  1.97% 92.87%    1110062  3.58%  github.com/raintank/metrictank/mdata.NewAggMetric
....
(pprof) list responseFeeder
Total: 30985299
(.. filtered out other routine that's not interesting)
ROUTINE ======================== github.com/Shopify/sarama.(*partitionConsumer).responseFeeder in /home/dieter/go/src/github.com/Shopify/sarama/consumer.go
         0    2886946 (flat, cum)  9.32% of Total
         .          .    415:   var msgs []*ConsumerMessage
         .          .    416:   expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
         .          .    417:
         .          .    418:feederLoop:
         .          .    419:   for response := range child.feeder {
         .    2886946    420:       msgs, child.responseResult = child.parseResponse(response)
         .          .    421:
         .          .    422:       for i, msg := range msgs {
         .          .    423:           expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
         .          .    424:
         .          .    425:           select {
(pprof) 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants