Skip to content

Commit

Permalink
feat: Prevent retry buffer in AsyncProducer to go OOM
Browse files Browse the repository at this point in the history
This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer.
The change addresses issues IBM#1358 and IBM#1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive.

Key updates:
- Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer.
- Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued.

This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default).

Signed-off-by: Wenli Wan <[email protected]>
  • Loading branch information
wanwenli committed Dec 5, 2024
1 parent 4178837 commit c1c3fa4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
17 changes: 17 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"github.com/rcrowley/go-metrics"
)

// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied.
var ErrProducerRetryBufferOverflow = errors.New("discard a message and surface it as an error to prevent retry buffer to overflow")

// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
// and parses responses for errors. You must read from the Errors() channel or the
Expand Down Expand Up @@ -1207,6 +1210,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
func (p *asyncProducer) retryHandler() {
maxBufferSize := p.conf.Producer.Retry.MaxBufferLength
var msg *ProducerMessage
buf := queue.New()

Expand All @@ -1227,6 +1231,19 @@ func (p *asyncProducer) retryHandler() {
}

buf.Add(msg)

if maxBufferSize > 0 && buf.Length() >= maxBufferSize {
msgToHandle := buf.Peek().(*ProducerMessage)
if msgToHandle.flags == 0 {
select {
case p.input <- msgToHandle:
buf.Remove()
default:
buf.Remove()
p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
}
}
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,12 @@ type Config struct {
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
// The maximum length of the bridging buffer between `input` and `retries` channels
// in AsyncProducer#retryHandler.
// Defaults to 0 for unlimited.
// The limit is to prevent this buffer from overflowing.
// A zero or negative number indicates unlimited.
MaxBufferLength int
}

// Interceptors to be called when the producer dispatcher reads the
Expand Down

0 comments on commit c1c3fa4

Please sign in to comment.