Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncProducer retries causing OOM #1358

Closed
hagen1778 opened this issue Apr 17, 2019 · 5 comments
Closed

AsyncProducer retries causing OOM #1358

hagen1778 opened this issue Apr 17, 2019 · 5 comments
Labels
help wanted stale Issues and pull requests without any recent activity

Comments

@hagen1778
Copy link

hagen1778 commented Apr 17, 2019

Versions
Sarama Kafka Go
1.20.1 1.0.0 1.11.3
Configuration
        conf := sarama.NewConfig()
	conf.Version = sarama.V0_11_0_0
	conf.Net.MaxOpenRequests = 5
	conf.Net.ReadTimeout = 5s
	conf.Net.DialTimeout = 5s
	conf.Net.WriteTimeout = 5s
	conf.Metadata.Retry.Max = 0
	conf.Producer.RequiredAcks = -1
	conf.Producer.Timeout = 5s
	conf.Producer.Flush.Bytes = 16MB
	conf.Producer.Flush.Frequency = 5s
	conf.Producer.Return.Errors = true
	conf.Producer.Return.Successes = true
Logs

Sarama logs recorded right before memory usage goes up:

logs: CLICK ME

producer/leader/<topic>/58 abandoning broker 10137
producer/leader/<topic>/14 selected broker 10128
producer/broker/10137 state change to [closed] on <topic>/80
producer/broker/10128 state change to [open] on <topic>/12
client/metadata fetching metadata for [<topic>] from broker <addr>:9092
producer/broker/10247 state change to [retrying] on <topic>/46 because kafka server: Request exceeded the user-specified time limit in the request.
producer/broker/10137 state change to [closed] on <topic>/18
producer/leader/<topic>/30 selected broker 10247
producer/broker/10247 state change to [retrying] on <topic>/30 because kafka server: Request exceeded the user-specified time limit in the request.
producer/broker/10137 state change to [open] on <topic>/58
producer/leader/<topic>/3 abandoning broker 10247
producer/broker/10247 state change to [closed] on <topic>/30
producer/broker/10247 state change to [closed] on <topic>/3
producer/leader/<topic>/55 state change to [normal]
producer/broker/10139 state change to [retrying] on <topic>/7 because kafka server: Request exceeded the user-specified time limit in the request.

Problem Description

The go-application writes messages into AsyncProducer. It also reads from Success and Error channels. The average memory usage of application is about 8GB.
The problem is that during peak load application memory consumption could raise up to 4x which causes OOM.

Log messages above were recorded during the incident. Messages with content maximum request accumulated, waiting for space were omitted. From the first glance it looks like there are some issues with inserting into Kafka and producer starts to retry messages.

Digging into AsyncProducer showed that retryHandler uses buffer without limiting by max size. So I created an additional metric to store buffer Length - // Length returns the number of elements currently stored in the queue.
And it showed perfect correlation between memory consumption and queue size:
image
image

This makes me think that AsyncProducer continues to accept incoming messages without retry queue growth control, which leads to uncontrolled memory consumption. I wasn't able to reproduce the issue at local environment.

Expected behaviour

If AsyncProducer is unable to flush retry queue it should block on receiving new messages.

@qiangmzsx
Copy link
Contributor

me too!!!

fatal error: runtime: out of memory

runtime stack:
runtime.throw(0x7de4b2, 0x16)
        /export/go/src/runtime/panic.go:616 +0x81
runtime.sysMap(0xc459710000, 0x100000, 0x7f8ab7ffec00, 0xbd0b98)
        /export/go/src/runtime/mem_linux.go:216 +0x20a
runtime.(*mheap).sysAlloc(0xbb7540, 0x100000, 0x7f8a9e1c61d0)
        /export/go/src/runtime/malloc.go:470 +0xd4
runtime.(*mheap).grow(0xbb7540, 0x1, 0x0)
        /export/go/src/runtime/mheap.go:907 +0x60
runtime.(*mheap).allocSpanLocked(0xbb7540, 0x1, 0xbd0ba8, 0x7f8a9e1c61d0)
        /export/go/src/runtime/mheap.go:820 +0x301
runtime.(*mheap).alloc_m(0xbb7540, 0x1, 0x7f8ab7ff0056, 0x7f8a9e1c61d0)
        /export/go/src/runtime/mheap.go:686 +0x118
runtime.(*mheap).alloc.func1()
        /export/go/src/runtime/mheap.go:753 +0x4d
runtime.(*mheap).alloc(0xbb7540, 0x1, 0xc420010056, 0x7f8ab7ffedf0)
        /export/go/src/runtime/mheap.go:752 +0x8a
runtime.(*mcentral).grow(0xbb9e10, 0x0)
        /export/go/src/runtime/mcentral.go:232 +0x94
runtime.(*mcentral).cacheSpan(0xbb9e10, 0x1ff)
        /export/go/src/runtime/mcentral.go:106 +0x2e4
runtime.(*mcache).refill(0x7f8b215ec000, 0xc420022056)
        /export/go/src/runtime/mcache.go:123 +0x9c
runtime.(*mcache).nextFree.func1()
        /export/go/src/runtime/malloc.go:556 +0x32
runtime.systemstack(0x0)
        /export/go/src/runtime/asm_amd64.s:409 +0x79
runtime.mstart()
        /export/go/src/runtime/proc.go:1175

goroutine 58 [running]:
runtime.systemstack_switch()
        /export/go/src/runtime/asm_amd64.s:363 fp=0xc42022cb48 sp=0xc42022cb40 pc=0x45ed70
runtime.(*mcache).nextFree(0x7f8b215ec000, 0xc459708756, 0x7f8a9e92093f, 0x775202, 0x79)
        /export/go/src/runtime/malloc.go:555 +0xa9 fp=0xc42022cba0 sp=0xc42022cb48 pc=0x41a929
runtime.mallocgc(0x1000, 0x77c500, 0xc458c31f01, 0xc459441000)
        /export/go/src/runtime/malloc.go:710 +0x79f fp=0xc42022cc40 sp=0xc42022cba0 pc=0x41b27f
runtime.growslice(0x77c500, 0xc4596b8000, 0x100, 0x100, 0x101, 0xa, 0x0, 0x0)
        /export/go/src/runtime/slice.go:179 +0x14a fp=0xc42022cca8 sp=0xc42022cc40 pc=0x44b15a
gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama.(*MessageSet).addMessage(...)
        /export/data/gopath/src/gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama/message_set.go:107
gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama.(*produceSet).add(0xc4595cc440, 0xc4596e10e0, 0x0, 0xc420148188)
        /export/data/gopath/src/gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama/produce_set.go:108 +0x851 fp=0xc42022cde0 sp=0xc42022cca8 pc=0x685501
gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama.(*brokerProducer).run(0xc42020a720)
        /export/data/gopath/src/gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama/async_producer.go:744 +0x653 fp=0xc42022cfa0 sp=0xc42022cde0 pc=0x644ed3
gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama.(*brokerProducer).(gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama.run)-fm()
        /export/data/gopath/src/gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama/async_producer.go:651 +0x2a fp=0xc42022cfb8 sp=0xc42022cfa0 pc=0x69652a
gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama.withRecover(0xc4201408e0)
        /export/data/gopath/src/gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama/utils.go:45 +0x43 fp=0xc42022cfd8 sp=0xc42022cfb8 pc=0x68fda3
runtime.goexit()
        /export/go/src/runtime/asm_amd64.s:2361 +0x1 fp=0xc42022cfe0 sp=0xc42022cfd8 pc=0x4618c1
created by gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer
        /export/data/gopath/src/gitlab.xxxxxxx.com/zeroteam/ddkafka/vendor/github.com/Shopify/sarama/async_producer.go:651 +0x1b8

@d1egoaz
Copy link
Contributor

d1egoaz commented Aug 22, 2019

Thanks for the error report @hagen1778

@d1egoaz
Copy link
Contributor

d1egoaz commented Aug 22, 2019

Same error was also reported in #1372

@ghost
Copy link

ghost commented Feb 21, 2020

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

@ghost ghost added the stale Issues and pull requests without any recent activity label Feb 21, 2020
@ghost ghost closed this as completed Mar 22, 2020
wanwenli added a commit to wanwenli/sarama that referenced this issue Dec 5, 2024
This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer.
The change addresses issues IBM#1358 and IBM#1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive.

Key updates:
- Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer.
- Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued.

This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default).

Signed-off-by: Wenli Wan <[email protected]>
@wanwenli
Copy link
Contributor

wanwenli commented Dec 5, 2024

@hagen1778 @qiangmzsx @d1egoaz any progress or updates on this issue?

I've made #3026 to limit the buffer size as an option

wanwenli added a commit to wanwenli/sarama that referenced this issue Dec 6, 2024
This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer.
The change addresses issues IBM#1358 and IBM#1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive.

Key updates:
- Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer.
- Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued.

This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default).

Signed-off-by: Wenli Wan <[email protected]>
wanwenli added a commit to wanwenli/sarama that referenced this issue Dec 6, 2024
This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer.
The change addresses issues IBM#1358 and IBM#1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive.

Key updates:
- Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer.
- Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued.

This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default).

Signed-off-by: Wenli Wan <[email protected]>
wanwenli added a commit to wanwenli/sarama that referenced this issue Dec 6, 2024
This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer.
The change addresses issues IBM#1358 and IBM#1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive.

Key updates:
- Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer.
- Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued.

This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default).

Signed-off-by: Wenli Wan <[email protected]>
wanwenli added a commit to wanwenli/sarama that referenced this issue Dec 6, 2024
This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer.
The change addresses issues IBM#1358 and IBM#1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive.

Key updates:
- Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer.
- Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued.

This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default).

Signed-off-by: Wenli Wan <[email protected]>
dnwe pushed a commit that referenced this issue Dec 19, 2024
This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer.
The change addresses issues #1358 and #1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive.

Key updates:
- Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer.
- Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued.

This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default).

Signed-off-by: Wenli Wan <[email protected]>
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted stale Issues and pull requests without any recent activity
Projects
None yet
Development

No branches or pull requests

4 participants