From c1c3fa4c761722df6914c953cdc1bd3f05b74b23 Mon Sep 17 00:00:00 2001 From: Wenli Wan Date: Thu, 5 Dec 2024 17:32:50 +0800 Subject: [PATCH] feat: Prevent retry buffer in AsyncProducer to go OOM This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer. The change addresses issues #1358 and #1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive. Key updates: - Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer. - Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued. This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default). Signed-off-by: Wenli Wan --- async_producer.go | 17 +++++++++++++++++ config.go | 6 ++++++ 2 files changed, 23 insertions(+) diff --git a/async_producer.go b/async_producer.go index a6fa3d4a2..e330a1073 100644 --- a/async_producer.go +++ b/async_producer.go @@ -13,6 +13,9 @@ import ( "github.com/rcrowley/go-metrics" ) +// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied. +var ErrProducerRetryBufferOverflow = errors.New("discard a message and surface it as an error to prevent retry buffer to overflow") + // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages // to the correct broker for the provided topic-partition, refreshing metadata as appropriate, // and parses responses for errors. You must read from the Errors() channel or the @@ -1207,6 +1210,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { + maxBufferSize := p.conf.Producer.Retry.MaxBufferLength var msg *ProducerMessage buf := queue.New() @@ -1227,6 +1231,19 @@ func (p *asyncProducer) retryHandler() { } buf.Add(msg) + + if maxBufferSize > 0 && buf.Length() >= maxBufferSize { + msgToHandle := buf.Peek().(*ProducerMessage) + if msgToHandle.flags == 0 { + select { + case p.input <- msgToHandle: + buf.Remove() + default: + buf.Remove() + p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) + } + } + } } } diff --git a/config.go b/config.go index f2f197887..91bd61857 100644 --- a/config.go +++ b/config.go @@ -269,6 +269,12 @@ type Config struct { // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration + // The maximum length of the bridging buffer between `input` and `retries` channels + // in AsyncProducer#retryHandler. + // Defaults to 0 for unlimited. + // The limit is to prevent this buffer from overflowing. + // A zero or negative number indicates unlimited. + MaxBufferLength int } // Interceptors to be called when the producer dispatcher reads the