Skip to content

Commit

Permalink
feat(producer): add retry buffer tuning option to prevent OOM (#3026)
Browse files Browse the repository at this point in the history
This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer.
The change addresses issues #1358 and #1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive.

Key updates:
- Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer.
- Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued.

This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default).

Signed-off-by: Wenli Wan <[email protected]>
  • Loading branch information
wanwenli authored Dec 19, 2024
1 parent 91728dc commit 7c75cc4
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 0 deletions.
25 changes: 25 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
"github.com/rcrowley/go-metrics"
)

// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied.
var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow")

// minFunctionalRetryBufferLength is the lower limit of Producer.Retry.MaxBufferLength for it to function.
// Any non-zero maxBufferLength but less than this lower limit is pushed to the lower limit.
const minFunctionalRetryBufferLength = 4 * 1024

// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
// and parses responses for errors. You must read from the Errors() channel or the
Expand Down Expand Up @@ -1207,6 +1214,11 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
func (p *asyncProducer) retryHandler() {
maxBufferSize := p.conf.Producer.Retry.MaxBufferLength
if 0 < maxBufferSize && maxBufferSize < minFunctionalRetryBufferLength {
maxBufferSize = minFunctionalRetryBufferLength
}

var msg *ProducerMessage
buf := queue.New()

Expand All @@ -1227,6 +1239,19 @@ func (p *asyncProducer) retryHandler() {
}

buf.Add(msg)

if maxBufferSize > 0 && buf.Length() >= maxBufferSize {
msgToHandle := buf.Peek().(*ProducerMessage)
if msgToHandle.flags == 0 {
select {
case p.input <- msgToHandle:
buf.Remove()
default:
buf.Remove()
p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
}
}
}
}
}

Expand Down
76 changes: 76 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2244,6 +2244,82 @@ func TestTxnCanAbort(t *testing.T) {
require.NoError(t, err)
}

func TestPreventRetryBufferOverflow(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()
topic := "test-topic"

metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
r := new(MetadataResponse)
r.AddBroker(broker.Addr(), broker.BrokerID())
r.AddTopicPartition(topic, 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
return r
}

produceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
r := new(ProduceResponse)
r.AddTopicPartition(topic, 0, ErrNotLeaderForPartition)
return r
}

broker.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"ProduceRequest": produceRequestHandlerFunc,
"MetadataRequest": metadataRequestHandlerFunc,
})

config := NewTestConfig()
config.Producer.Flush.MaxMessages = 1
config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength
config.Producer.Return.Successes = true

producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var (
wg sync.WaitGroup
successes, producerErrors int
errorFound bool
)

wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for errMsg := range producer.Errors() {
if errors.Is(errMsg.Err, ErrProducerRetryBufferOverflow) {
errorFound = true
}
producerErrors++
}
}()

numMessages := 100000
for i := 0; i < numMessages; i++ {
kv := StringEncoder(strconv.Itoa(i))
producer.Input() <- &ProducerMessage{
Topic: topic,
Key: kv,
Value: kv,
Metadata: i,
}
}

producer.AsyncClose()
wg.Wait()

require.Equal(t, successes+producerErrors, numMessages, "Expected all messages to be processed")
require.True(t, errorFound, "Expected at least one error matching ErrProducerRetryBufferOverflow")
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ type Config struct {
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
// The maximum length of the bridging buffer between `input` and `retries` channels
// in AsyncProducer#retryHandler.
// The limit is to prevent this buffer from overflowing or causing OOM.
// Defaults to 0 for unlimited.
// Any value between 0 and 4096 is pushed to 4096.
// A zero or negative value indicates unlimited.
MaxBufferLength int
}

// Interceptors to be called when the producer dispatcher reads the
Expand Down

0 comments on commit 7c75cc4

Please sign in to comment.