Skip to content

Commit

Permalink
feat: Prevent retry buffer in AsyncProducer to go OOM
Browse files Browse the repository at this point in the history
This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer.
The change addresses issues IBM#1358 and IBM#1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive.

Key updates:
- Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer.
- Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued.

This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default).

Signed-off-by: Wenli Wan <[email protected]>
  • Loading branch information
wanwenli committed Dec 6, 2024
1 parent 4178837 commit a0f5c5b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
26 changes: 26 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ import (
"github.com/rcrowley/go-metrics"
)

// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied.
var ErrProducerRetryBufferOverflow = errors.New("retry buffer is full: message discarded to " +
"prevent buffer overflow. Check other errors and logs to determine the cause of excessive retries")

// minFunctionalRetryBufferLength is the lower limit of Producer.Retry.MaxBufferLength for it to function.
// Any non-zero maxBufferLength but less than this lower limit is pushed to the lower limit.
const minFunctionalRetryBufferLength = 5000

// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
// and parses responses for errors. You must read from the Errors() channel or the
Expand Down Expand Up @@ -1207,6 +1215,11 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
func (p *asyncProducer) retryHandler() {
maxBufferSize := p.conf.Producer.Retry.MaxBufferLength
if 0 < maxBufferSize && maxBufferSize < minFunctionalRetryBufferLength {
maxBufferSize = minFunctionalRetryBufferLength
}

var msg *ProducerMessage
buf := queue.New()

Expand All @@ -1227,6 +1240,19 @@ func (p *asyncProducer) retryHandler() {
}

buf.Add(msg)

if maxBufferSize > 0 && buf.Length() >= maxBufferSize {
msgToHandle := buf.Peek().(*ProducerMessage)
if msgToHandle.flags == 0 {
select {
case p.input <- msgToHandle:
buf.Remove()
default:
buf.Remove()
p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
}
}
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ type Config struct {
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
// The maximum length of the bridging buffer between `input` and `retries` channels
// in AsyncProducer#retryHandler.
// The limit is to prevent this buffer from overflowing or causing OOM.
// Defaults to 0 for unlimited.
// Any value between 0 and 5000 is pushed to 5000.
// A zero or negative value indicates unlimited.
MaxBufferLength int
}

// Interceptors to be called when the producer dispatcher reads the
Expand Down

0 comments on commit a0f5c5b

Please sign in to comment.